main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package main
  2. import (
  3. qu "app.yhyue.com/moapp/jybase/common"
  4. "dataPrefer/db"
  5. "dataPrefer/service"
  6. "encoding/json"
  7. "flag"
  8. "github.com/gogf/gf/v2/frame/g"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "gopkg.in/natefinch/lumberjack.v2"
  11. "io"
  12. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "log"
  14. "net"
  15. "os"
  16. "sync"
  17. "time"
  18. )
  19. var udpclient mu.UdpClient
  20. var lock = &sync.Mutex{}
  21. func main() {
  22. model := flag.Int("m", 0, "1:非定时任务")
  23. flag.Parse()
  24. var logger *lumberjack.Logger
  25. ctx := gctx.New()
  26. g.Config().MustGet(ctx, "logger").Struct(&logger)
  27. writers := []io.Writer{logger}
  28. if g.Config().MustGet(ctx, "logger.console").Bool() {
  29. writers = append(writers, os.Stdout)
  30. }
  31. log.SetOutput(io.MultiWriter(writers...))
  32. udpPort := g.Config().MustGet(ctx, "udpPort").String()
  33. udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024}
  34. udpclient.Listen(ProcessUdpMsg)
  35. log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String())
  36. go repair()
  37. //service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710")
  38. //service.Tj()
  39. //service.Hz()
  40. //*model = 1
  41. if *model == 0 {
  42. <-chan bool(nil)
  43. }
  44. }
  45. // udp接收
  46. func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  47. switch act {
  48. case mu.OP_TYPE_DATA:
  49. var mapInfo map[string]interface{}
  50. err := json.Unmarshal(data, &mapInfo)
  51. if err != nil {
  52. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  53. } else if mapInfo != nil {
  54. go udpclient.WriteUdp([]byte(qu.ObjToString(mapInfo["key"])), mu.OP_NOOP, ra)
  55. log.Println("接收到udp消息", string(data))
  56. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  57. if qu.ObjToString(mapInfo["stype"]) == "bidding" {
  58. lock.Lock()
  59. service.IncDataById(sid, eid)
  60. sendNextNode(nil, sid, eid)
  61. for {
  62. time.Sleep(10 * time.Second)
  63. if db.Mgo_Main.Count("bidding_processing_ids", map[string]interface{}{"lteid": eid, "dataprocess": 9}) > 0 {
  64. break
  65. }
  66. log.Println(string(data), "es索引未生完,等待中。。。")
  67. }
  68. lock.Unlock()
  69. } else {
  70. sendNextNode(data, "", "")
  71. }
  72. }
  73. case mu.OP_NOOP: //下个节点回应
  74. str := string(data)
  75. log.Println("其他节点回应:", str)
  76. }
  77. }
  78. // 下节点发送
  79. func sendNextNode(by []byte, sid string, eid string) {
  80. if by == nil {
  81. stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String()
  82. key := sid + "-" + eid + "-" + stype
  83. by, _ = json.Marshal(map[string]interface{}{
  84. "gtid": sid,
  85. "lteid": eid,
  86. "stype": stype,
  87. "key": key,
  88. })
  89. }
  90. addr := &net.UDPAddr{
  91. IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()),
  92. Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(),
  93. }
  94. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  95. }
  96. func repair() {
  97. }