udp.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  9. "net"
  10. )
  11. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  12. defer util.Catch()
  13. switch act {
  14. case udp.OP_TYPE_DATA: //上个节点的数据
  15. var mapInfo map[string]interface{}
  16. err := json.Unmarshal(data, &mapInfo)
  17. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  18. gtid, _ := mapInfo["gtid"].(string)
  19. lteid, _ := mapInfo["lteid"].(string)
  20. if err != nil {
  21. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  22. } else {
  23. //udp成功回写
  24. if k := util.ObjToString(mapInfo["key"]); k != "" {
  25. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  26. } else {
  27. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  28. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  29. }
  30. log.Info("start sync ...")
  31. //doBiddingTask(gtid, lteid, mapInfo)
  32. }
  33. }
  34. }