|
@@ -182,15 +182,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
//开始判重程序
|
|
|
func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("开始数据判重")
|
|
@@ -366,7 +357,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
time.Sleep(60 * time.Second)
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
- if n > repeateN && mapInfo["stop"] == nil {
|
|
|
+ if n >= repeateN && mapInfo["stop"] == nil {
|
|
|
log.Println("判重任务完成发送udp")
|
|
|
for _, to := range nextNode {
|
|
|
sid, _ := mapInfo["gtid"].(string)
|
|
@@ -637,7 +628,7 @@ func historyTaskDay() {
|
|
|
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
|
|
|
- if n > repeateN {
|
|
|
+ if n >= repeateN && gtid!=lteid{
|
|
|
for _, to := range nextNode {
|
|
|
next_sid := util.BsonIdToSId(gtid)
|
|
|
next_eid := util.BsonIdToSId(lteid)
|
|
@@ -748,11 +739,11 @@ func moveOnceTimeOut() {
|
|
|
}
|
|
|
log.Println("save and delete", " ok index", index)
|
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-}
|
|
|
|
|
|
|
|
|
|