oss.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package oss
  2. import (
  3. "app.yhyue.com/moapp/jybase/api"
  4. "bytes"
  5. "compress/gzip"
  6. "errors"
  7. "fmt"
  8. "github.com/gogf/gf/v2/frame/g"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "io"
  11. "io/ioutil"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
  13. "log"
  14. "net/http"
  15. "sync"
  16. "sync/atomic"
  17. "app.yhyue.com/moapp/jybase/es"
  18. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  19. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  20. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  21. )
  22. var (
  23. // accountMap 用来缓存OSS帐号信息
  24. accountMap sync.Map
  25. DownLoadPool = make(chan bool, g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int())
  26. GetDetailFromEsPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
  27. GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
  28. getDetail = map[string]func(w io.Writer, bucketID, objectName string){
  29. "oss": func(w io.Writer, bucketID, objectName string) {
  30. Download(w, 0, bucketID, objectName)
  31. },
  32. "es": func(w io.Writer, bucketID, objectName string) {
  33. GetDetailFromEsPool <- true
  34. defer func() {
  35. <-GetDetailFromEsPool
  36. }()
  37. indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
  38. list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  39. if list != nil && len(*list) > 0 {
  40. detail, _ := (*list)[0]["detail"].(string)
  41. io.Copy(w, bytes.NewReader([]byte(detail)))
  42. }
  43. },
  44. "mgo": func(w io.Writer, bucketID, objectName string) {
  45. GetDetailFromMgoPool <- true
  46. defer func() {
  47. <-GetDetailFromMgoPool
  48. }()
  49. data, _ := config.Mgo.FindOneByField(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
  50. if data != nil && len(*data) > 0 {
  51. detail, _ := (*data)["detail"].(string)
  52. io.Copy(w, bytes.NewReader([]byte(detail)))
  53. }
  54. },
  55. }
  56. )
  57. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  58. func GetBucket(bucketID string) (config.BucketInfo, error) {
  59. var buckets []*config.BucketInfo
  60. g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
  61. for _, v := range buckets {
  62. if v.BucketID == bucketID {
  63. return *v, nil
  64. }
  65. }
  66. log.Println("Bucket %s not found in cache, querying database", bucketID)
  67. // 模拟数据库查询:实际业务中这里应查询数据库
  68. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  69. }
  70. // bucketPool 缓存 bucket 对象,避免重复创建
  71. //var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  72. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  73. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  74. // 优先从对象池取
  75. //if cached, ok := bucketPool.Load(bucketID); ok {
  76. // return cached.(*ossSDK.Bucket), nil
  77. //}
  78. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  79. bucketInfo, err := GetBucket(bucketID)
  80. if err != nil {
  81. return nil, err
  82. }
  83. client, err := getOSSClient(bucketInfo.AccountID)
  84. if err != nil {
  85. return nil, err
  86. }
  87. bucket, err := client.Bucket(bucketInfo.BucketName)
  88. if err != nil {
  89. return nil, err
  90. }
  91. // 存入对象池
  92. //bucketPool.Store(bucketID, bucket)
  93. return bucket, nil
  94. }
  95. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  96. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  97. if val, ok := accountMap.Load(accountID); ok {
  98. acc := val.(config.OSSAccount)
  99. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return client, nil
  104. }
  105. return nil, fmt.Errorf("oss account %s not found", accountID)
  106. }
  107. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  108. func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  109. bucket, err := GetCachedBucket(bucketID)
  110. if err != nil {
  111. return err
  112. }
  113. var reader io.Reader
  114. if gzipEnabled {
  115. pr, pw := io.Pipe()
  116. go func() {
  117. gzipWriter := gzip.NewWriter(pw)
  118. _, err := io.Copy(gzipWriter, data)
  119. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  120. gzipWriter.Close()
  121. pw.CloseWithError(err)
  122. }()
  123. reader = pr
  124. } else {
  125. reader = data
  126. }
  127. // 上传到OSS
  128. return bucket.PutObject(objectName, reader)
  129. }
  130. // downloadAttachment 下载,如果检测到数据为gzip压缩,则自动解压后返回
  131. func Download(w io.Writer, autoExtract int, bucketID, objectName string) error {
  132. atomic.AddInt64(&util.DownLoadCounter, 1)
  133. DownLoadPool <- true
  134. defer func() {
  135. <-DownLoadPool
  136. atomic.AddInt64(&util.DownLoadCounter, -1)
  137. }()
  138. bucket, err := GetCachedBucket(bucketID)
  139. if err != nil {
  140. return err
  141. }
  142. result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
  143. if err != nil {
  144. return err
  145. }
  146. defer result.Response.Body.Close()
  147. if rw, ok := w.(http.ResponseWriter); ok && result.Response.Headers.Get("Content-Type") != "text/plain" {
  148. rw.Header().Set("Content-Type", result.Response.Headers.Get("Content-Type")) // 根据文件类型调整此行
  149. rw.Header().Set("Content-Disposition", "attachment; filename="+objectName)
  150. }
  151. if autoExtract == 1 {
  152. data, err := ioutil.ReadAll(result.Response.Body)
  153. if err != nil {
  154. return err
  155. }
  156. // 判断是否为gzip压缩格式(判断前两个字节)
  157. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  158. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  159. if err != nil {
  160. return err
  161. }
  162. defer gzipReader.Close()
  163. if _, err = io.Copy(w, gzipReader); err != nil {
  164. return err
  165. }
  166. return nil
  167. }
  168. } else {
  169. if _, err = io.Copy(w, result.Response.Body); err != nil {
  170. return err
  171. }
  172. }
  173. return nil
  174. }
  175. // DeleteAttachment 删除附件
  176. func Delete(bucketID, objectName string) error {
  177. bucket, err := GetCachedBucket(bucketID)
  178. if err != nil {
  179. return err
  180. }
  181. return bucket.DeleteObject(objectName)
  182. }
  183. // LoadOSSAccounts 加载OSS帐号信息到缓存
  184. func LoadOSSAccounts() {
  185. var oas []*config.OSSAccount
  186. g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
  187. accountMap.Clear()
  188. for _, acc := range oas {
  189. accountMap.Store(acc.ID, *acc)
  190. }
  191. //bucketPool.Clear()
  192. }
  193. // 获取标讯正文,优先从oss中取,再从es中取
  194. func GetBidDetail(w io.Writer, bucketID, objectName string) {
  195. atomic.AddInt64(&util.GetDetailCounter, 1)
  196. defer atomic.AddInt64(&util.GetDetailCounter, -1)
  197. for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() {
  198. getDetail[v](w, bucketID, objectName)
  199. }
  200. }
  201. // 检查入参
  202. func checkArgs(args *entity.Args) error {
  203. if args.BucketID == "" {
  204. return errors.New(api.Error_msg_1002 + "BucketID")
  205. } else if args.ObjectName == "" {
  206. return errors.New(api.Error_msg_1002 + "ObjectName")
  207. }
  208. return nil
  209. }