123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- package oss
- import (
- "app.yhyue.com/moapp/jybase/api"
- "bytes"
- "compress/gzip"
- "errors"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "io"
- "io/ioutil"
- "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity"
- "log"
- "net/http"
- "sync"
- "sync/atomic"
- "app.yhyue.com/moapp/jybase/es"
- ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
- "jygit.jydev.jianyu360.cn/BaseService/ossService/util"
- )
- var (
- // accountMap 用来缓存OSS帐号信息
- accountMap sync.Map
- UploadPool = make(chan bool, g.Config().MustGet(gctx.New(), "uploadPoolSize").Int())
- DownloadPool = make(chan bool, g.Config().MustGet(gctx.New(), "downloadPoolSize").Int())
- GetDetailFromEsPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
- GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
- getDetail = map[string]func(w io.Writer, bucketID, objectName string) error{
- "oss": func(w io.Writer, bucketID, objectName string) error {
- return Download(w, 0, bucketID, objectName+".txt")
- },
- "es": func(w io.Writer, bucketID, objectName string) error {
- GetDetailFromEsPool <- true
- defer func() {
- <-GetDetailFromEsPool
- }()
- indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String()
- list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
- if list != nil && len(*list) > 0 {
- detail, _ := (*list)[0]["detail"].(string)
- buf := bufferPool.Get().(*[]byte)
- defer bufferPool.Put(buf)
- _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
- return err
- }
- return errors.New("not find from es")
- },
- "mgo": func(w io.Writer, bucketID, objectName string) error {
- GetDetailFromMgoPool <- true
- defer func() {
- <-GetDetailFromMgoPool
- }()
- data, _ := config.Mgo.FindById(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
- if data != nil && len(*data) > 0 {
- detail, _ := (*data)["detail"].(string)
- buf := bufferPool.Get().(*[]byte)
- defer bufferPool.Put(buf)
- _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
- return err
- }
- return errors.New("not find from mgo")
- },
- }
- bufferPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, 512*1024) // 512KB 缓冲区
- return &buf
- },
- }
- )
- // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
- func GetBucket(bucketID string) (config.BucketInfo, error) {
- var buckets []*config.BucketInfo
- g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets)
- for _, v := range buckets {
- if v.BucketID == bucketID {
- return *v, nil
- }
- }
- // 模拟数据库查询:实际业务中这里应查询数据库
- return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
- }
- // bucketPool 缓存 bucket 对象,避免重复创建
- var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
- // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
- func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
- // 优先从对象池取
- if cached, ok := bucketPool.Load(bucketID); ok {
- return cached.(*ossSDK.Bucket), nil
- }
- // 如果不存在则依次读取 bucketInfo 和 OSS client 信息
- bucketInfo, err := GetBucket(bucketID)
- if err != nil {
- log.Println(err)
- return nil, err
- }
- client, err := getOSSClient(bucketInfo.AccountID)
- if err != nil {
- log.Println(err)
- return nil, err
- }
- bucket, err := client.Bucket(bucketInfo.BucketName)
- if err != nil {
- log.Println(err)
- return nil, err
- }
- // 存入对象池
- bucketPool.Store(bucketID, bucket)
- return bucket, nil
- }
- // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client
- func getOSSClient(accountID string) (*ossSDK.Client, error) {
- if val, ok := accountMap.Load(accountID); ok {
- acc := val.(config.OSSAccount)
- client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret)
- if err != nil {
- return nil, err
- }
- return client, nil
- }
- return nil, fmt.Errorf("oss account %s not found", accountID)
- }
- // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
- func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
- atomic.AddInt64(&util.UploadCounter, 1)
- UploadPool <- true
- defer func() {
- <-UploadPool
- atomic.AddInt64(&util.UploadCounter, -1)
- }()
- bucket, err := GetCachedBucket(bucketID)
- if err != nil {
- return err
- }
- var reader io.Reader
- if gzipEnabled {
- pr, pw := io.Pipe()
- go func() {
- gzipWriter := gzip.NewWriter(pw)
- _, err := io.Copy(gzipWriter, data)
- // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中
- gzipWriter.Close()
- pw.CloseWithError(err)
- }()
- reader = pr
- } else {
- reader = data
- }
- // 上传到OSS
- if err := bucket.PutObject(objectName, reader); err != nil {
- log.Println(bucketID, objectName, "上传出错", err)
- return err
- }
- return nil
- }
- // downloadAttachment 下载,如果检测到数据为gzip压缩,则自动解压后返回
- func Download(w io.Writer, autoExtract int, bucketID, objectName string) error {
- atomic.AddInt64(&util.DownloadCounter, 1)
- DownloadPool <- true
- defer func() {
- <-DownloadPool
- atomic.AddInt64(&util.DownloadCounter, -1)
- }()
- bucket, err := GetCachedBucket(bucketID)
- if err != nil {
- return err
- }
- result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
- if err != nil {
- log.Println(bucketID, objectName, "下载出错", err)
- return err
- }
- defer result.Response.Body.Close()
- if rw, ok := w.(http.ResponseWriter); ok && result.Response.Headers.Get("Content-Type") != "text/plain" {
- rw.Header().Set("Content-Type", result.Response.Headers.Get("Content-Type")) // 根据文件类型调整此行
- rw.Header().Set("Content-Disposition", "attachment; filename="+objectName)
- }
- buf := bufferPool.Get().(*[]byte)
- defer bufferPool.Put(buf)
- if autoExtract == 1 {
- data, err := ioutil.ReadAll(result.Response.Body)
- if err != nil {
- log.Println(bucketID, objectName, "下载后ioutil.ReadAll出错", err)
- return err
- }
- // 判断是否为gzip压缩格式(判断前两个字节)
- if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
- gzipReader, err := gzip.NewReader(bytes.NewReader(data))
- if err != nil {
- log.Println(bucketID, objectName, "下载后gzip.NewReader出错", err)
- return err
- }
- defer gzipReader.Close()
- if _, err = io.CopyBuffer(w, gzipReader, *buf); err != nil {
- log.Println(bucketID, objectName, "下载解压后io.Copy出错", err)
- return err
- }
- return nil
- }
- } else {
- if _, err = io.CopyBuffer(w, result.Response.Body, *buf); err != nil {
- log.Println(bucketID, objectName, "下载后io.Copy出错", err)
- return err
- }
- }
- return nil
- }
- // DeleteAttachment 删除附件
- func Delete(bucketID, objectName string) error {
- bucket, err := GetCachedBucket(bucketID)
- if err != nil {
- return err
- }
- if err := bucket.DeleteObject(objectName); err != nil {
- log.Println(bucketID, objectName, "删除出错", err)
- }
- return err
- }
- // LoadOSSAccounts 加载OSS帐号信息到缓存
- func LoadOSSAccounts() {
- var oas []*config.OSSAccount
- g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas)
- accountMap.Clear()
- for _, acc := range oas {
- accountMap.Store(acc.ID, *acc)
- }
- bucketPool.Clear()
- }
- // 获取标讯正文,优先从oss中取,再从es中取
- func GetBidDetail(w io.Writer, bucketID, objectName string) {
- atomic.AddInt64(&util.GetDetailCounter, 1)
- defer atomic.AddInt64(&util.GetDetailCounter, -1)
- for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() {
- if err := getDetail[v](w, bucketID, objectName); err == nil {
- break
- }
- }
- }
- // 检查入参
- func checkArgs(args *entity.Args) error {
- if args.BucketID == "" {
- return errors.New(api.Error_msg_1002 + "BucketID")
- } else if args.ObjectName == "" {
- return errors.New(api.Error_msg_1002 + "ObjectName")
- }
- return nil
- }
|