task.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "mongodb"
  7. "net/http"
  8. "qfw/util"
  9. "strconv"
  10. "time"
  11. )
  12. var taskA = 0 // 增量统计跳过
  13. func TimeTask() {
  14. //StartTask()
  15. c := cron.New()
  16. cronstr := "0 0 * * * ?" // 每小时执行一次
  17. _ = c.AddFunc(cronstr, func() { StartTask() })
  18. c.Start()
  19. }
  20. func StartTask() {
  21. util.Debug("Start Task...")
  22. result := MgoCount()
  23. result = EsCheck(result)
  24. if result != "" {
  25. util.Debug(result)
  26. SendMail(result)
  27. }
  28. util.Debug("Task Over...")
  29. }
  30. // 统计mgo数据
  31. func MgoCount() string {
  32. defer util.Catch()
  33. sess := Mgo.GetMgoConn()
  34. defer Mgo.DestoryMongoConn(sess)
  35. result := ""
  36. // biddingback
  37. count := Mgo.Count("bidding_back", nil)
  38. util.Debug("bidding_back---", BiddingBackSize, count)
  39. if int64(count) != BiddingBackSize {
  40. result = fmt.Sprintf("bindding_back数据异常,统计结果%d,往常数据量%d", count, BiddingBackSize)
  41. result += "<br>"
  42. }
  43. // bidding
  44. st := time.Now().Unix() - 3600
  45. currentLastId := fmt.Sprintf("%x0000000000000000", st)
  46. // 对比内存中存量id的统计结果
  47. q1 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
  48. count1 := Mgo.Count("bidding", q1)
  49. util.Debug("bidding---", LastStockSize, count1, q1)
  50. if int64(count1) != LastStockSize {
  51. result = fmt.Sprintf("bindding数据存量统计异常,统计条件%v,统计结果%d,上次统计数据量%d", q1, count1, LastStockSize)
  52. result += "<br>"
  53. }else {
  54. // 无异常,替换成当前时间点(当前时间点往前一个小时的id)存量的统计结果
  55. LastStockId = currentLastId
  56. q2 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
  57. count2 := Mgo.Count("bidding", q2)
  58. LastStockSize = int64(count2)
  59. }
  60. // 增量
  61. et := time.Now().Unix()
  62. currentId := fmt.Sprintf("%x0000000000000000", et)
  63. q3 := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(currentLastId), "$lte": mongodb.StringTOBsonId(currentId)}}
  64. count3 := Mgo.Count("bidding", q3)
  65. util.Debug("bidding---", count3, q3)
  66. if int64(count3) == 0 {
  67. if taskA > 1 {
  68. result += fmt.Sprintf("bindding数据增量统计异常,统计条件%v,统计结果%d", q3, count3)
  69. result += "<br><br>"
  70. taskA = 0
  71. }else {
  72. taskA ++
  73. }
  74. }
  75. return result
  76. }
  77. func EsCheck(result string) string {
  78. client := Es.GetEsConn()
  79. defer Es.DestoryEsConn(client)
  80. resp, _ := client.ClusterHealth().Do()
  81. util.Debug(*resp)
  82. if resp.Status != "green" {
  83. result += "<br>" + "检索库异常,异常内容:" + "<br>" +
  84. "&nbsp;&nbsp;&nbsp;&nbsp;" + "cluster_name:" + resp.ClusterName + "<br>" +
  85. "&nbsp;&nbsp;&nbsp;&nbsp;" + "status:" + resp.Status + "<br>" +
  86. "&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
  87. "&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
  88. "&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
  89. "&nbsp;&nbsp;&nbsp;&nbsp;" + "active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
  90. "&nbsp;&nbsp;&nbsp;&nbsp;" + "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
  91. "&nbsp;&nbsp;&nbsp;&nbsp;" + "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
  92. "&nbsp;&nbsp;&nbsp;&nbsp;" + "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
  93. }
  94. return result
  95. }
  96. func SendMail(report string) {
  97. _, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "异常报告!", report))
  98. }