task.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/cron"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "io/ioutil"
  9. "mongodb"
  10. "net/http"
  11. "qfw/util"
  12. "strconv"
  13. "time"
  14. )
  15. var taskA = 0 // 增量统计跳过
  16. var (
  17. WebUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a"
  18. WarningStr1 = "机器智能识别提醒,超过30分钟未有数据入库,bidding表数据已积累%d条。"
  19. WarningStr2 = "数据采集提醒,%d分钟未有数据入库。"
  20. skip, send = 0, 0
  21. )
  22. func TimeTask() {
  23. //StartTask()
  24. c := cron.New()
  25. //cronstr := "0 0 * * * ?" // 每小时执行一次
  26. cronstr := "0 */10 * * * ?" // 每10min执行一次
  27. //_ = c.AddFunc(cronstr, func() { StartTask() })
  28. _ = c.AddFunc(cronstr, func() {
  29. util.Debug("Start Task...")
  30. Attachment()
  31. })
  32. c.Start()
  33. }
  34. func StartTask() {
  35. util.Debug("Start Task...")
  36. result := MgoCount()
  37. result = EsCheck(result)
  38. if result != "" {
  39. util.Debug(result)
  40. }
  41. util.Debug("Task Over...")
  42. }
  43. // @Description 附件识别监控
  44. // @Author J 2022/6/9 5:58 PM
  45. func Attachment() {
  46. info, _ := Mgo.Find("ocr_goods_over", nil, map[string]interface{}{"_id": -1}, nil, true, -1, -1)
  47. lastT := util.Int64All((*info)[0]["import_time"])
  48. if time.Now().Unix()-lastT > 30*60 {
  49. lteid := util.ObjToString((*info)[0]["lteid"])
  50. if skip == 0 {
  51. skip += 1
  52. util.Debug("跳过第一次", (*info)[0]["_id"])
  53. } else {
  54. send++
  55. if send <= 3 {
  56. //SendMsg("当前最后一个id段数据累积时间已经超过30分钟, 最后一段数据入库时间为:" + util.FormatDateByInt64(&lastT, util.Date_Full_Layout))
  57. c := Mgo.Count("ocr_goods_over", map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId(lteid)}})
  58. SendMsg(fmt.Sprintf(WarningStr1, c))
  59. }
  60. }
  61. } else {
  62. if skip != 0 || send != 0 {
  63. skip = 0
  64. send = 0
  65. }
  66. }
  67. }
  68. // 统计mgo数据
  69. func MgoCount() string {
  70. defer util.Catch()
  71. sess := Mgo.GetMgoConn()
  72. defer Mgo.DestoryMongoConn(sess)
  73. result := ""
  74. // biddingback
  75. count := Mgo.Count("bidding_back", nil)
  76. util.Debug("bidding_back---", BiddingBackSize, count)
  77. if int64(count) != BiddingBackSize {
  78. result = fmt.Sprintf("bindding_back数据异常,统计结果%d,往常数据量%d", count, BiddingBackSize)
  79. result += "<br>"
  80. }
  81. // bidding
  82. st := time.Now().Unix() - 3600
  83. currentLastId := fmt.Sprintf("%x0000000000000000", st)
  84. // 对比内存中存量id的统计结果
  85. q1 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
  86. count1 := Mgo.Count("bidding", q1)
  87. util.Debug("bidding---", LastStockSize, count1, q1)
  88. if int64(count1) != LastStockSize {
  89. result = fmt.Sprintf("bindding数据存量统计异常,统计条件%v,统计结果%d,上次统计数据量%d", q1, count1, LastStockSize)
  90. result += "<br>"
  91. } else {
  92. // 无异常,替换成当前时间点(当前时间点往前一个小时的id)存量的统计结果
  93. LastStockId = currentLastId
  94. q2 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
  95. count2 := Mgo.Count("bidding", q2)
  96. LastStockSize = int64(count2)
  97. }
  98. // 增量
  99. et := time.Now().Unix()
  100. currentId := fmt.Sprintf("%x0000000000000000", et)
  101. q3 := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(currentLastId), "$lte": mongodb.StringTOBsonId(currentId)}}
  102. count3 := Mgo.Count("bidding", q3)
  103. util.Debug("bidding---", count3, q3)
  104. if int64(count3) == 0 {
  105. if taskA > 1 {
  106. result += fmt.Sprintf("bindding数据增量统计异常,统计条件%v,统计结果%d", q3, count3)
  107. result += "<br><br>"
  108. taskA = 0
  109. } else {
  110. taskA++
  111. }
  112. }
  113. return result
  114. }
  115. func EsCheck(result string) string {
  116. client := Es.GetEsConn()
  117. defer Es.DestoryEsConn(client)
  118. resp, _ := client.ClusterHealth().Do()
  119. util.Debug(*resp)
  120. if resp.Status != "green" {
  121. result += "<br>" + "检索库异常,异常内容:" + "<br>" +
  122. "cluster_name:" + resp.ClusterName + "<br>" +
  123. "status:" + resp.Status + "<br>" +
  124. "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
  125. "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
  126. "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
  127. "active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
  128. "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
  129. "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
  130. "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
  131. }
  132. return result
  133. }
  134. func SendMail(report string) {
  135. _, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "异常报告!", report))
  136. }
  137. func SendMsg(content string) {
  138. client := &http.Client{}
  139. data := map[string]interface{}{"msgtype": "text", "text": map[string]interface{}{
  140. "content": content, "mentioned_mobile_list": []string{"13373929153,15090279371,15639297172"},
  141. }}
  142. bytesData, _ := json.Marshal(data)
  143. req, _ := http.NewRequest("POST", WebUrl, bytes.NewReader(bytesData))
  144. resp, _ := client.Do(req)
  145. body, _ := ioutil.ReadAll(resp.Body)
  146. fmt.Println(string(body))
  147. }