esQuery.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package service
  2. import (
  3. "context"
  4. "esproxy/internal/consts"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gcron"
  7. "github.com/gogf/gf/v2/util/gconv"
  8. )
  9. var (
  10. EsStatusQuery *esStatusQuery = &esStatusQuery{}
  11. )
  12. type (
  13. esStatusQuery struct {
  14. }
  15. ThreadPool struct {
  16. ThreadPool struct {
  17. Search struct {
  18. Threads int `json:"threads"`
  19. Queue int `json:"queue"`
  20. Active int `json:"active"`
  21. Rejected int `json:"rejected"`
  22. Largest int `json:"largest"`
  23. Completed int `json:"completed"`
  24. } `json:"search"`
  25. } `json:"thread_pool"`
  26. }
  27. ResponseStruct struct {
  28. Nodes map[string]ThreadPool `json:"nodes"`
  29. }
  30. )
  31. // GetEsStatus 查询获取Es状态
  32. func (*esStatusQuery) GetEsStatus(ctx context.Context) (esStatus consts.EsStatus) {
  33. esStatus = consts.EsStatus_block
  34. client := g.Client()
  35. username, password := g.Cfg().MustGet(ctx, "elasticSearch.queryState.elasticsearch.username").String(), g.Cfg().MustGet(ctx, "elasticSearch.queryState.elasticsearch.password").String()
  36. if username != "" || password != "" {
  37. client.SetBasicAuth(username, password)
  38. }
  39. r, err := client.Get(ctx, g.Cfg().MustGet(ctx, "elasticSearch.queryState.curlAddr").String())
  40. if err != nil {
  41. g.Log().Errorf(ctx, "请求异常 err:%v", err)
  42. return
  43. }
  44. defer r.Close()
  45. rs := &ResponseStruct{}
  46. if err := gconv.Struct(r.ReadAllString(), rs); err != nil || rs == nil || len(rs.Nodes) == 0 {
  47. g.Log().Errorf(ctx, "请求结果异常 err:%v", err)
  48. return
  49. }
  50. maxQueue, maxActive, threshold := 0, 0, g.Cfg().MustGet(ctx, "elasticSearch.threshold", 13).Int()
  51. for _, val := range rs.Nodes {
  52. if val.ThreadPool.Search.Queue > maxQueue {
  53. maxQueue = val.ThreadPool.Search.Queue
  54. }
  55. if val.ThreadPool.Search.Active > maxActive {
  56. maxActive = val.ThreadPool.Search.Active
  57. }
  58. if val.ThreadPool.Search.Largest > threshold {
  59. threshold = val.ThreadPool.Search.Largest
  60. }
  61. }
  62. if maxQueue == 0 && maxActive < threshold/2 {
  63. esStatus = consts.EsStatus_Free
  64. } else if maxQueue < threshold/2 && maxActive < threshold {
  65. esStatus = consts.EsStatus_Busy
  66. }
  67. g.Log().Debugf(ctx, "now maxQueue:%d maxActive:%d finalStatus:%d", maxQueue, maxActive, esStatus)
  68. return
  69. }
  70. // GetEsStatusRunning es状态持续查询
  71. func (*esStatusQuery) GetEsStatusRunning(cron string, job func(ctx context.Context)) error {
  72. e, err := gcron.New().Add(context.Background(), cron, job)
  73. if err != nil {
  74. return err
  75. }
  76. e.Start()
  77. select {}
  78. }