Bladeren bron

调整判重

zhengkun 3 jaren geleden
bovenliggende
commit
ca060f9e95

+ 0 - 1
udpfilterdup/go_build_main_go

@@ -1 +0,0 @@
-ELF

+ 1 - 1
udpfilterdup/src/config.json

@@ -13,7 +13,7 @@
         }
     },
     "task_mongodb": {
-        "task_addrName": "192.168.3.207:27092",
+        "task_addrName": "127.0.0.1:27017",
         "task_dbName": "zhengkun",
         "task_collName": "repeat_test",
         "pool": 10

+ 0 - 421
udpfilterdup/src/dataMethodMerge.go

@@ -1,421 +0,0 @@
-package main
-
-import "qfw/util"
-
-
-func mergeDataFields(source *Info, info *Info) (*Info,map[string]interface{} ,bool) {
-	update_map := map[string]interface{}{
-		"$set": map[string]interface{}{},
-	}
-	mergeMap :=source.mergemap
-	isReplace:=false
-	//项目名称
-	if source.projectname == "" && info.projectname != "" {
-		mergeMap["projectname"] = map[string]interface{}{
-			"projectname":info.projectname,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["projectname"] = info.projectname
-		source.projectname = info.projectname
-		isReplace = true
-	}
-
-	//项目编号
-	if source.projectcode == "" && info.projectcode != "" {
-		mergeMap["projectcode"] = map[string]interface{}{
-			"projectcode":info.projectcode,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["projectcode"] = info.projectcode
-		source.projectcode = info.projectcode
-		isReplace = true
-	}
-
-	//采购单位
-	if source.buyer == "" && info.buyer != "" {
-		mergeMap["buyer"] = map[string]interface{}{
-			"buyer":info.buyer,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["buyer"] = info.buyer
-		source.buyer = info.buyer
-		isReplace = true
-	}
-
-	//预算
-	if source.budget == 0 && info.budget != 0 {
-		mergeMap["budget"] = map[string]interface{}{
-			"budget":info.budget,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["budget"] = info.budget
-		source.budget = info.budget
-		isReplace = true
-	}
-
-	//中标单位
-	if source.winner == "" && info.winner != "" {
-		mergeMap["winner"] = map[string]interface{}{
-			"winner":info.winner,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["winner"] = info.winner
-		source.winner = info.winner
-		isReplace = true
-	}
-
-	//中标金额
-	if source.bidamount == 0 && info.bidamount != 0 {
-		mergeMap["bidamount"] = map[string]interface{}{
-			"bidamount":info.bidamount,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["bidamount"] = info.bidamount
-		source.bidamount = info.bidamount
-		isReplace = true
-	}
-
-	//开标时间
-	if source.bidopentime == 0 && info.bidopentime != 0 {
-		mergeMap["bidopentime"] = map[string]interface{}{
-			"bidopentime":info.bidopentime,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["bidopentime"] = info.bidopentime
-		source.bidopentime = info.bidopentime
-		isReplace = true
-	}
-
-	//合同编号
-	if source.contractnumber == "" && info.contractnumber != "" {
-		mergeMap["contractnumber"] = map[string]interface{}{
-			"contractnumber":info.contractnumber,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["contractnumber"] = info.contractnumber
-		source.contractnumber = info.contractnumber
-		isReplace = true
-	}
-
-	//代理机构
-	if source.agency == "" && info.agency != "" {
-		mergeMap["agency"] = map[string]interface{}{
-			"agency":info.agency,
-			"id":info.id,
-		}
-		update_map["$set"].(map[string]interface{})["agency"] = info.agency
-		source.agency = info.agency
-		isReplace = true
-	}
-
-	source.mergemap = mergeMap
-	update_map["$set"].(map[string]interface{})["merge"] = mergeMap
-
-	return source,update_map,isReplace
-}
-
-
-
-
-
-
-
-//合并字段-并更新merge字段的值-
-func mergeDataFieldsArr(source *Info, info *Info) (*Info, []int64, bool) {
-
-	merge_recordMap := make(map[string]interface{}, 0)
-	mergeArr := make([]int64, 0)
-	//是否替换数据了-记录原始的数据
-	is_replace := false
-	//1、城市
-	if source.area == "" || source.area == "全国" {
-		//为空
-		if info.area != "全国" && info.area != "" {
-			merge_recordMap["area"] = source.area
-			merge_recordMap["city"] = source.city
-			source.area = info.area
-			source.city = info.city
-			mergeArr = append(mergeArr, 1)
-			is_replace = true
-		}
-	} else {
-		//不为空-查看站点相关-有值必替换
-		if source.is_site {
-			//是站点替换的城市
-			merge_recordMap["site_area"] = source.area
-			merge_recordMap["site_city"] = source.city
-			mergeArr = append(mergeArr, 0)
-			is_replace = true
-			source.is_site = false
-
-		}
-	}
-	//2、项目名称
-	if source.projectname == "" && info.projectname != "" {
-		merge_recordMap["projectname"] = source.projectname
-		source.projectname = info.projectname
-		mergeArr = append(mergeArr, 2)
-		is_replace = true
-	}
-	//3、项目编号
-	if source.projectcode == "" && info.projectcode != "" {
-		merge_recordMap["projectcode"] = source.projectcode
-		source.projectcode = info.projectcode
-		mergeArr = append(mergeArr, 3)
-		is_replace = true
-	}
-	//4、采购单位
-	if source.buyer == "" && info.buyer != "" {
-		merge_recordMap["buyer"] = source.buyer
-		source.buyer = info.buyer
-		mergeArr = append(mergeArr, 4)
-		is_replace = true
-	}
-	//5、预算
-	if source.budget == 0 && info.budget != 0 {
-		merge_recordMap["budget"] = source.budget
-		source.budget = info.budget
-		mergeArr = append(mergeArr, 5)
-		is_replace = true
-	}
-	//6、中标单位
-	if source.winner == "" && info.winner != "" {
-		merge_recordMap["winner"] = source.winner
-		source.winner = info.winner
-		mergeArr = append(mergeArr, 6)
-		is_replace = true
-	}
-
-	//7、中标金额
-	if source.bidamount == 0 && info.bidamount != 0 {
-		merge_recordMap["bidamount"] = source.bidamount
-		source.bidamount = info.bidamount
-		mergeArr = append(mergeArr, 7)
-		is_replace = true
-	}
-	//8、开标时间-地点
-	if source.bidopentime == 0 && info.bidopentime != 0 {
-		merge_recordMap["bidopentime"] = source.bidopentime
-		source.bidopentime = info.bidopentime
-		mergeArr = append(mergeArr, 8)
-		is_replace = true
-	}
-
-	//9、合同编号
-	if source.contractnumber == "" && info.contractnumber != "" {
-		merge_recordMap["contractnumber"] = source.contractnumber
-		source.contractnumber = info.contractnumber
-		mergeArr = append(mergeArr, 9)
-		is_replace = true
-	}
-
-	//10、发布时间
-	if source.publishtime == 0 && info.publishtime != 0 {
-		merge_recordMap["publishtime"] = source.publishtime
-		source.publishtime = info.publishtime
-		mergeArr = append(mergeArr, 10)
-		is_replace = true
-	}
-	//11、代理机构
-	if source.agency == "" && info.agency != "" {
-		merge_recordMap["agency"] = source.agency
-		source.agency = info.agency
-		mergeArr = append(mergeArr, 11)
-		is_replace = true
-	}
-
-	if is_replace { //有过替换更新
-		//总次数+1
-		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
-		merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
-		//和哪一个数据id进行非空替换的-记录
-		key := info.id
-		source.mergemap[key] = merge_recordMap
-	}
-
-	//待进一步优化
-	return source, mergeArr, is_replace
-}
-
-//权重评估
-func basicDataScore(v *Info, info *Info) bool {
-
-	/*
-	  权重评估
-	  网站优先级判定规则:
-	  1、国家>省级>市级>县区
-	  2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台
-	  3、同sitetype-分析weight
-	  4、要素打分-分析
-	*/
-	v_score, info_score := -1, -1
-	dict_v := SiteMap[v.site]
-	dict_info := SiteMap[info.site]
-	//先判断level
-	if dict_v != nil {
-		v_level := util.ObjToString(dict_v["level"])
-		if v_level == "国家" {
-			v_score = 4
-		} else if v_level == "省级" {
-			v_score = 3
-		} else if v_level == "市级" {
-			v_score = 2
-		} else if v_level == "县区" {
-			v_score = 1
-		} else if v_level == "" {
-		} else {
-			v_score = 0
-		}
-	}
-
-	if dict_info != nil {
-		info_level := util.ObjToString(dict_info["level"])
-		if info_level == "国家" {
-			info_score = 4
-		} else if info_level == "省级" {
-			info_score = 3
-		} else if info_level == "市级" {
-			info_score = 2
-		} else if info_level == "县区" {
-			info_score = 1
-		} else if info_level == "" {
-
-		} else {
-			v_score = 0
-		}
-	}
-
-	if v_score > info_score {
-		return true
-	}
-	if v_score < info_score {
-		return false
-	}
-
-	//判断sitetype
-	if dict_v != nil {
-		v_sitetype := util.ObjToString(dict_v["sitetype"])
-		if v_sitetype == "政府采购" {
-			v_score = 4
-		} else if v_sitetype == "公共资源" {
-			v_score = 3
-		} else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
-			v_score = 2
-		} else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
-			v_score = 1
-		} else if v_sitetype == "" {
-		} else {
-			v_score = 0
-		}
-	}
-
-	if dict_info != nil {
-		info_sitetype := util.ObjToString(dict_info["sitetype"])
-		if info_sitetype == "政府采购" {
-			info_score = 4
-		} else if info_sitetype == "公共资源" {
-			info_score = 3
-		} else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
-			info_score = 2
-		} else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
-			info_score = 1
-		} else if info_sitetype == "" {
-		} else {
-			info_score = 0
-		}
-	}
-
-	if v_score > info_score {
-		return true
-	}
-	if v_score < info_score {
-		return false
-	}
-
-	if v_score == info_score {//同sitetype 情况下   分析weight
-		v_weight := util.IntAll(dict_v["weight"])
-		info_weight := util.IntAll(dict_info["weight"])
-		if v_weight>info_weight {
-			return true
-		}
-		if info_weight>v_weight {
-			return false
-		}
-	}
-
-	//网站评估
-	m, n := 0, 0
-	if v.projectname != "" {
-		m++
-	}
-	if v.buyer != "" {
-		m++
-	}
-	if v.projectcode != "" || v.contractnumber != "" {
-		m++
-	}
-	if v.budget != 0 {
-		m++
-	}
-	if v.bidamount != 0 {
-		m++
-	}
-	if v.winner != "" {
-		m++
-	}
-	if v.bidopentime != 0 {
-		m++
-	}
-	if v.bidopenaddress != "" {
-		m++
-	}
-	if v.agency != "" {
-		m = m + 2
-	}
-	if v.city != "" {
-		m = m + 2
-	}
-
-	if info.projectname != "" {
-		n++
-	}
-	if info.buyer != "" {
-		n++
-	}
-	if info.projectcode != "" || info.contractnumber != "" {
-		n++
-	}
-	if info.budget != 0 {
-		n++
-	}
-	if info.bidamount != 0 {
-		n++
-	}
-	if info.winner != "" {
-		n++
-	}
-	if info.bidopentime != 0 {
-		n++
-	}
-	if info.bidopenaddress != "" {
-		n++
-	}
-	if info.agency != "" {
-		n = n + 2
-	}
-	if info.city != "" {
-		n = n + 2
-	}
-
-	if m > n {
-		return true
-	} else if m == n {
-		if v.publishtime >= info.publishtime {
-			return true
-		} else {
-			return false
-		}
-	} else {
-		return false
-	}
-}

+ 0 - 3
udpfilterdup/src/datamap.go

@@ -264,10 +264,7 @@ func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
 		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
-
 	}
-
-
 	d.lock.Unlock()
 L:
 	for _, k := range keys {

+ 374 - 0
udpfilterdup/src/historyRepeat.go

@@ -0,0 +1,374 @@
+package main
+
+import (
+	"encoding/json"
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"qfw/util"
+	"strconv"
+	"sync"
+	"time"
+)
+
+//历史判重
+func historyTaskDay() {
+	defer util.Catch()
+
+	for {
+		start:=time.Now().Unix()
+
+		if gtid=="" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
+		}
+		if lteid!="" {
+			//先进行数据迁移
+			log.Println("开启一次迁移任务",gtid,lteid)
+			moveHistoryData(gtid,lteid)
+			gtid = lteid //替换数据
+		}
+
+		//查询表最后一个id
+		task_sess := task_mgo.GetMgoConn()
+		defer task_mgo.DestoryMongoConn(task_sess)
+		q:=map[string]interface{}{}
+		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+
+		isRepeatStatus:=false
+		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+			is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
+			if is_repeat_status == 1 {
+				lteid = util.ObjToString(tmp["lteid"])
+				log.Println("查询的最后一个已标记的任务lteid:",lteid)
+				isRepeatStatus = true
+				tmp = make(map[string]interface{})
+				break
+			}else  {
+				tmp = make(map[string]interface{})
+			}
+		}
+
+		if !isRepeatStatus {
+			log.Println("查询不到有标记的lteid数据")
+			log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
+			time.Sleep(5 * time.Minute)
+			continue
+		}
+
+		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
+		time.Sleep(5 * time.Minute)
+
+		sess := mgo.GetMgoConn()//连接器
+		defer mgo.DestoryMongoConn(sess)
+		//开始判重
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+		log.Println("历史判重查询条件:",q,"时间:", between_time)
+		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
+			}
+			//取-符合-发布时间X年内的数据
+			if util.IntAll(tmp["dataging"]) == 1 {
+				pubtime := util.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time {
+					oknum++
+					if deterTime==0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						if pubtime-deterTime >timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr,dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = util.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr,tmp)
+						}else {
+							dayArr = append(dayArr,tmp)
+						}
+					}
+				}else {
+					outnum++
+					//不在两年内的也清标记
+					Update.updatePool <- []map[string]interface{}{//重复数据打标签
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,
+								"history_updatetime":util.Int64All(time.Now().Unix()),
+							},
+						},
+					}
+				}
+			}
+			tmp = make(map[string]interface{})
+		}
+
+		if len(dayArr)>0 {
+			pendAllArr = append(pendAllArr,dayArr)
+			dayArr = []map[string]interface{}{}
+		}
+
+		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
+
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
+
+		//测试分组数量是否正确
+		testNum:=0
+		for k,v:=range pendAllArr {
+			log.Println("第",k,"组--","数量:",len(v))
+			testNum = testNum+len(v)
+		}
+		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+		n, repeateN := 0, 0
+		log.Println("线程数:",threadNum)
+		pool := make(chan bool, threadNum)
+		wg := &sync.WaitGroup{}
+		for k,v:=range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int, v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//相关ids 跨表
+				groupOtherExtract := [][]map[string]interface{}{}
+
+				//构建当前组的数据池
+				log.Println("构建第", k, "组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
+					info := NewInfo(tmp)
+					b, source, reason := curTM.check(info)
+					if b { //有重复,生成更新语句,更新抽取和更新招标
+						repeateN++
+						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+
+						updatelock.Lock()
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						//更新数据源
+						//判断是否在当前段落
+						if judgeIsCurIds(gtid,lteid,source.id) {
+							Update.updatePool <- []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							}
+						}else {
+							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":        1,
+									"repeat_reason": reason,
+									"repeat_id":     source.id,
+									"dataging":      0,
+									"history_updatetime":util.Int64All(time.Now().Unix()),
+								},
+							},
+						}
+						if len(groupOtherExtract) >= 500 {
+							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							groupOtherExtract = [][]map[string]interface{}{}
+						}
+
+						updatelock.Unlock()
+
+
+					} else {
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"dataging": 0, //符合条件的都为dataging==0
+									"history_updatetime":util.Int64All(time.Now().Unix()),
+								},
+							},
+						}
+					}
+				}
+				//每组数据结束-更新数据
+				updatelock.Lock()
+				if len(groupOtherExtract) > 0 {
+					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+				}
+				updatelock.Unlock()
+
+			}(k, v)
+
+		}
+
+		wg.Wait()
+
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+
+		time.Sleep(30 * time.Second)
+		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+		if n >= repeateN && gtid!=lteid{
+			for _, to := range nextNode {
+				next_sid := util.BsonIdToSId(gtid)
+				next_eid := util.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+
+		end:=time.Now().Unix()
+
+		log.Println(gtid,lteid)
+
+		if end-start<60*5 {
+			log.Println("睡眠.............")
+			time.Sleep(5 * time.Minute)
+		}
+		log.Println("继续下一段的历史判重")
+	}
+}
+//判断是否在当前id段落
+func judgeIsCurIds (gtid string,lteid string,curid string) bool {
+
+	gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
+	lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
+	cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
+	if cur_time>=gt_time&&cur_time<=lte_time {
+		return true
+	}
+	return false
+}
+//迁移上一段数据
+func moveHistoryData(startid string,endid string) {
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	year, month, day := time.Now().Date()
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": StringTOBsonId(startid),
+			"$lte": StringTOBsonId(endid),
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		mgo.Save(extract_back, tmp)
+		tmp = map[string]interface{}{}
+		if index%1000 == 0 {
+			log.Println("index", index)
+		}
+	}
+	log.Println("save to", extract_back, " ok index", index)
+
+	qv := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
+		},
+	}
+	delnum := mgo.Delete(extract, qv)
+	log.Println("remove from ", extract, delnum)
+
+}
+
+
+
+
+
+
+
+
+//暂时弃用
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
+}
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
+	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$lt": StringTOBsonId(task_id),
+		},
+	}
+
+	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		mgo.Save("result_20200713", tmp)
+		mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+
+}

+ 169 - 0
udpfilterdup/src/increaseRepeat.go

@@ -0,0 +1,169 @@
+package main
+
+import (
+	"encoding/json"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sync"
+	"time"
+)
+
+//开始判重程序
+func taskRepeat(mapInfo map[string]interface{}) {
+	defer util.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	//全量
+	if IsFull && gtept!="" && ltept!=""{
+		log.Println("执行全量分段模式:",gtept,"---",ltept)
+		q = map[string]interface{}{
+			"publishtime": map[string]interface{}{
+				"$gte": util.Int64All(gtept),
+				"$lte": util.Int64All(ltept),
+			},
+		}
+	}
+	//临时赋值
+	log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	n, repeateN := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%1000 == 0 {
+			log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
+		}
+		if util.IntAll(tmp["repeat"]) == 1 {
+			repeateN++
+			tmp = make(map[string]interface{})
+			continue
+		}
+		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		//数据分组-按照类别分组
+
+
+
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			info := NewInfo(tmp)
+			//正常判重
+			b, source, reason := DM.check(info)
+			if b {
+				repeateN++
+				var updateID = map[string]interface{}{} //记录更新判重的
+				updateID["_id"] = StringTOBsonId(info.id)
+				repeat_ids:=source.repeat_ids
+				repeat_ids =  append(repeat_ids,info.id)
+				source.repeat_ids = repeat_ids
+				//替换数据池-更新
+				DM.replacePoolData(source)
+				Update.updatePool <- []map[string]interface{}{//重复数据打标签
+					updateID,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":        1,
+							"repeat_reason": reason,
+							"repeat_id":     source.id,
+							"dataging":		 0,
+							"updatetime_repeat" :util.Int64All(time.Now().Unix()),
+						},
+					},
+				}
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+
+	log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
+	//log.Println("当前数据池的数量:",DM.currentTotalCount())
+	//睡眠时间30s 目的是让数据池更新所有数据...
+	time.Sleep(15 * time.Second)
+	//更新Ocr的标记
+	if !IsFull {
+		updateOcrFileData(mapInfo["lteid"].(string))
+		//任务完成,开始发送广播通知下面节点
+		if n >= repeateN && mapInfo["stop"] == nil {
+			log.Println("判重任务完成发送udp")
+			for _, to := range nextNode {
+				sid, _ := mapInfo["gtid"].(string)
+				eid, _ := mapInfo["lteid"].(string)
+				key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  sid,
+					"lteid": eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+	}
+
+}
+//更新ocr表
+func updateOcrFileData(cur_lteid string)  {
+	//更新ocr 分类表-判重的状态
+	log.Println("开始更新Ocr表-标记",cur_lteid)
+	task_sess := task_mgo.GetMgoConn()
+	defer task_mgo.DestoryMongoConn(task_sess)
+	q_task:=map[string]interface{}{}
+	it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
+	isUpdateOcr:=false
+	updateOcrFile:=[][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+		cur_id := BsonTOStringId(tmp["_id"])
+		lteid:=util.ObjToString(tmp["lteid"])
+		if (lteid==cur_lteid) { //需要更新
+			log.Println("找到该lteid数据",cur_lteid,cur_id)
+			isUpdateOcr = true
+			updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"is_repeat_status": 1,
+						"is_repeat_time" : util.Int64All(time.Now().Unix()),
+					},
+				},
+			})
+			tmp = make(map[string]interface{})
+			break
+		}else {
+			tmp = make(map[string]interface{})
+		}
+	}
+	if !isUpdateOcr {
+		log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
+	}else {
+		if len(updateOcrFile) > 0 {
+			task_mgo.UpSertBulk(task_collName, updateOcrFile...)
+		}
+	}
+}

+ 0 - 556
udpfilterdup/src/main.go

@@ -7,15 +7,12 @@ package main
 import (
 	"encoding/json"
 	"flag"
-	"github.com/cron"
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
 	"os"
 	"qfw/util"
 	"regexp"
-	"strconv"
 	"sync"
 	"time"
 )
@@ -190,7 +187,6 @@ func main() {
 
 		log.Println("测试:全量判重-准备开始")
 		taskRepeat(mapinfo)
-		
 		time.Sleep(99999 * time.Hour)
 	}
 }
@@ -208,7 +204,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				key = "udpok"
 			}
 			udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-
 			//插入任务-判断任务-是否存在
 			updatelock.Lock()
 			taskList = append(taskList,mapInfo)
@@ -225,58 +220,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 }
 
 
-//upd接收
-//func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-//	select {
-//	case udptask <- struct{}{}:
-//		log.Println("...接收段落,通道正常...")
-//		switch act {
-//		case mu.OP_TYPE_DATA: //上个节点的数据
-//			var mapInfo map[string]interface{}
-//			err := json.Unmarshal(data, &mapInfo)
-//			if err != nil {
-//				log.Println("error data:", err)
-//				udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
-//			} else if mapInfo != nil {
-//				key, _ := mapInfo["key"].(string)
-//				if key == "" {
-//					key = "udpok"
-//				}
-//				log.Println("当前段落,需要判重...",mapInfo)
-//				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-//				task(data, mapInfo)
-//			}
-//			log.Println("此段任务结束...",err,mapInfo)
-//			<-udptask
-//		case mu.OP_NOOP: //下个节点回应
-//			ok := string(data)
-//			if ok != "" {
-//				log.Println("下节点回应-ok:", ok)
-//				udptaskmap.Delete(ok)
-//			}
-//			<-udptask
-//		}
-//	case <-time.After(2 * time.Second):
-//		switch act {
-//		case mu.OP_TYPE_DATA: //上个节点的数据
-//			log.Println("通道堵塞中...上节点")
-//			udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
-//		case mu.OP_NOOP: //下个节点回应
-//			log.Println("通道堵塞中...下节点")
-//			ok := string(data)
-//			if ok != "" {
-//				log.Println("下节点回应-ok:", ok)
-//				udptaskmap.Delete(ok)
-//			}
-//		}
-//	}
-//
-//	//udptask <- struct{}{}
-//	//defer func() {
-//	//	<-udptask
-//	//}()
-//}
-
 //监听-获取-分发判重任务
 func getRepeatTask()  {
 	for  {
@@ -302,527 +245,28 @@ func getRepeatTask()  {
 
 
 
-//开始判重程序
-func taskRepeat(mapInfo map[string]interface{}) {
-	defer util.Catch()
-	//区间id
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
-			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
-		},
-	}
-	//全量
-	if IsFull && gtept!="" && ltept!=""{
-		log.Println("执行全量分段模式")
-		log.Println(gtept,"---",ltept)
-		q = map[string]interface{}{
-			"publishtime": map[string]interface{}{
-				"$gte": util.Int64All(gtept),
-				"$lte": util.Int64All(ltept),
-			},
-		}
-	}
-	//临时赋值
-	log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
-
-
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
-	n, repeateN := 0, 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		if n%1000 == 0 {
-			log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
-		}
 
-		if util.IntAll(tmp["repeat"]) == 1 {
-			repeateN++
-			tmp = make(map[string]interface{})
-			continue
-		}
 
-		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
-			tmp = make(map[string]interface{})
-			continue
-		}
 
-		pool <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			info := NewInfo(tmp)
-			//正常判重
-			b, source, reason := DM.check(info)
-			if b {
-				repeateN++
-				var updateID = map[string]interface{}{} //记录更新判重的
-				updateID["_id"] = StringTOBsonId(info.id)
-				repeat_ids:=source.repeat_ids
-				repeat_ids =  append(repeat_ids,info.id)
-				source.repeat_ids = repeat_ids
-				//替换数据池-更新
-				DM.replacePoolData(source)
-
-				//Update.updatePool <- []map[string]interface{}{//原始数据打标签
-				//	map[string]interface{}{
-				//		"_id": StringTOBsonId(source.id),
-				//	},
-				//	map[string]interface{}{
-				//		"$set": map[string]interface{}{
-				//			"repeat_ids": repeat_ids,
-				//		},
-				//	},
-				//}
-				Update.updatePool <- []map[string]interface{}{//重复数据打标签
-					updateID,
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"repeat":        1,
-							"repeat_reason": reason,
-							"repeat_id":     source.id,
-							"dataging":		 0,
-							"updatetime_repeat" :util.Int64All(time.Now().Unix()),
-						},
-					},
-				}
-			}
-		}(tmp)
-		tmp = make(map[string]interface{})
-	}
-	wg.Wait()
-
-	log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
-	//log.Println("当前数据池的数量:",DM.currentTotalCount())
-	//睡眠时间30s 目的是让数据池更新所有数据...
-	time.Sleep(15 * time.Second)
-	//更新Ocr的标记
-	if !IsFull {
-		updateOcrFileData(mapInfo["lteid"].(string))
-		//任务完成,开始发送广播通知下面节点
-		if n >= repeateN && mapInfo["stop"] == nil {
-			log.Println("判重任务完成发送udp")
-			for _, to := range nextNode {
-				sid, _ := mapInfo["gtid"].(string)
-				eid, _ := mapInfo["lteid"].(string)
-				key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
-				by, _ := json.Marshal(map[string]interface{}{
-					"gtid":  sid,
-					"lteid": eid,
-					"stype": util.ObjToString(to["stype"]),
-					"key":   key,
-				})
-				addr := &net.UDPAddr{
-					IP:   net.ParseIP(to["addr"].(string)),
-					Port: util.IntAll(to["port"]),
-				}
-				node := &udpNode{by, addr, time.Now().Unix(), 0}
-				udptaskmap.Store(key, node)
-				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-			}
-		}
-	}
 
-}
 
-func updateOcrFileData(cur_lteid string)  {
-	//更新ocr 分类表-判重的状态
-	log.Println("开始更新Ocr表-标记",cur_lteid)
-	task_sess := task_mgo.GetMgoConn()
-	defer task_mgo.DestoryMongoConn(task_sess)
-	q_task:=map[string]interface{}{}
-	it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
-	isUpdateOcr:=false
-	updateOcrFile:=[][]map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-		cur_id := BsonTOStringId(tmp["_id"])
-		lteid:=util.ObjToString(tmp["lteid"])
-		if (lteid==cur_lteid) { //需要更新
-			log.Println("找到该lteid数据",cur_lteid,cur_id)
-			isUpdateOcr = true
-			updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"is_repeat_status": 1,
-						"is_repeat_time" : util.Int64All(time.Now().Unix()),
-					},
-				},
-			})
-			tmp = make(map[string]interface{})
-			break
-		}else {
-			tmp = make(map[string]interface{})
-		}
-	}
-	if !isUpdateOcr {
-		log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
-	}else {
-		if len(updateOcrFile) > 0 {
-			task_mgo.UpSertBulk(task_collName, updateOcrFile...)
-		}
-	}
-}
 
-//历史判重
-func historyTaskDay() {
-	defer util.Catch()
 
-	for {
-		start:=time.Now().Unix()
 
-		if gtid=="" {
-			log.Println("请传gtid,否则无法运行")
-			os.Exit(0)
-			return
-		}
-		if lteid!="" {
-			//先进行数据迁移
-			log.Println("开启一次迁移任务",gtid,lteid)
-			moveHistoryData(gtid,lteid)
-			gtid = lteid //替换数据
-		}
 
-		//查询表最后一个id
-		task_sess := task_mgo.GetMgoConn()
-		defer task_mgo.DestoryMongoConn(task_sess)
-		q:=map[string]interface{}{}
-		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
-		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
-
-		isRepeatStatus:=false
-		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-			is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
-			if is_repeat_status == 1 {
-				lteid = util.ObjToString(tmp["lteid"])
-				log.Println("查询的最后一个已标记的任务lteid:",lteid)
-				isRepeatStatus = true
-				tmp = make(map[string]interface{})
-				break
-			}else  {
-				tmp = make(map[string]interface{})
-			}
-		}
 
-		if !isRepeatStatus {
-			log.Println("查询不到有标记的lteid数据")
-			log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
-			time.Sleep(5 * time.Minute)
-			continue
-		}
 
-		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
-		time.Sleep(5 * time.Minute)
-
-		sess := mgo.GetMgoConn()//连接器
-		defer mgo.DestoryMongoConn(sess)
-		//开始判重
-		q = map[string]interface{}{
-			"_id": map[string]interface{}{
-				"$gt": StringTOBsonId(gtid),
-				"$lte": StringTOBsonId(lteid),
-			},
-		}
-		log.Println("历史判重查询条件:",q,"时间:", between_time)
-		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
-		pendAllArr:=[][]map[string]interface{}{}//待处理数组
-		dayArr := []map[string]interface{}{}
-		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
-			if num%10000 == 0 {
-				log.Println("正序遍历:", num)
-			}
-			//取-符合-发布时间X年内的数据
-			if util.IntAll(tmp["dataging"]) == 1 {
-				pubtime := util.Int64All(tmp["publishtime"])
-				if pubtime > 0 && pubtime >= between_time {
-					oknum++
-					if deterTime==0 {
-						log.Println("找到第一条符合条件的数据")
-						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						if pubtime-deterTime >timingSpanDay*86400 {
-							//新数组重新构建,当前组数据加到全部组数据
-							pendAllArr = append(pendAllArr,dayArr)
-							dayArr = []map[string]interface{}{}
-							deterTime = util.Int64All(tmp["publishtime"])
-							dayArr = append(dayArr,tmp)
-						}else {
-							dayArr = append(dayArr,tmp)
-						}
-					}
-				}else {
-					outnum++
-					//不在两年内的也清标记
-					Update.updatePool <- []map[string]interface{}{//重复数据打标签
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"dataging": 0,
-								"history_updatetime":util.Int64All(time.Now().Unix()),
-							},
-						},
-					}
-				}
-			}
-			tmp = make(map[string]interface{})
-		}
 
-		if len(dayArr)>0 {
-			pendAllArr = append(pendAllArr,dayArr)
-			dayArr = []map[string]interface{}{}
-		}
 
-		log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
 
-		if len(pendAllArr) <= 0 {
-			log.Println("没找到dataging==1的数据")
-		}
 
-		//测试分组数量是否正确
-		testNum:=0
-		for k,v:=range pendAllArr {
-			log.Println("第",k,"组--","数量:",len(v))
-			testNum = testNum+len(v)
-		}
-		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
-
-		n, repeateN := 0, 0
-		log.Println("线程数:",threadNum)
-		pool := make(chan bool, threadNum)
-		wg := &sync.WaitGroup{}
-		for k,v:=range pendAllArr { //每组结束更新一波数据
-			pool <- true
-			wg.Add(1)
-			go func(k int, v []map[string]interface{}) {
-				defer func() {
-					<-pool
-					wg.Done()
-				}()
-				//相关ids 跨表
-				groupOtherExtract := [][]map[string]interface{}{}
-
-				//构建当前组的数据池
-				log.Println("构建第", k, "组---(数据池)")
-				//当前组的第一个发布时间
-				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
-				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
-				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
-				n = n + len(v)
-				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
-				for _, tmp := range v {
-					info := NewInfo(tmp)
-					b, source, reason := curTM.check(info)
-					if b { //有重复,生成更新语句,更新抽取和更新招标
-						repeateN++
-						//重复数据打标签
-						repeat_ids:=source.repeat_ids
-						repeat_ids =  append(repeat_ids,info.id)
-						source.repeat_ids = repeat_ids
-
-						updatelock.Lock()
-						//替换数据池-更新
-						DM.replacePoolData(source)
-						//更新数据源
-						//判断是否在当前段落
-						if judgeIsCurIds(gtid,lteid,source.id) {
-							Update.updatePool <- []map[string]interface{}{//重复数据打标签
-								map[string]interface{}{
-									"_id": StringTOBsonId(source.id),
-								},
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat_ids": repeat_ids,
-									},
-								},
-							}
-						}else {
-							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
-								map[string]interface{}{
-									"_id": StringTOBsonId(source.id),
-								},
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat_ids": repeat_ids,
-									},
-								},
-							})
-						}
-						Update.updatePool <- []map[string]interface{}{//重复数据打标签
-							map[string]interface{}{
-								"_id": tmp["_id"],
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":        1,
-									"repeat_reason": reason,
-									"repeat_id":     source.id,
-									"dataging":      0,
-									"history_updatetime":util.Int64All(time.Now().Unix()),
-								},
-							},
-						}
-						if len(groupOtherExtract) >= 500 {
-							mgo.UpSertBulk(extract_back, groupOtherExtract...)
-							groupOtherExtract = [][]map[string]interface{}{}
-						}
-
-						updatelock.Unlock()
-
-
-					} else {
-						Update.updatePool <- []map[string]interface{}{//重复数据打标签
-							map[string]interface{}{
-								"_id": tmp["_id"],
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"dataging": 0, //符合条件的都为dataging==0
-									"history_updatetime":util.Int64All(time.Now().Unix()),
-								},
-							},
-						}
-					}
-				}
-				//每组数据结束-更新数据
-				updatelock.Lock()
-				if len(groupOtherExtract) > 0 {
-					mgo.UpSertBulk(extract_back, groupOtherExtract...)
-				}
-				updatelock.Unlock()
-
-			}(k, v)
 
-		}
 
-		wg.Wait()
-
-		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
-
-		time.Sleep(30 * time.Second)
-		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-		if n >= repeateN && gtid!=lteid{
-			for _, to := range nextNode {
-				next_sid := util.BsonIdToSId(gtid)
-				next_eid := util.BsonIdToSId(lteid)
-				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
-				by, _ := json.Marshal(map[string]interface{}{
-					"gtid":  next_sid,
-					"lteid": next_eid,
-					"stype": util.ObjToString(to["stype"]),
-					"key":   key,
-				})
-				addr := &net.UDPAddr{
-					IP:   net.ParseIP(to["addr"].(string)),
-					Port: util.IntAll(to["port"]),
-				}
-				node := &udpNode{by, addr, time.Now().Unix(), 0}
-				udptaskmap.Store(key, node)
-				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-			}
-		}
 
-		end:=time.Now().Unix()
 
-		log.Println(gtid,lteid)
 
-		if end-start<60*5 {
-			log.Println("睡眠.............")
-			time.Sleep(5 * time.Minute)
-		}
-		log.Println("继续下一段的历史判重")
-	}
-}
 
-//判断是否在当前id段落
-func judgeIsCurIds (gtid string,lteid string,curid string) bool {
 
-	gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
-	lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
-	cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
-	if cur_time>=gt_time&&cur_time<=lte_time {
-		return true
-	}
-	return false
-}
-
-//迁移上一段数据
-func moveHistoryData(startid string,endid string) {
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	year, month, day := time.Now().Date()
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt": StringTOBsonId(startid),
-			"$lte": StringTOBsonId(endid),
-		},
-	}
-	log.Println(q)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
-	index := 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		mgo.Save(extract_back, tmp)
-		tmp = map[string]interface{}{}
-		if index%1000 == 0 {
-			log.Println("index", index)
-		}
-	}
-	log.Println("save to", extract_back, " ok index", index)
-
-	qv := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
-	},
-	}
-	delnum := mgo.Delete(extract, qv)
-	log.Println("remove from ", extract, delnum)
-
-}
-
-func moveTimeoutData()  {
-	log.Println("部署迁移定时任务")
-	c := cron.New()
-	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
-	c.Start()
-}
-
-func moveOnceTimeOut()  {
-	log.Println("执行一次迁移超时数据")
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	now:=time.Now()
-	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
-	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$lt": StringTOBsonId(task_id),
-		},
-	}
-
-	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
-	index := 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		if index%10000 == 0 {
-			log.Println("index", index)
-		}
-		del_id:=BsonTOStringId(tmp["_id"])
-		mgo.Save("result_20200713", tmp)
-		mgo.DeleteById("result_20200714",del_id)
-		tmp = map[string]interface{}{}
-	}
-	log.Println("save and delete", " ok index", index)
-
-}