main.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "go.uber.org/zap"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "time"
  13. )
  14. var (
  15. MgoB *mongodb.MongodbSim
  16. UdpClient udp.UdpClient
  17. nextAddr *net.UDPAddr
  18. Repeat = false //抽取是否回复
  19. )
  20. //processUdpMsg 处理udp
  21. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  22. switch act {
  23. case udp.OP_TYPE_DATA:
  24. //
  25. //var mapInfo map[string]interface{}
  26. //err := json.Unmarshal(data, &mapInfo)
  27. //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  28. //if err != nil {
  29. // fmt.Println(err)
  30. //}
  31. //if mapInfo != nil {
  32. // key, _ := mapInfo["key"].(string)
  33. // if key == "" {
  34. // key = "udpok"
  35. // }
  36. // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  37. //}
  38. case udp.OP_NOOP:
  39. da := string(data)
  40. log.Info("收到回复数据", zap.String("data", da))
  41. if da == "ok" {
  42. Repeat = true
  43. }
  44. default:
  45. fmt.Println("current_listen : processUdpMsg =====", act)
  46. }
  47. }
  48. func main() {
  49. local, _ := time.LoadLocation("Asia/Shanghai")
  50. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  51. if GF.Env.SpecType == "day" {
  52. _, err := c.AddFunc(GF.Env.Spec, dealIndexByDay)
  53. if err != nil {
  54. log.Info("main", zap.Any("AddFunc err", err))
  55. }
  56. } else if GF.Env.SpecType == "month" {
  57. _, err := c.AddFunc(GF.Env.Spec, dealIndexByMonth)
  58. if err != nil {
  59. log.Info("main", zap.Any("AddFunc err", err))
  60. }
  61. }
  62. //切换别名-定时任务
  63. _, err := c.AddFunc(GF.Env.SwitchSpec, SwitchAlias)
  64. if err != nil {
  65. log.Info("main", zap.Any("AddFunc err", err))
  66. }
  67. c.Start()
  68. defer c.Stop()
  69. //发送数据给抽取
  70. if GF.Env.Send {
  71. go SendPreData()
  72. }
  73. select {}
  74. }
  75. //SendPreData 发送预处理数据给 抽取程序
  76. func SendPreData() {
  77. f_sid := ""
  78. n_sid := ""
  79. f_lid := "" //file 最后一个分类结束ID
  80. n_lid := ""
  81. where := map[string]interface{}{
  82. "extracttype": 9,
  83. "biddingid": map[string]interface{}{
  84. "$exists": 1,
  85. },
  86. }
  87. fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  88. fileDaa := *fileDataa
  89. f_sid = BsonIdToSId(fileDaa[0]["_id"])
  90. nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  91. nomalDaa := *nomalDataa
  92. n_sid = BsonIdToSId(nomalDaa[0]["_id"])
  93. for {
  94. fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  95. fileDa := *fileData
  96. fid := BsonIdToSId(fileDa[0]["_id"])
  97. if fid != "" {
  98. f_lid = fid
  99. }
  100. nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  101. nomalDa := *nomalData
  102. nid := BsonIdToSId(nomalDa[0]["_id"])
  103. if nid != "" {
  104. n_lid = nid
  105. }
  106. //起始ID 等于 结束ID
  107. if f_lid == f_sid && n_lid == n_sid {
  108. time.Sleep(time.Second * 5)
  109. } else if f_lid == "" && n_lid == "" {
  110. time.Sleep(time.Second * 5)
  111. } else {
  112. //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid))
  113. data := map[string]interface{}{
  114. "file": fmt.Sprintf("%s-%s", f_sid, f_lid),
  115. "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid),
  116. }
  117. SendUdpMsg(data, nextAddr)
  118. time.Sleep(time.Second * 3)
  119. if !Repeat {
  120. time.Sleep(time.Second * 10)
  121. SendUdpMsg(data, nextAddr)
  122. time.Sleep(time.Second * 10)
  123. SendUdpMsg(data, nextAddr)
  124. log.Error("没有收到回复", zap.Any("data", data))
  125. }
  126. f_sid = f_lid
  127. n_sid = n_lid
  128. }
  129. }
  130. }
  131. //BsonIdToSId 根据bsonID转string
  132. func BsonIdToSId(uid interface{}) string {
  133. if uid == nil {
  134. return ""
  135. } else if u, ok := uid.(string); ok {
  136. return u
  137. } else if u, ok := uid.(primitive.ObjectID); ok {
  138. return u.Hex()
  139. } else {
  140. return ""
  141. }
  142. }
  143. //SendUdpMsg 通知处理企业新增数据
  144. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  145. bytes, _ := json.Marshal(data)
  146. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  147. log.Info("SendUdpMsg", zap.Any("data", data))
  148. log.Info("SendUdpMsg", zap.Any("target", target))
  149. }