oss.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package oss
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. "github.com/gogf/gf/v2/os/gctx"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "sync"
  12. "app.yhyue.com/moapp/jybase/es"
  13. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  14. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  15. )
  16. // accountMap 用来缓存OSS帐号信息
  17. var accountMap sync.Map
  18. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  19. func GetBucket(bucketID string) (config.BucketInfo, error) {
  20. var buckets []*config.BucketInfo
  21. g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
  22. for _, v := range buckets {
  23. if v.BucketID == bucketID {
  24. return *v, nil
  25. }
  26. }
  27. log.Println("Bucket %s not found in cache, querying database", bucketID)
  28. // 模拟数据库查询:实际业务中这里应查询数据库
  29. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  30. }
  31. // bucketPool 缓存 bucket 对象,避免重复创建
  32. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  33. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  34. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  35. // 优先从对象池取
  36. if cached, ok := bucketPool.Load(bucketID); ok {
  37. return cached.(*ossSDK.Bucket), nil
  38. }
  39. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  40. bucketInfo, err := GetBucket(bucketID)
  41. if err != nil {
  42. return nil, err
  43. }
  44. client, err := getOSSClient(bucketInfo.AccountID)
  45. if err != nil {
  46. return nil, err
  47. }
  48. bucket, err := client.Bucket(bucketInfo.BucketName)
  49. if err != nil {
  50. return nil, err
  51. }
  52. // 存入对象池
  53. bucketPool.Store(bucketID, bucket)
  54. return bucket, nil
  55. }
  56. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  57. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  58. if val, ok := accountMap.Load(accountID); ok {
  59. acc := val.(config.OSSAccount)
  60. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  61. if err != nil {
  62. return nil, err
  63. }
  64. return client, nil
  65. }
  66. return nil, fmt.Errorf("oss account %s not found", accountID)
  67. }
  68. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  69. func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  70. bucket, err := GetCachedBucket(bucketID)
  71. if err != nil {
  72. return err
  73. }
  74. var reader io.Reader
  75. if gzipEnabled {
  76. pr, pw := io.Pipe()
  77. go func() {
  78. gzipWriter := gzip.NewWriter(pw)
  79. _, err := io.Copy(gzipWriter, data)
  80. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  81. gzipWriter.Close()
  82. pw.CloseWithError(err)
  83. }()
  84. reader = pr
  85. } else {
  86. reader = data
  87. }
  88. // 上传到OSS
  89. return bucket.PutObject(objectName, reader)
  90. }
  91. // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
  92. func DownloadAttachment(bucketID, objectName string) ([]byte, error) {
  93. bucket, err := GetCachedBucket(bucketID)
  94. if err != nil {
  95. return nil, err
  96. }
  97. body, err := bucket.GetObject(objectName)
  98. if err != nil {
  99. return nil, err
  100. }
  101. defer body.Close()
  102. data, err := ioutil.ReadAll(body)
  103. if err != nil {
  104. return nil, err
  105. }
  106. // 判断是否为gzip压缩格式(判断前两个字节)
  107. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  108. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  109. if err != nil {
  110. return nil, err
  111. }
  112. defer gzipReader.Close()
  113. return ioutil.ReadAll(gzipReader)
  114. }
  115. return data, nil
  116. }
  117. // DeleteAttachment 删除附件
  118. func DeleteAttachment(bucketID, objectName string) error {
  119. bucket, err := GetCachedBucket(bucketID)
  120. if err != nil {
  121. return err
  122. }
  123. return bucket.DeleteObject(objectName)
  124. }
  125. // LoadOSSAccounts 加载OSS帐号信息到缓存
  126. func LoadOSSAccounts() {
  127. var oas []*config.OSSAccount
  128. g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
  129. for _, acc := range oas {
  130. accountMap.LoadOrStore(acc.ID, *acc)
  131. }
  132. }
  133. // 获取标讯正文,优先从oss中取,再从es中取
  134. func GetBidDetail(bucketID, objectName string) []byte {
  135. b, e := DownloadAttachment(bucketID, objectName)
  136. if e == nil && b != nil && len(b) > 0 {
  137. return b
  138. }
  139. indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
  140. list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  141. if list != nil && len(*list) > 0 {
  142. detail, _ := (*list)[0]["detail"].(string)
  143. return []byte(detail)
  144. }
  145. return nil
  146. }