client.go 10 KB


  1. package ossClient
  2. import (
  3. "app.yhyue.com/moapp/jybase/api"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/keepalive"
  10. "io"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
  13. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  14. "log"
  15. "mime/multipart"
  16. "net/http"
  17. "net/rpc"
  18. "strconv"
  19. "sync"
  20. "time"
  21. )
  22. var (
  23. GrpcCoon *grpc.ClientConn
  24. GrpcCoonOnce = &sync.Once{}
  25. )
  26. /* restful方式上传
  27. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  28. * @param bucketId 桶id
  29. * @param objectName 对象名称
  30. * @param stream 文件流
  31. * @param gzip 是否压缩
  32. * @return {"error_code":0,"error_msg":"上传成功"}
  33. */
  34. func UpLoadByRestful(domain, bucketId, objectName string, stream []byte, gzip bool) (reply *api.Result) {
  35. reply = &api.Result{Error_code: -1}
  36. // 创建一个缓冲区来存储表单数据
  37. body := &bytes.Buffer{}
  38. writer := multipart.NewWriter(body)
  39. writer.WriteField("bucket_id", bucketId)
  40. writer.WriteField("object_name", objectName)
  41. writer.WriteField("gzip", strconv.FormatBool(gzip))
  42. // 创建表单字段
  43. part, err := writer.CreateFormFile("file", objectName)
  44. if err != nil {
  45. reply.Error_msg = err.Error()
  46. return
  47. }
  48. // 模拟文件流
  49. fileStream := bytes.NewReader(stream)
  50. // 将文件流复制到表单字段
  51. _, err = io.Copy(part, fileStream)
  52. if err != nil {
  53. reply.Error_msg = err.Error()
  54. return
  55. }
  56. // 创建 HTTP 请求
  57. if respBody, err := Post(domain+constant.UploadUrl, writer, body); err != nil {
  58. reply.Error_msg = err.Error()
  59. } else {
  60. json.Unmarshal(respBody, &reply)
  61. }
  62. return
  63. }
  64. /* restful方式下载
  65. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  66. * @param bucketId 桶id
  67. * @param objectName 对象名称
  68. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  69. */
  70. func DownloadByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  71. reply = &api.Result{}
  72. // 创建一个缓冲区来存储表单数据
  73. body := &bytes.Buffer{}
  74. writer := multipart.NewWriter(body)
  75. writer.WriteField("bucket_id", bucketId)
  76. writer.WriteField("object_name", objectName)
  77. if respBody, err := Post(domain+constant.DownloadUrl, writer, body); err != nil {
  78. reply.Error_msg = err.Error()
  79. } else {
  80. reply.Error_msg = constant.DownloadSuccess
  81. reply.Data = respBody
  82. }
  83. return
  84. }
  85. /* restful方式删除
  86. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  87. * @param bucketId 桶id
  88. * @param objectName 对象名称
  89. * @return {"error_code":0,"error_msg":"删除成功"}
  90. */
  91. func DeleteByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  92. reply = &api.Result{}
  93. // 创建一个缓冲区来存储表单数据
  94. body := &bytes.Buffer{}
  95. writer := multipart.NewWriter(body)
  96. writer.WriteField("bucket_id", bucketId)
  97. writer.WriteField("object_name", objectName)
  98. if respBody, err := Post(domain+constant.DeleteUrl, writer, body); err != nil {
  99. reply.Error_msg = err.Error()
  100. } else {
  101. json.Unmarshal(respBody, &reply)
  102. }
  103. return
  104. }
  105. /* restful方式获取标讯正文
  106. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  107. * @param bucketId 桶id
  108. * @param objectName 对象名称
  109. * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"}
  110. */
  111. func GetBidDetailByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  112. reply = &api.Result{}
  113. // 创建一个缓冲区来存储表单数据
  114. body := &bytes.Buffer{}
  115. writer := multipart.NewWriter(body)
  116. writer.WriteField("bucket_id", bucketId)
  117. writer.WriteField("object_name", objectName)
  118. if respBody, err := Post(domain+constant.GetBidDetailUrl, writer, body); err != nil {
  119. reply.Error_msg = err.Error()
  120. } else {
  121. reply.Error_msg = constant.GetBidDetailSuccess
  122. reply.Data = string(respBody)
  123. }
  124. return
  125. }
  126. func Post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) {
  127. // 关闭表单写入器
  128. if err := writer.Close(); err != nil {
  129. return nil, err
  130. }
  131. // 创建 HTTP 请求
  132. req, err := http.NewRequest("POST", url, body)
  133. if err != nil {
  134. log.Println("Error creating request:", err)
  135. return nil, err
  136. }
  137. // 设置请求头
  138. req.Header.Set("Content-Type", writer.FormDataContentType())
  139. // 发送请求
  140. client := &http.Client{}
  141. resp, err := client.Do(req)
  142. if err != nil {
  143. return nil, err
  144. }
  145. defer resp.Body.Close()
  146. // 读取响应
  147. respBody, err := io.ReadAll(resp.Body)
  148. if err != nil {
  149. return nil, err
  150. }
  151. if resp.StatusCode != http.StatusOK {
  152. return nil, errors.New(string(respBody))
  153. }
  154. return respBody, nil
  155. }
  156. /* rpc方式上传
  157. * @param address 域名,例如:192.168.3.206:8110
  158. * @param args 参数
  159. * @param args.BucketID 文件名
  160. * @param args.objectName 对象名称
  161. * @param args.Stream 文件流
  162. * @param args.Gzip 是否压缩
  163. * @return {"error_code":0,"error_msg":"上传成功"}
  164. * @return error 错误信息
  165. */
  166. func UpLoadByRpc(address string, args *entity.UploadArgs) (*api.Result, error) {
  167. reply := &api.Result{}
  168. err := RpcCall(address, "OSSService.Upload", args, reply)
  169. return reply, err
  170. }
  171. /*
  172. *rpc方式下载
  173. * @param address 域名,例如:192.168.3.206:8110
  174. * @param args 参数
  175. * @param args.BucketID 文件名
  176. * @param args.objectName 对象名称
  177. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  178. * @return error 错误信息
  179. */
  180. func DownloadByRpc(address string, args *entity.Args) (*api.Result, error) {
  181. reply := &api.Result{}
  182. err := RpcCall(address, "OSSService.Download", args, reply)
  183. return reply, err
  184. }
  185. /* rpc方式删除
  186. * @param address 域名,例如:192.168.3.206:8110
  187. * @param args 参数
  188. * @param args.BucketID 文件名
  189. * @param args.objectName 对象名称
  190. * @return {"error_code":0,"error_msg":"删除成功"}
  191. * @return error 错误信息
  192. */
  193. func DeleteByRpc(address string, args *entity.Args) (*api.Result, error) {
  194. reply := &api.Result{}
  195. err := RpcCall(address, "OSSService.Delete", args, reply)
  196. return reply, err
  197. }
  198. /*
  199. *rpc方式获取标讯正文
  200. * @param address 域名,例如:192.168.3.206:8110
  201. * @param args 参数
  202. * @param args.BucketID 文件名
  203. * @param args.objectName 对象名称
  204. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  205. * @return error 错误信息
  206. */
  207. func GetBidDetailByRpc(address string, args *entity.Args) (*api.Result, error) {
  208. reply := &api.Result{}
  209. err := RpcCall(address, "OSSService.GetBidDetail", args, reply)
  210. return reply, err
  211. }
  212. // rpc call
  213. func RpcCall(address, serviceMethod string, args any, reply any) error {
  214. conn, err := rpc.DialHTTP("tcp", address)
  215. if err != nil {
  216. log.Println(err)
  217. return err
  218. }
  219. defer conn.Close()
  220. err = conn.Call(serviceMethod, args, reply)
  221. if err != nil {
  222. log.Println(err)
  223. return err
  224. }
  225. return nil
  226. }
  227. // grpc call
  228. func GrpcCall(address string, timeout int64) (pb.ServiceClient, context.Context, func(), error) {
  229. var err error
  230. GrpcCoonOnce.Do(func() {
  231. GrpcCoon, err = grpc.Dial(address, grpc.WithKeepaliveParams(keepalive.ClientParameters{
  232. Time: 10 * time.Second, // 发送ping的间隔
  233. Timeout: 5 * time.Second, // 等待pong响应的超时时间
  234. PermitWithoutStream: true, // 是否在没有活跃流的情况下发送pings
  235. }))
  236. })
  237. var cancel context.CancelFunc
  238. if err != nil {
  239. log.Println(err)
  240. return nil, nil, cancel, err
  241. }
  242. var ctx context.Context
  243. if timeout < 0 {
  244. ctx = context.Background()
  245. } else if timeout == 0 {
  246. timeout = 5
  247. }
  248. if timeout > 0 {
  249. ctx, cancel = context.WithTimeout(context.TODO(), time.Duration(timeout)*time.Second)
  250. }
  251. return pb.NewServiceClient(GrpcCoon), ctx, cancel, nil
  252. }
  253. /* grpc方式上传
  254. * @param address 域名,例如:192.168.3.206:8110
  255. * @param args 参数
  256. * @param args.BucketID 文件名
  257. * @param args.objectName 对象名称
  258. * @param args.Stream 文件流
  259. * @param args.Gzip 是否压缩
  260. * @return {"error_code":0,"error_msg":"上传成功"}
  261. * @return error 错误信息
  262. */
  263. func UpLoadByGrpc(address string, argsFunc func() (*pb.UploadRequest, bool), timeout int64) (*pb.Response, error) {
  264. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  265. defer cancel()
  266. result := &pb.Response{}
  267. if err != nil {
  268. return result, err
  269. }
  270. stream, err := serviceClient.Upload(ctx)
  271. if err != nil {
  272. log.Println(err)
  273. return nil, err
  274. }
  275. for {
  276. if args, ok := argsFunc(); ok {
  277. stream.Send(args)
  278. } else {
  279. break
  280. }
  281. }
  282. return result, err
  283. }
  284. /*
  285. *grpc方式下载
  286. * @param address 域名,例如:192.168.3.206:8110
  287. * @param args 参数
  288. * @param args.BucketID 文件名
  289. * @param args.objectName 对象名称
  290. * @return []byte
  291. * @return error 错误信息
  292. */
  293. func DownloadByGrpc(address string, args *entity.Args, timeout int64) (*pb.Response, error) {
  294. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  295. defer cancel()
  296. result := &pb.Response{}
  297. if err != nil {
  298. return result, err
  299. }
  300. resp, err := serviceClient.Download(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  301. if err != nil {
  302. log.Println(err)
  303. return nil, err
  304. }
  305. b := &bytes.Buffer{}
  306. for {
  307. reply, err := resp.Recv()
  308. if err == io.EOF {
  309. break
  310. } else if err != nil {
  311. log.Println(err)
  312. return nil, err
  313. }
  314. result.ErrorMsg = reply.ErrorMsg
  315. result.ErrorCode = reply.ErrorCode
  316. b.Write(reply.Data)
  317. }
  318. result.Data = b.Bytes()
  319. return result, nil
  320. }
  321. /*
  322. *grpc方式获取标讯正文
  323. * @param address 域名,例如:192.168.3.206:8110
  324. * @param args 参数
  325. * @param args.BucketID 文件名
  326. * @param args.objectName 对象名称
  327. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  328. * @return error 错误信息
  329. */
  330. func GetBidDetailByGrpc(address string, args *entity.Args) (*api.Result, error) {
  331. serviceClient, ctx, cancel, err := GrpcCall(address, 0)
  332. defer cancel()
  333. result := &api.Result{}
  334. if err != nil {
  335. return result, err
  336. }
  337. resp, err := serviceClient.GetBidDetail(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  338. if err != nil {
  339. log.Println(err)
  340. return nil, err
  341. }
  342. b := &bytes.Buffer{}
  343. for {
  344. reply, err := resp.Recv()
  345. if err == io.EOF {
  346. break
  347. } else if err != nil {
  348. log.Println(err)
  349. return nil, err
  350. }
  351. result.Error_msg = reply.ErrorMsg
  352. result.Error_code = int(reply.ErrorCode)
  353. b.Write(reply.Data)
  354. }
  355. result.Data = string(b.Bytes())
  356. return result, nil
  357. }