main.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/udp"
  6. "encoding/json"
  7. "go.uber.org/zap"
  8. "medical_project/config"
  9. "net"
  10. "time"
  11. )
  12. var (
  13. udpClient udp.UdpClient
  14. SingleThread = make(chan bool, 1)
  15. UdpChan = make(chan map[string]interface{}, 500)
  16. )
  17. func main() {
  18. go updateAllQueue()
  19. loadData(0)
  20. udpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  21. udpClient.Listen(processUdpMsg)
  22. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  23. for {
  24. mapinfo, ok := <-UdpChan
  25. if !ok {
  26. continue
  27. }
  28. SingleThread <- true
  29. tasktype := util.ObjToString(mapinfo["stype"])
  30. switch tasktype {
  31. case "ql": //全量合并
  32. go func() {
  33. defer func() {
  34. <-SingleThread
  35. }()
  36. currentType = tasktype
  37. pici = time.Now().Unix()
  38. taskQl(mapinfo)
  39. }()
  40. case "project": //增量合并,
  41. go func() {
  42. defer func() {
  43. <-SingleThread
  44. }()
  45. currentType = tasktype
  46. pici = time.Now().Unix()
  47. taskZl(mapinfo)
  48. }()
  49. case "project_history": //增量合并, id段历史数据
  50. go func() {
  51. defer func() {
  52. <-SingleThread
  53. }()
  54. currentType = tasktype
  55. pici = time.Now().Unix()
  56. taskZl(mapinfo)
  57. }()
  58. default:
  59. <-SingleThread
  60. }
  61. }
  62. }
  63. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  64. switch act {
  65. case udp.OP_TYPE_DATA: //上个节点的数据
  66. var mapInfo map[string]interface{}
  67. err := json.Unmarshal(data, &mapInfo)
  68. log.Info("err:", zap.Error(err), zap.Any("mapInfo:", mapInfo))
  69. if err != nil {
  70. _ = udpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  71. } else if mapInfo != nil {
  72. key, _ := mapInfo["key"].(string)
  73. if key == "" {
  74. key = "udpok"
  75. }
  76. go udpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  77. UdpChan <- mapInfo
  78. }
  79. case udp.OP_NOOP: //下个节点回应
  80. ok := string(data)
  81. if ok != "" {
  82. log.Info("re", zap.String("ok:", ok))
  83. }
  84. }
  85. }