Przeglądaj źródła

迭代器中断

fengweiqiang 5 lat temu
rodzic
commit
33b6df4466
2 zmienionych plików z 25 dodań i 3 usunięć
  1. 4 2
      udp_winner/main.go
  2. 21 1
      udp_winner/timedTaskWinner.go

+ 4 - 2
udp_winner/main.go

@@ -72,16 +72,18 @@ func init() {
 	initRdis()
 	initMongo()
 	initReg()
+	Updport, err = strconv.Atoi(Config["port"])
+	if err != nil{
+		log.Fatalln(err)
+	}
 }
 
 func main() {
 	//udp
 	updport := Config["udpport"]
-	Updport, _ = strconv.Atoi(Config["port"])
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
-	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
 	go TimedTaskBuyer()  //定时任务
 	go TimedTaskAgency() //定时任务

+ 21 - 1
udp_winner/timedTaskWinner.go

@@ -6,6 +6,8 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	util2 "mfw/util"
+	"net"
 	"qfw/util"
 	"sort"
 	"strings"
@@ -58,8 +60,11 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		//key:企业名  value:json结构体{"winner": 1, "winnertel": 1, "winnerperson": 1,"topscopeclass": 1, "winneraddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
 		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
 			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			winner, ok := tmp["winner"].(string)
 			if !ok || utf8.RuneCountInString(winner) < 4 {
 				continue
@@ -67,7 +72,6 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(winner).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
@@ -132,6 +136,22 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			}
 		}
 		log.Println("存量 winner mongo遍历完成:", num)
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		//遍历redis
 		if scan := conn.Scan(0, "", 100); scan.Err() != nil {