1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- package main
- import (
- "encoding/json"
- "go.uber.org/zap"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "medical_project/config"
- "net"
- "time"
- )
- var (
- udpClient udp.UdpClient
- SingleThread = make(chan bool, 1)
- UdpChan = make(chan map[string]interface{}, 500)
- )
- func main() {
- go updateAllQueue()
- loadData(0)
- udpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- udpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
- for {
- mapinfo, ok := <-UdpChan
- if !ok {
- continue
- }
- SingleThread <- true
- go func() {
- defer func() {
- <-SingleThread
- }()
- currentType = "project"
- pici = time.Now().Unix()
- taskQl(mapinfo)
- }()
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
- if err != nil {
- _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- UdpChan <- mapInfo
- }
- case udp.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Info("re", zap.String("ok:", ok))
- }
- }
- }
|