oss.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package oss
  2. import (
  3. "app.yhyue.com/moapp/jybase/api"
  4. "bytes"
  5. "compress/gzip"
  6. "errors"
  7. "fmt"
  8. "github.com/gogf/gf/v2/frame/g"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "io"
  11. "io/ioutil"
  12. "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
  13. "log"
  14. "net/http"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. "app.yhyue.com/moapp/jybase/es"
  19. ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
  20. "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  21. "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
  22. )
  23. var (
  24. // accountMap 用来缓存OSS帐号信息
  25. accountMap sync.Map
  26. UploadPool = make(chan bool, g.Config().MustGet(gctx.New(), "uploadPoolSize").Int())
  27. DownloadPool = make(chan bool, g.Config().MustGet(gctx.New(), "downloadPoolSize").Int())
  28. GetDetailFromEsPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
  29. GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
  30. getDetail = map[string]func(w io.Writer, bucketID, objectName string) error{
  31. "oss": func(w io.Writer, bucketID, objectName string) error {
  32. return Download(w, 0, bucketID, objectName+".txt")
  33. },
  34. "es": func(w io.Writer, bucketID, objectName string) error {
  35. GetDetailFromEsPool <- true
  36. defer func() {
  37. <-GetDetailFromEsPool
  38. }()
  39. indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
  40. list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
  41. if list != nil && len(*list) > 0 {
  42. detail, _ := (*list)[0]["detail"].(string)
  43. buf := bufferPool.Get().(*[]byte)
  44. defer bufferPool.Put(buf)
  45. _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
  46. return err
  47. }
  48. return errors.New("not find from es")
  49. },
  50. "mgo": func(w io.Writer, bucketID, objectName string) error {
  51. GetDetailFromMgoPool <- true
  52. defer func() {
  53. <-GetDetailFromMgoPool
  54. }()
  55. data, _ := config.Mgo.FindById(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
  56. if data != nil && len(*data) > 0 {
  57. detail, _ := (*data)["detail"].(string)
  58. buf := bufferPool.Get().(*[]byte)
  59. defer bufferPool.Put(buf)
  60. _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
  61. return err
  62. }
  63. return errors.New("not find from mgo")
  64. },
  65. }
  66. bufferPool = sync.Pool{
  67. New: func() interface{} {
  68. buf := make([]byte, 512*1024) // 512KB 缓冲区
  69. return &buf
  70. },
  71. }
  72. )
  73. // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
  74. func GetBucket(bucketID string) (config.BucketInfo, error) {
  75. var buckets []*config.BucketInfo
  76. g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
  77. for _, v := range buckets {
  78. if v.BucketID == bucketID {
  79. return *v, nil
  80. }
  81. }
  82. // 模拟数据库查询:实际业务中这里应查询数据库
  83. return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
  84. }
  85. // bucketPool 缓存 bucket 对象,避免重复创建
  86. var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
  87. // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
  88. func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
  89. // 优先从对象池取
  90. if cached, ok := bucketPool.Load(bucketID); ok {
  91. return cached.(*ossSDK.Bucket), nil
  92. }
  93. // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
  94. bucketInfo, err := GetBucket(bucketID)
  95. if err != nil {
  96. log.Println(err)
  97. return nil, err
  98. }
  99. client, err := getOSSClient(bucketInfo.AccountID)
  100. if err != nil {
  101. log.Println(err)
  102. return nil, err
  103. }
  104. bucket, err := client.Bucket(bucketInfo.BucketName)
  105. if err != nil {
  106. log.Println(err)
  107. return nil, err
  108. }
  109. // 存入对象池
  110. bucketPool.Store(bucketID, bucket)
  111. return bucket, nil
  112. }
  113. // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
  114. func getOSSClient(accountID string) (*ossSDK.Client, error) {
  115. if val, ok := accountMap.Load(accountID); ok {
  116. acc := val.(config.OSSAccount)
  117. client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
  118. if err != nil {
  119. return nil, err
  120. }
  121. return client, nil
  122. }
  123. return nil, fmt.Errorf("oss account %s not found", accountID)
  124. }
  125. // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
  126. func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
  127. atomic.AddInt64(&util.UploadCounter, 1)
  128. UploadPool <- true
  129. defer func() {
  130. <-UploadPool
  131. atomic.AddInt64(&util.UploadCounter, -1)
  132. }()
  133. bucket, err := GetCachedBucket(bucketID)
  134. if err != nil {
  135. return err
  136. }
  137. var reader io.Reader
  138. if gzipEnabled {
  139. pr, pw := io.Pipe()
  140. go func() {
  141. gzipWriter := gzip.NewWriter(pw)
  142. _, err := io.Copy(gzipWriter, data)
  143. // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
  144. gzipWriter.Close()
  145. pw.CloseWithError(err)
  146. }()
  147. reader = pr
  148. } else {
  149. reader = data
  150. }
  151. // 上传到OSS
  152. retry := 0
  153. for {
  154. if err := bucket.PutObject(objectName, reader); err != nil {
  155. retrySleep := g.Config().MustGet(gctx.New(), "retrySleep", 3).Int()
  156. log.Println(bucketID, objectName, "第", retry+1, "次上传出错,", retrySleep, "秒后重试", err)
  157. if retry >= g.Config().MustGet(gctx.New(), "retry", 3).Int() {
  158. return err
  159. }
  160. retry++
  161. time.Sleep(time.Duration(retrySleep) * time.Second)
  162. } else {
  163. break
  164. }
  165. }
  166. return nil
  167. }
  168. // downloadAttachment 下载,如果检测到数据为gzip压缩,则自动解压后返回
  169. func Download(w io.Writer, autoExtract int, bucketID, objectName string) error {
  170. atomic.AddInt64(&util.DownloadCounter, 1)
  171. DownloadPool <- true
  172. defer func() {
  173. <-DownloadPool
  174. atomic.AddInt64(&util.DownloadCounter, -1)
  175. }()
  176. bucket, err := GetCachedBucket(bucketID)
  177. if err != nil {
  178. return err
  179. }
  180. result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
  181. if err != nil {
  182. log.Println(bucketID, objectName, "下载出错", err)
  183. return err
  184. }
  185. defer result.Response.Body.Close()
  186. if rw, ok := w.(http.ResponseWriter); ok && result.Response.Headers.Get("Content-Type") != "text/plain" {
  187. rw.Header().Set("Content-Type", result.Response.Headers.Get("Content-Type")) // 根据文件类型调整此行
  188. rw.Header().Set("Content-Disposition", "attachment; filename="+objectName)
  189. }
  190. buf := bufferPool.Get().(*[]byte)
  191. defer bufferPool.Put(buf)
  192. if autoExtract == 1 {
  193. data, err := ioutil.ReadAll(result.Response.Body)
  194. if err != nil {
  195. log.Println(bucketID, objectName, "下载后ioutil.ReadAll出错", err)
  196. return err
  197. }
  198. // 判断是否为gzip压缩格式(判断前两个字节)
  199. if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
  200. gzipReader, err := gzip.NewReader(bytes.NewReader(data))
  201. if err != nil {
  202. log.Println(bucketID, objectName, "下载后gzip.NewReader出错", err)
  203. return err
  204. }
  205. defer gzipReader.Close()
  206. if _, err = io.CopyBuffer(w, gzipReader, *buf); err != nil {
  207. log.Println(bucketID, objectName, "下载解压后io.Copy出错", err)
  208. return err
  209. }
  210. return nil
  211. }
  212. } else {
  213. if _, err = io.CopyBuffer(w, result.Response.Body, *buf); err != nil {
  214. log.Println(bucketID, objectName, "下载后io.Copy出错", err)
  215. return err
  216. }
  217. }
  218. return nil
  219. }
  220. // DeleteAttachment 删除附件
  221. func Delete(bucketID, objectName string) error {
  222. bucket, err := GetCachedBucket(bucketID)
  223. if err != nil {
  224. return err
  225. }
  226. if err := bucket.DeleteObject(objectName); err != nil {
  227. log.Println(bucketID, objectName, "删除出错", err)
  228. }
  229. return err
  230. }
  231. // LoadOSSAccounts 加载OSS帐号信息到缓存
  232. func LoadOSSAccounts() {
  233. var oas []*config.OSSAccount
  234. g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
  235. accountMap.Clear()
  236. for _, acc := range oas {
  237. accountMap.Store(acc.ID, *acc)
  238. }
  239. bucketPool.Clear()
  240. }
  241. // 获取标讯正文,优先从oss中取,再从es中取
  242. func GetBidDetail(w io.Writer, bucketID, objectName string) {
  243. atomic.AddInt64(&util.GetDetailCounter, 1)
  244. defer atomic.AddInt64(&util.GetDetailCounter, -1)
  245. for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() {
  246. if err := getDetail[v](w, bucketID, objectName); err == nil {
  247. break
  248. }
  249. }
  250. }
  251. // 检查入参
  252. func checkArgs(args *entity.Args) error {
  253. if args.BucketID == "" {
  254. return errors.New(api.Error_msg_1002 + "BucketID")
  255. } else if args.ObjectName == "" {
  256. return errors.New(api.Error_msg_1002 + "ObjectName")
  257. }
  258. return nil
  259. }