client.go 10 KB


  1. package ossClient
  2. import (
  3. "app.yhyue.com/moapp/jybase/api"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/keepalive"
  10. "io"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
  13. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  14. "log"
  15. "mime/multipart"
  16. "net/http"
  17. "net/rpc"
  18. "strconv"
  19. "sync"
  20. "time"
  21. )
  22. var (
  23. GrpcCoon *grpc.ClientConn
  24. GrpcCoonOnce = &sync.Once{}
  25. )
  26. /* restful方式上传
  27. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  28. * @param args 参数
  29. * @param args.BucketID 文件名
  30. * @param args.objectName 对象名称
  31. * @param args.Stream 文件流
  32. * @param args.Gzip 是否压缩
  33. * @return {"error_code":0,"error_msg":"上传成功"}
  34. */
  35. func UpLoadByRestful(domain string, args *entity.UploadArgs) (reply *api.Result) {
  36. reply = &api.Result{Error_code: -1}
  37. // 创建一个缓冲区来存储表单数据
  38. body := &bytes.Buffer{}
  39. writer := multipart.NewWriter(body)
  40. writer.WriteField("bucket_id", args.BucketID)
  41. writer.WriteField("object_name", args.ObjectName)
  42. writer.WriteField("gzip", strconv.FormatBool(args.Gzip))
  43. // 创建表单字段
  44. part, err := writer.CreateFormFile("file", args.ObjectName)
  45. if err != nil {
  46. reply.Error_msg = err.Error()
  47. return
  48. }
  49. // 模拟文件流
  50. fileStream := bytes.NewReader(args.Stream)
  51. // 将文件流复制到表单字段
  52. _, err = io.Copy(part, fileStream)
  53. if err != nil {
  54. reply.Error_msg = err.Error()
  55. return
  56. }
  57. // 创建 HTTP 请求
  58. if respBody, err := Post(domain+constant.UploadUrl, writer, body); err != nil {
  59. reply.Error_msg = err.Error()
  60. } else {
  61. json.Unmarshal(respBody, &reply)
  62. }
  63. return
  64. }
  65. /* restful方式下载
  66. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  67. * @param args 参数
  68. * @param args.BucketID 文件名
  69. * @param args.objectName 对象名称
  70. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  71. */
  72. func DownloadByRestful(domain string, args *entity.Args) (reply *api.Result) {
  73. reply = &api.Result{}
  74. // 创建一个缓冲区来存储表单数据
  75. body := &bytes.Buffer{}
  76. writer := multipart.NewWriter(body)
  77. writer.WriteField("bucket_id", args.BucketID)
  78. writer.WriteField("object_name", args.ObjectName)
  79. if respBody, err := Post(domain+constant.DownloadUrl, writer, body); err != nil {
  80. reply.Error_msg = err.Error()
  81. } else {
  82. reply.Error_msg = constant.DownloadSuccess
  83. reply.Data = respBody
  84. }
  85. return
  86. }
  87. /* restful方式删除
  88. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  89. * @param args 参数
  90. * @param args.BucketID 文件名
  91. * @param args.objectName 对象名称
  92. * @return {"error_code":0,"error_msg":"删除成功"}
  93. */
  94. func DeleteByRestful(domain string, args *entity.Args) (reply *api.Result) {
  95. reply = &api.Result{}
  96. // 创建一个缓冲区来存储表单数据
  97. body := &bytes.Buffer{}
  98. writer := multipart.NewWriter(body)
  99. writer.WriteField("bucket_id", args.BucketID)
  100. writer.WriteField("object_name", args.ObjectName)
  101. if respBody, err := Post(domain+constant.DeleteUrl, writer, body); err != nil {
  102. reply.Error_msg = err.Error()
  103. } else {
  104. json.Unmarshal(respBody, &reply)
  105. }
  106. return
  107. }
  108. /* restful方式获取标讯正文
  109. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  110. * @param args 参数
  111. * @param args.BucketID 文件名
  112. * @param args.objectName 对象名称
  113. * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"}
  114. */
  115. func GetBidDetailByRestful(domain string, args *entity.Args) (reply *api.Result) {
  116. reply = &api.Result{}
  117. // 创建一个缓冲区来存储表单数据
  118. body := &bytes.Buffer{}
  119. writer := multipart.NewWriter(body)
  120. writer.WriteField("bucket_id", args.BucketID)
  121. writer.WriteField("object_name", args.ObjectName)
  122. if respBody, err := Post(domain+constant.GetBidDetailUrl, writer, body); err != nil {
  123. reply.Error_msg = err.Error()
  124. } else {
  125. reply.Error_msg = constant.GetBidDetailSuccess
  126. reply.Data = string(respBody)
  127. }
  128. return
  129. }
  130. func Post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) {
  131. // 关闭表单写入器
  132. if err := writer.Close(); err != nil {
  133. return nil, err
  134. }
  135. // 创建 HTTP 请求
  136. req, err := http.NewRequest("POST", url, body)
  137. if err != nil {
  138. log.Println("Error creating request:", err)
  139. return nil, err
  140. }
  141. // 设置请求头
  142. req.Header.Set("Content-Type", writer.FormDataContentType())
  143. // 发送请求
  144. client := &http.Client{}
  145. resp, err := client.Do(req)
  146. if err != nil {
  147. return nil, err
  148. }
  149. defer resp.Body.Close()
  150. // 读取响应
  151. respBody, err := io.ReadAll(resp.Body)
  152. if err != nil {
  153. return nil, err
  154. }
  155. if resp.StatusCode != http.StatusOK {
  156. return nil, errors.New(string(respBody))
  157. }
  158. return respBody, nil
  159. }
  160. /* rpc方式上传
  161. * @param address 域名,例如:192.168.3.206:8110
  162. * @param args 参数
  163. * @param args.BucketID 文件名
  164. * @param args.objectName 对象名称
  165. * @param args.Stream 文件流
  166. * @param args.Gzip 是否压缩
  167. * @return {"error_code":0,"error_msg":"上传成功"}
  168. * @return error 错误信息
  169. */
  170. func UpLoadByRpc(address string, args *entity.UploadArgs) (*api.Result, error) {
  171. reply := &api.Result{}
  172. err := RpcCall(address, "OSSService.Upload", args, reply)
  173. return reply, err
  174. }
  175. /*
  176. *rpc方式下载
  177. * @param address 域名,例如:192.168.3.206:8110
  178. * @param args 参数
  179. * @param args.BucketID 文件名
  180. * @param args.objectName 对象名称
  181. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  182. * @return error 错误信息
  183. */
  184. func DownloadByRpc(address string, args *entity.Args) (*api.Result, error) {
  185. reply := &api.Result{}
  186. err := RpcCall(address, "OSSService.Download", args, reply)
  187. return reply, err
  188. }
  189. /* rpc方式删除
  190. * @param address 域名,例如:192.168.3.206:8110
  191. * @param args 参数
  192. * @param args.BucketID 文件名
  193. * @param args.objectName 对象名称
  194. * @return {"error_code":0,"error_msg":"删除成功"}
  195. * @return error 错误信息
  196. */
  197. func DeleteByRpc(address string, args *entity.Args) (*api.Result, error) {
  198. reply := &api.Result{}
  199. err := RpcCall(address, "OSSService.Delete", args, reply)
  200. return reply, err
  201. }
  202. /*
  203. *rpc方式获取标讯正文
  204. * @param address 域名,例如:192.168.3.206:8110
  205. * @param args 参数
  206. * @param args.BucketID 文件名
  207. * @param args.objectName 对象名称
  208. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  209. * @return error 错误信息
  210. */
  211. func GetBidDetailByRpc(address string, args *entity.Args) (*api.Result, error) {
  212. reply := &api.Result{}
  213. err := RpcCall(address, "OSSService.GetBidDetail", args, reply)
  214. return reply, err
  215. }
  216. // rpc call
  217. func RpcCall(address, serviceMethod string, args any, reply any) error {
  218. conn, err := rpc.DialHTTP("tcp", address)
  219. if err != nil {
  220. log.Println(err)
  221. return err
  222. }
  223. defer conn.Close()
  224. err = conn.Call(serviceMethod, args, reply)
  225. if err != nil {
  226. log.Println(err)
  227. return err
  228. }
  229. return nil
  230. }
  231. // grpc call
  232. func GrpcCall(address string, timeout int64) (pb.ServiceClient, context.Context, func(), error) {
  233. var err error
  234. GrpcCoonOnce.Do(func() {
  235. GrpcCoon, err = grpc.Dial(address, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
  236. Time: 10 * time.Second, // 发送ping的间隔
  237. Timeout: 5 * time.Second, // 等待pong响应的超时时间
  238. PermitWithoutStream: true, // 是否在没有活跃流的情况下发送pings
  239. }))
  240. })
  241. var cancel context.CancelFunc
  242. if err != nil {
  243. log.Println(err)
  244. return nil, nil, cancel, err
  245. }
  246. var ctx context.Context
  247. if timeout < 0 {
  248. ctx = context.Background()
  249. } else if timeout == 0 {
  250. timeout = 5
  251. }
  252. if timeout > 0 {
  253. ctx, cancel = context.WithTimeout(context.TODO(), time.Duration(timeout)*time.Second)
  254. }
  255. return pb.NewServiceClient(GrpcCoon), ctx, cancel, nil
  256. }
  257. /* grpc方式上传
  258. * @param address 域名,例如:192.168.3.206:8110
  259. * @param args 参数
  260. * @param args.BucketID 文件名
  261. * @param args.objectName 对象名称
  262. * @param args.Stream 文件流
  263. * @param args.Gzip 是否压缩
  264. * @param timeout 超时时间
  265. * @return {"error_code":0,"error_msg":"上传成功"}
  266. * @return error 错误信息
  267. */
  268. func UpLoadByGrpc(address string, args *entity.UploadArgs, timeout int64) (*pb.Response, error) {
  269. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  270. defer cancel()
  271. result := &pb.Response{}
  272. if err != nil {
  273. return result, err
  274. }
  275. return serviceClient.Upload(ctx, &pb.UploadRequest{
  276. Stream: args.Stream,
  277. Gzip: args.Gzip,
  278. BucketID: args.BucketID,
  279. ObjectName: args.ObjectName,
  280. })
  281. }
  282. /*
  283. *grpc方式下载
  284. * @param address 域名,例如:192.168.3.206:8110
  285. * @param args 参数
  286. * @param args.BucketID 文件名
  287. * @param args.objectName 对象名称
  288. * @return []byte
  289. * @return error 错误信息
  290. */
  291. func DownloadByGrpc(address string, args *entity.Args, timeout int64) (*pb.Response, error) {
  292. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  293. defer cancel()
  294. result := &pb.Response{}
  295. if err != nil {
  296. return result, err
  297. }
  298. resp, err := serviceClient.Download(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  299. if err != nil {
  300. log.Println(err)
  301. return nil, err
  302. }
  303. b := &bytes.Buffer{}
  304. for {
  305. reply, err := resp.Recv()
  306. if err == io.EOF {
  307. break
  308. } else if err != nil {
  309. log.Println(err)
  310. return nil, err
  311. }
  312. result.ErrorMsg = reply.ErrorMsg
  313. result.ErrorCode = reply.ErrorCode
  314. b.Write(reply.Data)
  315. }
  316. result.Data = b.Bytes()
  317. return result, nil
  318. }
  319. /*
  320. *grpc方式获取标讯正文
  321. * @param address 域名,例如:192.168.3.206:8110
  322. * @param args 参数
  323. * @param args.BucketID 文件名
  324. * @param args.objectName 对象名称
  325. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  326. * @return error 错误信息
  327. */
  328. func GetBidDetailByGrpc(address string, args *entity.Args) (*api.Result, error) {
  329. serviceClient, ctx, cancel, err := GrpcCall(address, 0)
  330. defer cancel()
  331. result := &api.Result{}
  332. if err != nil {
  333. return result, err
  334. }
  335. resp, err := serviceClient.GetBidDetail(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  336. if err != nil {
  337. log.Println(err)
  338. return nil, err
  339. }
  340. b := &bytes.Buffer{}
  341. for {
  342. reply, err := resp.Recv()
  343. if err == io.EOF {
  344. break
  345. } else if err != nil {
  346. log.Println(err)
  347. return nil, err
  348. }
  349. result.Error_msg = reply.ErrorMsg
  350. result.Error_code = int(reply.ErrorCode)
  351. b.Write(reply.Data)
  352. }
  353. result.Data = string(b.Bytes())
  354. return result, nil
  355. }