apple 5 жил өмнө
parent
commit
e58df5fb9f

+ 233 - 0
udpfilterdup/src/main.go

@@ -118,6 +118,11 @@ func main() {
 //测试组人员使用
 //测试组人员使用
 func mainT() {
 func mainT() {
 
 
+
+	testRepairData11()
+
+	return
+
 	if TimingTask {
 	if TimingTask {
 		log.Println("新历史任务测试开始")
 		log.Println("新历史任务测试开始")
 		go historyTaskDay()
 		go historyTaskDay()
@@ -176,6 +181,17 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 	}
 }
 }
 
 
+
+
+
+
+
+
+
+
+
+
+
 //开始判重程序
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始数据判重")
 	log.Println("开始数据判重")
@@ -669,6 +685,223 @@ func moveHistoryData(startid string,endid string) {
 
 
 
 
 
 
+func repairHistory() {
+	defer util.Catch()
+	log.Println("执行修复程序")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	q:=map[string]interface{}{}
+	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+	//开始判重
+	q = map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": StringTOBsonId("5f15bf800000000000000000"),
+			"$lte": StringTOBsonId("5f219cff0000000000000000"),
+		},
+	}
+	log.Println("历史判重查询条件:",q,"时间:", between_time)
+
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
+	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+	pendAllArr:=[][]map[string]interface{}{}//待处理数组
+	dayArr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+		if num%10000 == 0 {
+			log.Println("正序遍历:", num)
+		}
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging": 0,
+						"repeat_reason": "sourcewebsite为1 重复",
+					},
+				},
+			})
+			if len(updateExtract) > 50 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		//取-符合-发布时间X年内的数据
+		if util.IntAll(tmp["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp["publishtime"])
+			if pubtime > 0 && pubtime >= between_time {
+				oknum++
+				if deterTime==0 {
+					log.Println("找到第一条符合条件的数据")
+					deterTime = util.Int64All(tmp["publishtime"])
+					dayArr = append(dayArr,tmp)
+				}else {
+					if pubtime-deterTime >timingSpanDay*86400 {
+						//新数组重新构建,当前组数据加到全部组数据
+						pendAllArr = append(pendAllArr,dayArr)
+						dayArr = []map[string]interface{}{}
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						dayArr = append(dayArr,tmp)
+					}
+				}
+			}else {
+				//不在两年内的也清标记
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,
+						},
+					},
+				})
+				if len(updateExtract) > 50 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	//批量更新标记
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+
+	if len(dayArr)>0 {
+		pendAllArr = append(pendAllArr,dayArr)
+		dayArr = []map[string]interface{}{}
+	}
+
+	log.Println("查询数量:",num,"符合条件:",oknum)
+
+	if len(pendAllArr) <= 0 {
+		log.Println("没找到dataging==1的数据")
+	}
+
+	//测试分组数量是否正确
+	testNum:=0
+	for k,v:=range pendAllArr {
+		log.Println("第",k,"组--","数量:",len(v))
+		testNum = testNum+len(v)
+	}
+	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+	n, repeateN := 0, 0
+	pool := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+	for k,v:=range pendAllArr { //每组结束更新一波数据
+		pool <- true
+		wg.Add(1)
+		go func(k int,v []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			//构建当前组的数据池
+			log.Println("构建第",k,"组---(数据池)")
+			//当前组的第一个发布时间
+			first_pt :=util.Int64All(v[len(v)-1]["publishtime"])
+			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400,int(k))
+			log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
+			n = n+len(v)
+			log.Println("统计目前总数量:",n,"重复数量:",repeateN)
+			for _,tmp:=range v {
+				info := NewInfo(tmp)
+				if !LowHeavy { //是否进行低质量数据判重
+					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+						log.Println("无效数据")
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":   -1, //无效数据标签
+									"dataging": 0,
+								},
+							},
+						})
+						if len(updateExtract) > 50 {
+							mgo.UpSertBulk(extract, updateExtract...)
+							updateExtract = [][]map[string]interface{}{}
+						}
+						return
+					}
+				}
+				b, source, reason := curTM.check(info)
+				if b { //有重复,生成更新语句,更新抽取和更新招标
+					log.Println("判重结果", b, reason,"目标id",info.id)
+					repeateN++
+					//重复数据打标签
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":        1,
+								"repeat_reason": reason,
+								"repeat_id":     source.id,
+								"dataging":      0,
+							},
+						},
+					})
+				}else {
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,//符合条件的都为dataging==0
+							},
+						},
+					})
+				}
+				if len(updateExtract) > 50 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+			}
+		}(k,v)
+
+		//每组数据结束-更新数据
+		if len(updateExtract) > 0 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+	}
+
+	wg.Wait()
+
+
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+	}
+
+	time.Sleep(30 * time.Second)
+	log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
+	log.Println("修复结束")
+
+}
+
+
+
+