updprocess.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. log "github.com/donnie4w/go-logger/logger"
  6. mu "mfw/util"
  7. "net"
  8. qu "qfw/util"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. nextNode []map[string]interface{}
  15. udpclient mu.UdpClient
  16. udplock sync.Mutex
  17. nextlock sync.Mutex
  18. extractAction map[string]map[string]interface{}
  19. heartAction map[string]interface{}
  20. isAction bool
  21. isGetask bool
  22. using_ext_node, standby_ext_node, invalid_ext_node []map[string]interface{}
  23. )
  24. //udp接收
  25. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  26. switch act {
  27. case mu.OP_TYPE_DATA:
  28. var mapInfo map[string]interface{}
  29. err := json.Unmarshal(data, &mapInfo)
  30. if err != nil {
  31. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  32. } else if mapInfo != nil {
  33. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  34. if sid == "" || eid == "" {
  35. log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  36. } else {
  37. lastNodeResponse = time.Now().Unix()
  38. key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
  39. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  40. //插入任务
  41. udplock.Lock()
  42. taskList = append(taskList, map[string]interface{}{
  43. "sid": sid,
  44. "eid": eid,
  45. })
  46. log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
  47. udplock.Unlock()
  48. }
  49. }
  50. case mu.OP_NOOP: //下个节点回应
  51. //抽取多节点
  52. nextlock.Lock()
  53. str := string(data)
  54. if isAction {
  55. if strings.Contains(str, "heart_extract") {
  56. dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
  57. } else {
  58. dealWithCallBackUdpData(str)
  59. }
  60. } else {
  61. log.Debug("其他节点回应:", str)
  62. udptaskmap.Delete(str)
  63. }
  64. nextlock.Unlock()
  65. }
  66. }
  67. //处理~新接收抽取段~
  68. func dealWithExtUdpData(sid, eid string) {
  69. //获取最新-抽取节点状态
  70. initExtractNode()
  71. log.Debug("处理当前段落~~~需拆分", len(using_ext_node), "组", sid, "~~", eid)
  72. if len(using_ext_node) > 0 {
  73. //拆分段落方法~并附加抽取状态标记~有效期等
  74. splitArr, lifeArr := splitIdMethod(sid, eid)
  75. log.Debug("最终分", len(splitArr), "段")
  76. extractAction = map[string]map[string]interface{}{}
  77. heartAction = map[string]interface{}{}
  78. for k, v := range using_ext_node {
  79. skey := fmt.Sprintf("%s:%d:%s", v["addr"], qu.IntAll(v["port"]), v["stype"])
  80. extractAction[skey] = map[string]interface{}{
  81. "life": lifeArr[k],
  82. "action": 0,
  83. "uid": BsonTOStringId(v["_id"]),
  84. }
  85. heartAction[skey] = 0
  86. }
  87. extractAction["extract_ids"] = map[string]interface{}{
  88. "sid": sid,
  89. "eid": eid,
  90. }
  91. sendRunExtractNode(splitArr) //通知抽取
  92. } else {
  93. log.Debug("无有效机器抽取...程序停止于此...")
  94. }
  95. }
  96. //处理回调udp~相关数据
  97. func dealWithCallBackUdpData(str string) {
  98. if extractAction[str] != nil {
  99. extractAction[str]["action"] = 1
  100. log.Debug("抽取节点回应:", str)
  101. f := validExtractFinish()
  102. if f {
  103. sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
  104. eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
  105. isAction = false
  106. lastNodeResponse = time.Now().Unix()
  107. sendNextNode(sid, eid)
  108. }
  109. } else {
  110. log.Debug("其他节点回应:", str)
  111. udptaskmap.Delete(str)
  112. }
  113. }
  114. //处理-心跳回调
  115. func dealWithHeartBackUdpData(str string) {
  116. if heartAction[str] != nil {
  117. heartAction[str] = 0
  118. }
  119. }
  120. //通知所有节点~进行抽取~
  121. func sendRunExtractNode(splitArr []map[string]interface{}) {
  122. for index, node := range using_ext_node {
  123. tmp := splitArr[index]
  124. skey := fmt.Sprintf("%s:%d:%s", node["addr"], qu.IntAll(node["port"]), node["stype"])
  125. by, _ := json.Marshal(map[string]interface{}{
  126. "gtid": qu.ObjToString(tmp["sid"]),
  127. "lteid": qu.ObjToString(tmp["eid"]),
  128. "stype": skey,
  129. })
  130. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  131. IP: net.ParseIP(node["addr"].(string)),
  132. Port: qu.IntAll(node["port"]),
  133. })
  134. }
  135. isAction = true
  136. log.Debug("通知抽取udp...等待抽取...回应...")
  137. }
  138. //通知所有抽取节点~结束抽取
  139. func sendStopExtractNode(splitArr []map[string]interface{}) {
  140. for _, node := range using_ext_node {
  141. by, _ := json.Marshal(map[string]interface{}{
  142. "stype": "stop_extract",
  143. })
  144. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  145. IP: net.ParseIP(node["addr"].(string)),
  146. Port: qu.IntAll(node["port"]),
  147. })
  148. }
  149. }
  150. //发送下阶段节点~
  151. func sendNextNode(sid string, eid string) {
  152. //更新记录状态
  153. updateProcessUdpIdsInfo(sid, eid)
  154. for _, node := range nextNode {
  155. by, _ := json.Marshal(map[string]interface{}{
  156. "gtid": sid,
  157. "lteid": eid,
  158. "stype": qu.ObjToString(node["stype"]),
  159. })
  160. addr := &net.UDPAddr{
  161. IP: net.ParseIP(node["addr"].(string)),
  162. Port: qu.IntAll(node["port"]),
  163. }
  164. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  165. //只监控清洗流程
  166. if qu.IntAll(node["port"]) == 1799 {
  167. new_node := &udpNode{by, addr, time.Now().Unix()}
  168. udptaskmap.Store(string(by), new_node)
  169. }
  170. }
  171. log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
  172. //此段落彻底完毕~继续获取任务
  173. isGetask = false
  174. }
  175. //发送单节点~
  176. func sendSingleOtherNode(by []byte, addr string, port string) {
  177. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  178. IP: net.ParseIP(addr),
  179. Port: qu.IntAll(port),
  180. })
  181. }
  182. //更新流程记录id段落
  183. func updateProcessUdpIdsInfo(sid string, eid string) {
  184. query := map[string]interface{}{
  185. "gtid": sid,
  186. "lteid": eid,
  187. }
  188. log.Debug("开始更新流程段落记录~~", query)
  189. data := source_mgo.FindOne("bidding_processing_ids", query)
  190. if len(data) > 0 {
  191. up_id := BsonTOStringId(data["_id"])
  192. if up_id != "" {
  193. update := map[string]interface{}{
  194. "$set": map[string]interface{}{
  195. "dataprocess": 3,
  196. "updatetime": time.Now().Unix(),
  197. },
  198. }
  199. source_mgo.UpdateById("bidding_processing_ids", up_id, update)
  200. log.Debug("流程段落记录~~更新完毕~", update)
  201. }
  202. } else {
  203. log.Debug("未查询到记录id段落~", query)
  204. }
  205. }