12345678910111213141516171819202122232425262728293031323334353637383940 |
- package main
- import (
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- )
- var (
- UdpClient udp.UdpClient
- )
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer util.Catch()
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
- gtid, _ := mapInfo["gtid"].(string)
- lteid, _ := mapInfo["lteid"].(string)
- if err != nil {
- UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
- } else {
- //udp成功回写
- if k := util.ObjToString(mapInfo["key"]); k != "" {
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- } else {
- k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- }
- log.Info("start sync ...")
- //doBiddingTask(gtid, lteid, mapInfo)
- }
- }
- }
|