main6.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "go.mongodb.org/mongo-driver/mongo"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. "log"
  9. "mfw/util"
  10. "net"
  11. "time"
  12. )
  13. var sid, eid primitive.ObjectID
  14. var udpclient util.UdpClient //udp对象
  15. func main() {
  16. udpclient = util.UdpClient{Local: "10.171.112.160:1199", BufSize: 1024}//10.171.112.160:1199
  17. udpclient.Listen(processUdpMsg)
  18. log.Printf("Udp listening port: %s:%s\n", "10.171.112.160", "1199")
  19. client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080")) //192.168.3.207 10.30.94.175
  20. if err != nil {
  21. log.Println(17, err)
  22. return
  23. }
  24. ctx := context.Background()
  25. err = client.Connect(ctx)
  26. if err != nil {
  27. log.Println(23, err)
  28. return
  29. }
  30. result := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
  31. tmp := make(map[string]primitive.ObjectID)
  32. err = result.Decode(&tmp)
  33. if err != nil {
  34. log.Println(31, err)
  35. return
  36. }
  37. log.Println("start id",tmp)
  38. timer := time.NewTimer(time.Minute * 1)
  39. var isfive bool
  40. for {
  41. select {
  42. case <-timer.C:
  43. if sid.IsZero() {
  44. sid = tmp["_id"]
  45. } else {
  46. if !eid.IsZero() && eid != sid{
  47. sid = eid
  48. }else {
  49. log.Println(sid,eid,"为空或者id一致")
  50. timer.Reset(time.Minute)
  51. continue
  52. }
  53. }
  54. result2 := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
  55. tmp2 := make(map[string]primitive.ObjectID)
  56. err := result2.Decode(&tmp2)
  57. if err != nil {
  58. log.Println(44, err)
  59. timer.Reset(time.Minute)
  60. continue
  61. }
  62. countDocuments, err := client.Database("qfw").Collection("bidding").CountDocuments(ctx, primitive.M{"_id": primitive.M{
  63. "$gte": sid,
  64. "$lte": tmp2["_id"],
  65. }})
  66. if err != nil {
  67. log.Println(52, err)
  68. timer.Reset(time.Minute)
  69. continue
  70. }
  71. if countDocuments <= 100 && !isfive{
  72. isfive = true
  73. log.Println("数据不够100条",sid,tmp2["_id"],countDocuments)
  74. timer.Reset(time.Minute*5)
  75. continue
  76. }
  77. eid = tmp2["_id"]
  78. tmpmap := map[string]string{
  79. "gtid": sid.Hex(),
  80. "lteid": eid.Hex(),
  81. }
  82. tmpbyte, _ := json.Marshal(tmpmap)
  83. log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ",udpclient.WriteUdp(tmpbyte, util.OP_TYPE_DATA, &net.UDPAddr{
  84. IP: net.ParseIP("10.171.112.160"),
  85. Port: 1109,
  86. }))
  87. //log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ")
  88. log.Println(sid, tmp2["_id"], countDocuments)
  89. timer.Reset(time.Minute)
  90. isfive = false
  91. }
  92. }
  93. }
  94. func processUdpMsg(b byte, bytes []byte, addr *net.UDPAddr) {
  95. switch b {
  96. case util.OP_NOOP:
  97. log.Println("节点接收成功", string(bytes), addr.String())
  98. }
  99. }