package util import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" . "jygit.jydev.jianyu360.cn/BaseService/ossService/config" "log" "sync/atomic" "time" ) var DownLoadCounter int64 var GetDetailCounter int64 const OssService = "ossService" // NodeInfo 描述节点心跳信息 type NodeInfo struct { NodeName string `json:"node_name"` Timestamp int64 `json:"timestamp"` } // SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒 func SendHeartbeat(ctx context.Context) { nodeName := g.Config().MustGet(ctx, "node.node_name").String() node := NodeInfo{ NodeName: nodeName, Timestamp: time.Now().Unix(), } data, err := json.Marshal(node) if err != nil { log.Println("Heartbeat marshal error", err) return } Rdb.Set(ctx, OssService+":"+nodeName, data, 10*time.Second) } // CheckOnlineNodes 检查在线节点数 func CheckOnlineNodes(ctx context.Context, prevWarn *int64) { nodes, _ := GetOnlineNodes(ctx) warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int() if len(nodes) < warnMaxNodeNum { alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "onlineNodesWarn").String(), warnMaxNodeNum, len(nodes)) log.Println(alertMsg) if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() { *prevWarn = nowUnix SendWeixinNotification(alertMsg) SendEmailNotification(alertMsg) } } } func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) { keys, err := Rdb.Keys(ctx, OssService+":*").Result() var nodes []NodeInfo if err != nil { log.Println("GetOnlineNodes keys error", err) return nodes, err } else { for _, key := range keys { data, err := Rdb.Get(ctx, key).Result() if err != nil { log.Println("GetOnlineNodes key error", err) continue } var node NodeInfo json.Unmarshal([]byte(data), &node) nodes = append(nodes, node) } } return nodes, nil } func CheckDownLoadQueue(ctx context.Context, prevWarn *int64) { warnSize := g.Config().MustGet(ctx, "downLoadLineUpWarnSize").Int64() if atomic.LoadInt64(&DownLoadCounter) >= warnSize { alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downLoadQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), warnSize) log.Println(alertMsg) if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() { *prevWarn = nowUnix SendWeixinNotification(alertMsg) SendEmailNotification(alertMsg) } } } func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) { warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64() if atomic.LoadInt64(&DownLoadCounter) >= warnSize { alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize) log.Println(alertMsg) if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() { *prevWarn = nowUnix SendWeixinNotification(alertMsg) SendEmailNotification(alertMsg) } } }