소스 검색

wip:查询sql复杂度提交

wangkaiyue 2 년 전
부모
커밋
d564b8d2d2
6개의 변경된 파일52개의 추가작업 그리고 9개의 파일을 삭제
  1. 14 2
      config.yaml
  2. 2 1
      internal/cmd/cmd.go
  3. 10 0
      internal/consts/consts.go
  4. 4 4
      internal/service/esQuery.go
  5. 2 1
      internal/service/manager.go
  6. 20 1
      internal/service/middle.go

+ 14 - 2
config.yaml

@@ -11,6 +11,8 @@ elasticSearch:
   keepAlive: 60
   maxIdleConns: 30
 
+  complexQueryLen: 200 #查询条件复杂长度
+
   reverseProxy:
     timeout: 15
     keepAlive: 60
@@ -33,5 +35,15 @@ redis:
     address: 192.168.3.11:1712
 
 logger:
-  level: "all"
-  stdout: true
+  path:                  "logs/"               # 日志文件路径。默认为空,表示关闭,仅输出到终端
+  file:                  "{Y-m-d}.log"         # 日志文件格式。默认为"{Y-m-d}.log"
+  level:                 "all"                 # 日志输出级别
+  stdout:                true                  # 日志是否同时输出到终端。默认true
+  rotateExpire:          3                     # 按照日志文件时间间隔对文件滚动切分。默认为0,表示关闭滚动切分特性
+  rotateBackupLimit:     5                     # 按照切分的文件数量清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
+  rotateBackupExpire:    5                     # 按照切分的文件有效期清理切分文件,当滚动切分特性开启时有效。默认为0,表示不备份,切分则删除
+  rotateBackupCompress:  0                     # 滚动切分文件的压缩比(0-9)。默认为0,表示不压缩
+  rotateCheckInterval:   "1h"                  # 滚动切分的时间检测间隔,一般不需要设置。默认为1小时
+  stdoutColorDisabled:   false                 # 关闭终端的颜色打印。默认开启
+  writerColorEnable:     false                 # 日志文件是否带上颜色。默认false,表示不带颜色
+

+ 2 - 1
internal/cmd/cmd.go

@@ -2,6 +2,7 @@ package cmd
 
 import (
 	"context"
+	"esproxy/internal/consts"
 	"esproxy/internal/service"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/net/ghttp"
@@ -17,7 +18,7 @@ var (
 			s := g.Server()
 			s.Use(service.Middleware)
 			s.BindHandler("/*", func(r *ghttp.Request) {
-				rp, err := service.EsProxyManager.GetProxy(r.Context())
+				rp, err := service.EsProxyManager.GetProxy(r.Context(), r.GetCtxVar(consts.QueryLevelKey).Int())
 				if err != nil {
 					r.Response.Status = 500
 					g.Log().Errorf(r.Context(), "GetProxy Error:%v", err)

+ 10 - 0
internal/consts/consts.go

@@ -6,4 +6,14 @@ const (
 	EsStatus_Free  EsStatus = +iota //空闲
 	EsStatus_Busy                   //忙碌
 	EsStatus_block                  //阻塞
+
+)
+
+type QueryLevel int
+
+const (
+	QueryLevelKey                = "queryLevel"
+	QueryLevelSimple  QueryLevel = 0 + iota //简单查询
+	QueryLevelAggs                          //聚合查询
+	QueryLevelComplex                       //复杂查询
 )

+ 4 - 4
internal/service/esQuery.go

@@ -52,7 +52,7 @@ func (*esStatusQuery) GetEsStatus(ctx context.Context) (esStatus consts.EsStatus
 		g.Log().Errorf(ctx, "请求结果异常 err:%v", err)
 		return
 	}
-	maxQueue, maxActive, finalStatus := 0, 0, esStatus
+	maxQueue, maxActive := 0, 0
 
 	for _, val := range rs.Nodes {
 		if val.ThreadPool.Search.Queue > maxQueue {
@@ -64,12 +64,12 @@ func (*esStatusQuery) GetEsStatus(ctx context.Context) (esStatus consts.EsStatus
 	}
 
 	if maxQueue == 0 && maxActive < 6 {
-		finalStatus = consts.EsStatus_Free
+		esStatus = consts.EsStatus_Free
 	} else if maxQueue < 6 && maxActive < 10 {
-		finalStatus = consts.EsStatus_Busy
+		esStatus = consts.EsStatus_Busy
 	}
 
-	g.Log().Debugf(ctx, "now maxQueue:%d maxActive:%d finalStatus:%d", maxQueue, maxActive, finalStatus)
+	g.Log().Debugf(ctx, "now maxQueue:%d maxActive:%d finalStatus:%d", maxQueue, maxActive, esStatus)
 	return
 }
 

+ 2 - 1
internal/service/manager.go

@@ -65,9 +65,10 @@ func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
 }
 
 // GetProxy 获取代理对象
-func (epm *esProxyManager) GetProxy(ctx context.Context) (*httputil.ReverseProxy, error) {
+func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
 	//epm.lock.Lock()
 	//defer epm.lock.Unlock()
+
 	c := <-epm.proxyPool
 	epm.proxyPool <- c
 	return c, nil

+ 20 - 1
internal/service/middle.go

@@ -2,6 +2,8 @@ package service
 
 import (
 	"bytes"
+	"context"
+	"esproxy/internal/consts"
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/net/ghttp"
@@ -23,6 +25,23 @@ func Middleware(r *ghttp.Request) {
 		r.Request.Header.Set("Content-Length", fmt.Sprintf("%d", len(finalBytes)))
 		r.Request.Body = io.NopCloser(bytes.NewReader(finalBytes))
 	}
+	queryLevel := getQueryLevel(bodyBytes)
+	r.SetCtxVar(consts.QueryLevelKey, queryLevel)
 	r.Middleware.Next()
-	g.Log().Infof(r.Context(), "status:%d  time:%fs  from:%s  query:%s", r.Response.Status, time.Since(now).Seconds(), r.GetClientIp(), string(bodyBytes))
+	g.Log().Infof(r.Context(), "status:%d  time:%fs level:%d from:%s  query:%s", r.Response.Status, time.Since(now).Seconds(), queryLevel, r.GetClientIp(), string(bodyBytes))
+}
+
+var (
+	aggsFlag = []byte("aggs")
+)
+
+// getQueryLevel 获取sql查询复杂度
+func getQueryLevel(detail []byte) consts.QueryLevel {
+	if isAggs := bytes.Contains(detail, aggsFlag); !isAggs {
+		return consts.QueryLevelSimple
+	}
+	if queryLen := len(string(detail)); queryLen < g.Cfg().MustGet(context.Background(), "elasticSearch.complexQueryLen", 200).Int() {
+		return consts.QueryLevelAggs
+	}
+	return consts.QueryLevelComplex
 }