udprocess.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package udp
  2. import (
  3. "data_ai/ul"
  4. "encoding/json"
  5. log "github.com/donnie4w/go-logger/logger"
  6. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  8. "net"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. nextNode []map[string]interface{}
  14. udpclient mu.UdpClient
  15. udplock, nextlock sync.Mutex
  16. responselock, getasklock sync.Mutex
  17. lastNodeResponse int64
  18. taskList []map[string]interface{}
  19. isGetask, isAction, isStop bool
  20. )
  21. func InitProcessVar() {
  22. //初始化···
  23. isGetask = false
  24. isStop = false
  25. nextNode = qu.ObjArrToMapArr(ul.SysConfig["nextNode"].([]interface{}))
  26. lastNodeResponse = time.Now().Unix()
  27. taskList = []map[string]interface{}{}
  28. //执行监控
  29. go lastUdpMonitoring()
  30. go nextUdpMonitoring()
  31. go getDataAiTask()
  32. //监听···
  33. updport := ul.SysConfig["udpport"].(string)
  34. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  35. udpclient.Listen(ProcessUdpMsg)
  36. log.Debug("Udp服务监听", updport)
  37. }
  38. // udp接收
  39. func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  40. switch act {
  41. case mu.OP_TYPE_DATA:
  42. var mapInfo map[string]interface{}
  43. err := json.Unmarshal(data, &mapInfo)
  44. if err != nil {
  45. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  46. } else if mapInfo != nil {
  47. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  48. stype := qu.ObjToString(mapInfo["stype"])
  49. if stype == "monitor" {
  50. log.Debug("收到监测......")
  51. key := qu.ObjToString(mapInfo["key"])
  52. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  53. return
  54. }
  55. if stype == "stop" {
  56. log.Debug("收到停止信号...")
  57. isStop = true
  58. }
  59. if stype == "start" {
  60. log.Debug("收到开始信号...")
  61. isStop = false
  62. }
  63. if sid == "" || eid == "" {
  64. log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  65. } else {
  66. lastNodeResponse = time.Now().Unix()
  67. key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
  68. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  69. udplock.Lock()
  70. if isStop { //停止信号...
  71. for {
  72. if len(taskList) == 0 {
  73. break
  74. }
  75. log.Debug("检测到停止信号...等待任务结束...")
  76. time.Sleep(time.Second * 30)
  77. }
  78. log.Debug("检测到停止信号...通知下节点...")
  79. sendNextNode(sid, eid)
  80. } else { //插入任务...
  81. taskList = append(taskList, map[string]interface{}{
  82. "sid": sid,
  83. "eid": eid,
  84. })
  85. log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", sid, "~", eid)
  86. }
  87. udplock.Unlock()
  88. }
  89. }
  90. case mu.OP_NOOP: //下个节点回应
  91. nextlock.Lock()
  92. str := string(data)
  93. udptaskmap.Delete(str)
  94. log.Debug("其他节点回应:", str)
  95. nextlock.Unlock()
  96. }
  97. }
  98. // 下节点发送
  99. func sendNextNode(sid string, eid string) {
  100. //更新记录状态
  101. updateProcessUdpIdsInfo(sid, eid)
  102. for _, node := range nextNode {
  103. key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
  104. by, _ := json.Marshal(map[string]interface{}{
  105. "gtid": sid,
  106. "lteid": eid,
  107. "stype": qu.ObjToString(node["stype"]),
  108. "key": key,
  109. })
  110. addr := &net.UDPAddr{
  111. IP: net.ParseIP(node["addr"].(string)),
  112. Port: qu.IntAll(node["port"]),
  113. }
  114. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  115. new_node := &udpNode{by, addr, time.Now().Unix()}
  116. udptaskmap.Store(key, new_node) //监控···节点
  117. }
  118. log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
  119. isGetask = false //此段落彻底完毕~继续获取任务
  120. }
  121. // 更新流程记录id段落
  122. func updateProcessUdpIdsInfo(sid string, eid string) {
  123. query := map[string]interface{}{
  124. "gtid": map[string]interface{}{
  125. "$gte": sid,
  126. },
  127. "lteid": map[string]interface{}{
  128. "$lte": eid,
  129. },
  130. }
  131. task_coll := "bidding_processing_ids"
  132. datas, _ := ul.BidMgo.Find(task_coll, query, nil, nil)
  133. if len(datas) > 0 {
  134. log.Debug("开始更新流程段落记录~~", len(datas), "段")
  135. for _, v := range datas {
  136. up_id := ul.BsonTOStringId(v["_id"])
  137. if up_id != "" {
  138. update := map[string]interface{}{
  139. "$set": map[string]interface{}{
  140. "dataprocess": 4,
  141. "updatetime": time.Now().Unix(),
  142. },
  143. }
  144. ul.BidMgo.UpdateById(task_coll, up_id, update)
  145. log.Debug("流程段落记录~~更新完毕~", update)
  146. }
  147. }
  148. } else {
  149. log.Debug("未查询到记录id段落~", query)
  150. }
  151. }