client.go 10 KB


  1. package ossClient
  2. import (
  3. "app.yhyue.com/moapp/jybase/api"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "google.golang.org/grpc"
  9. "io"
  10. "jygit.jydev.jianyu360.cn/BaseService/ossClient/constant"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossClient/pb"
  13. "log"
  14. "mime/multipart"
  15. "net/http"
  16. "net/rpc"
  17. "strconv"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. GrpcCoon *grpc.ClientConn
  23. GrpcCoonOnce = &sync.Once{}
  24. )
  25. /* restful方式上传
  26. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  27. * @param bucketId 桶id
  28. * @param objectName 对象名称
  29. * @param stream 文件流
  30. * @param gzip 是否压缩
  31. * @return {"error_code":0,"error_msg":"上传成功"}
  32. */
  33. func UpLoadByRestful(domain, bucketId, objectName string, stream []byte, gzip bool) (reply *api.Result) {
  34. reply = &api.Result{Error_code: -1}
  35. // 创建一个缓冲区来存储表单数据
  36. body := &bytes.Buffer{}
  37. writer := multipart.NewWriter(body)
  38. writer.WriteField("bucket_id", bucketId)
  39. writer.WriteField("object_name", objectName)
  40. writer.WriteField("gzip", strconv.FormatBool(gzip))
  41. // 创建表单字段
  42. part, err := writer.CreateFormFile("file", objectName)
  43. if err != nil {
  44. reply.Error_msg = err.Error()
  45. return
  46. }
  47. // 模拟文件流
  48. fileStream := bytes.NewReader(stream)
  49. // 将文件流复制到表单字段
  50. _, err = io.Copy(part, fileStream)
  51. if err != nil {
  52. reply.Error_msg = err.Error()
  53. return
  54. }
  55. // 创建 HTTP 请求
  56. if respBody, err := Post(domain+constant.UploadUrl, writer, body); err != nil {
  57. reply.Error_msg = err.Error()
  58. } else {
  59. json.Unmarshal(respBody, &reply)
  60. }
  61. return
  62. }
  63. /* restful方式下载
  64. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  65. * @param bucketId 桶id
  66. * @param objectName 对象名称
  67. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  68. */
  69. func DownloadByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  70. reply = &api.Result{}
  71. // 创建一个缓冲区来存储表单数据
  72. body := &bytes.Buffer{}
  73. writer := multipart.NewWriter(body)
  74. writer.WriteField("bucket_id", bucketId)
  75. writer.WriteField("object_name", objectName)
  76. if respBody, err := Post(domain+constant.DownloadUrl, writer, body); err != nil {
  77. reply.Error_msg = err.Error()
  78. } else {
  79. reply.Error_msg = constant.DownloadSuccess
  80. reply.Data = respBody
  81. }
  82. return
  83. }
  84. /* restful方式删除
  85. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  86. * @param bucketId 桶id
  87. * @param objectName 对象名称
  88. * @return {"error_code":0,"error_msg":"删除成功"}
  89. */
  90. func DeleteByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  91. reply = &api.Result{}
  92. // 创建一个缓冲区来存储表单数据
  93. body := &bytes.Buffer{}
  94. writer := multipart.NewWriter(body)
  95. writer.WriteField("bucket_id", bucketId)
  96. writer.WriteField("object_name", objectName)
  97. if respBody, err := Post(domain+constant.DeleteUrl, writer, body); err != nil {
  98. reply.Error_msg = err.Error()
  99. } else {
  100. json.Unmarshal(respBody, &reply)
  101. }
  102. return
  103. }
  104. /* restful方式获取标讯正文
  105. * @param domain 域名,例如:https://ossservice.jianyu360.cn
  106. * @param bucketId 桶id
  107. * @param objectName 对象名称
  108. * @return {"error_code":0,"error_msg":"获取正文成功","data":"正文内容"}
  109. */
  110. func GetBidDetailByRestful(domain, bucketId, objectName string) (reply *api.Result) {
  111. reply = &api.Result{}
  112. // 创建一个缓冲区来存储表单数据
  113. body := &bytes.Buffer{}
  114. writer := multipart.NewWriter(body)
  115. writer.WriteField("bucket_id", bucketId)
  116. writer.WriteField("object_name", objectName)
  117. if respBody, err := Post(domain+constant.GetBidDetailUrl, writer, body); err != nil {
  118. reply.Error_msg = err.Error()
  119. } else {
  120. reply.Error_msg = constant.GetBidDetailSuccess
  121. reply.Data = string(respBody)
  122. }
  123. return
  124. }
  125. func Post(url string, writer *multipart.Writer, body *bytes.Buffer) ([]byte, error) {
  126. // 关闭表单写入器
  127. if err := writer.Close(); err != nil {
  128. return nil, err
  129. }
  130. // 创建 HTTP 请求
  131. req, err := http.NewRequest("POST", url, body)
  132. if err != nil {
  133. log.Println("Error creating request:", err)
  134. return nil, err
  135. }
  136. // 设置请求头
  137. req.Header.Set("Content-Type", writer.FormDataContentType())
  138. // 发送请求
  139. client := &http.Client{}
  140. resp, err := client.Do(req)
  141. if err != nil {
  142. return nil, err
  143. }
  144. defer resp.Body.Close()
  145. // 读取响应
  146. respBody, err := io.ReadAll(resp.Body)
  147. if err != nil {
  148. return nil, err
  149. }
  150. if resp.StatusCode != http.StatusOK {
  151. return nil, errors.New(string(respBody))
  152. }
  153. return respBody, nil
  154. }
  155. /* rpc方式上传
  156. * @param address 域名,例如:192.168.3.206:8110
  157. * @param args 参数
  158. * @param args.BucketID 文件名
  159. * @param args.objectName 对象名称
  160. * @param args.Stream 文件流
  161. * @param args.Gzip 是否压缩
  162. * @return {"error_code":0,"error_msg":"上传成功"}
  163. * @return error 错误信息
  164. */
  165. func UpLoadByRpc(address string, args *entity.UploadArgs) (*api.Result, error) {
  166. reply := &api.Result{}
  167. err := RpcCall(address, "OSSService.Upload", args, reply)
  168. return reply, err
  169. }
  170. /*
  171. *rpc方式下载
  172. * @param address 域名,例如:192.168.3.206:8110
  173. * @param args 参数
  174. * @param args.BucketID 文件名
  175. * @param args.objectName 对象名称
  176. * @return {"error_code":0,"error_msg":"下载成功","data":文件流}
  177. * @return error 错误信息
  178. */
  179. func DownloadByRpc(address string, args *entity.Args) (*api.Result, error) {
  180. reply := &api.Result{}
  181. err := RpcCall(address, "OSSService.Download", args, reply)
  182. return reply, err
  183. }
  184. /* rpc方式删除
  185. * @param address 域名,例如:192.168.3.206:8110
  186. * @param args 参数
  187. * @param args.BucketID 文件名
  188. * @param args.objectName 对象名称
  189. * @return {"error_code":0,"error_msg":"删除成功"}
  190. * @return error 错误信息
  191. */
  192. func DeleteByRpc(address string, args *entity.Args) (*api.Result, error) {
  193. reply := &api.Result{}
  194. err := RpcCall(address, "OSSService.Delete", args, reply)
  195. return reply, err
  196. }
  197. /*
  198. *rpc方式获取标讯正文
  199. * @param address 域名,例如:192.168.3.206:8110
  200. * @param args 参数
  201. * @param args.BucketID 文件名
  202. * @param args.objectName 对象名称
  203. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  204. * @return error 错误信息
  205. */
  206. func GetBidDetailByRpc(address string, args *entity.Args) (*api.Result, error) {
  207. reply := &api.Result{}
  208. err := RpcCall(address, "OSSService.GetBidDetail", args, reply)
  209. return reply, err
  210. }
  211. // rpc call
  212. func RpcCall(address, serviceMethod string, args any, reply any) error {
  213. conn, err := rpc.DialHTTP("tcp", address)
  214. if err != nil {
  215. log.Println(err)
  216. return err
  217. }
  218. defer conn.Close()
  219. err = conn.Call(serviceMethod, args, reply)
  220. if err != nil {
  221. log.Println(err)
  222. return err
  223. }
  224. return nil
  225. }
  226. // grpc call
  227. func GrpcCall(address string, timeout int64) (pb.ServiceClient, context.Context, func(), error) {
  228. var err error
  229. GrpcCoonOnce.Do(func() {
  230. //GrpcCoon, err = grpc.Dial(address, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
  231. // Time: 10 * time.Second, // 发送ping的间隔
  232. // Timeout: 5 * time.Second, // 等待pong响应的超时时间
  233. // PermitWithoutStream: true, // 是否在没有活跃流的情况下发送pings
  234. //}))
  235. GrpcCoon, err = grpc.Dial(address, grpc.WithInsecure())
  236. })
  237. var cancel context.CancelFunc
  238. if err != nil {
  239. log.Println(err)
  240. return nil, nil, cancel, err
  241. }
  242. var ctx context.Context
  243. if timeout < 0 {
  244. ctx = context.Background()
  245. } else if timeout == 0 {
  246. timeout = 5
  247. }
  248. if timeout > 0 {
  249. ctx, cancel = context.WithTimeout(context.TODO(), time.Duration(timeout)*time.Second)
  250. }
  251. return pb.NewServiceClient(GrpcCoon), ctx, cancel, nil
  252. }
  253. /* grpc方式上传
  254. * @param address 域名,例如:192.168.3.206:8110
  255. * @param args 参数
  256. * @param args.BucketID 文件名
  257. * @param args.objectName 对象名称
  258. * @param args.Stream 文件流
  259. * @param args.Gzip 是否压缩
  260. * @return {"error_code":0,"error_msg":"上传成功"}
  261. * @return error 错误信息
  262. */
  263. func UpLoadByGrpc(address string, argsFunc func() (*pb.UploadRequest, bool), timeout int64) (*pb.Response, error) {
  264. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  265. defer cancel()
  266. result := &pb.Response{}
  267. if err != nil {
  268. return result, err
  269. }
  270. stream, err := serviceClient.Upload(ctx)
  271. if err != nil {
  272. log.Println(err)
  273. return nil, err
  274. }
  275. for {
  276. if args, ok := argsFunc(); ok {
  277. stream.Send(args)
  278. } else {
  279. break
  280. }
  281. }
  282. return result, err
  283. }
  284. /*
  285. *grpc方式下载
  286. * @param address 域名,例如:192.168.3.206:8110
  287. * @param args 参数
  288. * @param args.BucketID 文件名
  289. * @param args.objectName 对象名称
  290. * @return []byte
  291. * @return error 错误信息
  292. */
  293. func DownloadByGrpc(address string, args *entity.Args, timeout int64) (*pb.Response, error) {
  294. serviceClient, ctx, cancel, err := GrpcCall(address, timeout)
  295. defer cancel()
  296. result := &pb.Response{}
  297. if err != nil {
  298. return result, err
  299. }
  300. resp, err := serviceClient.Download(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  301. if err != nil {
  302. log.Println(err)
  303. return nil, err
  304. }
  305. b := &bytes.Buffer{}
  306. for {
  307. reply, err := resp.Recv()
  308. if err == io.EOF {
  309. break
  310. } else if err != nil {
  311. log.Println(err)
  312. return nil, err
  313. }
  314. result.ErrorMsg = reply.ErrorMsg
  315. result.ErrorCode = reply.ErrorCode
  316. b.Write(reply.Data)
  317. }
  318. result.Data = b.Bytes()
  319. return result, nil
  320. }
  321. /*
  322. *grpc方式获取标讯正文
  323. * @param address 域名,例如:192.168.3.206:8110
  324. * @param args 参数
  325. * @param args.BucketID 文件名
  326. * @param args.objectName 对象名称
  327. * @return {"error_code":0,"error_msg":"获取成功","data":"正文内容"}
  328. * @return error 错误信息
  329. */
  330. func GetBidDetailByGrpc(address string, args *entity.Args) (*api.Result, error) {
  331. serviceClient, ctx, cancel, err := GrpcCall(address, 0)
  332. defer cancel()
  333. result := &api.Result{}
  334. if err != nil {
  335. return result, err
  336. }
  337. resp, err := serviceClient.GetBidDetail(ctx, &pb.DownloadRequest{BucketID: args.BucketID, ObjectName: args.ObjectName})
  338. if err != nil {
  339. log.Println(err)
  340. return nil, err
  341. }
  342. b := &bytes.Buffer{}
  343. for {
  344. reply, err := resp.Recv()
  345. if err == io.EOF {
  346. break
  347. } else if err != nil {
  348. log.Println(err)
  349. return nil, err
  350. }
  351. result.Error_msg = reply.ErrorMsg
  352. result.Error_code = int(reply.ErrorCode)
  353. b.Write(reply.Data)
  354. }
  355. result.Data = string(b.Bytes())
  356. return result, nil
  357. }