123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package udp
- import (
- "data_ai/ul"
- "encoding/json"
- log "github.com/donnie4w/go-logger/logger"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "sync"
- "time"
- )
- var (
- nextNode []map[string]interface{}
- udpclient mu.UdpClient
- udplock, nextlock sync.Mutex
- responselock, getasklock sync.Mutex
- lastNodeResponse int64
- taskList []map[string]interface{}
- isGetask, isAction, isStop bool
- )
- func InitProcessVar() {
- //初始化···
- isGetask = false
- isStop = false
- nextNode = qu.ObjArrToMapArr(ul.SysConfig["nextNode"].([]interface{}))
- lastNodeResponse = time.Now().Unix()
- taskList = []map[string]interface{}{}
- //执行监控
- go lastUdpMonitoring()
- go nextUdpMonitoring()
- go getRepeatTask()
- //监听···
- updport := ul.SysConfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(ProcessUdpMsg)
- log.Debug("Udp服务监听", updport)
- }
- // udp接收
- func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- stype := qu.ObjToString(mapInfo["stype"])
- if stype == "monitor" {
- log.Debug("收到监测......")
- key := qu.ObjToString(mapInfo["key"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- return
- }
- if stype == "stop" {
- log.Debug("收到停止信号...")
- isStop = true
- }
- if stype == "start" {
- log.Debug("收到开始信号...")
- isStop = false
- }
- if sid == "" || eid == "" {
- log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
- } else {
- lastNodeResponse = time.Now().Unix()
- key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- udplock.Lock()
- if isStop { //停止信号...
- for {
- if len(taskList) == 0 {
- break
- }
- log.Debug("检测到停止信号...等待任务结束...")
- time.Sleep(time.Second * 30)
- }
- log.Debug("检测到停止信号...通知下节点...")
- sendNextNode(sid, eid)
- } else { //插入任务...
- taskList = append(taskList, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- })
- log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
- }
- udplock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- nextlock.Lock()
- str := string(data)
- udptaskmap.Delete(str)
- log.Debug("其他节点回应:", str)
- nextlock.Unlock()
- }
- }
- // 下节点发送
- func sendNextNode(sid string, eid string) {
- for _, node := range nextNode {
- key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(node["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(node["addr"].(string)),
- Port: qu.IntAll(node["port"]),
- }
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
- new_node := &udpNode{by, addr, time.Now().Unix()}
- udptaskmap.Store(key, new_node) //监控···节点
- }
- log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
- isGetask = false //此段落彻底完毕~继续获取任务
- }
|