Bladeren bron

wip:线程池增加限制

wangkaiyue 2 jaren geleden
bovenliggende
commit
5f5f775d48
5 gewijzigde bestanden met toevoegingen van 70 en 30 verwijderingen
  1. 17 16
      config.yaml
  2. 3 1
      internal/cmd/cmd.go
  3. 2 1
      internal/consts/consts.go
  4. 47 11
      internal/service/manager.go
  5. 1 1
      internal/service/middle.go

+ 17 - 16
config.yaml

@@ -7,10 +7,11 @@ server:
 elasticSearch:
   address:
     - "http://192.168.3.241:9205"
-  pool: 5
-  keepAlive: 60
-  maxIdleConns: 30
-
+  pool:
+    simple: 20
+    aggs: 10
+    complex: 10
+  waitTime: 5 #单位秒
   complexQueryLen: 200 #查询条件复杂长度
 
   reverseProxy:
@@ -23,7 +24,7 @@ elasticSearch:
     maxIdleConnsPerHost: 2
 
   queryState:
-    cron: "*/60 * * * * *"
+    cron: "*/10 * * * * *"
     curlAddr: "http://192.168.3.241:9205/_nodes/stats?pretty&human&filter_path=nodes.*.thread_pool.search"
 
     #elasticsearch: #查询es状态携带账户密码
@@ -35,15 +36,15 @@ redis:
     address: 192.168.3.11:1712
 
 logger:
-  path:                  "logs/"               # 日志文件路径。默认为空,表示关闭,仅输出到终端
-  file:                  "{Y-m-d}.log"         # 日志文件格式。默认为"{Y-m-d}.log"
-  level:                 "all"                 # 日志输出级别
-  stdout:                true                  # 日志是否同时输出到终端。默认true
-  rotateExpire:          3                     # 按照日志文件时间间隔对文件滚动切分。默认为0,表示关闭滚动切分特性
-  rotateBackupLimit:     5                     # 按照切分的文件数量清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
-  rotateBackupExpire:    5                     # 按照切分的文件有效期清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
-  rotateBackupCompress:  0                     # 滚动切分文件的压缩比(0-9)。默认为0,表示不压缩
-  rotateCheckInterval:   "1h"                  # 滚动切分的时间检测间隔,一般不需要设置。默认为1小时
-  stdoutColorDisabled:   false                 # 关闭终端的颜色打印。默认开启
-  writerColorEnable:     false                 # 日志文件是否带上颜色。默认false,表示不带颜色
+  path: "logs/"               # 日志文件路径。默认为空,表示关闭,仅输出到终端
+  file: "{Y-m-d}.log"         # 日志文件格式。默认为"{Y-m-d}.log"
+  level: "all"                 # 日志输出级别
+  stdout: true                  # 日志是否同时输出到终端。默认true
+  rotateExpire: 3                     # 按照日志文件时间间隔对文件滚动切分。默认为0,表示关闭滚动切分特性
+  rotateBackupLimit: 5                     # 按照切分的文件数量清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
+  rotateBackupExpire: 5                     # 按照切分的文件有效期清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
+  rotateBackupCompress: 0                     # 滚动切分文件的压缩比(0-9)。默认为0,表示不压缩
+  rotateCheckInterval: "1h"                  # 滚动切分的时间检测间隔,一般不需要设置。默认为1小时
+  stdoutColorDisabled: false                 # 关闭终端的颜色打印。默认开启
+  writerColorEnable: false                 # 日志文件是否带上颜色。默认false,表示不带颜色
 

+ 3 - 1
internal/cmd/cmd.go

@@ -18,12 +18,14 @@ var (
 			s := g.Server()
 			s.Use(service.Middleware)
 			s.BindHandler("/*", func(r *ghttp.Request) {
-				rp, err := service.EsProxyManager.GetProxy(r.Context(), r.GetCtxVar(consts.QueryLevelKey).Int())
+				queryLevel := r.GetCtxVar(consts.QueryLevelKey).Int()
+				rp, err := service.EsProxyManager.GetProxy(r.Context(), queryLevel)
 				if err != nil {
 					r.Response.Status = 500
 					g.Log().Errorf(r.Context(), "GetProxy Error:%v", err)
 					return
 				}
+				defer service.EsProxyManager.Release(r.Context(), queryLevel)
 				rp.ServeHTTP(r.Response.Writer, r.Request)
 			})
 			s.Run()

+ 2 - 1
internal/consts/consts.go

@@ -12,8 +12,9 @@ const (
 type QueryLevel int
 
 const (
-	QueryLevelKey                = "queryLevel"
 	QueryLevelSimple  QueryLevel = 0 + iota //简单查询
 	QueryLevelAggs                          //聚合查询
 	QueryLevelComplex                       //复杂查询
+
+	QueryLevelKey = "queryLevel"
 )

+ 47 - 11
internal/service/manager.go

@@ -3,12 +3,14 @@ package service
 import (
 	"context"
 	"esproxy/internal/consts"
+	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/glog"
 	"net/http/httputil"
 	"net/url"
 	"os"
 	"sync"
+	"time"
 )
 
 var (
@@ -16,9 +18,10 @@ var (
 )
 
 type esProxyManager struct {
-	lock      sync.Mutex
-	esStatus  consts.EsStatus
-	proxyPool chan *httputil.ReverseProxy
+	lock                                             sync.Mutex
+	esStatus                                         consts.EsStatus
+	proxyPool                                        chan *httputil.ReverseProxy
+	simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
 }
 
 // initManager 初始化代理Manager
@@ -39,9 +42,12 @@ func initManager(ctx context.Context) *esProxyManager {
 	}
 
 	emp := &esProxyManager{
-		lock:      sync.Mutex{},
-		esStatus:  consts.EsStatus_Free,
-		proxyPool: pool,
+		lock:             sync.Mutex{},
+		esStatus:         consts.EsStatus_Free,
+		proxyPool:        pool,
+		simpleQueryPool:  makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
+		aggsQueryPool:    makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
+		complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
 	}
 
 	go emp.UpdateEsStatus(ctx)
@@ -49,6 +55,14 @@ func initManager(ctx context.Context) *esProxyManager {
 	return emp
 }
 
+func makeEmptyChan(total int) chan struct{} {
+	c := make(chan struct{}, total)
+	for i := 0; i < total; i++ {
+		c <- struct{}{}
+	}
+	return c
+}
+
 // UpdateEsStatus 更新es状态
 func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
 	if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
@@ -66,10 +80,32 @@ func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
 
 // GetProxy 获取代理对象
 func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
-	//epm.lock.Lock()
-	//defer epm.lock.Unlock()
+	if queryLevel+int(epm.esStatus) > 2 {
+		return nil, fmt.Errorf("server is busy")
+	}
+	select {
+	case <-epm.getPool(queryLevel):
+		c := <-epm.proxyPool
+		epm.proxyPool <- c
+		return c, nil
+	case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
+		return nil, fmt.Errorf("wait time out")
+	}
+}
+
+// Release 归还链接池
+func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
+	epm.getPool(queryLevel) <- struct{}{}
+}
 
-	c := <-epm.proxyPool
-	epm.proxyPool <- c
-	return c, nil
+// 获取对应查询复杂度的链接池
+func (epm *esProxyManager) getPool(queryLevel int) chan struct{} {
+	switch queryLevel {
+	case int(consts.QueryLevelSimple):
+		return epm.simpleQueryPool
+	case int(consts.QueryLevelAggs):
+		return epm.aggsQueryPool
+	default:
+		return epm.complexQueryPool
+	}
 }

+ 1 - 1
internal/service/middle.go

@@ -28,7 +28,7 @@ func Middleware(r *ghttp.Request) {
 	queryLevel := getQueryLevel(bodyBytes)
 	r.SetCtxVar(consts.QueryLevelKey, queryLevel)
 	r.Middleware.Next()
-	g.Log().Infof(r.Context(), "status:%d  time:%fs level:%d from:%s  query:%s", r.Response.Status, time.Since(now).Seconds(), queryLevel, r.GetClientIp(), string(bodyBytes))
+	g.Log().Infof(r.Context(), "status:%d  time:%fs  level:%d  from:%s  query:%s", r.Response.Status, time.Since(now).Seconds(), queryLevel, r.GetClientIp(), string(bodyBytes))
 }
 
 var (