middle.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "esproxy/internal/consts"
  6. "fmt"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/net/ghttp"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. "io"
  11. "time"
  12. )
  13. // Middleware 分析查询记录日志中间件
  14. // 获取请求时间、查询和响应
  15. // 记录查询到数据库、区分查询类型:简单查询、聚合查询、复杂查询
  16. // 根据es当前状态,使用4个通道,进行调度
  17. func Middleware(r *ghttp.Request) {
  18. now := time.Now()
  19. bodyBytes, err := io.ReadAll(r.Request.Body)
  20. if len(bodyBytes) > 0 && err == nil {
  21. finalBytes := bodyBytes
  22. r.ContentLength = gconv.Int64(len(finalBytes))
  23. r.Request.Header.Set("Content-Length", fmt.Sprintf("%d", len(finalBytes)))
  24. r.Request.Body = io.NopCloser(bytes.NewReader(finalBytes))
  25. }
  26. queryLevel := getQueryLevel(bodyBytes)
  27. r.SetCtxVar(consts.QueryLevelKey, queryLevel)
  28. r.Middleware.Next()
  29. if r.RequestURI != "/" { //打印非建立链接的日志
  30. g.Log().Infof(r.Context(), "status:%d time:%fs req:%s waitPool:%fs level:%d from:%s query:%s", r.Response.Status, time.Since(now).Seconds(), r.RequestURI, r.GetCtxVar(consts.QueryWaitPoolTime).Float64(), queryLevel, r.GetClientIp(), string(bodyBytes))
  31. }
  32. }
  33. var (
  34. aggsFlag = []byte("aggs")
  35. )
  36. // getQueryLevel 获取sql查询复杂度
  37. func getQueryLevel(detail []byte) consts.QueryLevel {
  38. if isAggs := bytes.Contains(detail, aggsFlag); !isAggs {
  39. return consts.QueryLevelSimple
  40. }
  41. if queryLen := len(string(detail)); queryLen < g.Cfg().MustGet(context.Background(), "elasticSearch.complexQueryLen", 200).Int() {
  42. return consts.QueryLevelAggs
  43. }
  44. return consts.QueryLevelComplex
  45. }