123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/udp"
- "encoding/json"
- "go.uber.org/zap"
- "medical_project/config"
- "net"
- "time"
- )
- var (
- udpClient udp.UdpClient
- SingleThread = make(chan bool, 1)
- UdpChan = make(chan map[string]interface{}, 500)
- )
- func main() {
- go updateAllQueue()
- loadData(0)
- udpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- udpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
- for {
- mapinfo, ok := <-UdpChan
- if !ok {
- continue
- }
- SingleThread <- true
- tasktype := util.ObjToString(mapinfo["stype"])
- switch tasktype {
- case "ql": //全量合并
- go func() {
- defer func() {
- <-SingleThread
- }()
- currentType = tasktype
- pici = time.Now().Unix()
- taskQl(mapinfo)
- }()
- case "project": //增量合并,
- go func() {
- defer func() {
- <-SingleThread
- }()
- currentType = tasktype
- pici = time.Now().Unix()
- taskZl(mapinfo)
- }()
- case "project_history": //增量合并, id段历史数据
- go func() {
- defer func() {
- <-SingleThread
- }()
- currentType = tasktype
- pici = time.Now().Unix()
- taskZl(mapinfo)
- }()
- default:
- <-SingleThread
- }
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
- if err != nil {
- _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- UdpChan <- mapInfo
- }
- case udp.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Info("re", zap.String("ok:", ok))
- }
- }
- }
|