updprocess.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. log "github.com/donnie4w/go-logger/logger"
  6. mu "mfw/util"
  7. "net"
  8. qu "qfw/util"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. nextNode []map[string]interface{}
  15. udpclient mu.UdpClient
  16. udplock sync.Mutex
  17. nextlock sync.Mutex
  18. extractAction map[string]map[string]interface{}
  19. heartAction map[string]interface{}
  20. isAction bool
  21. isGetask bool
  22. using_ext_node, standby_ext_node, invalid_ext_node []map[string]interface{}
  23. )
  24. //udp接收
  25. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  26. switch act {
  27. case mu.OP_TYPE_DATA:
  28. var mapInfo map[string]interface{}
  29. err := json.Unmarshal(data, &mapInfo)
  30. if err != nil {
  31. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  32. } else if mapInfo != nil {
  33. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  34. stype := qu.ObjToString(mapInfo["stype"])
  35. if stype == "monitor" {
  36. log.Debug("收到监测......")
  37. key := qu.ObjToString(mapInfo["key"])
  38. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  39. return
  40. }
  41. if sid == "" || eid == "" {
  42. log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  43. } else {
  44. lastNodeResponse = time.Now().Unix()
  45. key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
  46. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  47. //插入任务
  48. udplock.Lock()
  49. taskList = append(taskList, map[string]interface{}{
  50. "sid": sid,
  51. "eid": eid,
  52. })
  53. log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
  54. udplock.Unlock()
  55. }
  56. }
  57. case mu.OP_NOOP: //下个节点回应
  58. //抽取多节点
  59. nextlock.Lock()
  60. str := string(data)
  61. if isAction {
  62. if strings.Contains(str, "heart_extract") {
  63. dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
  64. } else {
  65. dealWithCallBackUdpData(str)
  66. }
  67. } else {
  68. log.Debug("其他节点回应:", str)
  69. udptaskmap.Delete(str)
  70. }
  71. nextlock.Unlock()
  72. }
  73. }
  74. //处理~新接收抽取段~
  75. func dealWithExtUdpData(sid, eid string) {
  76. //获取最新-抽取节点状态
  77. initExtractNode()
  78. log.Debug("处理当前段落~~~需拆分", len(using_ext_node), "组", sid, "~~", eid)
  79. if len(using_ext_node) > 0 {
  80. //拆分段落方法~并附加抽取状态标记~有效期等
  81. splitArr, lifeArr := splitIdMethod(sid, eid)
  82. log.Debug("最终分", len(splitArr), "段")
  83. extractAction = map[string]map[string]interface{}{}
  84. heartAction = map[string]interface{}{}
  85. for k, v := range using_ext_node {
  86. skey := fmt.Sprintf("%s:%d:%s", v["addr"], qu.IntAll(v["port"]), v["stype"])
  87. extractAction[skey] = map[string]interface{}{
  88. "life": lifeArr[k],
  89. "action": 0,
  90. "uid": BsonTOStringId(v["_id"]),
  91. }
  92. heartAction[skey] = 0
  93. }
  94. extractAction["extract_ids"] = map[string]interface{}{
  95. "sid": sid,
  96. "eid": eid,
  97. }
  98. sendRunExtractNode(splitArr) //通知抽取
  99. } else {
  100. log.Debug("无有效机器抽取...程序停止于此...")
  101. }
  102. }
  103. //处理回调udp~相关数据
  104. func dealWithCallBackUdpData(str string) {
  105. if extractAction[str] != nil {
  106. extractAction[str]["action"] = 1
  107. log.Debug("抽取节点回应:", str)
  108. f := validExtractFinish()
  109. if f {
  110. sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
  111. eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
  112. isAction = false
  113. lastNodeResponse = time.Now().Unix()
  114. sendNextNode(sid, eid)
  115. }
  116. } else {
  117. log.Debug("其他节点回应:", str)
  118. udptaskmap.Delete(str)
  119. }
  120. }
  121. //处理-心跳回调
  122. func dealWithHeartBackUdpData(str string) {
  123. if heartAction[str] != nil {
  124. heartAction[str] = 0
  125. }
  126. }
  127. //通知所有节点~进行抽取~
  128. func sendRunExtractNode(splitArr []map[string]interface{}) {
  129. for index, node := range using_ext_node {
  130. tmp := splitArr[index]
  131. skey := fmt.Sprintf("%s:%d:%s", node["addr"], qu.IntAll(node["port"]), node["stype"])
  132. by, _ := json.Marshal(map[string]interface{}{
  133. "gtid": qu.ObjToString(tmp["sid"]),
  134. "lteid": qu.ObjToString(tmp["eid"]),
  135. "stype": skey,
  136. })
  137. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  138. IP: net.ParseIP(node["addr"].(string)),
  139. Port: qu.IntAll(node["port"]),
  140. })
  141. }
  142. isAction = true
  143. log.Debug("通知抽取udp...等待抽取...回应...")
  144. }
  145. //通知所有抽取节点~结束抽取
  146. func sendStopExtractNode(splitArr []map[string]interface{}) {
  147. for _, node := range using_ext_node {
  148. by, _ := json.Marshal(map[string]interface{}{
  149. "stype": "stop_extract",
  150. })
  151. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  152. IP: net.ParseIP(node["addr"].(string)),
  153. Port: qu.IntAll(node["port"]),
  154. })
  155. }
  156. }
  157. //发送下阶段节点~
  158. func sendNextNode(sid string, eid string) {
  159. //更新记录状态
  160. updateProcessUdpIdsInfo(sid, eid)
  161. for _, node := range nextNode {
  162. key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
  163. by, _ := json.Marshal(map[string]interface{}{
  164. "gtid": sid,
  165. "lteid": eid,
  166. "stype": qu.ObjToString(node["stype"]),
  167. "key": key,
  168. })
  169. addr := &net.UDPAddr{
  170. IP: net.ParseIP(node["addr"].(string)),
  171. Port: qu.IntAll(node["port"]),
  172. }
  173. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  174. //只监控清洗流程
  175. if qu.IntAll(node["port"]) == 1799 {
  176. new_node := &udpNode{by, addr, time.Now().Unix()}
  177. udptaskmap.Store(key, new_node)
  178. }
  179. }
  180. log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
  181. //此段落彻底完毕~继续获取任务
  182. isGetask = false
  183. }
  184. //发送单节点~
  185. func sendSingleOtherNode(by []byte, addr string, port string) {
  186. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  187. IP: net.ParseIP(addr),
  188. Port: qu.IntAll(port),
  189. })
  190. }
  191. //更新流程记录id段落
  192. func updateProcessUdpIdsInfo(sid string, eid string) {
  193. query := map[string]interface{}{
  194. "gtid": sid,
  195. "lteid": eid,
  196. }
  197. log.Debug("开始更新流程段落记录~~", query)
  198. data := source_mgo.FindOne("bidding_processing_ids", query)
  199. if len(data) > 0 {
  200. up_id := BsonTOStringId(data["_id"])
  201. if up_id != "" {
  202. update := map[string]interface{}{
  203. "$set": map[string]interface{}{
  204. "dataprocess": 3,
  205. "updatetime": time.Now().Unix(),
  206. },
  207. }
  208. source_mgo.UpdateById("bidding_processing_ids", up_id, update)
  209. log.Debug("流程段落记录~~更新完毕~", update)
  210. }
  211. } else {
  212. log.Debug("未查询到记录id段落~", query)
  213. }
  214. }