udp.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  9. "net"
  10. )
  11. var (
  12. UdpClient udp.UdpClient
  13. )
  14. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  15. defer util.Catch()
  16. switch act {
  17. case udp.OP_TYPE_DATA: //上个节点的数据
  18. var mapInfo map[string]interface{}
  19. err := json.Unmarshal(data, &mapInfo)
  20. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  21. gtid, _ := mapInfo["gtid"].(string)
  22. lteid, _ := mapInfo["lteid"].(string)
  23. if err != nil {
  24. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
  25. } else {
  26. //udp成功回写
  27. if k := util.ObjToString(mapInfo["key"]); k != "" {
  28. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  29. } else {
  30. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  31. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  32. }
  33. log.Info("start sync ...")
  34. //doBiddingTask(gtid, lteid, mapInfo)
  35. }
  36. }
  37. }