main6.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "go.mongodb.org/mongo-driver/mongo"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. "log"
  9. "mfw/util"
  10. "net"
  11. "time"
  12. )
  13. var sid, eid primitive.ObjectID
  14. var udpclient util.UdpClient //udp对象
  15. func main() {
  16. udpclient = util.UdpClient{Local: "10.171.112.160:1199", BufSize: 1024}//10.171.112.160:1199
  17. udpclient.Listen(processUdpMsg)
  18. log.Printf("Udp listening port: %s:%s\n", "10.171.112.160", "1199")
  19. client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080")) //192.168.3.207 10.30.94.175
  20. if err != nil {
  21. log.Println(17, err)
  22. return
  23. }
  24. ctx := context.Background()
  25. err = client.Connect(ctx)
  26. if err != nil {
  27. log.Println(23, err)
  28. return
  29. }
  30. result := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
  31. tmp := make(map[string]primitive.ObjectID)
  32. err = result.Decode(&tmp)
  33. if err != nil {
  34. log.Println(31, err)
  35. return
  36. }
  37. log.Println("start id",tmp)
  38. timer := time.NewTimer(time.Minute * 1)
  39. var isfive bool
  40. for {
  41. select {
  42. case <-timer.C:
  43. if sid.IsZero() {
  44. sid = tmp["_id"]
  45. } else {
  46. if !eid.IsZero(){
  47. sid = eid
  48. }
  49. }
  50. result2 := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
  51. tmp2 := make(map[string]primitive.ObjectID)
  52. err := result2.Decode(&tmp2)
  53. if err != nil {
  54. log.Println(44, err)
  55. timer.Reset(time.Minute)
  56. continue
  57. }
  58. countDocuments, err := client.Database("qfw").Collection("bidding").CountDocuments(ctx, primitive.M{"_id": primitive.M{
  59. "$gte": sid,
  60. "$lte": tmp2["_id"],
  61. }})
  62. if err != nil {
  63. log.Println(52, err)
  64. timer.Reset(time.Minute)
  65. continue
  66. }
  67. if countDocuments <= 100 && !isfive{
  68. isfive = true
  69. log.Println("数据不够100条",sid,tmp2["_id"],countDocuments)
  70. timer.Reset(time.Minute*5)
  71. continue
  72. }
  73. eid = tmp2["_id"]
  74. tmpmap := map[string]string{
  75. "gtid": sid.Hex(),
  76. "lteid": eid.Hex(),
  77. }
  78. tmpbyte, _ := json.Marshal(tmpmap)
  79. log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ",udpclient.WriteUdp(tmpbyte, util.OP_TYPE_DATA, &net.UDPAddr{
  80. IP: net.ParseIP("10.171.112.160"),
  81. Port: 1109,
  82. }))
  83. //log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ")
  84. log.Println(sid, tmp2["_id"], countDocuments)
  85. timer.Reset(time.Minute)
  86. isfive = false
  87. }
  88. }
  89. }
  90. func processUdpMsg(b byte, bytes []byte, addr *net.UDPAddr) {
  91. switch b {
  92. case util.OP_NOOP:
  93. log.Println("节点接收成功", string(bytes), addr.String())
  94. }
  95. }