123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- package ossClient
- import (
- "app.yhyue.com/moapp/jybase/api"
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "google.golang.org/grpc"
- "google.golang.org/grpc/keepalive"
- "io"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
- "log"
- "mime/multipart"
- "net/http"
- "net/rpc"
- "strconv"
- "sync"
- "time"
- )
- var (
- GrpcCoon *grpc.ClientConn
- GrpcCoonOnce = &sync.Once{}
- )
- /* restful方式上传
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @param stream 文件流
- * @param gzip 是否压缩
- * @return {"error_code":0,"error_msg":"上传成功"}
- */
- func UpLoadByRestful(domain, bucketId, objectName string, stream []byte, gzip bool) (reply *api.Result) {
- reply = &api.Result{Error_code: -1}
- // 创建一个缓冲区来存储表单数据
- body := &bytes.Buffer{}
- writer := multipart.NewWriter(body)
- writer.WriteField("bucket_id", bucketId)
- writer.WriteField("object_name", objectName)
- writer.WriteField("gzip", strconv.FormatBool(gzip))
- // 创建表单字段
- part, err := writer.CreateFormFile("file", objectName)
- if err != nil {
- reply.Error_msg = err.Error()
- return
- }
- // 模拟文件流
- fileStream := bytes.NewReader(stream)
- // 将文件流复制到表单字段
- _, err = io.Copy(part, fileStream)
- if err != nil {
- reply.Error_msg = err.Error()
- return
- }
- // 创建 HTTP 请求
- if respBody, err := Post(domain+constant.UploadUrl, writer, body); err != nil {
- reply.Error_msg = err.Error()
- } else {
- json.Unmarshal(respBody, &reply)
- }
- return
- }
- /* restful方式下载
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
- */
- func DownloadByRestful(domain, bucketId, objectName string) (reply *api.Result) {
- reply = &api.Result{}
- // 创建一个缓冲区来存储表单数据
- body := &bytes.Buffer{}
- writer := multipart.NewWriter(body)
- writer.WriteField("bucket_id", bucketId)
- writer.WriteField("object_name", objectName)
- if respBody, err := Post(domain+constant.DownloadUrl, writer, body); err != nil {
- reply.Error_msg = err.Error()
- } else {
- reply.Error_msg = constant.DownloadSuccess
- reply.Data = respBody
- }
- return
- }
- /* restful方式删除
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"删除成功"}
- */
- func DeleteByRestful(domain, bucketId, objectName string) (reply *api.Result) {
- reply = &api.Result{}
- // 创建一个缓冲区来存储表单数据
- body := &bytes.Buffer{}
- writer := multipart.NewWriter(body)
- writer.WriteField("bucket_id", bucketId)
- writer.WriteField("object_name", objectName)
- if respBody, err := Post(domain+constant.DeleteUrl, writer, body); err != nil {
- reply.Error_msg = err.Error()
- } else {
- json.Unmarshal(respBody, &reply)
- }
- return
- }
- /* restful方式获取标讯正文
- * @param domain 域名,例如:https://ossservice.jianyu360.cn
- * @param bucketId 桶id
- * @param objectName 对象名称
- * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"}
- */
- func GetBidDetailByRestful(domain, bucketId, objectName string) (reply *api.Result) {
- reply = &api.Result{}
- // 创建一个缓冲区来存储表单数据
- body := &bytes.Buffer{}
- writer := multipart.NewWriter(body)
- writer.WriteField("bucket_id", bucketId)
- writer.WriteField("object_name", objectName)
- if respBody, err := Post(domain+constant.GetBidDetailUrl, writer, body); err != nil {
- reply.Error_msg = err.Error()
- } else {
- reply.Error_msg = constant.GetBidDetailSuccess
- reply.Data = string(respBody)
- }
- return
- }
- func Post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) {
- // 关闭表单写入器
- if err := writer.Close(); err != nil {
- return nil, err
- }
- // 创建 HTTP 请求
- req, err := http.NewRequest("POST", url, body)
- if err != nil {
- log.Println("Error creating request:", err)
- return nil, err
- }
- // 设置请求头
- req.Header.Set("Content-Type", writer.FormDataContentType())
- // 发送请求
- client := &http.Client{}
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- // 读取响应
- respBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- if resp.StatusCode != http.StatusOK {
- return nil, errors.New(string(respBody))
- }
- return respBody, nil
- }
- /* rpc方式上传
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @param args.Stream 文件流
- * @param args.Gzip 是否压缩
- * @return {"error_code":0,"error_msg":"上传成功"}
- * @return error 错误信息
- */
- func UpLoadByRpc(address string, args *entity.UploadArgs) (*api.Result, error) {
- reply := &api.Result{}
- err := RpcCall(address, "OSSService.Upload", args, reply)
- return reply, err
- }
- /*
- *rpc方式下载
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
- * @return error 错误信息
- */
- func DownloadByRpc(address string, args *entity.Args) (*api.Result, error) {
- reply := &api.Result{}
- err := RpcCall(address, "OSSService.Download", args, reply)
- return reply, err
- }
- /* rpc方式删除
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"删除成功"}
- * @return error 错误信息
- */
- func DeleteByRpc(address string, args *entity.Args) (*api.Result, error) {
- reply := &api.Result{}
- err := RpcCall(address, "OSSService.Delete", args, reply)
- return reply, err
- }
- /*
- *rpc方式获取标讯正文
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
- * @return error 错误信息
- */
- func GetBidDetailByRpc(address string, args *entity.Args) (*api.Result, error) {
- reply := &api.Result{}
- err := RpcCall(address, "OSSService.GetBidDetail", args, reply)
- return reply, err
- }
- // rpc call
- func RpcCall(address, serviceMethod string, args any, reply any) error {
- conn, err := rpc.DialHTTP("tcp", address)
- if err != nil {
- log.Println(err)
- return err
- }
- defer conn.Close()
- err = conn.Call(serviceMethod, args, reply)
- if err != nil {
- log.Println(err)
- return err
- }
- return nil
- }
- // grpc call
- func GrpcCall(address string, timeout int64) (pb.ServiceClient, context.Context, func(), error) {
- var err error
- GrpcCoonOnce.Do(func() {
- GrpcCoon, err = grpc.Dial(address, grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: 10 * time.Second, // 发送ping的间隔
- Timeout: 5 * time.Second, // 等待pong响应的超时时间
- PermitWithoutStream: true, // 是否在没有活跃流的情况下发送pings
- }))
- })
- var cancel context.CancelFunc
- if err != nil {
- log.Println(err)
- return nil, nil, cancel, err
- }
- var ctx context.Context
- if timeout < 0 {
- ctx = context.Background()
- } else if timeout == 0 {
- timeout = 5
- }
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(context.TODO(), time.Duration(timeout)*time.Second)
- }
- return pb.NewServiceClient(GrpcCoon), ctx, cancel, nil
- }
- /* grpc方式上传
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @param args.Stream 文件流
- * @param args.Gzip 是否压缩
- * @return {"error_code":0,"error_msg":"上传成功"}
- * @return error 错误信息
- */
- func UpLoadByGrpc(address string, argsFunc func() (*pb.UploadRequest, bool), timeout int64) (*pb.Response, error) {
- serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
- defer cancel()
- result := &pb.Response{}
- if err != nil {
- return result, err
- }
- stream, err := serviceClient.Upload(ctx)
- if err != nil {
- log.Println(err)
- return nil, err
- }
- for {
- if args, ok := argsFunc(); ok {
- stream.Send(args)
- } else {
- break
- }
- }
- return result, err
- }
- /*
- *grpc方式下载
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return []byte
- * @return error 错误信息
- */
- func DownloadByGrpc(address string, args *entity.Args, timeout int64) (*pb.Response, error) {
- serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
- defer cancel()
- result := &pb.Response{}
- if err != nil {
- return result, err
- }
- resp, err := serviceClient.Download(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
- if err != nil {
- log.Println(err)
- return nil, err
- }
- b := &bytes.Buffer{}
- for {
- reply, err := resp.Recv()
- if err == io.EOF {
- break
- } else if err != nil {
- log.Println(err)
- return nil, err
- }
- result.ErrorMsg = reply.ErrorMsg
- result.ErrorCode = reply.ErrorCode
- b.Write(reply.Data)
- }
- result.Data = b.Bytes()
- return result, nil
- }
- /*
- *grpc方式获取标讯正文
- * @param address 域名,例如:192.168.3.206:8110
- * @param args 参数
- * @param args.BucketID 文件名
- * @param args.objectName 对象名称
- * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
- * @return error 错误信息
- */
- func GetBidDetailByGrpc(address string, args *entity.Args) (*api.Result, error) {
- serviceClient, ctx, cancel, err := GrpcCall(address, 0)
- defer cancel()
- result := &api.Result{}
- if err != nil {
- return result, err
- }
- resp, err := serviceClient.GetBidDetail(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
- if err != nil {
- log.Println(err)
- return nil, err
- }
- b := &bytes.Buffer{}
- for {
- reply, err := resp.Recv()
- if err == io.EOF {
- break
- } else if err != nil {
- log.Println(err)
- return nil, err
- }
- result.Error_msg = reply.ErrorMsg
- result.Error_code = int(reply.ErrorCode)
- b.Write(reply.Data)
- }
- result.Data = string(b.Bytes())
- return result, nil
- }
|