apple vor 5 Jahren
Ursprung
Commit
929535b596
2 geänderte Dateien mit 47 neuen und 2 gelöschten Zeilen
  1. 23 1
      udp_winner/timedTaskAgency.go
  2. 24 1
      udp_winner/timedTaskBuyer.go

+ 23 - 1
udp_winner/timedTaskAgency.go

@@ -6,6 +6,8 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	util2 "mfw/util"
+	"net"
 	"qfw/util"
 	"sort"
 	"strings"
@@ -58,8 +60,11 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		//key:企业名  value:json结构体{"agency": 1, "agencytel": 1, "agencyperson": 1,"topscopeclass": 1, "agencyaddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
 		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
 			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			agency, ok := tmp["agency"].(string)
 			if !ok || utf8.RuneCountInString(agency) < 4 {
 				continue
@@ -67,7 +72,6 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(agency).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
@@ -85,6 +89,24 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			}
 		}
 		log.Println("存量 agency mongo遍历完成:",num)
+
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "agency",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
+
 		SourceClient.DestoryMongoConn(SourceClientcc)
 
 		//遍历redis

+ 24 - 1
udp_winner/timedTaskBuyer.go

@@ -6,6 +6,8 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	util2 "mfw/util"
+	"net"
 	"qfw/util"
 	"sort"
 	"strings"
@@ -58,8 +60,11 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		//key:企业名  value:json结构体{"buyer": 1, "buyertel": 1, "buyerperson": 1,"topscopeclass": 1, "buyeraddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
 		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
 			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			buyer, ok := tmp["buyer"].(string)
 			if !ok || utf8.RuneCountInString(buyer) < 4 {
 				continue
@@ -67,7 +72,6 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(buyer).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
@@ -85,6 +89,25 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			}
 		}
 		log.Println("存量buyer mongo遍历完成:",num)
+
+
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "buyer",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
+
 		SourceClient.DestoryMongoConn(SourceClientcc)
 
 		//遍历redis