wangchuanjin 1 mês atrás
pai
commit
8c49f7a5ff
7 arquivos alterados com 101 adições e 45 exclusões
  1. 6 3
      config.yaml
  2. 8 6
      main.go
  3. 1 1
      oss/grpc.go
  4. 1 1
      oss/http.go
  5. 52 23
      oss/oss.go
  6. 3 3
      oss/rpc.go
  7. 30 8
      util/warn.go

+ 6 - 3
config.yaml

@@ -56,13 +56,16 @@ logger:
   maxBackups: 3
   maxAge: 3
   compress: true
-downLoadPoolSize: 200
+uploadPoolSize: 200
+downloadPoolSize: 200
 getDetailFromEsPoolSize: 100
 getDetailFromMgoPoolSize: 100
 onlineNodesWarn: "在线节点数少于%d,当前在线节点数:%d"
-downLoadQueueWarn: "下载并发数%d,排队数量超过%d,请检查!"
+uploadQueueWarn: "下载并发数%d,排队数量超过%d,请检查!"
+downloadQueueWarn: "下载并发数%d,排队数量超过%d,请检查!"
 getDetailQueueWarn: "获取正文oss并发数%d,es并发数%d,mgo并发数%d,排队数量超过%d,请检查!"
-downLoadLineUpWarnSize: 10
+uploadLineUpWarnSize: 10
+downloadLineUpWarnSize: 10
 getDetailLineUpWarnSize: 10
 getDetailOrder:  #mgo es oss
   - es

+ 8 - 6
main.go

@@ -25,7 +25,7 @@ func main() {
 	// 初始化OSS帐号与bucket信息
 	ossService.LoadOSSAccounts()
 	// 注册一个回调函数,当配置发生变更时会被调用
-	downLoadPoolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int()
+	downloadPoolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int()
 	getDetailFromEsPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int()
 	getDetailFromMgoPoolSize := g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int()
 	gfsnotify.Add("./config.yaml", func(event *gfsnotify.Event) {
@@ -33,9 +33,9 @@ func main() {
 		if event.IsWrite() || event.IsChmod() || event.IsRename() {
 			log.Println("配置文件有变化,更新内存。。。")
 			ossService.LoadOSSAccounts()
-			if poolSize := g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int(); downLoadPoolSize != poolSize {
-				downLoadPoolSize = poolSize
-				ossService.DownLoadPool = make(chan bool, downLoadPoolSize)
+			if poolSize := g.Config().MustGet(gctx.New(), "downloadPoolSize").Int(); downloadPoolSize != poolSize {
+				downloadPoolSize = poolSize
+				ossService.DownloadPool = make(chan bool, downloadPoolSize)
 			}
 			if poolSize := g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int(); getDetailFromEsPoolSize != poolSize {
 				getDetailFromEsPoolSize = poolSize
@@ -50,12 +50,14 @@ func main() {
 	// 启动心跳检测协程,每5秒发送一次心跳,同时检查在线节点数量
 	go func() {
 		ticker := time.NewTicker(5 * time.Second)
-		var onlineNodesPrevWarn, downLoadQueuePrevWarn int64
+		var onlineNodesPrevWarn, downloadQueuePrevWarn, uploadQueuePrevWarn, getDetailQueuePrevWarn int64
 		ctx := gctx.New()
 		for range ticker.C {
 			util.SendHeartbeat(ctx)
 			util.CheckOnlineNodes(ctx, &onlineNodesPrevWarn)
-			util.CheckDownLoadQueue(ctx, &downLoadQueuePrevWarn)
+			util.CheckDownloadQueue(ctx, &downloadQueuePrevWarn)
+			util.CheckUploadQueue(ctx, &uploadQueuePrevWarn)
+			util.CheckGetDetailQueue(ctx, &getDetailQueuePrevWarn)
 		}
 	}()
 	go func() {

+ 1 - 1
oss/grpc.go

@@ -33,7 +33,7 @@ func (g *Grpc) Download(req *pb.DownloadRequest, resp pb.Service_DownloadServer)
 
 func (g *Grpc) GetBidDetail(req *pb.DownloadRequest, resp pb.Service_DownloadServer) error {
 	if err := checkArgs(&entity.Args{BucketID: req.BucketID, ObjectName: req.ObjectName}); err != nil {
-		log.Println("grpc方式获取正文", err)
+		log.Println("grpc方式", fmt.Sprintf(constant.GetBidDetailFail, err))
 		return err
 	}
 	log.Println("grpc方式获取正文", req.BucketID, req.ObjectName)

+ 1 - 1
oss/http.go

@@ -77,7 +77,7 @@ func DownloadHandler(w http.ResponseWriter, r *http.Request) {
 	}
 	log.Println(GetIp(r), "restful方式下载文件", bucketID, objectName, autoExtract)
 	if err := Download(w, autoExtract, bucketID, objectName); err != nil {
-		log.Println(constant.DownloadFail, err)
+		log.Println(bucketID, objectName, "restful方式", fmt.Sprintf(constant.DownloadFail, err))
 		http.Error(w, fmt.Sprintf(constant.DownloadFail, err), http.StatusInternalServerError)
 		return
 	}

+ 52 - 23
oss/oss.go

@@ -25,7 +25,8 @@ import (
 var (
 	// accountMap 用来缓存OSS帐号信息
 	accountMap           sync.Map
-	DownLoadPool         = make(chan bool, g.Config().MustGet(gctx.New(), "downLoadPoolSize").Int())
+	UploadPool           = make(chan bool, g.Config().MustGet(gctx.New(), "uploadPoolSize").Int())
+	DownloadPool         = make(chan bool, g.Config().MustGet(gctx.New(), "downloadPoolSize").Int())
 	GetDetailFromEsPool  = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromEsPoolSize").Int())
 	GetDetailFromMgoPool = make(chan bool, g.Config().MustGet(gctx.New(), "getDetailFromMgoPoolSize").Int())
 	getDetail            = map[string]func(w io.Writer, bucketID, objectName string) error{
@@ -41,7 +42,9 @@ var (
 			list := es.VarEs.Get(indexName, indexName, fmt.Sprintf(`{"query":{"bool":{"filter":{"term":{"_id":"%s"}}}},"_source":["detail"]}`, objectName))
 			if list != nil && len(*list) > 0 {
 				detail, _ := (*list)[0]["detail"].(string)
-				_, err := io.Copy(w, bytes.NewReader([]byte(detail)))
+				buf := bufferPool.Get().(*[]byte)
+				defer bufferPool.Put(buf)
+				_, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
 				return err
 			}
 			return errors.New("not find from es")
@@ -54,12 +57,20 @@ var (
 			data, _ := config.Mgo.FindById(g.Config().MustGet(gctx.New(), "mongodb.collection").String(), objectName, `{"detail":1}`)
 			if data != nil && len(*data) > 0 {
 				detail, _ := (*data)["detail"].(string)
-				_, err := io.Copy(w, bytes.NewReader([]byte(detail)))
+				buf := bufferPool.Get().(*[]byte)
+				defer bufferPool.Put(buf)
+				_, err := io.CopyBuffer(w, bytes.NewReader([]byte(detail)), *buf)
 				return err
 			}
 			return errors.New("not find from mgo")
 		},
 	}
+	bufferPool = sync.Pool{
+		New: func() interface{} {
+			buf := make([]byte, 512*1024) // 512KB 缓冲区
+			return &buf
+		},
+	}
 )
 
 // GetBucket 根据bucketID获取bucket信息,如果没有则模拟查询数据库(这里只查询一次)
@@ -71,40 +82,41 @@ func GetBucket(bucketID string) (config.BucketInfo, error) {
 			return *v, nil
 		}
 	}
-	err := fmt.Errorf("bucket id %s not found", bucketID)
-	log.Println(err)
 	// 模拟数据库查询:实际业务中这里应查询数据库
-	return config.BucketInfo{}, err
+	return config.BucketInfo{}, fmt.Errorf("bucket id %s not found", bucketID)
 }
 
 // bucketPool 缓存 bucket 对象,避免重复创建
-//var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
+var bucketPool sync.Map // key: bucketID, value: *ossSDK.Bucket
 
 // GetCachedBucket 对外暴露统一入口,根据 bucketID 获取 *ossSDK.Bucket 对象
 func GetCachedBucket(bucketID string) (*ossSDK.Bucket, error) {
 	// 优先从对象池取
-	//if cached, ok := bucketPool.Load(bucketID); ok {
-	//	return cached.(*ossSDK.Bucket), nil
-	//}
+	if cached, ok := bucketPool.Load(bucketID); ok {
+		return cached.(*ossSDK.Bucket), nil
+	}
 
 	// 如果不存在则依次读取 bucketInfo 和 OSS client 信息
 	bucketInfo, err := GetBucket(bucketID)
 	if err != nil {
+		log.Println(err)
 		return nil, err
 	}
 
 	client, err := getOSSClient(bucketInfo.AccountID)
 	if err != nil {
+		log.Println(err)
 		return nil, err
 	}
 
 	bucket, err := client.Bucket(bucketInfo.BucketName)
 	if err != nil {
+		log.Println(err)
 		return nil, err
 	}
 
 	// 存入对象池
-	//bucketPool.Store(bucketID, bucket)
+	bucketPool.Store(bucketID, bucket)
 	return bucket, nil
 }
 
@@ -123,6 +135,12 @@ func getOSSClient(accountID string) (*ossSDK.Client, error) {
 
 // UploadAttachment 上传附件;如果gzipEnabled为true,则进行gzip压缩(使用 io.Pipe 实现流式压缩)
 func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error {
+	atomic.AddInt64(&util.UploadCounter, 1)
+	UploadPool <- true
+	defer func() {
+		<-UploadPool
+		atomic.AddInt64(&util.UploadCounter, -1)
+	}()
 	bucket, err := GetCachedBucket(bucketID)
 	if err != nil {
 		return err
@@ -142,26 +160,29 @@ func Upload(bucketID, objectName string, data io.Reader, gzipEnabled bool) error
 	} else {
 		reader = data
 	}
-
 	// 上传到OSS
-	return bucket.PutObject(objectName, reader)
+	if err := bucket.PutObject(objectName, reader); err != nil {
+		log.Println(bucketID, objectName, "上传出错", err)
+		return err
+	}
+	return nil
 }
 
 // downloadAttachment 下载,如果检测到数据为gzip压缩,则自动解压后返回
 func Download(w io.Writer, autoExtract int, bucketID, objectName string) error {
-	atomic.AddInt64(&util.DownLoadCounter, 1)
-	DownLoadPool <- true
+	atomic.AddInt64(&util.DownloadCounter, 1)
+	DownloadPool <- true
 	defer func() {
-		<-DownLoadPool
-		atomic.AddInt64(&util.DownLoadCounter, -1)
+		<-DownloadPool
+		atomic.AddInt64(&util.DownloadCounter, -1)
 	}()
 	bucket, err := GetCachedBucket(bucketID)
 	if err != nil {
 		return err
 	}
-
 	result, err := bucket.DoGetObject(&ossSDK.GetObjectRequest{objectName}, nil)
 	if err != nil {
+		log.Println(bucketID, objectName, "下载出错", err)
 		return err
 	}
 	defer result.Response.Body.Close()
@@ -169,25 +190,31 @@ func Download(w io.Writer, autoExtract int, bucketID, objectName string) error {
 		rw.Header().Set("Content-Type", result.Response.Headers.Get("Content-Type")) // 根据文件类型调整此行
 		rw.Header().Set("Content-Disposition", "attachment; filename="+objectName)
 	}
+	buf := bufferPool.Get().(*[]byte)
+	defer bufferPool.Put(buf)
 	if autoExtract == 1 {
 		data, err := ioutil.ReadAll(result.Response.Body)
 		if err != nil {
+			log.Println(bucketID, objectName, "下载后ioutil.ReadAll出错", err)
 			return err
 		}
 		// 判断是否为gzip压缩格式(判断前两个字节)
 		if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
 			gzipReader, err := gzip.NewReader(bytes.NewReader(data))
 			if err != nil {
+				log.Println(bucketID, objectName, "下载后gzip.NewReader出错", err)
 				return err
 			}
 			defer gzipReader.Close()
-			if _, err = io.Copy(w, gzipReader); err != nil {
+			if _, err = io.CopyBuffer(w, gzipReader, *buf); err != nil {
+				log.Println(bucketID, objectName, "下载解压后io.Copy出错", err)
 				return err
 			}
 			return nil
 		}
 	} else {
-		if _, err = io.Copy(w, result.Response.Body); err != nil {
+		if _, err = io.CopyBuffer(w, result.Response.Body, *buf); err != nil {
+			log.Println(bucketID, objectName, "下载后io.Copy出错", err)
 			return err
 		}
 	}
@@ -200,8 +227,10 @@ func Delete(bucketID, objectName string) error {
 	if err != nil {
 		return err
 	}
-
-	return bucket.DeleteObject(objectName)
+	if err := bucket.DeleteObject(objectName); err != nil {
+		log.Println(bucketID, objectName, "删除出错", err)
+	}
+	return err
 }
 
 // LoadOSSAccounts 加载OSS帐号信息到缓存
@@ -212,7 +241,7 @@ func LoadOSSAccounts() {
 	for _, acc := range oas {
 		accountMap.Store(acc.ID, *acc)
 	}
-	//bucketPool.Clear()
+	bucketPool.Clear()
 }
 
 // 获取标讯正文,优先从oss中取,再从es中取

+ 3 - 3
oss/rpc.go

@@ -30,17 +30,17 @@ type OSSService struct {
  */
 func (s *OSSService) Upload(args *entity.UploadArgs, reply *api.Result) error {
 	if err := checkArgs(&entity.Args{BucketID: args.BucketID, ObjectName: args.ObjectName}); err != nil {
-		log.Println("rpc方式", fmt.Sprintf(constant.UploadUrl, err))
+		log.Println("rpc方式", fmt.Sprintf(constant.UploadFail, err))
 		return err
 	} else if args.Stream == nil || len(args.Stream) == 0 {
 		err := errors.New(api.Error_msg_1002 + "Stream")
-		log.Println("rpc方式", fmt.Sprintf(constant.UploadUrl, err))
+		log.Println("rpc方式", fmt.Sprintf(constant.UploadFail, err))
 		return err
 	}
 	log.Println("rpc方式上传文件", args.BucketID, args.ObjectName, args.Gzip, len(args.Stream))
 	err := Upload(args.BucketID, args.ObjectName, ioutil.NopCloser(bytes.NewReader(args.Stream)), args.Gzip)
 	if err != nil {
-		log.Println(args.BucketID, args.ObjectName, "rpc方式", fmt.Sprintf(constant.UploadUrl, err))
+		log.Println(args.BucketID, args.ObjectName, "rpc方式", fmt.Sprintf(constant.UploadFail, err))
 		return err
 	}
 	reply.Error_msg = constant.UploadSuccess

+ 30 - 8
util/warn.go

@@ -11,8 +11,11 @@ import (
 	"time"
 )
 
-var DownLoadCounter int64
-var GetDetailCounter int64
+var (
+	UploadCounter    int64
+	DownloadCounter  int64
+	GetDetailCounter int64
+)
 
 const OssService = "ossService"
 
@@ -73,10 +76,27 @@ func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) {
 	return nodes, nil
 }
 
-func CheckDownLoadQueue(ctx context.Context, prevWarn *int64) {
-	warnSize := g.Config().MustGet(ctx, "downLoadLineUpWarnSize").Int64()
-	if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
-		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downLoadQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), warnSize)
+func CheckUploadQueue(ctx context.Context, prevWarn *int64) {
+	warnSize := g.Config().MustGet(ctx, "uploadLineUpWarnSize").Int64()
+	counter := atomic.LoadInt64(&UploadCounter)
+	log.Println("当前上传并发数", counter)
+	if counter >= warnSize {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "uploadQueueWarn").String(), g.Config().MustGet(ctx, "uploadPoolSize").Int(), warnSize)
+		log.Println(alertMsg)
+		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
+			*prevWarn = nowUnix
+			SendWeixinNotification(alertMsg)
+			SendEmailNotification(alertMsg)
+		}
+	}
+}
+
+func CheckDownloadQueue(ctx context.Context, prevWarn *int64) {
+	warnSize := g.Config().MustGet(ctx, "downloadLineUpWarnSize").Int64()
+	counter := atomic.LoadInt64(&DownloadCounter)
+	log.Println("当前下载并发数", counter)
+	if counter >= warnSize {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downloadQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), warnSize)
 		log.Println(alertMsg)
 		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
 			*prevWarn = nowUnix
@@ -88,8 +108,10 @@ func CheckDownLoadQueue(ctx context.Context, prevWarn *int64) {
 
 func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) {
 	warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64()
-	if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
-		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
+	counter := atomic.LoadInt64(&DownloadCounter)
+	log.Println("当前获取正文并发数", counter)
+	if counter >= warnSize {
+		alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
 		log.Println(alertMsg)
 		if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
 			*prevWarn = nowUnix