udptask.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package udp
  2. import (
  3. "data_ai/extract"
  4. "fmt"
  5. log "github.com/donnie4w/go-logger/logger"
  6. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "time"
  8. )
  9. // 监听-获取-分发任务
  10. func getRepeatTask() {
  11. for {
  12. if len(taskList) > 0 && !isGetask {
  13. getasklock.Lock()
  14. isGetask = true
  15. len_list := len(taskList)
  16. if len_list > 1 {
  17. first_id := qu.ObjToString(taskList[0]["sid"])
  18. end_id := qu.ObjToString(taskList[len_list-1]["eid"])
  19. if first_id != "" && end_id != "" {
  20. taskList = taskList[len_list:]
  21. log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
  22. extract.ExtractFieldInfo(first_id, end_id, "bidding")
  23. log.Debug("AI识别数据完成...发送下节点udp...")
  24. sendNextNode(first_id, end_id)
  25. } else {
  26. log.Debug("合并段落~错误~正常取段落~~~")
  27. mapInfo := taskList[0]
  28. if mapInfo != nil {
  29. taskList = taskList[1:]
  30. log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  31. sid := qu.ObjToString(mapInfo["sid"])
  32. eid := qu.ObjToString(mapInfo["eid"])
  33. extract.ExtractFieldInfo(sid, eid, "bidding")
  34. log.Debug("AI识别数据完成...发送下节点udp...")
  35. sendNextNode(sid, eid)
  36. } else {
  37. sendErrMailApi("AI识别接收段落错误", "获取任务段落异常...跳过段落...")
  38. isGetask = false
  39. }
  40. }
  41. } else {
  42. mapInfo := taskList[0]
  43. if mapInfo != nil {
  44. taskList = taskList[1:]
  45. log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  46. sid := qu.ObjToString(mapInfo["sid"])
  47. eid := qu.ObjToString(mapInfo["eid"])
  48. extract.ExtractFieldInfo(sid, eid, "bidding")
  49. log.Debug("AI识别数据完成...发送下节点udp...")
  50. sendNextNode(sid, eid)
  51. } else {
  52. sendErrMailApi("AI识别获取段落错误", "获取任务段落异常...跳过段落...")
  53. isGetask = false
  54. }
  55. }
  56. getasklock.Unlock()
  57. } else {
  58. time.Sleep(10 * time.Second)
  59. }
  60. }
  61. }
  62. // 监控~上节点~长时间未响应
  63. func lastUdpMonitoring() {
  64. for {
  65. responselock.Lock()
  66. if !isGetask && time.Now().Unix()-lastNodeResponse >= 1800 {
  67. sendErrMailApi("AI识别程序~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
  68. lastNodeResponse = time.Now().Unix() //重置时间
  69. }
  70. responselock.Unlock()
  71. time.Sleep(600 * time.Second)
  72. }
  73. }
  74. // 监控~下节点
  75. func nextUdpMonitoring() {
  76. for {
  77. udptaskmap.Range(func(k, v interface{}) bool {
  78. now := time.Now().Unix()
  79. node, _ := v.(*udpNode)
  80. if now-node.timestamp > 120 {
  81. udptaskmap.Delete(k)
  82. sendErrMailApi("AI识别程序~下节点未响应~警告", fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
  83. }
  84. return true
  85. })
  86. time.Sleep(10 * time.Second)
  87. }
  88. }