oss.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package oss
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "sync"
  10. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  11. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  12. )
  13. // bucketMap 用来缓存bucket配置,避免重复查询数据库
  14. var bucketMap sync.Map
  15. // accountMap 用来缓存OSS帐号信息
  16. var accountMap sync.Map
  17. // InitBuckets 从配置中加载bucket信息
  18. func InitBuckets() {
  19. for _, bucket := range config.AppConfig.Buckets {
  20. bucketMap.Store(bucket.BucketID, bucket)
  21. }
  22. }
  23. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  24. func GetBucket(bucketID string) (config.BucketInfo, error) {
  25. if val, ok := bucketMap.Load(bucketID); ok {
  26. return val.(config.BucketInfo), nil
  27. }
  28. log.Println("Bucket %s not found in cache, querying database", bucketID)
  29. // 模拟数据库查询:实际业务中这里应查询数据库
  30. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  31. }
  32. // bucketPool 缓存 bucket 对象,避免重复创建
  33. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  34. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  35. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  36. // 优先从对象池取
  37. if cached, ok := bucketPool.Load(bucketID); ok {
  38. return cached.(*ossSDK.Bucket), nil
  39. }
  40. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  41. bucketInfo, err := GetBucket(bucketID)
  42. if err != nil {
  43. return nil, err
  44. }
  45. client, err := getOSSClient(bucketInfo.AccountID)
  46. if err != nil {
  47. return nil, err
  48. }
  49. bucket, err := client.Bucket(bucketInfo.BucketName)
  50. if err != nil {
  51. return nil, err
  52. }
  53. // 存入对象池
  54. bucketPool.Store(bucketID, bucket)
  55. return bucket, nil
  56. }
  57. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  58. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  59. if val, ok := accountMap.Load(accountID); ok {
  60. acc := val.(config.OSSAccount)
  61. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  62. if err != nil {
  63. return nil, err
  64. }
  65. return client, nil
  66. }
  67. return nil, fmt.Errorf("oss account %s not found", accountID)
  68. }
  69. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  70. func UploadAttachment(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  71. bucket, err := GetCachedBucket(bucketID)
  72. if err != nil {
  73. return err
  74. }
  75. var reader io.Reader
  76. if gzipEnabled {
  77. pr, pw := io.Pipe()
  78. go func() {
  79. gzipWriter := gzip.NewWriter(pw)
  80. _, err := io.Copy(gzipWriter, data)
  81. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  82. gzipWriter.Close()
  83. pw.CloseWithError(err)
  84. }()
  85. reader = pr
  86. } else {
  87. reader = data
  88. }
  89. // 上传到OSS
  90. return bucket.PutObject(objectName, reader)
  91. }
  92. // DownloadAttachment 下载附件,如果检测到数据为gzip压缩,则自动解压后返回
  93. func DownloadAttachment(bucketID, objectName string) ([]byte, error) {
  94. bucket, err := GetCachedBucket(bucketID)
  95. if err != nil {
  96. return nil, err
  97. }
  98. body, err := bucket.GetObject(objectName)
  99. if err != nil {
  100. return nil, err
  101. }
  102. defer body.Close()
  103. data, err := ioutil.ReadAll(body)
  104. if err != nil {
  105. return nil, err
  106. }
  107. // 判断是否为gzip压缩格式(判断前两个字节)
  108. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  109. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  110. if err != nil {
  111. return nil, err
  112. }
  113. defer gzipReader.Close()
  114. return ioutil.ReadAll(gzipReader)
  115. }
  116. return data, nil
  117. }
  118. // DeleteAttachment 删除附件
  119. func DeleteAttachment(bucketID, objectName string) error {
  120. bucket, err := GetCachedBucket(bucketID)
  121. if err != nil {
  122. return err
  123. }
  124. return bucket.DeleteObject(objectName)
  125. }
  126. // LoadOSSAccounts 加载OSS帐号信息到缓存
  127. func LoadOSSAccounts() {
  128. for _, acc := range config.AppConfig.OSSAccounts {
  129. accountMap.Store(acc.ID, acc)
  130. }
  131. }