oss.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package oss
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. "github.com/gogf/gf/v2/os/gctx"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "sync"
  13. "app.yhyue.com/moapp/jybase/es"
  14. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  15. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  16. )
  17. // accountMap 用来缓存OSS帐号信息
  18. var accountMap sync.Map
  19. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  20. func GetBucket(bucketID string) (config.BucketInfo, error) {
  21. var buckets []*config.BucketInfo
  22. g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
  23. for _, v := range buckets {
  24. if v.BucketID == bucketID {
  25. return *v, nil
  26. }
  27. }
  28. log.Println("Bucket %s not found in cache, querying database", bucketID)
  29. // 模拟数据库查询:实际业务中这里应查询数据库
  30. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  31. }
  32. // bucketPool 缓存 bucket 对象,避免重复创建
  33. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  34. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  35. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  36. // 优先从对象池取
  37. if cached, ok := bucketPool.Load(bucketID); ok {
  38. return cached.(*ossSDK.Bucket), nil
  39. }
  40. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  41. bucketInfo, err := GetBucket(bucketID)
  42. if err != nil {
  43. return nil, err
  44. }
  45. client, err := getOSSClient(bucketInfo.AccountID)
  46. if err != nil {
  47. return nil, err
  48. }
  49. bucket, err := client.Bucket(bucketInfo.BucketName)
  50. if err != nil {
  51. return nil, err
  52. }
  53. // 存入对象池
  54. bucketPool.Store(bucketID, bucket)
  55. return bucket, nil
  56. }
  57. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  58. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  59. if val, ok := accountMap.Load(accountID); ok {
  60. acc := val.(config.OSSAccount)
  61. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  62. if err != nil {
  63. return nil, err
  64. }
  65. return client, nil
  66. }
  67. return nil, fmt.Errorf("oss account %s not found", accountID)
  68. }
  69. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  70. func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  71. bucket, err := GetCachedBucket(bucketID)
  72. if err != nil {
  73. return err
  74. }
  75. var reader io.Reader
  76. if gzipEnabled {
  77. pr, pw := io.Pipe()
  78. go func() {
  79. gzipWriter := gzip.NewWriter(pw)
  80. _, err := io.Copy(gzipWriter, data)
  81. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  82. gzipWriter.Close()
  83. pw.CloseWithError(err)
  84. }()
  85. reader = pr
  86. } else {
  87. reader = data
  88. }
  89. // 上传到OSS
  90. return bucket.PutObject(objectName, reader)
  91. }
  92. // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
  93. func DownloadAttachment(bucketID, objectName string) ([]byte, http.Header, error) {
  94. bucket, err := GetCachedBucket(bucketID)
  95. if err != nil {
  96. return nil, nil, err
  97. }
  98. result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
  99. if err != nil {
  100. return nil, nil, err
  101. }
  102. defer result.Response.Body.Close()
  103. data, err := ioutil.ReadAll(result.Response.Body)
  104. if err != nil {
  105. return nil, nil, err
  106. }
  107. // 判断是否为gzip压缩格式(判断前两个字节)
  108. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  109. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  110. if err != nil {
  111. return nil, nil, err
  112. }
  113. defer gzipReader.Close()
  114. data, err = ioutil.ReadAll(gzipReader)
  115. if err != nil {
  116. return nil, nil, err
  117. }
  118. return data, result.Response.Headers, nil
  119. }
  120. return data, result.Response.Headers, nil
  121. }
  122. // DeleteAttachment 删除附件
  123. func DeleteAttachment(bucketID, objectName string) error {
  124. bucket, err := GetCachedBucket(bucketID)
  125. if err != nil {
  126. return err
  127. }
  128. return bucket.DeleteObject(objectName)
  129. }
  130. // LoadOSSAccounts 加载OSS帐号信息到缓存
  131. func LoadOSSAccounts() {
  132. var oas []*config.OSSAccount
  133. g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
  134. accountMap.Clear()
  135. for _, acc := range oas {
  136. accountMap.Store(acc.ID, *acc)
  137. }
  138. bucketPool.Clear()
  139. }
  140. // 获取标讯正文,优先从oss中取,再从es中取
  141. func GetBidDetail(bucketID, objectName string) []byte {
  142. b, _, e := DownloadAttachment(bucketID, objectName)
  143. if e == nil && b != nil && len(b) > 0 {
  144. return b
  145. }
  146. indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
  147. list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  148. if list != nil && len(*list) > 0 {
  149. detail, _ := (*list)[0]["detail"].(string)
  150. return []byte(detail)
  151. }
  152. return nil
  153. }