|
@@ -3,11 +3,12 @@ package main
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
"log"
|
|
|
+ mu "mfw/util"
|
|
|
+ "net"
|
|
|
"qfw/util"
|
|
|
"regexp"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
- //"strings"
|
|
|
"time"
|
|
|
|
|
|
"github.com/robfig/cron"
|
|
@@ -203,7 +204,6 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
|
|
|
if bcon {
|
|
|
//生成查询语句执行
|
|
|
p.enter(db, coll, map[string]interface{}{})
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -225,11 +225,13 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
|
|
|
}
|
|
|
//开始id和结束id
|
|
|
q, _ := udpInfo["query"].(map[string]interface{})
|
|
|
+ gtid := udpInfo["gtid"].(string)
|
|
|
+ lteid := udpInfo["lteid"].(string)
|
|
|
if q == nil {
|
|
|
q = map[string]interface{}{
|
|
|
"_id": map[string]interface{}{
|
|
|
- "$gt": util.StringTOBsonId(udpInfo["gtid"].(string)),
|
|
|
- "$lte": util.StringTOBsonId(udpInfo["lteid"].(string)),
|
|
|
+ "$gt": util.StringTOBsonId(gtid), //util.StringTOBsonId(udpInfo["gtid"].(string)),
|
|
|
+ "$lte": util.StringTOBsonId(lteid), //util.StringTOBsonId(udpInfo["lteid"].(string)),
|
|
|
},
|
|
|
}
|
|
|
}
|
|
@@ -237,7 +239,24 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
|
|
|
//生成查询语句执行
|
|
|
p.enter(db, coll, q)
|
|
|
}
|
|
|
+ nextNode(gtid, lteid, "project")
|
|
|
+}
|
|
|
|
|
|
+//通知下个节点nextNode
|
|
|
+func nextNode(gtid, lteid, stype string) {
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": gtid,
|
|
|
+ "lteid": lteid,
|
|
|
+ "stype": stype,
|
|
|
+ })
|
|
|
+ for _, v := range NextNode {
|
|
|
+ if node, ok := v.(map[string]interface{}); ok {
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(node["addr"].(string)),
|
|
|
+ Port: util.IntAll(node["port"]),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
|