package main import ( "encoding/json" "fmt" log "github.com/donnie4w/go-logger/logger" mu "mfw/util" "net" qu "qfw/util" "strings" "sync" "time" ) var ( nextNode []map[string]interface{} udpclient mu.UdpClient udplock sync.Mutex nextlock sync.Mutex extractAction map[string]map[string]interface{} heartAction map[string]interface{} isAction bool isGetask bool using_ext_node, standby_ext_node, invalid_ext_node []map[string]interface{} ) //udp接收 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]) if sid == "" || eid == "" { log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { lastNodeResponse = time.Now().Unix() key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"]) go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) //插入任务 udplock.Lock() taskList = append(taskList, map[string]interface{}{ "sid": sid, "eid": eid, }) log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList) udplock.Unlock() } } case mu.OP_NOOP: //下个节点回应 //抽取多节点 nextlock.Lock() str := string(data) if isAction { if strings.Contains(str, "heart_extract") { dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", "")) } else { dealWithCallBackUdpData(str) } } else { log.Debug("其他节点回应:", str) udptaskmap.Delete(str) } nextlock.Unlock() } } //处理~新接收抽取段~ func dealWithExtUdpData(sid, eid string) { //获取最新-抽取节点状态 initExtractNode() log.Debug("处理当前段落~~~需拆分", len(using_ext_node), "组", sid, "~~", eid) if len(using_ext_node) > 0 { //拆分段落方法~并附加抽取状态标记~有效期等 splitArr, lifeArr := splitIdMethod(sid, eid) log.Debug("最终分", len(splitArr), "段") extractAction = map[string]map[string]interface{}{} heartAction = map[string]interface{}{} for k, v := range using_ext_node { skey := fmt.Sprintf("%s:%d:%s", v["addr"], qu.IntAll(v["port"]), v["stype"]) extractAction[skey] = map[string]interface{}{ "life": lifeArr[k], "action": 0, "uid": BsonTOStringId(v["_id"]), } heartAction[skey] = 0 } extractAction["extract_ids"] = map[string]interface{}{ "sid": sid, "eid": eid, } sendRunExtractNode(splitArr) //通知抽取 } else { log.Debug("无有效机器抽取...程序停止于此...") } } //处理回调udp~相关数据 func dealWithCallBackUdpData(str string) { if extractAction[str] != nil { extractAction[str]["action"] = 1 log.Debug("抽取节点回应:", str) f := validExtractFinish() if f { sid := qu.ObjToString(extractAction["extract_ids"]["sid"]) eid := qu.ObjToString(extractAction["extract_ids"]["eid"]) isAction = false lastNodeResponse = time.Now().Unix() sendNextNode(sid, eid) } } else { log.Debug("其他节点回应:", str) udptaskmap.Delete(str) } } //处理-心跳回调 func dealWithHeartBackUdpData(str string) { if heartAction[str] != nil { heartAction[str] = 0 } } //通知所有节点~进行抽取~ func sendRunExtractNode(splitArr []map[string]interface{}) { for index, node := range using_ext_node { tmp := splitArr[index] skey := fmt.Sprintf("%s:%d:%s", node["addr"], qu.IntAll(node["port"]), node["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": qu.ObjToString(tmp["sid"]), "lteid": qu.ObjToString(tmp["eid"]), "stype": skey, }) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(node["addr"].(string)), Port: qu.IntAll(node["port"]), }) } isAction = true log.Debug("通知抽取udp...等待抽取...回应...") } //通知所有抽取节点~结束抽取 func sendStopExtractNode(splitArr []map[string]interface{}) { for _, node := range using_ext_node { by, _ := json.Marshal(map[string]interface{}{ "stype": "stop_extract", }) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(node["addr"].(string)), Port: qu.IntAll(node["port"]), }) } } //发送下阶段节点~ func sendNextNode(sid string, eid string) { //更新记录状态 updateProcessUdpIdsInfo(sid, eid) for _, node := range nextNode { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(node["stype"]), }) addr := &net.UDPAddr{ IP: net.ParseIP(node["addr"].(string)), Port: qu.IntAll(node["port"]), } udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点 //只监控清洗流程 if qu.IntAll(node["port"]) == 1799 { new_node := &udpNode{by, addr, time.Now().Unix()} udptaskmap.Store(string(by), new_node) } } log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid) //此段落彻底完毕~继续获取任务 isGetask = false } //发送单节点~ func sendSingleOtherNode(by []byte, addr string, port string) { udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(addr), Port: qu.IntAll(port), }) } //更新流程记录id段落 func updateProcessUdpIdsInfo(sid string, eid string) { query := map[string]interface{}{ "gtid": sid, "lteid": eid, } log.Debug("开始更新流程段落记录~~", query) data := source_mgo.FindOne("bidding_processing_ids", query) if len(data) > 0 { up_id := BsonTOStringId(data["_id"]) if up_id != "" { update := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess": 3, "updatetime": time.Now().Unix(), }, } source_mgo.UpdateById("bidding_processing_ids", up_id, update) log.Debug("流程段落记录~~更新完毕~", update) } } else { log.Debug("未查询到记录id段落~", query) } }