package ossClient import ( "app.yhyue.com/moapp/jybase/api" "bytes" "context" "encoding/json" "errors" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "io" "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant" "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity" "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb" "log" "mime/multipart" "net/http" "net/rpc" "strconv" "sync" "time" ) var ( GrpcCoon *grpc.ClientConn GrpcCoonOnce = &sync.Once{} ) /* restful方式上传 * @param domain 域名,例如:https://ossservice.jianyu360.cn * @param bucketId 桶id * @param objectName 对象名称 * @param stream 文件流 * @param gzip 是否压缩 * @return {"error_code":0,"error_msg":"上传成功"} */ func UpLoadByRestful(domain, bucketId, objectName string, stream []byte, gzip bool) (reply *api.Result) { reply = &api.Result{Error_code: -1} // 创建一个缓冲区来存储表单数据 body := &bytes.Buffer{} writer := multipart.NewWriter(body) writer.WriteField("bucket_id", bucketId) writer.WriteField("object_name", objectName) writer.WriteField("gzip", strconv.FormatBool(gzip)) // 创建表单字段 part, err := writer.CreateFormFile("file", objectName) if err != nil { reply.Error_msg = err.Error() return } // 模拟文件流 fileStream := bytes.NewReader(stream) // 将文件流复制到表单字段 _, err = io.Copy(part, fileStream) if err != nil { reply.Error_msg = err.Error() return } // 创建 HTTP 请求 if respBody, err := Post(domain+constant.UploadUrl, writer, body); err != nil { reply.Error_msg = err.Error() } else { json.Unmarshal(respBody, &reply) } return } /* restful方式下载 * @param domain 域名,例如:https://ossservice.jianyu360.cn * @param bucketId 桶id * @param objectName 对象名称 * @return {"error_code":0,"error_msg":"下载成功","data":文件流} */ func DownloadByRestful(domain, bucketId, objectName string) (reply *api.Result) { reply = &api.Result{} // 创建一个缓冲区来存储表单数据 body := &bytes.Buffer{} writer := multipart.NewWriter(body) writer.WriteField("bucket_id", bucketId) writer.WriteField("object_name", objectName) if respBody, err := Post(domain+constant.DownloadUrl, writer, body); err != nil { reply.Error_msg = err.Error() } else { reply.Error_msg = constant.DownloadSuccess reply.Data = respBody } return } /* restful方式删除 * @param domain 域名,例如:https://ossservice.jianyu360.cn * @param bucketId 桶id * @param objectName 对象名称 * @return {"error_code":0,"error_msg":"删除成功"} */ func DeleteByRestful(domain, bucketId, objectName string) (reply *api.Result) { reply = &api.Result{} // 创建一个缓冲区来存储表单数据 body := &bytes.Buffer{} writer := multipart.NewWriter(body) writer.WriteField("bucket_id", bucketId) writer.WriteField("object_name", objectName) if respBody, err := Post(domain+constant.DeleteUrl, writer, body); err != nil { reply.Error_msg = err.Error() } else { json.Unmarshal(respBody, &reply) } return } /* restful方式获取标讯正文 * @param domain 域名,例如:https://ossservice.jianyu360.cn * @param bucketId 桶id * @param objectName 对象名称 * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"} */ func GetBidDetailByRestful(domain, bucketId, objectName string) (reply *api.Result) { reply = &api.Result{} // 创建一个缓冲区来存储表单数据 body := &bytes.Buffer{} writer := multipart.NewWriter(body) writer.WriteField("bucket_id", bucketId) writer.WriteField("object_name", objectName) if respBody, err := Post(domain+constant.GetBidDetailUrl, writer, body); err != nil { reply.Error_msg = err.Error() } else { reply.Error_msg = constant.GetBidDetailSuccess reply.Data = string(respBody) } return } func Post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) { // 关闭表单写入器 if err := writer.Close(); err != nil { return nil, err } // 创建 HTTP 请求 req, err := http.NewRequest("POST", url, body) if err != nil { log.Println("Error creating request:", err) return nil, err } // 设置请求头 req.Header.Set("Content-Type", writer.FormDataContentType()) // 发送请求 client := &http.Client{} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() // 读取响应 respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { return nil, errors.New(string(respBody)) } return respBody, nil } /* rpc方式上传 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @param args.Stream 文件流 * @param args.Gzip 是否压缩 * @return {"error_code":0,"error_msg":"上传成功"} * @return error 错误信息 */ func UpLoadByRpc(address string, args *entity.UploadArgs) (*api.Result, error) { reply := &api.Result{} err := RpcCall(address, "OSSService.Upload", args, reply) return reply, err } /* *rpc方式下载 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @return {"error_code":0,"error_msg":"下载成功","data":文件流} * @return error 错误信息 */ func DownloadByRpc(address string, args *entity.Args) (*api.Result, error) { reply := &api.Result{} err := RpcCall(address, "OSSService.Download", args, reply) return reply, err } /* rpc方式删除 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @return {"error_code":0,"error_msg":"删除成功"} * @return error 错误信息 */ func DeleteByRpc(address string, args *entity.Args) (*api.Result, error) { reply := &api.Result{} err := RpcCall(address, "OSSService.Delete", args, reply) return reply, err } /* *rpc方式获取标讯正文 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"} * @return error 错误信息 */ func GetBidDetailByRpc(address string, args *entity.Args) (*api.Result, error) { reply := &api.Result{} err := RpcCall(address, "OSSService.GetBidDetail", args, reply) return reply, err } // rpc call func RpcCall(address, serviceMethod string, args any, reply any) error { conn, err := rpc.DialHTTP("tcp", address) if err != nil { log.Println(err) return err } defer conn.Close() err = conn.Call(serviceMethod, args, reply) if err != nil { log.Println(err) return err } return nil } // grpc call func GrpcCall(address string, timeout int64) (pb.ServiceClient, context.Context, func(), error) { var err error GrpcCoonOnce.Do(func() { GrpcCoon, err = grpc.Dial(address, grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, // 发送ping的间隔 Timeout: 5 * time.Second, // 等待pong响应的超时时间 PermitWithoutStream: true, // 是否在没有活跃流的情况下发送pings })) }) var cancel context.CancelFunc if err != nil { log.Println(err) return nil, nil, cancel, err } var ctx context.Context if timeout < 0 { ctx = context.Background() } else if timeout == 0 { timeout = 5 } if timeout > 0 { ctx, cancel = context.WithTimeout(context.TODO(), time.Duration(timeout)*time.Second) } return pb.NewServiceClient(GrpcCoon), ctx, cancel, nil } /* grpc方式上传 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @param args.Stream 文件流 * @param args.Gzip 是否压缩 * @return {"error_code":0,"error_msg":"上传成功"} * @return error 错误信息 */ func UpLoadByGrpc(address string, argsFunc func() (*pb.UploadRequest, bool), timeout int64) (*pb.Response, error) { serviceClient, ctx, cancel, err := GrpcCall(address, timeout) defer cancel() result := &pb.Response{} if err != nil { return result, err } stream, err := serviceClient.Upload(ctx) if err != nil { log.Println(err) return nil, err } for { if args, ok := argsFunc(); ok { stream.Send(args) } else { break } } return result, err } /* *grpc方式下载 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @return []byte * @return error 错误信息 */ func DownloadByGrpc(address string, args *entity.Args, timeout int64) (*pb.Response, error) { serviceClient, ctx, cancel, err := GrpcCall(address, timeout) defer cancel() result := &pb.Response{} if err != nil { return result, err } resp, err := serviceClient.Download(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName}) if err != nil { log.Println(err) return nil, err } b := &bytes.Buffer{} for { reply, err := resp.Recv() if err == io.EOF { break } else if err != nil { log.Println(err) return nil, err } result.ErrorMsg = reply.ErrorMsg result.ErrorCode = reply.ErrorCode b.Write(reply.Data) } result.Data = b.Bytes() return result, nil } /* *grpc方式获取标讯正文 * @param address 域名,例如:192.168.3.206:8110 * @param args 参数 * @param args.BucketID 文件名 * @param args.objectName 对象名称 * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"} * @return error 错误信息 */ func GetBidDetailByGrpc(address string, args *entity.Args) (*api.Result, error) { serviceClient, ctx, cancel, err := GrpcCall(address, 0) defer cancel() result := &api.Result{} if err != nil { return result, err } resp, err := serviceClient.GetBidDetail(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName}) if err != nil { log.Println(err) return nil, err } b := &bytes.Buffer{} for { reply, err := resp.Recv() if err == io.EOF { break } else if err != nil { log.Println(err) return nil, err } result.Error_msg = reply.ErrorMsg result.Error_code = int(reply.ErrorCode) b.Write(reply.Data) } result.Data = string(b.Bytes()) return result, nil }