1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- package service
- import (
- "context"
- "esproxy/internal/consts"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gcron"
- "github.com/gogf/gf/v2/util/gconv"
- )
- var (
- EsStatusQuery *esStatusQuery = &esStatusQuery{}
- )
- type (
- esStatusQuery struct {
- }
- ThreadPool struct {
- ThreadPool struct {
- Search struct {
- Threads int `json:"threads"`
- Queue int `json:"queue"`
- Active int `json:"active"`
- Rejected int `json:"rejected"`
- Largest int `json:"largest"`
- Completed int `json:"completed"`
- } `json:"search"`
- } `json:"thread_pool"`
- }
- ResponseStruct struct {
- Nodes map[string]ThreadPool `json:"nodes"`
- }
- )
- // GetEsStatus 查询获取Es状态
- func (*esStatusQuery) GetEsStatus(ctx context.Context) (esStatus consts.EsStatus) {
- esStatus = consts.EsStatus_block
- client := g.Client()
- username, password := g.Cfg().MustGet(ctx, "elasticSearch.queryState.elasticsearch.username").String(), g.Cfg().MustGet(ctx, "elasticSearch.queryState.elasticsearch.password").String()
- if username != "" || password != "" {
- client.SetBasicAuth(username, password)
- }
- r, err := client.Get(ctx, g.Cfg().MustGet(ctx, "elasticSearch.queryState.curlAddr").String())
- if err != nil {
- g.Log().Errorf(ctx, "请求异常 err:%v", err)
- return
- }
- defer r.Close()
- rs := &ResponseStruct{}
- if err := gconv.Struct(r.ReadAllString(), rs); err != nil || rs == nil || len(rs.Nodes) == 0 {
- g.Log().Errorf(ctx, "请求结果异常 err:%v", err)
- return
- }
- maxQueue, maxActive, threshold := 0, 0, g.Cfg().MustGet(ctx, "elasticSearch.threshold", 13).Int()
- for _, val := range rs.Nodes {
- if val.ThreadPool.Search.Queue > maxQueue {
- maxQueue = val.ThreadPool.Search.Queue
- }
- if val.ThreadPool.Search.Active > maxActive {
- maxActive = val.ThreadPool.Search.Active
- }
- if val.ThreadPool.Search.Largest > threshold {
- threshold = val.ThreadPool.Search.Largest
- }
- }
- if maxQueue == 0 && maxActive < threshold/2 {
- esStatus = consts.EsStatus_Free
- } else if maxQueue < threshold/2 && maxActive < threshold {
- esStatus = consts.EsStatus_Busy
- }
- g.Log().Debugf(ctx, "now maxQueue:%d maxActive:%d finalStatus:%d", maxQueue, maxActive, esStatus)
- return
- }
- // GetEsStatusRunning es状态持续查询
- func (*esStatusQuery) GetEsStatusRunning(cron string, job func(ctx context.Context)) error {
- e, err := gcron.New().Add(context.Background(), cron, job)
- if err != nil {
- return err
- }
- e.Start()
- select {}
- }
|