main.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "go.uber.org/zap"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  10. "net"
  11. "time"
  12. )
  13. var (
  14. MgoB *mongodb.MongodbSim
  15. UdpClient udp.UdpClient
  16. nextAddr *net.UDPAddr
  17. Repeat = false //抽取是否回复
  18. )
  19. //processUdpMsg 处理udp
  20. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  21. switch act {
  22. case udp.OP_TYPE_DATA:
  23. //
  24. //var mapInfo map[string]interface{}
  25. //err := json.Unmarshal(data, &mapInfo)
  26. //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  27. //if err != nil {
  28. // fmt.Println(err)
  29. //}
  30. //if mapInfo != nil {
  31. // key, _ := mapInfo["key"].(string)
  32. // if key == "" {
  33. // key = "udpok"
  34. // }
  35. // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  36. //}
  37. case udp.OP_NOOP:
  38. da := string(data)
  39. log.Info("收到回复数据", zap.String("data", da))
  40. if da == "ok" {
  41. Repeat = true
  42. }
  43. default:
  44. fmt.Println("current_listen : processUdpMsg =====", act)
  45. }
  46. }
  47. func main() {
  48. f_sid := ""
  49. n_sid := ""
  50. f_lid := "" //file 最后一个分类结束ID
  51. n_lid := ""
  52. where := map[string]interface{}{
  53. "extracttype": 9,
  54. "biddingid": map[string]interface{}{
  55. "$exists": 1,
  56. },
  57. }
  58. fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  59. fileDaa := *fileDataa
  60. f_sid = BsonIdToSId(fileDaa[0]["_id"])
  61. nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  62. nomalDaa := *nomalDataa
  63. n_sid = BsonIdToSId(nomalDaa[0]["_id"])
  64. for {
  65. fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  66. fileDa := *fileData
  67. fid := BsonIdToSId(fileDa[0]["_id"])
  68. if fid != "" {
  69. f_lid = fid
  70. }
  71. nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
  72. nomalDa := *nomalData
  73. nid := BsonIdToSId(nomalDa[0]["_id"])
  74. if nid != "" {
  75. n_lid = nid
  76. }
  77. //起始ID 等于 结束ID
  78. if f_lid == f_sid && n_lid == n_sid {
  79. time.Sleep(time.Second * 5)
  80. } else if f_lid == "" && n_lid == "" {
  81. time.Sleep(time.Second * 5)
  82. } else {
  83. //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid))
  84. data := map[string]interface{}{
  85. "file": fmt.Sprintf("%s-%s", f_sid, f_lid),
  86. "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid),
  87. }
  88. SendUdpMsg(data, nextAddr)
  89. time.Sleep(time.Second * 3)
  90. if !Repeat {
  91. time.Sleep(time.Second * 10)
  92. SendUdpMsg(data, nextAddr)
  93. time.Sleep(time.Second * 10)
  94. SendUdpMsg(data, nextAddr)
  95. log.Error("没有收到回复", zap.Any("data", data))
  96. }
  97. f_sid = f_lid
  98. n_sid = n_lid
  99. }
  100. }
  101. }
  102. func deletePreData() {
  103. }
  104. //BsonIdToSId 根据bsonID转string
  105. func BsonIdToSId(uid interface{}) string {
  106. if uid == nil {
  107. return ""
  108. } else if u, ok := uid.(string); ok {
  109. return u
  110. } else if u, ok := uid.(primitive.ObjectID); ok {
  111. return u.Hex()
  112. } else {
  113. return ""
  114. }
  115. }
  116. //SendUdpMsg 通知处理企业新增数据
  117. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  118. bytes, _ := json.Marshal(data)
  119. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  120. log.Info("SendUdpMsg", zap.Any("data", data))
  121. log.Info("SendUdpMsg", zap.Any("target", target))
  122. }