main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. qu "app.yhyue.com/moapp/jybase/common"
  4. "dataPrefer/db"
  5. "dataPrefer/service"
  6. "encoding/json"
  7. "flag"
  8. "github.com/gogf/gf/v2/frame/g"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "gopkg.in/natefinch/lumberjack.v2"
  11. "io"
  12. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "log"
  14. "net"
  15. "os"
  16. "sync"
  17. "time"
  18. )
  19. var udpclient mu.UdpClient
  20. var lock = &sync.Mutex{}
  21. func main() {
  22. model := flag.Int("m", 0, "1:非定时任务")
  23. flag.Parse()
  24. var logger *lumberjack.Logger
  25. ctx := gctx.New()
  26. g.Config().MustGet(ctx, "logger").Struct(&logger)
  27. writers := []io.Writer{logger}
  28. if g.Config().MustGet(ctx, "logger.console").Bool() {
  29. writers = append(writers, os.Stdout)
  30. }
  31. log.SetOutput(io.MultiWriter(writers...))
  32. udpPort := g.Config().MustGet(ctx, "udpPort").String()
  33. udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024}
  34. udpclient.Listen(ProcessUdpMsg)
  35. log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String())
  36. service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710")
  37. //service.Tj()
  38. //service.Hz()
  39. *model = 1
  40. if *model == 0 {
  41. <-chan bool(nil)
  42. }
  43. }
  44. // udp接收
  45. func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  46. switch act {
  47. case mu.OP_TYPE_DATA:
  48. var mapInfo map[string]interface{}
  49. err := json.Unmarshal(data, &mapInfo)
  50. if err != nil {
  51. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  52. } else if mapInfo != nil {
  53. log.Println("接收到udp消息", string(data))
  54. stype := qu.ObjToString(mapInfo["stype"])
  55. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  56. key := sid + "-" + eid + "-" + stype
  57. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  58. if stype == "bidding" {
  59. lock.Lock()
  60. service.IncDataById(sid, eid)
  61. sendNextNode(nil, sid, eid)
  62. for {
  63. time.Sleep(10 * time.Second)
  64. if db.Mgo_Main.Count("bidding_processing_ids", map[string]interface{}{"lteid": eid, "dataprocess": 9}) > 0 {
  65. break
  66. }
  67. log.Println(string(data), "es索引未生完,等待中。。。")
  68. }
  69. lock.Unlock()
  70. } else {
  71. sendNextNode(data, "", "")
  72. }
  73. }
  74. case mu.OP_NOOP: //下个节点回应
  75. str := string(data)
  76. log.Println("其他节点回应:", str)
  77. }
  78. }
  79. // 下节点发送
  80. func sendNextNode(by []byte, sid string, eid string) {
  81. if by == nil {
  82. stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String()
  83. key := sid + "-" + eid + "-" + stype
  84. by, _ = json.Marshal(map[string]interface{}{
  85. "gtid": sid,
  86. "lteid": eid,
  87. "stype": stype,
  88. "key": key,
  89. })
  90. }
  91. addr := &net.UDPAddr{
  92. IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()),
  93. Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(),
  94. }
  95. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  96. }