123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package main
- import (
- "context"
- "encoding/json"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "log"
- "mfw/util"
- "net"
- "time"
- )
- var sid, eid primitive.ObjectID
- var udpclient util.UdpClient //udp对象
- func main() {
- udpclient = util.UdpClient{Local: "10.171.112.160:1199", BufSize: 1024}//10.171.112.160:1199
- udpclient.Listen(processUdpMsg)
- log.Printf("Udp listening port: %s:%s\n", "10.171.112.160", "1199")
- client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080")) //192.168.3.207 10.30.94.175
- if err != nil {
- log.Println(17, err)
- return
- }
- ctx := context.Background()
- err = client.Connect(ctx)
- if err != nil {
- log.Println(23, err)
- return
- }
- result := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
- tmp := make(map[string]primitive.ObjectID)
- err = result.Decode(&tmp)
- if err != nil {
- log.Println(31, err)
- return
- }
- log.Println("start id",tmp)
- timer := time.NewTimer(time.Minute * 1)
- var isfive bool
- for {
- select {
- case <-timer.C:
- if sid.IsZero() {
- sid = tmp["_id"]
- } else {
- if !eid.IsZero() && eid != sid{
- sid = eid
- }else {
- log.Println(sid,eid,"为空或者id一致")
- timer.Reset(time.Minute)
- continue
- }
- }
- result2 := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
- tmp2 := make(map[string]primitive.ObjectID)
- err := result2.Decode(&tmp2)
- if err != nil {
- log.Println(44, err)
- timer.Reset(time.Minute)
- continue
- }
- countDocuments, err := client.Database("qfw").Collection("bidding").CountDocuments(ctx, primitive.M{"_id": primitive.M{
- "$gte": sid,
- "$lte": tmp2["_id"],
- }})
- if err != nil {
- log.Println(52, err)
- timer.Reset(time.Minute)
- continue
- }
- if countDocuments <= 100 && !isfive{
- isfive = true
- log.Println("数据不够100条",sid,tmp2["_id"],countDocuments)
- timer.Reset(time.Minute*5)
- continue
- }
- eid = tmp2["_id"]
- tmpmap := map[string]string{
- "gtid": sid.Hex(),
- "lteid": eid.Hex(),
- }
- tmpbyte, _ := json.Marshal(tmpmap)
- log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ",udpclient.WriteUdp(tmpbyte, util.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP("10.171.112.160"),
- Port: 1109,
- }))
- //log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ")
- log.Println(sid, tmp2["_id"], countDocuments)
- timer.Reset(time.Minute)
- isfive = false
- }
- }
- }
- func processUdpMsg(b byte, bytes []byte, addr *net.UDPAddr) {
- switch b {
- case util.OP_NOOP:
- log.Println("节点接收成功", string(bytes), addr.String())
- }
- }
|