123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package util
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- . "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
- "log"
- "sync/atomic"
- "time"
- )
- var (
- UploadCounter int64
- DownloadCounter int64
- GetDetailCounter int64
- )
- const OssService = "ossService"
- // NodeInfo 描述节点心跳信息
- type NodeInfo struct {
- NodeName string `json:"node_name"`
- Timestamp int64 `json:"timestamp"`
- }
- // SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒
- func SendHeartbeat(ctx context.Context) {
- nodeName := g.Config().MustGet(ctx, "node.node_name").String()
- node := NodeInfo{
- NodeName: nodeName,
- Timestamp: time.Now().Unix(),
- }
- data, err := json.Marshal(node)
- if err != nil {
- log.Println("Heartbeat marshal error", err)
- return
- }
- Rdb.Set(ctx, OssService+":"+nodeName, data, 10*time.Second)
- }
- // CheckOnlineNodes 检查在线节点数
- func CheckOnlineNodes(ctx context.Context, prevWarn *int64) {
- nodes, _ := GetOnlineNodes(ctx)
- warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int()
- if len(nodes) < warnMaxNodeNum {
- alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "onlineNodesWarn").String(), warnMaxNodeNum, len(nodes))
- log.Println(alertMsg)
- if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
- *prevWarn = nowUnix
- SendWeixinNotification(alertMsg)
- SendEmailNotification(alertMsg)
- }
- }
- }
- func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) {
- keys, err := Rdb.Keys(ctx, OssService+":*").Result()
- var nodes []NodeInfo
- if err != nil {
- log.Println("GetOnlineNodes keys error", err)
- return nodes, err
- } else {
- for _, key := range keys {
- data, err := Rdb.Get(ctx, key).Result()
- if err != nil {
- log.Println("GetOnlineNodes key error", err)
- continue
- }
- var node NodeInfo
- json.Unmarshal([]byte(data), &node)
- nodes = append(nodes, node)
- }
- }
- return nodes, nil
- }
- func CheckUploadQueue(ctx context.Context, prevWarn *int64) {
- warnSize := g.Config().MustGet(ctx, "uploadLineUpWarnSize").Int64()
- counter := atomic.LoadInt64(&UploadCounter)
- log.Println("当前上传并发数", counter)
- if counter >= warnSize {
- alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "uploadQueueWarn").String(), g.Config().MustGet(ctx, "uploadPoolSize").Int(), warnSize)
- log.Println(alertMsg)
- if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
- *prevWarn = nowUnix
- SendWeixinNotification(alertMsg)
- SendEmailNotification(alertMsg)
- }
- }
- }
- func CheckDownloadQueue(ctx context.Context, prevWarn *int64) {
- warnSize := g.Config().MustGet(ctx, "downloadLineUpWarnSize").Int64()
- counter := atomic.LoadInt64(&DownloadCounter)
- log.Println("当前下载并发数", counter)
- if counter >= warnSize {
- alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downloadQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), warnSize)
- log.Println(alertMsg)
- if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
- *prevWarn = nowUnix
- SendWeixinNotification(alertMsg)
- SendEmailNotification(alertMsg)
- }
- }
- }
- func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) {
- warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64()
- counter := atomic.LoadInt64(&DownloadCounter)
- log.Println("当前获取正文并发数", counter)
- if counter >= warnSize {
- alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
- log.Println(alertMsg)
- if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
- *prevWarn = nowUnix
- SendWeixinNotification(alertMsg)
- SendEmailNotification(alertMsg)
- }
- }
- }
|