package oss import ( "app.yhyue.com/moapp/jybase/api" "bytes" "compress/gzip" "errors" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "io" "io/ioutil" "jygit.jydev.jianyu360.cn/BaseService/ossClient/entity" "log" "net/http" "sync" "sync/atomic" "time" "app.yhyue.com/moapp/jybase/es" ossSDK "github.com/aliyun/aliyun-oss-go-sdk/oss" "jygit.jydev.jianyu360.cn/BaseService/ossService/config" "jygit.jydev.jianyu360.cn/BaseService/ossService/util" ) var ( // accountMap 用来缓存OSS帐号信息 accountMap sync.Map UploadPool = make(chan bool, g.Config().MustGet(gctx.New(), "uploadPoolSize").Int()) DownloadPool = make(chan bool, g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()) GetDetailFromEsPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()) GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()) getDetail = map[string]func(w io.Writer, bucketID, objectName string) error{ "oss": func(w io.Writer, bucketID, objectName string) error { return Download(w, 0, bucketID, objectName+".txt") }, "es": func(w io.Writer, bucketID, objectName string) error { GetDetailFromEsPool <- true defer func() { <-GetDetailFromEsPool }() indexName := g.Config().MustGet(gctx.New(), "elasticSearch.indexName").String() list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName)) if list != nil && len(*list) > 0 { detail, _ := (*list)[0]["detail"].(string) buf := bufferPool.Get().(*[]byte) defer bufferPool.Put(buf) _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf) return err } return errors.New("not find from es") }, "mgo": func(w io.Writer, bucketID, objectName string) error { GetDetailFromMgoPool <- true defer func() { <-GetDetailFromMgoPool }() data, _ := config.Mgo.FindById(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`) if data != nil && len(*data) > 0 { detail, _ := (*data)["detail"].(string) buf := bufferPool.Get().(*[]byte) defer bufferPool.Put(buf) _, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf) return err } return errors.New("not find from mgo") }, } bufferPool = sync.Pool{ New: func() interface{} { buf := make([]byte, 512*1024) // 512KB 缓冲区 return &buf }, } ) // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次) func GetBucket(bucketID string) (config.BucketInfo, error) { var buckets []*config.BucketInfo g.Config().MustGet(gctx.New(), "buckets").Structs(&buckets) for _, v := range buckets { if v.BucketID == bucketID { return *v, nil } } // 模拟数据库查询:实际业务中这里应查询数据库 return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID) } // bucketPool 缓存 bucket 对象,避免重复创建 var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象 func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) { // 优先从对象池取 if cached, ok := bucketPool.Load(bucketID); ok { return cached.(*ossSDK.Bucket), nil } // 如果不存在则依次读取 bucketInfo 和 OSS client 信息 bucketInfo, err := GetBucket(bucketID) if err != nil { log.Println(err) return nil, err } client, err := getOSSClient(bucketInfo.AccountID) if err != nil { log.Println(err) return nil, err } bucket, err := client.Bucket(bucketInfo.BucketName) if err != nil { log.Println(err) return nil, err } // 存入对象池 bucketPool.Store(bucketID, bucket) return bucket, nil } // getOSSClient 根据OSS帐号ID从accountMap中获取OSS Client func getOSSClient(accountID string) (*ossSDK.Client, error) { if val, ok := accountMap.Load(accountID); ok { acc := val.(config.OSSAccount) client, err := ossSDK.New(acc.Endpoint, acc.AccessKeyId, acc.AccessKeySecret) if err != nil { return nil, err } return client, nil } return nil, fmt.Errorf("oss account %s not found", accountID) } // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩) func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error { atomic.AddInt64(&util.UploadCounter, 1) UploadPool <- true defer func() { <-UploadPool atomic.AddInt64(&util.UploadCounter, -1) }() bucket, err := GetCachedBucket(bucketID) if err != nil { return err } var reader io.Reader if gzipEnabled { pr, pw := io.Pipe() go func() { gzipWriter := gzip.NewWriter(pw) _, err := io.Copy(gzipWriter, data) // 注意:关闭 gzipWriter 时会将数据刷新到 pw 中 gzipWriter.Close() pw.CloseWithError(err) }() reader = pr } else { reader = data } // 上传到OSS retry := 0 for { if err := bucket.PutObject(objectName, reader); err != nil { retrySleep := g.Config().MustGet(gctx.New(), "retrySleep", 3).Int() log.Println(bucketID, objectName, "第", retry+1, "次上传出错,", retrySleep, "秒后重试", err) if retry >= g.Config().MustGet(gctx.New(), "retry", 3).Int() { return err } retry++ time.Sleep(time.Duration(retrySleep) * time.Second) } else { break } } return nil } // downloadAttachment 下载,如果检测到数据为gzip压缩,则自动解压后返回 func Download(w io.Writer, autoExtract int, bucketID, objectName string) error { atomic.AddInt64(&util.DownloadCounter, 1) DownloadPool <- true defer func() { <-DownloadPool atomic.AddInt64(&util.DownloadCounter, -1) }() bucket, err := GetCachedBucket(bucketID) if err != nil { return err } result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil) if err != nil { log.Println(bucketID, objectName, "下载出错", err) return err } defer result.Response.Body.Close() if rw, ok := w.(http.ResponseWriter); ok && result.Response.Headers.Get("Content-Type") != "text/plain" { rw.Header().Set("Content-Type", result.Response.Headers.Get("Content-Type")) // 根据文件类型调整此行 rw.Header().Set("Content-Disposition", "attachment; filename="+objectName) } buf := bufferPool.Get().(*[]byte) defer bufferPool.Put(buf) if autoExtract == 1 { data, err := ioutil.ReadAll(result.Response.Body) if err != nil { log.Println(bucketID, objectName, "下载后ioutil.ReadAll出错", err) return err } // 判断是否为gzip压缩格式(判断前两个字节) if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b { gzipReader, err := gzip.NewReader(bytes.NewReader(data)) if err != nil { log.Println(bucketID, objectName, "下载后gzip.NewReader出错", err) return err } defer gzipReader.Close() if _, err = io.CopyBuffer(w, gzipReader, *buf); err != nil { log.Println(bucketID, objectName, "下载解压后io.Copy出错", err) return err } return nil } } else { if _, err = io.CopyBuffer(w, result.Response.Body, *buf); err != nil { log.Println(bucketID, objectName, "下载后io.Copy出错", err) return err } } return nil } // DeleteAttachment 删除附件 func Delete(bucketID, objectName string) error { bucket, err := GetCachedBucket(bucketID) if err != nil { return err } if err := bucket.DeleteObject(objectName); err != nil { log.Println(bucketID, objectName, "删除出错", err) } return err } // LoadOSSAccounts 加载OSS帐号信息到缓存 func LoadOSSAccounts() { var oas []*config.OSSAccount g.Config().MustGet(gctx.New(), "oss_accounts").Structs(&oas) accountMap.Clear() for _, acc := range oas { accountMap.Store(acc.ID, *acc) } bucketPool.Clear() } // 获取标讯正文,优先从oss中取,再从es中取 func GetBidDetail(w io.Writer, bucketID, objectName string) { atomic.AddInt64(&util.GetDetailCounter, 1) defer atomic.AddInt64(&util.GetDetailCounter, -1) for _, v := range g.Config().MustGet(gctx.New(), "getDetailOrder").Strings() { if err := getDetail[v](w, bucketID, objectName); err == nil { break } } } // 检查入参 func checkArgs(args *entity.Args) error { if args.BucketID == "" { return errors.New(api.Error_msg_1002 + "BucketID") } else if args.ObjectName == "" { return errors.New(api.Error_msg_1002 + "ObjectName") } return nil }