package main import ( "context" "encoding/json" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "log" "mfw/util" "net" "time" ) var sid, eid primitive.ObjectID var udpclient util.UdpClient //udp对象 func main() { udpclient = util.UdpClient{Local: "10.171.112.160:1199", BufSize: 1024}//10.171.112.160:1199 udpclient.Listen(processUdpMsg) log.Printf("Udp listening port: %s:%s\n", "10.171.112.160", "1199") client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080")) //192.168.3.207 10.30.94.175 if err != nil { log.Println(17, err) return } ctx := context.Background() err = client.Connect(ctx) if err != nil { log.Println(23, err) return } result := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1})) tmp := make(map[string]primitive.ObjectID) err = result.Decode(&tmp) if err != nil { log.Println(31, err) return } log.Println("start id",tmp) timer := time.NewTimer(time.Minute * 1) var isfive bool for { select { case <-timer.C: if sid.IsZero() { sid = tmp["_id"] } else { if !eid.IsZero() && eid != sid{ sid = eid }else { log.Println(sid,eid,"为空或者id一致") timer.Reset(time.Minute) continue } } result2 := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1})) tmp2 := make(map[string]primitive.ObjectID) err := result2.Decode(&tmp2) if err != nil { log.Println(44, err) timer.Reset(time.Minute) continue } countDocuments, err := client.Database("qfw").Collection("bidding").CountDocuments(ctx, primitive.M{"_id": primitive.M{ "$gte": sid, "$lte": tmp2["_id"], }}) if err != nil { log.Println(52, err) timer.Reset(time.Minute) continue } if countDocuments <= 100 && !isfive{ isfive = true log.Println("数据不够100条",sid,tmp2["_id"],countDocuments) timer.Reset(time.Minute*5) continue } eid = tmp2["_id"] tmpmap := map[string]string{ "gtid": sid.Hex(), "lteid": eid.Hex(), } tmpbyte, _ := json.Marshal(tmpmap) log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ",udpclient.WriteUdp(tmpbyte, util.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP("10.171.112.160"), Port: 1109, })) //log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ") log.Println(sid, tmp2["_id"], countDocuments) timer.Reset(time.Minute) isfive = false } } } func processUdpMsg(b byte, bytes []byte, addr *net.UDPAddr) { switch b { case util.OP_NOOP: log.Println("节点接收成功", string(bytes), addr.String()) } }