main.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "go.uber.org/zap"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "time"
  13. )
  14. var (
  15. MgoB *mongodb.MongodbSim
  16. UdpClient udp.UdpClient
  17. nextAddr *net.UDPAddr
  18. Repeat = false //抽取是否回复
  19. )
  20. //processUdpMsg 处理udp
  21. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  22. switch act {
  23. case udp.OP_TYPE_DATA:
  24. //
  25. //var mapInfo map[string]interface{}
  26. //err := json.Unmarshal(data, &mapInfo)
  27. //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  28. //if err != nil {
  29. // fmt.Println(err)
  30. //}
  31. //if mapInfo != nil {
  32. // key, _ := mapInfo["key"].(string)
  33. // if key == "" {
  34. // key = "udpok"
  35. // }
  36. // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  37. //}
  38. case udp.OP_NOOP:
  39. da := string(data)
  40. log.Info("收到回复数据", zap.String("data", da))
  41. if da == "ok" {
  42. Repeat = true
  43. }
  44. default:
  45. fmt.Println("current_listen : processUdpMsg =====", act)
  46. }
  47. }
  48. func main() {
  49. local, _ := time.LoadLocation("Asia/Shanghai")
  50. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  51. if GF.Env.SpecType == "day" {
  52. _, err := c.AddFunc(GF.Env.Spec, dealIndexByDay)
  53. if err != nil {
  54. log.Info("main", zap.Any("AddFunc err", err))
  55. }
  56. } else if GF.Env.SpecType == "month" {
  57. _, err := c.AddFunc(GF.Env.Spec, dealIndexByMonth)
  58. if err != nil {
  59. log.Info("main", zap.Any("AddFunc err", err))
  60. }
  61. }
  62. ////切换别名-定时任务
  63. //_, err := c.AddFunc(GF.Env.SwitchSpec, SwitchAlias)
  64. //if err != nil {
  65. // log.Info("main", zap.Any("AddFunc err", err))
  66. //}
  67. c.Start()
  68. defer c.Stop()
  69. //发送数据给抽取
  70. if GF.Env.Send {
  71. go SendPreData()
  72. }
  73. log.Info("main", zap.String("监听端口:", GF.Env.LocalPort))
  74. select {}
  75. }
  76. //SendPreData 发送预处理数据给 抽取程序
  77. func SendPreData() {
  78. f_sid := ""
  79. n_sid := ""
  80. f_lid := "" //file 最后一个分类结束ID
  81. n_lid := ""
  82. where := map[string]interface{}{
  83. "extracttype": 9,
  84. "biddingid": map[string]interface{}{
  85. "$exists": 1,
  86. },
  87. }
  88. fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  89. fileDaa := *fileDataa
  90. f_sid = BsonIdToSId(fileDaa[0]["_id"])
  91. nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  92. nomalDaa := *nomalDataa
  93. n_sid = BsonIdToSId(nomalDaa[0]["_id"])
  94. for {
  95. fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  96. fileDa := *fileData
  97. fid := BsonIdToSId(fileDa[0]["_id"])
  98. if fid != "" {
  99. f_lid = fid
  100. }
  101. nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  102. nomalDa := *nomalData
  103. nid := BsonIdToSId(nomalDa[0]["_id"])
  104. if nid != "" {
  105. n_lid = nid
  106. }
  107. //起始ID 等于 结束ID
  108. if f_lid == f_sid && n_lid == n_sid {
  109. time.Sleep(time.Second * 5)
  110. } else if f_lid == "" && n_lid == "" {
  111. time.Sleep(time.Second * 5)
  112. } else {
  113. //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid))
  114. data := map[string]interface{}{
  115. "file": fmt.Sprintf("%s-%s", f_sid, f_lid),
  116. "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid),
  117. }
  118. SendUdpMsg(data, nextAddr)
  119. time.Sleep(time.Second * 3)
  120. if !Repeat {
  121. time.Sleep(time.Second * 10)
  122. SendUdpMsg(data, nextAddr)
  123. time.Sleep(time.Second * 10)
  124. SendUdpMsg(data, nextAddr)
  125. log.Error("没有收到回复", zap.Any("data", data))
  126. }
  127. f_sid = f_lid
  128. n_sid = n_lid
  129. }
  130. }
  131. }
  132. //BsonIdToSId 根据bsonID转string
  133. func BsonIdToSId(uid interface{}) string {
  134. if uid == nil {
  135. return ""
  136. } else if u, ok := uid.(string); ok {
  137. return u
  138. } else if u, ok := uid.(primitive.ObjectID); ok {
  139. return u.Hex()
  140. } else {
  141. return ""
  142. }
  143. }
  144. //SendUdpMsg 通知处理企业新增数据
  145. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  146. bytes, _ := json.Marshal(data)
  147. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  148. log.Info("SendUdpMsg", zap.Any("data", data))
  149. log.Info("SendUdpMsg", zap.Any("target", target))
  150. }