package main import ( "encoding/json" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "medical_project/config" "net" "time" ) var ( udpClient udp.UdpClient SingleThread = make(chan bool, 1) UdpChan = make(chan map[string]interface{}, 500) ) func main() { go updateAllQueue() loadData(0) udpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} udpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) for { mapinfo, ok := <-UdpChan if !ok { continue } SingleThread <- true go func() { defer func() { <-SingleThread }() currentType = "project" pici = time.Now().Unix() taskQl(mapinfo) }() } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo)) if err != nil { _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) UdpChan <- mapInfo } case udp.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Info("re", zap.String("ok:", ok)) } } }