oss.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package oss
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "sync"
  10. "app.yhyue.com/moapp/jybase/es"
  11. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossService/server/config"
  13. )
  14. // bucketMap 用来缓存bucket配置,避免重复查询数据库
  15. var bucketMap sync.Map
  16. // accountMap 用来缓存OSS帐号信息
  17. var accountMap sync.Map
  18. // InitBuckets 从配置中加载bucket信息
  19. func InitBuckets() {
  20. for _, bucket := range config.AppConfig.Buckets {
  21. bucketMap.Store(bucket.BucketID, bucket)
  22. }
  23. }
  24. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  25. func GetBucket(bucketID string) (config.BucketInfo, error) {
  26. if val, ok := bucketMap.Load(bucketID); ok {
  27. return val.(config.BucketInfo), nil
  28. }
  29. log.Println("Bucket %s not found in cache, querying database", bucketID)
  30. // 模拟数据库查询:实际业务中这里应查询数据库
  31. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  32. }
  33. // bucketPool 缓存 bucket 对象,避免重复创建
  34. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  35. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  36. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  37. // 优先从对象池取
  38. if cached, ok := bucketPool.Load(bucketID); ok {
  39. return cached.(*ossSDK.Bucket), nil
  40. }
  41. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  42. bucketInfo, err := GetBucket(bucketID)
  43. if err != nil {
  44. return nil, err
  45. }
  46. client, err := getOSSClient(bucketInfo.AccountID)
  47. if err != nil {
  48. return nil, err
  49. }
  50. bucket, err := client.Bucket(bucketInfo.BucketName)
  51. if err != nil {
  52. return nil, err
  53. }
  54. // 存入对象池
  55. bucketPool.Store(bucketID, bucket)
  56. return bucket, nil
  57. }
  58. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  59. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  60. if val, ok := accountMap.Load(accountID); ok {
  61. acc := val.(config.OSSAccount)
  62. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return client, nil
  67. }
  68. return nil, fmt.Errorf("oss account %s not found", accountID)
  69. }
  70. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  71. func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  72. bucket, err := GetCachedBucket(bucketID)
  73. if err != nil {
  74. return err
  75. }
  76. var reader io.Reader
  77. if gzipEnabled {
  78. pr, pw := io.Pipe()
  79. go func() {
  80. gzipWriter := gzip.NewWriter(pw)
  81. _, err := io.Copy(gzipWriter, data)
  82. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  83. gzipWriter.Close()
  84. pw.CloseWithError(err)
  85. }()
  86. reader = pr
  87. } else {
  88. reader = data
  89. }
  90. // 上传到OSS
  91. return bucket.PutObject(objectName, reader)
  92. }
  93. // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
  94. func DownloadAttachment(bucketID, objectName string) ([]byte, error) {
  95. bucket, err := GetCachedBucket(bucketID)
  96. if err != nil {
  97. return nil, err
  98. }
  99. body, err := bucket.GetObject(objectName)
  100. if err != nil {
  101. return nil, err
  102. }
  103. defer body.Close()
  104. data, err := ioutil.ReadAll(body)
  105. if err != nil {
  106. return nil, err
  107. }
  108. // 判断是否为gzip压缩格式(判断前两个字节)
  109. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  110. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  111. if err != nil {
  112. return nil, err
  113. }
  114. defer gzipReader.Close()
  115. return ioutil.ReadAll(gzipReader)
  116. }
  117. return data, nil
  118. }
  119. // DeleteAttachment 删除附件
  120. func DeleteAttachment(bucketID, objectName string) error {
  121. bucket, err := GetCachedBucket(bucketID)
  122. if err != nil {
  123. return err
  124. }
  125. return bucket.DeleteObject(objectName)
  126. }
  127. // LoadOSSAccounts 加载OSS帐号信息到缓存
  128. func LoadOSSAccounts() {
  129. for _, acc := range config.AppConfig.OSSAccounts {
  130. accountMap.Store(acc.ID, acc)
  131. }
  132. }
  133. // 获取标讯正文,优先从oss中取,再从es中取
  134. func GetBidDetail(bucketID, objectName string) []byte {
  135. b, e := DownloadAttachment(bucketID, objectName)
  136. if e == nil && b != nil && len(b) > 0 {
  137. return b
  138. }
  139. list := es.VarEs.Get(config.AppConfig.ElasticSearch.IndexName, config.AppConfig.ElasticSearch.IndexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  140. if list != nil && len(*list) > 0 {
  141. detail, _ := (*list)[0]["detail"].(string)
  142. return []byte(detail)
  143. }
  144. return nil
  145. }