main.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "go.uber.org/zap"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "time"
  13. )
  14. var (
  15. MgoB *mongodb.MongodbSim
  16. UdpClient udp.UdpClient
  17. nextAddr *net.UDPAddr
  18. Repeat = false //抽取是否回复
  19. )
  20. //processUdpMsg 处理udp
  21. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  22. switch act {
  23. case udp.OP_TYPE_DATA:
  24. //
  25. //var mapInfo map[string]interface{}
  26. //err := json.Unmarshal(data, &mapInfo)
  27. //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  28. //if err != nil {
  29. // fmt.Println(err)
  30. //}
  31. //if mapInfo != nil {
  32. // key, _ := mapInfo["key"].(string)
  33. // if key == "" {
  34. // key = "udpok"
  35. // }
  36. // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  37. //}
  38. case udp.OP_NOOP:
  39. da := string(data)
  40. log.Info("收到回复数据", zap.String("data", da))
  41. if da == "ok" {
  42. Repeat = true
  43. }
  44. default:
  45. fmt.Println("current_listen : processUdpMsg =====", act)
  46. }
  47. }
  48. func main() {
  49. local, _ := time.LoadLocation("Asia/Shanghai")
  50. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  51. if GF.Env.SpecType == "day" {
  52. _, err := c.AddFunc(GF.Env.Spec, dealIndexByDay)
  53. if err != nil {
  54. log.Info("main", zap.Any("AddFunc err", err))
  55. }
  56. } else if GF.Env.SpecType == "month" {
  57. _, err := c.AddFunc(GF.Env.Spec, dealIndexByMonth)
  58. if err != nil {
  59. log.Info("main", zap.Any("AddFunc err", err))
  60. }
  61. }
  62. c.Start()
  63. defer c.Stop()
  64. if GF.Env.Send {
  65. go SendPreData()
  66. }
  67. select {}
  68. }
  69. //SendPreData 发送预处理数据给 抽取程序
  70. func SendPreData() {
  71. f_sid := ""
  72. n_sid := ""
  73. f_lid := "" //file 最后一个分类结束ID
  74. n_lid := ""
  75. where := map[string]interface{}{
  76. "extracttype": 9,
  77. "biddingid": map[string]interface{}{
  78. "$exists": 1,
  79. },
  80. }
  81. fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  82. fileDaa := *fileDataa
  83. f_sid = BsonIdToSId(fileDaa[0]["_id"])
  84. nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  85. nomalDaa := *nomalDataa
  86. n_sid = BsonIdToSId(nomalDaa[0]["_id"])
  87. for {
  88. fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  89. fileDa := *fileData
  90. fid := BsonIdToSId(fileDa[0]["_id"])
  91. if fid != "" {
  92. f_lid = fid
  93. }
  94. nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  95. nomalDa := *nomalData
  96. nid := BsonIdToSId(nomalDa[0]["_id"])
  97. if nid != "" {
  98. n_lid = nid
  99. }
  100. //起始ID 等于 结束ID
  101. if f_lid == f_sid && n_lid == n_sid {
  102. time.Sleep(time.Second * 5)
  103. } else if f_lid == "" && n_lid == "" {
  104. time.Sleep(time.Second * 5)
  105. } else {
  106. //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid))
  107. data := map[string]interface{}{
  108. "file": fmt.Sprintf("%s-%s", f_sid, f_lid),
  109. "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid),
  110. }
  111. SendUdpMsg(data, nextAddr)
  112. time.Sleep(time.Second * 3)
  113. if !Repeat {
  114. time.Sleep(time.Second * 10)
  115. SendUdpMsg(data, nextAddr)
  116. time.Sleep(time.Second * 10)
  117. SendUdpMsg(data, nextAddr)
  118. log.Error("没有收到回复", zap.Any("data", data))
  119. }
  120. f_sid = f_lid
  121. n_sid = n_lid
  122. }
  123. }
  124. }
  125. //BsonIdToSId 根据bsonID转string
  126. func BsonIdToSId(uid interface{}) string {
  127. if uid == nil {
  128. return ""
  129. } else if u, ok := uid.(string); ok {
  130. return u
  131. } else if u, ok := uid.(primitive.ObjectID); ok {
  132. return u.Hex()
  133. } else {
  134. return ""
  135. }
  136. }
  137. //SendUdpMsg 通知处理企业新增数据
  138. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  139. bytes, _ := json.Marshal(data)
  140. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  141. log.Info("SendUdpMsg", zap.Any("data", data))
  142. log.Info("SendUdpMsg", zap.Any("target", target))
  143. }