|
@@ -53,7 +53,6 @@ var (
|
|
|
updatelock sync.Mutex //锁4
|
|
|
userName,passWord string //mongo -用户密码
|
|
|
taskList []map[string]interface{} //任务池
|
|
|
- isRunningRepeat bool
|
|
|
)
|
|
|
|
|
|
//udp通道
|
|
@@ -213,7 +212,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
//插入任务-判断任务-是否存在
|
|
|
updatelock.Lock()
|
|
|
taskList = append(taskList,mapInfo)
|
|
|
- log.Println("udp收到任务...数量:",len(taskList))
|
|
|
+ log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
|
|
|
updatelock.Unlock()
|
|
|
}
|
|
|
case mu.OP_NOOP: //下个节点回应
|
|
@@ -283,16 +282,16 @@ func getRepeatTask() {
|
|
|
for {
|
|
|
if len(taskList)>0 {
|
|
|
updatelock.Lock()
|
|
|
- log.Println("准备执行判重任务...")
|
|
|
+ //log.Println("准备执行判重任务...")
|
|
|
mapInfo := taskList[0]
|
|
|
if mapInfo != nil {
|
|
|
taskRepeat(mapInfo) //判重方法
|
|
|
}
|
|
|
taskList = taskList[1:]
|
|
|
- log.Println("当前任务池...",len(taskList),taskList)
|
|
|
+ log.Println("此段落结束当前任务池...",len(taskList),taskList)
|
|
|
updatelock.Unlock()
|
|
|
}else {
|
|
|
- log.Println("无任务...睡眠15s")
|
|
|
+ //log.Println("无任务...睡眠15s")
|
|
|
time.Sleep(15 * time.Second)
|
|
|
}
|
|
|
}
|
|
@@ -324,7 +323,10 @@ func taskRepeat(mapInfo map[string]interface{}) {
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
+ //临时赋值
|
|
|
log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
|
|
|
+
|
|
|
+
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
@@ -367,16 +369,16 @@ func taskRepeat(mapInfo map[string]interface{}) {
|
|
|
//替换数据池-更新
|
|
|
DM.replacePoolData(source)
|
|
|
|
|
|
- Update.updatePool <- []map[string]interface{}{//原始数据打标签
|
|
|
- map[string]interface{}{
|
|
|
- "_id": StringTOBsonId(source.id),
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat_ids": repeat_ids,
|
|
|
- },
|
|
|
- },
|
|
|
- }
|
|
|
+ //Update.updatePool <- []map[string]interface{}{//原始数据打标签
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": StringTOBsonId(source.id),
|
|
|
+ // },
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "$set": map[string]interface{}{
|
|
|
+ // "repeat_ids": repeat_ids,
|
|
|
+ // },
|
|
|
+ // },
|
|
|
+ //}
|
|
|
Update.updatePool <- []map[string]interface{}{//重复数据打标签
|
|
|
updateID,
|
|
|
map[string]interface{}{
|
|
@@ -400,27 +402,29 @@ func taskRepeat(mapInfo map[string]interface{}) {
|
|
|
//睡眠时间30s 目的是让数据池更新所有数据...
|
|
|
time.Sleep(15 * time.Second)
|
|
|
//更新Ocr的标记
|
|
|
- updateOcrFileData(mapInfo["lteid"].(string))
|
|
|
- //任务完成,开始发送广播通知下面节点
|
|
|
- if n >= repeateN && mapInfo["stop"] == nil {
|
|
|
- log.Println("判重任务完成发送udp")
|
|
|
- for _, to := range nextNode {
|
|
|
- sid, _ := mapInfo["gtid"].(string)
|
|
|
- eid, _ := mapInfo["lteid"].(string)
|
|
|
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
- "stype": util.ObjToString(to["stype"]),
|
|
|
- "key": key,
|
|
|
- })
|
|
|
- addr := &net.UDPAddr{
|
|
|
- IP: net.ParseIP(to["addr"].(string)),
|
|
|
- Port: util.IntAll(to["port"]),
|
|
|
+ if !IsFull {
|
|
|
+ updateOcrFileData(mapInfo["lteid"].(string))
|
|
|
+ //任务完成,开始发送广播通知下面节点
|
|
|
+ if n >= repeateN && mapInfo["stop"] == nil {
|
|
|
+ log.Println("判重任务完成发送udp")
|
|
|
+ for _, to := range nextNode {
|
|
|
+ sid, _ := mapInfo["gtid"].(string)
|
|
|
+ eid, _ := mapInfo["lteid"].(string)
|
|
|
+ key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": sid,
|
|
|
+ "lteid": eid,
|
|
|
+ "stype": util.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: util.IntAll(to["port"]),
|
|
|
+ }
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
}
|
|
|
- node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
- udptaskmap.Store(key, node)
|
|
|
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
}
|
|
|
}
|
|
|
|