warn.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package util
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. . "jygit.jydev.jianyu360.cn/BaseService/ossService/config"
  8. "log"
  9. "sync/atomic"
  10. "time"
  11. )
  12. var (
  13. UploadCounter int64
  14. DownloadCounter int64
  15. GetDetailCounter int64
  16. )
  17. const OssService = "ossService"
  18. // NodeInfo 描述节点心跳信息
  19. type NodeInfo struct {
  20. NodeName string `json:"node_name"`
  21. Timestamp int64 `json:"timestamp"`
  22. }
  23. // SendHeartbeat 将节点心跳信息写入redis,过期时间为10秒
  24. func SendHeartbeat(ctx context.Context) {
  25. nodeName := g.Config().MustGet(ctx, "node.node_name").String()
  26. node := NodeInfo{
  27. NodeName: nodeName,
  28. Timestamp: time.Now().Unix(),
  29. }
  30. data, err := json.Marshal(node)
  31. if err != nil {
  32. log.Println("Heartbeat marshal error", err)
  33. return
  34. }
  35. Rdb.Set(ctx, OssService+":"+nodeName, data, 10*time.Second)
  36. }
  37. // CheckOnlineNodes 检查在线节点数
  38. func CheckOnlineNodes(ctx context.Context, prevWarn *int64) {
  39. nodes, _ := GetOnlineNodes(ctx)
  40. warnMaxNodeNum := g.Config().MustGet(ctx, "warnMaxNodeNum").Int()
  41. if len(nodes) < warnMaxNodeNum {
  42. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "onlineNodesWarn").String(), warnMaxNodeNum, len(nodes))
  43. log.Println(alertMsg)
  44. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  45. *prevWarn = nowUnix
  46. SendWeixinNotification(alertMsg)
  47. SendEmailNotification(alertMsg)
  48. }
  49. }
  50. }
  51. func GetOnlineNodes(ctx context.Context) ([]NodeInfo, error) {
  52. keys, err := Rdb.Keys(ctx, OssService+":*").Result()
  53. var nodes []NodeInfo
  54. if err != nil {
  55. log.Println("GetOnlineNodes keys error", err)
  56. return nodes, err
  57. } else {
  58. for _, key := range keys {
  59. data, err := Rdb.Get(ctx, key).Result()
  60. if err != nil {
  61. log.Println("GetOnlineNodes key error", err)
  62. continue
  63. }
  64. var node NodeInfo
  65. json.Unmarshal([]byte(data), &node)
  66. nodes = append(nodes, node)
  67. }
  68. }
  69. return nodes, nil
  70. }
  71. func CheckUploadQueue(ctx context.Context, prevWarn *int64) {
  72. warnSize := g.Config().MustGet(ctx, "uploadLineUpWarnSize").Int64()
  73. counter := atomic.LoadInt64(&UploadCounter)
  74. log.Println("当前上传并发数", counter)
  75. if counter >= warnSize {
  76. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "uploadQueueWarn").String(), g.Config().MustGet(ctx, "uploadPoolSize").Int(), warnSize)
  77. log.Println(alertMsg)
  78. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  79. *prevWarn = nowUnix
  80. SendWeixinNotification(alertMsg)
  81. SendEmailNotification(alertMsg)
  82. }
  83. }
  84. }
  85. func CheckDownloadQueue(ctx context.Context, prevWarn *int64) {
  86. warnSize := g.Config().MustGet(ctx, "downloadLineUpWarnSize").Int64()
  87. counter := atomic.LoadInt64(&DownloadCounter)
  88. log.Println("当前下载并发数", counter)
  89. if counter >= warnSize {
  90. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "downloadQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), warnSize)
  91. log.Println(alertMsg)
  92. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  93. *prevWarn = nowUnix
  94. SendWeixinNotification(alertMsg)
  95. SendEmailNotification(alertMsg)
  96. }
  97. }
  98. }
  99. func CheckGetDetailQueue(ctx context.Context, prevWarn *int64) {
  100. warnSize := g.Config().MustGet(ctx, "getDetailLineUpWarnSize").Int64()
  101. counter := atomic.LoadInt64(&DownloadCounter)
  102. log.Println("当前获取正文并发数", counter)
  103. if counter >= warnSize {
  104. alertMsg := fmt.Sprintf(g.Config().MustGet(ctx, "getDetailQueueWarn").String(), g.Config().MustGet(ctx, "downloadPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromEsPoolSize").Int(), g.Config().MustGet(ctx, "getDetailFromMgoPoolSize").Int(), warnSize)
  105. log.Println(alertMsg)
  106. if nowUnix := time.Now().Unix(); nowUnix-*prevWarn > g.Config().MustGet(ctx, "warnInterval").Int64() {
  107. *prevWarn = nowUnix
  108. SendWeixinNotification(alertMsg)
  109. SendEmailNotification(alertMsg)
  110. }
  111. }
  112. }