package udp import ( "data_ai/ul" "encoding/json" log "github.com/donnie4w/go-logger/logger" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "sync" "time" ) var ( nextNode []map[string]interface{} udpclient mu.UdpClient udplock, nextlock sync.Mutex responselock, getasklock sync.Mutex lastNodeResponse int64 taskList []map[string]interface{} isGetask, isAction, isStop bool ) func InitProcessVar() { //初始化··· isGetask = false isStop = false nextNode = qu.ObjArrToMapArr(ul.SysConfig["nextNode"].([]interface{})) lastNodeResponse = time.Now().Unix() taskList = []map[string]interface{}{} //执行监控 go lastUdpMonitoring() go nextUdpMonitoring() go getRepeatTask() //监听··· updport := ul.SysConfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(ProcessUdpMsg) log.Debug("Udp服务监听", updport) } // udp接收 func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]) stype := qu.ObjToString(mapInfo["stype"]) if stype == "monitor" { log.Debug("收到监测......") key := qu.ObjToString(mapInfo["key"]) udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) return } if stype == "stop" { log.Debug("收到停止信号...") isStop = true } if stype == "start" { log.Debug("收到开始信号...") isStop = false } if sid == "" || eid == "" { log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { lastNodeResponse = time.Now().Unix() key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"]) go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) udplock.Lock() if isStop { //停止信号... for { if len(taskList) == 0 { break } log.Debug("检测到停止信号...等待任务结束...") time.Sleep(time.Second * 30) } log.Debug("检测到停止信号...通知下节点...") sendNextNode(sid, eid) } else { //插入任务... taskList = append(taskList, map[string]interface{}{ "sid": sid, "eid": eid, }) log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList) } udplock.Unlock() } } case mu.OP_NOOP: //下个节点回应 nextlock.Lock() str := string(data) udptaskmap.Delete(str) log.Debug("其他节点回应:", str) nextlock.Unlock() } } // 下节点发送 func sendNextNode(sid string, eid string) { for _, node := range nextNode { key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(node["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(node["addr"].(string)), Port: qu.IntAll(node["port"]), } udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点 new_node := &udpNode{by, addr, time.Now().Unix()} udptaskmap.Store(key, new_node) //监控···节点 } log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid) isGetask = false //此段落彻底完毕~继续获取任务 }