main.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package main
  2. import (
  3. "encoding/json"
  4. "go.uber.org/zap"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  7. "medical_project/config"
  8. "net"
  9. "time"
  10. )
  11. var (
  12. udpClient udp.UdpClient
  13. SingleThread = make(chan bool, 1)
  14. UdpChan = make(chan map[string]interface{}, 500)
  15. )
  16. func main() {
  17. go updateAllQueue()
  18. loadData(0)
  19. udpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  20. udpClient.Listen(processUdpMsg)
  21. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  22. for {
  23. mapinfo, ok := <-UdpChan
  24. if !ok {
  25. continue
  26. }
  27. SingleThread <- true
  28. go func() {
  29. defer func() {
  30. <-SingleThread
  31. }()
  32. currentType = "project"
  33. pici = time.Now().Unix()
  34. taskQl(mapinfo)
  35. }()
  36. }
  37. }
  38. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  39. switch act {
  40. case udp.OP_TYPE_DATA: //上个节点的数据
  41. var mapInfo map[string]interface{}
  42. err := json.Unmarshal(data, &mapInfo)
  43. log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
  44. if err != nil {
  45. _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  46. } else if mapInfo != nil {
  47. key, _ := mapInfo["key"].(string)
  48. if key == "" {
  49. key = "udpok"
  50. }
  51. go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  52. UdpChan <- mapInfo
  53. }
  54. case udp.OP_NOOP: //下个节点回应
  55. ok := string(data)
  56. if ok != "" {
  57. log.Info("re", zap.String("ok:", ok))
  58. }
  59. }
  60. }