apple 5 年之前
父節點
當前提交
795b26a030
共有 4 個文件被更改,包括 88 次插入442 次删除
  1. 77 209
      udpfilterdup/src/README.md
  2. 2 2
      udpfilterdup/src/config.json
  3. 4 8
      udpfilterdup/src/datamap.go
  4. 5 223
      udpfilterdup/src/main.go

+ 77 - 209
udpfilterdup/src/README.md

@@ -1,42 +1,36 @@
-{
-    "udpport": ":1785",
-    "dupdays": 5,
-    "mongodb": {
-        "addr": "172.17.4.187:27083",
-        "pool": 5,
-        "db": "qfw",
-        "extract": "result_file_20200410",
-        "extract_back": "result_file_20200409",
-        "site": {
-            "dbname": "qfw",
-            "coll": "site"
-        }
-    },
-    "jkmail": {
-        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
-        "api": "http://10.171.112.160:19281/_send/_mail"
-    },
-    "nextNode": [
-        {
-            "addr": "127.0.0.1",
-            "port": 1783,
-            "stype": "bidding",
-            "memo": "创建招标数据索引new"
-        }
-    ],
-    "threads": 1,
-    "isMerger": true,
-    "lowHeavy":true,
-    "timingTask":false,
-    "timingSpanDay": 3,
-    "timingPubScope": 720,
-    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
-    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
-    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
-    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
-    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
 }
 
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": move_time,
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		mgo.Save("result_20200713", tmp)
+		mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+}
 
 
 
@@ -44,105 +38,37 @@
 
 
 
-{
-    "udpport": ":17859",
-    "dupdays": 5,
-    "mongodb": {
-        "addr": "192.168.3.207:27092",
-        "pool": 5,
-        "db": "extract_kf",
-        "extract": "a_testbidding",
-        "extract_back": "a_testbidding",
-        "site": {
-            "dbname": "extract_kf",
-            "coll": "site"
-        }
-    },
-    "jkmail": {
-        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
-        "api": "http://10.171.112.160:19281/_send/_mail"
-    },
-    "nextNode": [
-    ],
-    "threads": 1,
-    "isMerger": true,
-    "lowHeavy":true,
-    "timingTask":false,
-    "timingSpanDay": 3,
-    "timingPubScope": 720,
-    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
-    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
-    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
-    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
-    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-}
 
 
-"nextNode": [
-                        {
-                            "addr": "172.17.145.179",
-                            "port": 1782,
-                            "stype": "project",
-                            "memo": "合并项目"
-                        },
-                        {
-                            "addr": "127.0.0.1",
-                            "port": 1783,
-                            "stype": "bidding",
-                            "memo": "创建招标数据索引new"
-                        }
-               ],
 
 
 
 
 
-//定时任务--定时任务--定时任务--暂未用
-func timedTaskDay() {
-	log.Println("部署定时任务")
-	c := cron.New()
-	c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
-	c.Start()
-	//timedTaskOnce()
-}
-func timedTaskOnce() {
 
+
+func repairHistory() {
 	defer util.Catch()
-	log.Println("开始一次迁移任务")
-	movedata()
-	log.Println("开始一次任务判重")
-	//当前时间-8   -4 小时
-	now := time.Now()
-	log.Println(now)
-	preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
-	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
-	log.Println(preTime,curTime)
-	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
-	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	between_time := curTime.Unix() - (86400 * timingPubScope)
-	log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
-	//区间id
-	q_start := map[string]interface{}{
+	log.Println("执行修复程序")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	q:=map[string]interface{}{}
+	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+	//开始判重
+	q = map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt": StringTOBsonId(task_sid),
-			"$lte": StringTOBsonId(task_eid),
+			"$gt": StringTOBsonId("5f15bf800000000000000000"),
+			"$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
 		},
 	}
-	//q_start = map[string]interface{}{
-	//	"_id": map[string]interface{}{
-	//		"$gte": StringTOBsonId("5f184cd552c1d9fbf84519d3"),
-	//		"$lte": StringTOBsonId("5f184d3852c1d9fbf8451a2a"),
-	//	},
-	//}
+	log.Println("历史判重查询条件:",q,"时间:", between_time)
 
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
 	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
 	pendAllArr:=[][]map[string]interface{}{}//待处理数组
 	dayArr := []map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
+	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
 		if num%10000 == 0 {
 			log.Println("正序遍历:", num)
 		}
@@ -164,8 +90,6 @@ func timedTaskOnce() {
 				mgo.UpSertBulk(extract, updateExtract...)
 				updateExtract = [][]map[string]interface{}{}
 			}
-
-
 			tmp = make(map[string]interface{})
 			continue
 		}
@@ -228,7 +152,6 @@ func timedTaskOnce() {
 
 	if len(pendAllArr) <= 0 {
 		log.Println("没找到dataging==1的数据")
-		return
 	}
 
 	//测试分组数量是否正确
@@ -240,30 +163,32 @@ func timedTaskOnce() {
 	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
 
 	n, repeateN := 0, 0
-	pool := make(chan bool, 4)
+	pool := make(chan bool, 2)
 	wg := &sync.WaitGroup{}
 	for k,v:=range pendAllArr { //每组结束更新一波数据
 		pool <- true
 		wg.Add(1)
-		go func(k int,v []map[string]interface{}) {
+		go func(k int, v []map[string]interface{}) {
 			defer func() {
 				<-pool
 				wg.Done()
 			}()
+			//每组临时数组 -  互不干扰
+			groupUpdateExtract := [][]map[string]interface{}{}
 			//构建当前组的数据池
-			log.Println("构建第",k,"组---(数据池)")
+			log.Println("构建第", k, "组---(数据池)")
 			//当前组的第一个发布时间
-			first_pt :=util.Int64All(v[0]["publishtime"])
-			curTM := TimedTaskDatamap(dupdays, first_pt,int(k))
-			log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
-			n = n+len(v)
-			log.Println("统计目前总数量:",n,"重复数量:",repeateN)
-			for _,tmp:=range v {
+			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+			n = n + len(v)
+			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+			for _, tmp := range v {
 				info := NewInfo(tmp)
 				if !LowHeavy { //是否进行低质量数据判重
 					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 						log.Println("无效数据")
-						updateExtract = append(updateExtract, []map[string]interface{}{
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
@@ -274,19 +199,18 @@ func timedTaskOnce() {
 								},
 							},
 						})
-						if len(updateExtract) > 50 {
-							mgo.UpSertBulk(extract, updateExtract...)
-							updateExtract = [][]map[string]interface{}{}
+						if len(groupUpdateExtract) > 50 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
 						}
 						return
 					}
 				}
 				b, source, reason := curTM.check(info)
 				if b { //有重复,生成更新语句,更新抽取和更新招标
-					log.Println("判重结果", b, reason,"目标id",info.id)
 					repeateN++
 					//重复数据打标签
-					updateExtract = append(updateExtract, []map[string]interface{}{
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
@@ -299,92 +223,36 @@ func timedTaskOnce() {
 							},
 						},
 					})
-				}else {
-					updateExtract = append(updateExtract, []map[string]interface{}{
+				} else {
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"dataging": 0,//符合条件的都为dataging==0
+								"dataging": 0, //符合条件的都为dataging==0
 							},
 						},
 					})
 				}
-				if len(updateExtract) > 50 {
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
+				if len(groupUpdateExtract) > 50 {
+					mgo.UpSertBulk(extract, groupUpdateExtract...)
+					groupUpdateExtract = [][]map[string]interface{}{}
 				}
 			}
-		}(k,v)
+			//每组数据结束-更新数据
+			if len(groupUpdateExtract) > 0 {
+				mgo.UpSertBulk(extract, groupUpdateExtract...)
+			}
+		}(k, v)
 
-		//每组数据结束-更新数据
-		if len(updateExtract) > 0 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
 	}
 
+	wg.Wait()
 
 
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
-	log.Println("this timeTask over.", n, "repeateN:", repeateN)
-
-	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
-	if n > repeateN {
-		for _, to := range nextNode {
-			next_sid := util.BsonIdToSId(task_sid)
-			next_eid := util.BsonIdToSId(task_eid)
-			key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
-			by, _ := json.Marshal(map[string]interface{}{
-				"gtid":  next_sid,
-				"lteid": next_eid,
-				"stype": util.ObjToString(to["stype"]),
-				"key":   key,
-			})
-			addr := &net.UDPAddr{
-				IP:   net.ParseIP(to["addr"].(string)),
-				Port: util.IntAll(to["port"]),
-			}
-			node := &udpNode{by, addr, time.Now().Unix(), 0}
-			udptaskmap.Store(key, node)
-			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-		}
-	}
-}
-
+	time.Sleep(30 * time.Second)
+	log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
+	log.Println("修复结束")
 
-//迁移数据dupdays+5之前的数据
-func movedata() {
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	year, month, day := time.Now().Date()
-	now:=time.Now()
-	move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
-	q := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$lt": move_time,
-		},
-	}
-	log.Println(q)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
-	index := 0
-	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		mgo.Save(extract_back, tmp)
-		tmp = map[string]interface{}{}
-		if index%1000 == 0 {
-			log.Println("index", index)
-		}
-	}
-	log.Println("save to", extract_back, " ok index", index)
-	qv := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
-		},
-	}
-	delnum := mgo.Delete(extract, qv)
-	log.Println("remove from ", extract, delnum)
-}
+}

+ 2 - 2
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk",
-        "extract_back": "zk",
+        "extract": "zk_zk_test",
+        "extract_back": "zk_zk_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"

+ 4 - 8
udpfilterdup/src/datamap.go

@@ -140,7 +140,7 @@ func NewDatamap(days int, lastid string) *datamap {
 	}}
 	log.Println("query", query)
 	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
-	now1 := int64(0)
+	nowTime := time.Now().Unix()//当前时间的时间戳
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{
@@ -154,11 +154,7 @@ func NewDatamap(days int, lastid string) *datamap {
 			if pt_time > time.Now().Unix() {
 				continue
 			}
-			if now1 == 0 {
-				now1 = pt_time
-			}
-
-			if qutil.Float64All(now1-pt_time) < datelimit {
+			if qutil.Float64All(nowTime-pt_time) <= datelimit {
 				continuSum++
 				info := NewInfo(tmp)
 				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
@@ -463,9 +459,9 @@ L:
 func (d *datamap) update(t int64) {
 
 	if TimingTask {
-		//d.keymap = d.GetLatelyFiveDay(t)
+
 	}else {
-		//d.keymap = d.GetLatelyFiveDay(t)//测试全量数据采用
+		//d.keymap = d.GetLatelyFiveDay(t)		//测试全量数据采用
 		d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	m := map[string]bool{}

+ 5 - 223
udpfilterdup/src/main.go

@@ -48,7 +48,6 @@ var (
 )
 
 func init() {
-
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
 	flag.StringVar(&gtid, "gtid", "", "历史的起始id")
 
@@ -117,8 +116,7 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-
-	repairHistory()
+	testRepairData11()
 	return
 
 	if TimingTask {
@@ -332,6 +330,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				}
 			}
 		}(tmp)
+
 		if len(updateExtract) >= 200 {
 			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
@@ -344,6 +343,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 
+	time.Sleep(60 * time.Second)
+
 	//任务完成,开始发送广播通知下面节点
 	if n > repeateN && mapInfo["stop"] == nil {
 		log.Println("判重任务完成发送udp")
@@ -666,226 +667,6 @@ func moveHistoryData(startid string,endid string) {
 	}
 	delnum := mgo.Delete(extract, qv)
 	log.Println("remove from ", extract, delnum)
-}
-
-
-
-
-
-
-
-
-
-
-
-
-func repairHistory() {
-	defer util.Catch()
-	log.Println("执行修复程序")
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	q:=map[string]interface{}{}
-	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
-	//开始判重
-	q = map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt": StringTOBsonId("5f15bf800000000000000000"),
-			"$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
-		},
-	}
-	log.Println("历史判重查询条件:",q,"时间:", between_time)
-
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
-	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
-	pendAllArr:=[][]map[string]interface{}{}//待处理数组
-	dayArr := []map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
-		if num%10000 == 0 {
-			log.Println("正序遍历:", num)
-		}
-		source := util.ObjToMap(tmp["jsondata"])
-		if util.IntAll((*source)["sourcewebsite"]) == 1 {
-			updateExtract = append(updateExtract, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"repeat": 1,
-						"dataging": 0,
-						"repeat_reason": "sourcewebsite为1 重复",
-					},
-				},
-			})
-			if len(updateExtract) > 50 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
-			tmp = make(map[string]interface{})
-			continue
-		}
-
-		//取-符合-发布时间X年内的数据
-		if util.IntAll(tmp["dataging"]) == 1 {
-			pubtime := util.Int64All(tmp["publishtime"])
-			if pubtime > 0 && pubtime >= between_time {
-				oknum++
-				if deterTime==0 {
-					log.Println("找到第一条符合条件的数据")
-					deterTime = util.Int64All(tmp["publishtime"])
-					dayArr = append(dayArr,tmp)
-				}else {
-					if pubtime-deterTime >timingSpanDay*86400 {
-						//新数组重新构建,当前组数据加到全部组数据
-						pendAllArr = append(pendAllArr,dayArr)
-						dayArr = []map[string]interface{}{}
-						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						dayArr = append(dayArr,tmp)
-					}
-				}
-			}else {
-				//不在两年内的也清标记
-				updateExtract = append(updateExtract, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"dataging": 0,
-						},
-					},
-				})
-				if len(updateExtract) > 50 {
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
-				}
-
-			}
-		}
-		tmp = make(map[string]interface{})
-	}
-
-
-	//批量更新标记
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
-
-	if len(dayArr)>0 {
-		pendAllArr = append(pendAllArr,dayArr)
-		dayArr = []map[string]interface{}{}
-	}
-
-	log.Println("查询数量:",num,"符合条件:",oknum)
-
-	if len(pendAllArr) <= 0 {
-		log.Println("没找到dataging==1的数据")
-	}
-
-	//测试分组数量是否正确
-	testNum:=0
-	for k,v:=range pendAllArr {
-		log.Println("第",k,"组--","数量:",len(v))
-		testNum = testNum+len(v)
-	}
-	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
-
-	n, repeateN := 0, 0
-	pool := make(chan bool, 2)
-	wg := &sync.WaitGroup{}
-	for k,v:=range pendAllArr { //每组结束更新一波数据
-		pool <- true
-		wg.Add(1)
-		go func(k int, v []map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			//每组临时数组 -  互不干扰
-			groupUpdateExtract := [][]map[string]interface{}{}
-			//构建当前组的数据池
-			log.Println("构建第", k, "组---(数据池)")
-			//当前组的第一个发布时间
-			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
-			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
-			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
-			n = n + len(v)
-			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
-			for _, tmp := range v {
-				info := NewInfo(tmp)
-				if !LowHeavy { //是否进行低质量数据判重
-					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-						log.Println("无效数据")
-						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-							map[string]interface{}{
-								"_id": tmp["_id"],
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":   -1, //无效数据标签
-									"dataging": 0,
-								},
-							},
-						})
-						if len(groupUpdateExtract) > 50 {
-							mgo.UpSertBulk(extract, groupUpdateExtract...)
-							groupUpdateExtract = [][]map[string]interface{}{}
-						}
-						return
-					}
-				}
-				b, source, reason := curTM.check(info)
-				if b { //有重复,生成更新语句,更新抽取和更新招标
-					repeateN++
-					//重复数据打标签
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"repeat":        1,
-								"repeat_reason": reason,
-								"repeat_id":     source.id,
-								"dataging":      0,
-							},
-						},
-					})
-				} else {
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"dataging": 0, //符合条件的都为dataging==0
-							},
-						},
-					})
-				}
-				if len(groupUpdateExtract) > 50 {
-					mgo.UpSertBulk(extract, groupUpdateExtract...)
-					groupUpdateExtract = [][]map[string]interface{}{}
-				}
-			}
-			//每组数据结束-更新数据
-			if len(groupUpdateExtract) > 0 {
-				mgo.UpSertBulk(extract, groupUpdateExtract...)
-			}
-		}(k, v)
-
-	}
-
-	wg.Wait()
-
-
-	time.Sleep(30 * time.Second)
-	log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
-	log.Println("修复结束")
 
 }
 
@@ -900,3 +681,4 @@ func repairHistory() {
 
 
 
+