فهرست منبع

定时查询数据,发送

fengweiqiang 5 سال پیش
والد
کامیت
ca4ff4da5c
1فایلهای تغییر یافته به همراه101 افزوده شده و 0 حذف شده
  1. 101 0
      udp_ocr_conter/crondata/main6.go

+ 101 - 0
udp_ocr_conter/crondata/main6.go

@@ -0,0 +1,101 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"log"
+	"mfw/util"
+	"net"
+	"time"
+)
+
+var sid, eid primitive.ObjectID
+var udpclient util.UdpClient //udp对象
+func main() {
+	udpclient = util.UdpClient{Local: "10.171.112.160:1199", BufSize: 1024}//10.171.112.160:1199
+	udpclient.Listen(processUdpMsg)
+	log.Printf("Udp listening port: %s:%s\n", "10.171.112.160", "1199")
+	client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080")) //192.168.3.207  10.30.94.175
+	if err != nil {
+		log.Println(17, err)
+		return
+	}
+	ctx := context.Background()
+	err = client.Connect(ctx)
+	if err != nil {
+		log.Println(23, err)
+		return
+	}
+
+	result := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
+	tmp := make(map[string]primitive.ObjectID)
+	err = result.Decode(&tmp)
+	if err != nil {
+		log.Println(31, err)
+		return
+	}
+	log.Println("start id",tmp)
+	timer := time.NewTimer(time.Minute * 1)
+	var isfive bool
+	for  {
+		select {
+		case <-timer.C:
+			if sid.IsZero() {
+				sid = tmp["_id"]
+			} else {
+				if !eid.IsZero(){
+					sid = eid
+				}
+			}
+			result2 := client.Database("qfw").Collection("bidding").FindOne(ctx, primitive.M{}, options.FindOne().SetSort(primitive.M{"_id": -1}).SetProjection(primitive.M{"_id": 1}))
+			tmp2 := make(map[string]primitive.ObjectID)
+			err := result2.Decode(&tmp2)
+			if err != nil {
+				log.Println(44, err)
+				timer.Reset(time.Minute)
+				continue
+			}
+
+			countDocuments, err := client.Database("qfw").Collection("bidding").CountDocuments(ctx, primitive.M{"_id": primitive.M{
+				"$gte": sid,
+				"$lte": tmp2["_id"],
+			}})
+			if err != nil {
+				log.Println(52, err)
+				timer.Reset(time.Minute)
+				continue
+			}
+			if countDocuments <= 100 && !isfive{
+				isfive = true
+				log.Println("数据不够100条",sid,tmp2["_id"],countDocuments)
+				timer.Reset(time.Minute*5)
+				continue
+			}
+			eid = tmp2["_id"]
+			tmpmap := map[string]string{
+				"gtid":  sid.Hex(),
+				"lteid": eid.Hex(),
+			}
+			tmpbyte, _ := json.Marshal(tmpmap)
+			log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ",udpclient.WriteUdp(tmpbyte, util.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("10.171.112.160"),
+				Port: 1109,
+			}))
+			//log.Println("发送",string(tmpbyte),"udp到10.171.112.160:1109 ")
+
+			log.Println(sid, tmp2["_id"], countDocuments)
+			timer.Reset(time.Minute)
+			isfive = false
+		}
+	}
+}
+
+func processUdpMsg(b byte, bytes []byte, addr *net.UDPAddr) {
+	switch b {
+	case util.OP_NOOP:
+		log.Println("节点接收成功", string(bytes), addr.String())
+	}
+}