|
@@ -58,7 +58,7 @@ var (
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
- //return
|
|
|
+
|
|
|
flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
|
|
|
flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
|
|
|
flag.StringVar(>ept, "gtept", "", "全量gte发布时间")//全量区间pt
|
|
@@ -131,7 +131,6 @@ func init() {
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
-
|
|
|
go checkMapJob()
|
|
|
updport := Sysconfig["udpport"].(string)
|
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
@@ -250,27 +249,10 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg := &sync.WaitGroup{}
|
|
|
n, repeateN := 0, 0
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
- if n%10000 == 0 {
|
|
|
+ if n%100 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
|
|
|
}
|
|
|
- //source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
|
|
|
- //if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
- // repeateN++
|
|
|
- // Update.updatePool <- []map[string]interface{}{
|
|
|
- // map[string]interface{}{
|
|
|
- // "_id": tmp["_id"],
|
|
|
- // },
|
|
|
- // map[string]interface{}{
|
|
|
- // "$set": map[string]interface{}{
|
|
|
- // "repeat": 1,
|
|
|
- // "dataging":0,
|
|
|
- // "repeat_reason": "sourcewebsite为1,重复",
|
|
|
- // },
|
|
|
- // },
|
|
|
- // }
|
|
|
- // tmp = make(map[string]interface{})
|
|
|
- // continue
|
|
|
- //}
|
|
|
+
|
|
|
if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
repeateN++
|
|
|
tmp = make(map[string]interface{})
|
|
@@ -319,7 +301,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
"repeat": 1,
|
|
|
"repeat_reason": reason,
|
|
|
"repeat_id": source.id,
|
|
|
- "dataging": "0",
|
|
|
+ "dataging": 0,
|
|
|
},
|
|
|
},
|
|
|
}
|
|
@@ -333,6 +315,10 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
time.Sleep(30 * time.Second)
|
|
|
|
|
|
+ //更新Ocr的标记
|
|
|
+ updateOcrFileData(mapInfo["lteid"].(string))
|
|
|
+
|
|
|
+
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
if n >= repeateN && mapInfo["stop"] == nil {
|
|
|
log.Println("判重任务完成发送udp")
|
|
@@ -357,6 +343,47 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func updateOcrFileData(cur_lteid string) {
|
|
|
+ //更新ocr 分类表-判重的状态
|
|
|
+ log.Println("开始更新Ocr表-标记",cur_lteid)
|
|
|
+ task_sess := task_mgo.GetMgoConn()
|
|
|
+ defer task_mgo.DestoryMongoConn(task_sess)
|
|
|
+ q_task:=map[string]interface{}{}
|
|
|
+ it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
|
|
|
+ isUpdateOcr:=false
|
|
|
+ updateOcrFile:=[][]map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
|
|
|
+ cur_id := BsonTOStringId(tmp["_id"])
|
|
|
+ lteid:=util.ObjToString(tmp["lteid"])
|
|
|
+ if (lteid==cur_lteid) { //需要更新
|
|
|
+ log.Println("找到该lteid数据",cur_lteid,cur_id)
|
|
|
+ isUpdateOcr = true
|
|
|
+ updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "is_repeat_status": 1,
|
|
|
+ "is_repeat_time" : util.Int64All(time.Now().Unix()),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ break
|
|
|
+ }else {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !isUpdateOcr {
|
|
|
+ log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
|
|
|
+ }else {
|
|
|
+ if len(updateOcrFile) > 0 {
|
|
|
+ task_mgo.UpSertBulk(task_collName, updateOcrFile...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//历史判重
|
|
|
func historyTaskDay() {
|
|
|
defer util.Catch()
|
|
@@ -379,18 +406,32 @@ func historyTaskDay() {
|
|
|
//查询表最后一个id
|
|
|
task_sess := task_mgo.GetMgoConn()
|
|
|
defer task_mgo.DestoryMongoConn(task_sess)
|
|
|
- q:=map[string]interface{}{
|
|
|
- "isused":true,
|
|
|
- }
|
|
|
+ q:=map[string]interface{}{}
|
|
|
between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
|
|
|
it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
|
|
|
+
|
|
|
+ isRepeatStatus:=false
|
|
|
for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
|
|
|
- lteid = util.ObjToString(tmp["gtid"])
|
|
|
- log.Println("查询的最后一个任务Id:",lteid)
|
|
|
- break
|
|
|
+ is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
|
|
|
+ if is_repeat_status == 1 {
|
|
|
+ lteid = util.ObjToString(tmp["lteid"])
|
|
|
+ log.Println("查询的最后一个已标记的任务lteid:",lteid)
|
|
|
+ isRepeatStatus = true
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ break
|
|
|
+ }else {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
|
|
|
+ if !isRepeatStatus {
|
|
|
+ log.Println("查询不到有标记的lteid数据")
|
|
|
+ log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
|
|
|
+ time.Sleep(5 * time.Minute)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
|
|
|
sess := mgo.GetMgoConn()//连接器
|
|
@@ -411,26 +452,6 @@ func historyTaskDay() {
|
|
|
if num%10000 == 0 {
|
|
|
log.Println("正序遍历:", num)
|
|
|
}
|
|
|
- //source := util.ObjToMap(tmp["jsondata"])
|
|
|
- //if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
- // outnum++
|
|
|
- // Update.updatePool <- []map[string]interface{}{//重复数据打标签
|
|
|
- // map[string]interface{}{
|
|
|
- // "_id": tmp["_id"],
|
|
|
- // },
|
|
|
- // map[string]interface{}{
|
|
|
- // "$set": map[string]interface{}{
|
|
|
- // "repeat": 1,
|
|
|
- // "dataging": 0,
|
|
|
- // "history_updatetime":util.Int64All(time.Now().Unix()),
|
|
|
- // "repeat_reason": "sourcewebsite为1 重复",
|
|
|
- // },
|
|
|
- // },
|
|
|
- // }
|
|
|
- // tmp = make(map[string]interface{})
|
|
|
- // continue
|
|
|
- //}
|
|
|
-
|
|
|
//取-符合-发布时间X年内的数据
|
|
|
if util.IntAll(tmp["dataging"]) == 1 {
|
|
|
pubtime := util.Int64All(tmp["publishtime"])
|
|
@@ -641,7 +662,7 @@ func judgeIsCurIds (gtid string,lteid string,curid string) bool {
|
|
|
gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
|
|
|
lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
|
|
|
cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
|
|
|
- if cur_time>gt_time&&cur_time<=lte_time {
|
|
|
+ if cur_time>=gt_time&&cur_time<=lte_time {
|
|
|
return true
|
|
|
}
|
|
|
return false
|
|
@@ -732,5 +753,7 @@ func moveOnceTimeOut() {
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|