package main import ( "encoding/json" "fmt" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" ) var ( UdpClient udp.UdpClient ) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) if err != nil { UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写 } else { //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } log.Info("start sync ...") //doBiddingTask(gtid, lteid, mapInfo) } } }