warn.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package util
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. . "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  8. "log"
  9. "sync/atomic"
  10. "time"
  11. )
  12. var DownLoadCounter int64
  13. var GetDetailCounter int64
  14. const OssService = "ossService"
  15. // NodeInfo 描述节点心跳信息
  16. type NodeInfo struct {
  17. NodeName string `json:"node_name"`
  18. Timestamp int64 `json:"timestamp"`
  19. }
  20. // SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒
  21. func SendHeartbeat(ctx context.Context) {
  22. nodeName := g.Config().MustGet(ctx, "node.node_name").String()
  23. node := NodeInfo{
  24. NodeName: nodeName,
  25. Timestamp: time.Now().Unix(),
  26. }
  27. data, err := json.Marshal(node)
  28. if err != nil {
  29. log.Println("Heartbeat marshal error", err)
  30. return
  31. }
  32. Rdb.Set(ctx, OssService+":"+nodeName, data, 10*time.Second)
  33. }
  34. // CheckOnlineNodes 检查在线节点数
  35. func CheckOnlineNodes(ctx context.Context, prevWarn *int64) {
  36. nodes, _ := GetOnlineNodes(ctx)
  37. warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int()
  38. if len(nodes) < warnMaxNodeNum {
  39. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "onlineNodesWarn").String(), warnMaxNodeNum, len(nodes))
  40. log.Println(alertMsg)
  41. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  42. *prevWarn = nowUnix
  43. SendWeixinNotification(alertMsg)
  44. SendEmailNotification(alertMsg)
  45. }
  46. }
  47. }
  48. func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) {
  49. keys, err := Rdb.Keys(ctx, OssService+":*").Result()
  50. var nodes []NodeInfo
  51. if err != nil {
  52. log.Println("GetOnlineNodes keys error", err)
  53. return nodes, err
  54. } else {
  55. for _, key := range keys {
  56. data, err := Rdb.Get(ctx, key).Result()
  57. if err != nil {
  58. log.Println("GetOnlineNodes key error", err)
  59. continue
  60. }
  61. var node NodeInfo
  62. json.Unmarshal([]byte(data), &node)
  63. nodes = append(nodes, node)
  64. }
  65. }
  66. return nodes, nil
  67. }
  68. func CheckDownLoadQueue(ctx context.Context, prevWarn *int64) {
  69. warnSize := g.Config().MustGet(ctx, "downLoadLineUpWarnSize").Int64()
  70. if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
  71. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downLoadQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), warnSize)
  72. log.Println(alertMsg)
  73. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  74. *prevWarn = nowUnix
  75. SendWeixinNotification(alertMsg)
  76. SendEmailNotification(alertMsg)
  77. }
  78. }
  79. }
  80. func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) {
  81. warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64()
  82. if atomic.LoadInt64(&DownLoadCounter) >= warnSize {
  83. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downLoadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
  84. log.Println(alertMsg)
  85. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  86. *prevWarn = nowUnix
  87. SendWeixinNotification(alertMsg)
  88. SendEmailNotification(alertMsg)
  89. }
  90. }
  91. }