oss.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package oss
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. "github.com/gogf/gf/v2/os/gctx"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "sync"
  13. "sync/atomic"
  14. "app.yhyue.com/moapp/jybase/es"
  15. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  16. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  17. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  18. )
  19. var (
  20. // accountMap 用来缓存OSS帐号信息
  21. accountMap sync.Map
  22. DownLoadPool = make(chan bool, g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int())
  23. GetDetailFromEsPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
  24. GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
  25. getDetail = map[string]func(bucketID, objectName string) []byte{
  26. "oss": func(bucketID, objectName string) []byte {
  27. b, _, e := DownloadAttachment(bucketID, objectName)
  28. if e == nil && b != nil && len(b) > 0 {
  29. return b
  30. }
  31. return nil
  32. },
  33. "es": func(bucketID, objectName string) []byte {
  34. GetDetailFromEsPool <- true
  35. defer func() {
  36. <-GetDetailFromEsPool
  37. }()
  38. indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
  39. list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  40. if list != nil && len(*list) > 0 {
  41. detail, _ := (*list)[0]["detail"].(string)
  42. return []byte(detail)
  43. }
  44. return nil
  45. },
  46. "mgo": func(bucketID, objectName string) []byte {
  47. GetDetailFromMgoPool <- true
  48. defer func() {
  49. <-GetDetailFromMgoPool
  50. }()
  51. data, _ := config.Mgo.FindOneByField(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
  52. if data != nil && len(*data) > 0 {
  53. detail, _ := (*data)["detail"].(string)
  54. return []byte(detail)
  55. }
  56. return nil
  57. },
  58. }
  59. )
  60. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  61. func GetBucket(bucketID string) (config.BucketInfo, error) {
  62. var buckets []*config.BucketInfo
  63. g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
  64. for _, v := range buckets {
  65. if v.BucketID == bucketID {
  66. return *v, nil
  67. }
  68. }
  69. log.Println("Bucket %s not found in cache, querying database", bucketID)
  70. // 模拟数据库查询:实际业务中这里应查询数据库
  71. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  72. }
  73. // bucketPool 缓存 bucket 对象,避免重复创建
  74. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  75. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  76. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  77. // 优先从对象池取
  78. if cached, ok := bucketPool.Load(bucketID); ok {
  79. return cached.(*ossSDK.Bucket), nil
  80. }
  81. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  82. bucketInfo, err := GetBucket(bucketID)
  83. if err != nil {
  84. return nil, err
  85. }
  86. client, err := getOSSClient(bucketInfo.AccountID)
  87. if err != nil {
  88. return nil, err
  89. }
  90. bucket, err := client.Bucket(bucketInfo.BucketName)
  91. if err != nil {
  92. return nil, err
  93. }
  94. // 存入对象池
  95. bucketPool.Store(bucketID, bucket)
  96. return bucket, nil
  97. }
  98. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  99. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  100. if val, ok := accountMap.Load(accountID); ok {
  101. acc := val.(config.OSSAccount)
  102. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return client, nil
  107. }
  108. return nil, fmt.Errorf("oss account %s not found", accountID)
  109. }
  110. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  111. func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  112. bucket, err := GetCachedBucket(bucketID)
  113. if err != nil {
  114. return err
  115. }
  116. var reader io.Reader
  117. if gzipEnabled {
  118. pr, pw := io.Pipe()
  119. go func() {
  120. gzipWriter := gzip.NewWriter(pw)
  121. _, err := io.Copy(gzipWriter, data)
  122. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  123. gzipWriter.Close()
  124. pw.CloseWithError(err)
  125. }()
  126. reader = pr
  127. } else {
  128. reader = data
  129. }
  130. // 上传到OSS
  131. return bucket.PutObject(objectName, reader)
  132. }
  133. // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
  134. func DownloadAttachment(bucketID, objectName string) ([]byte, http.Header, error) {
  135. atomic.AddInt64(&util.DownLoadCounter, 1)
  136. DownLoadPool <- true
  137. defer func() {
  138. <-DownLoadPool
  139. atomic.AddInt64(&util.DownLoadCounter, -1)
  140. }()
  141. bucket, err := GetCachedBucket(bucketID)
  142. if err != nil {
  143. return nil, nil, err
  144. }
  145. result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
  146. if err != nil {
  147. return nil, nil, err
  148. }
  149. defer result.Response.Body.Close()
  150. data, err := ioutil.ReadAll(result.Response.Body)
  151. if err != nil {
  152. return nil, nil, err
  153. }
  154. // 判断是否为gzip压缩格式(判断前两个字节)
  155. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  156. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  157. if err != nil {
  158. return nil, nil, err
  159. }
  160. defer gzipReader.Close()
  161. data, err = ioutil.ReadAll(gzipReader)
  162. if err != nil {
  163. return nil, nil, err
  164. }
  165. return data, result.Response.Headers, nil
  166. }
  167. return data, result.Response.Headers, nil
  168. }
  169. // DeleteAttachment 删除附件
  170. func DeleteAttachment(bucketID, objectName string) error {
  171. bucket, err := GetCachedBucket(bucketID)
  172. if err != nil {
  173. return err
  174. }
  175. return bucket.DeleteObject(objectName)
  176. }
  177. // LoadOSSAccounts 加载OSS帐号信息到缓存
  178. func LoadOSSAccounts() {
  179. var oas []*config.OSSAccount
  180. g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
  181. accountMap.Clear()
  182. for _, acc := range oas {
  183. accountMap.Store(acc.ID, *acc)
  184. }
  185. bucketPool.Clear()
  186. }
  187. // 获取标讯正文,优先从oss中取,再从es中取
  188. func GetBidDetail(bucketID, objectName string) []byte {
  189. atomic.AddInt64(&util.GetDetailCounter, 1)
  190. defer atomic.AddInt64(&util.GetDetailCounter, -1)
  191. for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() {
  192. detail := getDetail[v](bucketID, objectName)
  193. if detail != nil && len(detail) > 0 {
  194. return detail
  195. }
  196. }
  197. return nil
  198. }