Răsfoiți Sursa

wip:cron 定时任务提交

wangkaiyue 2 ani în urmă
părinte
comite
8740c2c554
1 a modificat fișierele cu 53 adăugiri și 13 ștergeri
  1. 53 13
      main.go

+ 53 - 13
main.go

@@ -2,31 +2,71 @@ package main
 
 import (
 	"context"
-	"fmt"
 	_ "github.com/gogf/gf/contrib/nosql/redis/v2"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gctx"
-	"log"
+	"github.com/gogf/gf/v2/util/gconv"
 )
 
-func main() {
-	ctx := gctx.New()
-	e, err := gcron.New().Add(ctx, g.Cfg().MustGet(ctx, "cron").String(), job)
-	if err != nil {
-		panic(err)
-	}
-	e.Start()
-	select {}
+type ResponseStruct struct {
+	Nodes map[string]ThreadPool `json:"nodes"`
+}
+
+type ThreadPool struct {
+	ThreadPool struct {
+		Search struct {
+			Threads   int `json:"threads"`
+			Queue     int `json:"queue"`
+			Active    int `json:"active"`
+			Rejected  int `json:"rejected"`
+			Largest   int `json:"largest"`
+			Completed int `json:"completed"`
+		} `json:"search"`
+	} `json:"thread_pool"`
 }
 
 func job(ctx context.Context) {
-	log.Println("do curl")
 	r, err := g.Client().Get(ctx, g.Cfg().MustGet(ctx, "curlAddr").String())
 	if err != nil {
-		g.Log().Errorf(ctx, "请求异常", err)
+		g.Log().Errorf(ctx, "请求异常 err:%v", err)
+		return
 	}
 	defer r.Close()
-	fmt.Println(r.ReadAllString())
 
+	rs := &ResponseStruct{}
+	if err := gconv.Struct(r.ReadAllString(), rs); err != nil || rs == nil || len(rs.Nodes) == 0 {
+		g.Log().Errorf(ctx, "请求结果异常 err:%v", err)
+		return
+	}
+	maxQueue, maxActive, finalStatus := 0, 0, 2
+
+	for _, val := range rs.Nodes {
+		if val.ThreadPool.Search.Queue > maxQueue {
+			maxQueue = val.ThreadPool.Search.Queue
+		}
+		if val.ThreadPool.Search.Active > maxActive {
+			maxActive = val.ThreadPool.Search.Active
+		}
+	}
+
+	if maxQueue == 0 && maxActive < 0 {
+		finalStatus = 0
+	} else if maxQueue < 6 && maxActive < 10 {
+		finalStatus = 1
+	}
+
+	g.Log().Debugf(ctx, "now maxQueue:%d maxActive:%d finalStatus:%d", maxQueue, maxActive, finalStatus)
+
+	_, _ = g.Redis().Set(ctx, "es_status", finalStatus)
+}
+
+func main() {
+	ctx := gctx.New()
+	e, err := gcron.New().Add(ctx, g.Cfg().MustGet(ctx, "cron").String(), job)
+	if err != nil {
+		panic(err)
+	}
+	e.Start()
+	select {}
 }