apple 5 роки тому
батько
коміт
7c1103fd31
1 змінених файлів з 55 додано та 62 видалено
  1. 55 62
      udpfilterdup/src/main.go

+ 55 - 62
udpfilterdup/src/main.go

@@ -118,7 +118,7 @@ func main() {
 //测试组人员使用
 func mainT() {
 
-	testRepairData11()
+	repairHistory()
 	return
 
 	if TimingTask {
@@ -513,25 +513,27 @@ func historyTaskDay() {
 		for k,v:=range pendAllArr { //每组结束更新一波数据
 			pool <- true
 			wg.Add(1)
-			go func(k int,v []map[string]interface{}) {
+			go func(k int, v []map[string]interface{}) {
 				defer func() {
 					<-pool
 					wg.Done()
 				}()
+				//每组临时数组 -  互不干扰
+				groupUpdateExtract := [][]map[string]interface{}{}
 				//构建当前组的数据池
-				log.Println("构建第",k,"组---(数据池)")
+				log.Println("构建第", k, "组---(数据池)")
 				//当前组的第一个发布时间
-				first_pt :=util.Int64All(v[len(v)-1]["publishtime"])
-				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400,int(k))
-				log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
-				n = n+len(v)
-				log.Println("统计目前总数量:",n,"重复数量:",repeateN)
-				for _,tmp:=range v {
+				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
 					info := NewInfo(tmp)
 					if !LowHeavy { //是否进行低质量数据判重
 						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 							log.Println("无效数据")
-							updateExtract = append(updateExtract, []map[string]interface{}{
+							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 								map[string]interface{}{
 									"_id": tmp["_id"],
 								},
@@ -542,9 +544,9 @@ func historyTaskDay() {
 									},
 								},
 							})
-							if len(updateExtract) > 50 {
-								mgo.UpSertBulk(extract, updateExtract...)
-								updateExtract = [][]map[string]interface{}{}
+							if len(groupUpdateExtract) > 50 {
+								mgo.UpSertBulk(extract, groupUpdateExtract...)
+								groupUpdateExtract = [][]map[string]interface{}{}
 							}
 							return
 						}
@@ -553,7 +555,7 @@ func historyTaskDay() {
 					if b { //有重复,生成更新语句,更新抽取和更新招标
 						repeateN++
 						//重复数据打标签
-						updateExtract = append(updateExtract, []map[string]interface{}{
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
@@ -566,39 +568,34 @@ func historyTaskDay() {
 								},
 							},
 						})
-					}else {
-						updateExtract = append(updateExtract, []map[string]interface{}{
+					} else {
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
 							map[string]interface{}{
 								"$set": map[string]interface{}{
-									"dataging": 0,//符合条件的都为dataging==0
+									"dataging": 0, //符合条件的都为dataging==0
 								},
 							},
 						})
 					}
-					if len(updateExtract) > 50 {
-						mgo.UpSertBulk(extract, updateExtract...)
-						updateExtract = [][]map[string]interface{}{}
+					if len(groupUpdateExtract) > 50 {
+						mgo.UpSertBulk(extract, groupUpdateExtract...)
+						groupUpdateExtract = [][]map[string]interface{}{}
 					}
 				}
-			}(k,v)
+				//每组数据结束-更新数据
+				if len(groupUpdateExtract) > 0 {
+					mgo.UpSertBulk(extract, groupUpdateExtract...)
+				}
+			}(k, v)
 
-			//每组数据结束-更新数据
-			if len(updateExtract) > 0 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
 		}
 
 		wg.Wait()
 
 
-		if len(updateExtract) > 0 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
 		if n > repeateN {
 			for _, to := range nextNode {
@@ -693,7 +690,7 @@ func repairHistory() {
 	q = map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$gt": StringTOBsonId("5f15bf800000000000000000"),
-			"$lte": StringTOBsonId("5f219cff0000000000000000"),
+			"$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
 		},
 	}
 	log.Println("历史判重查询条件:",q,"时间:", between_time)
@@ -798,30 +795,32 @@ func repairHistory() {
 	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
 
 	n, repeateN := 0, 0
-	pool := make(chan bool, 3)
+	pool := make(chan bool, 2)
 	wg := &sync.WaitGroup{}
 	for k,v:=range pendAllArr { //每组结束更新一波数据
 		pool <- true
 		wg.Add(1)
-		go func(k int,v []map[string]interface{}) {
+		go func(k int, v []map[string]interface{}) {
 			defer func() {
 				<-pool
 				wg.Done()
 			}()
+			//每组临时数组 -  互不干扰
+			groupUpdateExtract := [][]map[string]interface{}{}
 			//构建当前组的数据池
-			log.Println("构建第",k,"组---(数据池)")
+			log.Println("构建第", k, "组---(数据池)")
 			//当前组的第一个发布时间
-			first_pt :=util.Int64All(v[len(v)-1]["publishtime"])
-			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400,int(k))
-			log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
-			n = n+len(v)
-			log.Println("统计目前总数量:",n,"重复数量:",repeateN)
-			for _,tmp:=range v {
+			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+			n = n + len(v)
+			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+			for _, tmp := range v {
 				info := NewInfo(tmp)
 				if !LowHeavy { //是否进行低质量数据判重
 					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 						log.Println("无效数据")
-						updateExtract = append(updateExtract, []map[string]interface{}{
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
@@ -832,19 +831,18 @@ func repairHistory() {
 								},
 							},
 						})
-						if len(updateExtract) > 50 {
-							mgo.UpSertBulk(extract, updateExtract...)
-							updateExtract = [][]map[string]interface{}{}
+						if len(groupUpdateExtract) > 50 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
 						}
 						return
 					}
 				}
 				b, source, reason := curTM.check(info)
 				if b { //有重复,生成更新语句,更新抽取和更新招标
-					log.Println("判重结果", b, reason,"目标id",info.id)
 					repeateN++
 					//重复数据打标签
-					updateExtract = append(updateExtract, []map[string]interface{}{
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
@@ -857,39 +855,34 @@ func repairHistory() {
 							},
 						},
 					})
-				}else {
-					updateExtract = append(updateExtract, []map[string]interface{}{
+				} else {
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"dataging": 0,//符合条件的都为dataging==0
+								"dataging": 0, //符合条件的都为dataging==0
 							},
 						},
 					})
 				}
-				if len(updateExtract) > 50 {
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
+				if len(groupUpdateExtract) > 50 {
+					mgo.UpSertBulk(extract, groupUpdateExtract...)
+					groupUpdateExtract = [][]map[string]interface{}{}
 				}
 			}
-		}(k,v)
+			//每组数据结束-更新数据
+			if len(groupUpdateExtract) > 0 {
+				mgo.UpSertBulk(extract, groupUpdateExtract...)
+			}
+		}(k, v)
 
-		//每组数据结束-更新数据
-		if len(updateExtract) > 0 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
 	}
 
 	wg.Wait()
 
 
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-	}
-
 	time.Sleep(30 * time.Second)
 	log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
 	log.Println("修复结束")