apple 4 年之前
父节点
当前提交
99adf63122
共有 5 个文件被更改,包括 653 次插入595 次删除
  1. 598 249
      udpfilterdup/src/README.md
  2. 3 3
      udpfilterdup/src/config.json
  3. 51 339
      udpfilterdup/src/main.go
  4. 0 3
      udpfilterdup/src/updateMethod.go
  5. 1 1
      udps/main.go

+ 598 - 249
udpfilterdup/src/README.md

@@ -141,305 +141,654 @@ func moveOnceTimeOut()  {
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
 }
 
+func historyTaskDay() {
+	defer util.Catch()
 
+	for {
+		start:=time.Now().Unix()
 
+		if gtid=="" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
+		}
+		if lteid!="" {
+			//先进行数据迁移
+			log.Println("开启一次迁移任务",gtid,lteid)
+			moveHistoryData(gtid,lteid)
+			gtid = lteid //替换数据
+		}
 
-
-
-
-
-
-
-
-
-func repairHistory() {
-	defer util.Catch()
-	log.Println("执行修复程序")
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	q:=map[string]interface{}{}
-	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
-	//开始判重
-	q = map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gt": StringTOBsonId("5f15bf800000000000000000"),
-			"$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
-		},
-	}
-	log.Println("历史判重查询条件:",q,"时间:", between_time)
-
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
-	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
-	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
-	pendAllArr:=[][]map[string]interface{}{}//待处理数组
-	dayArr := []map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
-		if num%10000 == 0 {
-			log.Println("正序遍历:", num)
+		//查询表最后一个id
+		task_sess := task_mgo.GetMgoConn()
+		defer task_mgo.DestoryMongoConn(task_sess)
+		q:=map[string]interface{}{
+			"isused":true,
 		}
-		source := util.ObjToMap(tmp["jsondata"])
-		if util.IntAll((*source)["sourcewebsite"]) == 1 {
-			updateExtract = append(updateExtract, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"repeat": 1,
-						"dataging": 0,
-						"repeat_reason": "sourcewebsite为1 重复",
-					},
-				},
-			})
-			if len(updateExtract) > 50 {
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
-			tmp = make(map[string]interface{})
-			continue
+		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+			lteid = util.ObjToString(tmp["gtid"])
+			log.Println("查询的最后一个任务Id:",lteid)
+			break
 		}
 
-		//取-符合-发布时间X年内的数据
-		if util.IntAll(tmp["dataging"]) == 1 {
-			pubtime := util.Int64All(tmp["publishtime"])
-			if pubtime > 0 && pubtime >= between_time {
-				oknum++
-				if deterTime==0 {
-					log.Println("找到第一条符合条件的数据")
-					deterTime = util.Int64All(tmp["publishtime"])
-					dayArr = append(dayArr,tmp)
-				}else {
-					if pubtime-deterTime >timingSpanDay*86400 {
-						//新数组重新构建,当前组数据加到全部组数据
-						pendAllArr = append(pendAllArr,dayArr)
-						dayArr = []map[string]interface{}{}
-						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						dayArr = append(dayArr,tmp)
-					}
-				}
-			}else {
-				//不在两年内的也清标记
+		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
+		time.Sleep(5 * time.Minute)
+
+		sess := mgo.GetMgoConn()//连接器
+		defer mgo.DestoryMongoConn(sess)
+		//开始判重
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+		log.Println("历史判重查询条件:",q,"时间:", between_time)
+		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
+			}
+			source := util.ObjToMap(tmp["jsondata"])
+			if util.IntAll((*source)["sourcewebsite"]) == 1 {
+				outnum++
+				updatelock.Lock()
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
 						"_id": tmp["_id"],
 					},
 					map[string]interface{}{
 						"$set": map[string]interface{}{
+							"repeat": 1,
 							"dataging": 0,
+							"repeat_reason": "sourcewebsite为1 重复",
 						},
 					},
 				})
-				if len(updateExtract) > 50 {
+				if len(updateExtract) >= 200 {
+					log.Println("sourcewebsite,批量更新")
 					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
 				}
 
+				updatelock.Unlock()
+
+
+				tmp = make(map[string]interface{})
+				continue
 			}
+
+			//取-符合-发布时间X年内的数据
+			updatelock.Lock()
+			if util.IntAll(tmp["dataging"]) == 1 {
+				pubtime := util.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time {
+					oknum++
+					if deterTime==0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						if pubtime-deterTime >timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr,dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = util.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr,tmp)
+						}else {
+							dayArr = append(dayArr,tmp)
+						}
+					}
+				}else {
+					outnum++
+					//不在两年内的也清标记
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0,
+							},
+						},
+					})
+					if len(updateExtract) >= 200 {
+						log.Println("不在周期内符合dataging==1,批量更新")
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+
+				}
+			}
+
+			updatelock.Unlock()
+
+			tmp = make(map[string]interface{})
 		}
-		tmp = make(map[string]interface{})
-	}
 
 
-	//批量更新标记
-	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
+		//批量更新标记
+		updatelock.Lock()
 
-	if len(dayArr)>0 {
-		pendAllArr = append(pendAllArr,dayArr)
-		dayArr = []map[string]interface{}{}
-	}
+		if len(updateExtract) > 0 {
+			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
 
-	log.Println("查询数量:",num,"符合条件:",oknum)
+		updatelock.Unlock()
 
-	if len(pendAllArr) <= 0 {
-		log.Println("没找到dataging==1的数据")
-	}
 
-	//测试分组数量是否正确
-	testNum:=0
-	for k,v:=range pendAllArr {
-		log.Println("第",k,"组--","数量:",len(v))
-		testNum = testNum+len(v)
-	}
-	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
-
-	n, repeateN := 0, 0
-	pool := make(chan bool, 2)
-	wg := &sync.WaitGroup{}
-	for k,v:=range pendAllArr { //每组结束更新一波数据
-		pool <- true
-		wg.Add(1)
-		go func(k int, v []map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			//每组临时数组 -  互不干扰
-			groupUpdateExtract := [][]map[string]interface{}{}
-			//构建当前组的数据池
-			log.Println("构建第", k, "组---(数据池)")
-			//当前组的第一个发布时间
-			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
-			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
-			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
-			n = n + len(v)
-			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
-			for _, tmp := range v {
-				info := NewInfo(tmp)
-				if !LowHeavy { //是否进行低质量数据判重
-					if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-						log.Println("无效数据")
+		if len(dayArr)>0 {
+			pendAllArr = append(pendAllArr,dayArr)
+			dayArr = []map[string]interface{}{}
+		}
+
+		log.Println("查询数量:",num,"符合条件:",oknum)
+
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
+
+		//测试分组数量是否正确
+		testNum:=0
+		for k,v:=range pendAllArr {
+			log.Println("第",k,"组--","数量:",len(v))
+			testNum = testNum+len(v)
+		}
+		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+		n, repeateN := 0, 0
+		log.Println("线程数:",threadNum)
+		pool := make(chan bool, threadNum)
+		wg := &sync.WaitGroup{}
+		for k,v:=range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int, v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//每组临时数组 -  互不干扰
+				groupUpdateExtract := [][]map[string]interface{}{}
+				//
+				groupOtherExtract := [][]map[string]interface{}{}
+
+				//构建当前组的数据池
+				log.Println("构建第", k, "组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
+					info := NewInfo(tmp)
+					b, source, reason := curTM.check(info)
+					if b { //有重复,生成更新语句,更新抽取和更新招标
+						repeateN++
+						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						updatelock.Lock()
+
+
+						//更新数据源-   14 或者 15
+						//判断是否在当前段落
+						if judgeIsCurIds(gtid,lteid,source.id) {
+							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}else {
+							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+								map[string]interface{}{
+									"_id": StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat_ids": repeat_ids,
+									},
+								},
+							})
+						}
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
 							map[string]interface{}{
 								"$set": map[string]interface{}{
-									"repeat":   -1, //无效数据标签
-									"dataging": 0,
+									"repeat":        1,
+									"repeat_reason": reason,
+									"repeat_id":     source.id,
+									"dataging":      0,
 								},
 							},
 						})
-						if len(groupUpdateExtract) > 50 {
+
+						if len(groupUpdateExtract) >= 500 {
 							mgo.UpSertBulk(extract, groupUpdateExtract...)
 							groupUpdateExtract = [][]map[string]interface{}{}
 						}
-						return
-					}
-				}
-				b, source, reason := curTM.check(info)
-				if b { //有重复,生成更新语句,更新抽取和更新招标
-					repeateN++
-					//重复数据打标签
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"repeat":        1,
-								"repeat_reason": reason,
-								"repeat_id":     source.id,
-								"dataging":      0,
+
+						if len(groupOtherExtract) >= 500 {
+							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							groupOtherExtract = [][]map[string]interface{}{}
+						}
+
+						updatelock.Unlock()
+
+
+					} else {
+						updatelock.Lock()
+
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
 							},
-						},
-					})
-				} else {
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"dataging": 0, //符合条件的都为dataging==0
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"dataging": 0, //符合条件的都为dataging==0
+								},
 							},
-						},
-					})
+						})
+
+						if len(groupUpdateExtract) >= 500 {
+							mgo.UpSertBulk(extract, groupUpdateExtract...)
+							groupUpdateExtract = [][]map[string]interface{}{}
+						}
+						updatelock.Unlock()
+					}
 				}
-				if len(groupUpdateExtract) > 50 {
+				//每组数据结束-更新数据
+				updatelock.Lock()
+				if len(groupUpdateExtract) > 0 {
 					mgo.UpSertBulk(extract, groupUpdateExtract...)
-					groupUpdateExtract = [][]map[string]interface{}{}
 				}
-			}
-			//每组数据结束-更新数据
-			if len(groupUpdateExtract) > 0 {
-				mgo.UpSertBulk(extract, groupUpdateExtract...)
-			}
-		}(k, v)
 
-	}
-
-	wg.Wait()
+				if len(groupOtherExtract) > 0 {
+					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+				}
+				updatelock.Unlock()
 
+			}(k, v)
 
-	time.Sleep(30 * time.Second)
-	log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
-	log.Println("修复结束")
+		}
 
-}
+		wg.Wait()
 
 
+		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+		if n >= repeateN && gtid!=lteid{
+			for _, to := range nextNode {
+				next_sid := util.BsonIdToSId(gtid)
+				next_eid := util.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": util.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: util.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
 
+		end:=time.Now().Unix()
 
-//if !LowHeavy { //是否进行低质量数据判重
-			//	if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-			//		updateExtract = append(updateExtract, []map[string]interface{}{
-			//			map[string]interface{}{
-			//				"_id": tmp["_id"],
-			//			},
-			//			map[string]interface{}{
-			//				"$set": map[string]interface{}{
-			//					"repeat": -1, //无效数据标签
-			//				},
-			//			},
-			//		})
-			//		if len(updateExtract) >= 200 {
-			//			mgo.UpSertBulk(extract, updateExtract...)
-			//			updateExtract = [][]map[string]interface{}{}
-			//		}
-			//		return
-			//	}
-			//}
-			
-			
-			
-			
-			if !LowHeavy { //是否进行低质量数据判重
-            						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-            							log.Println("无效数据")
-            							updatelock.Lock()
-            
-            							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-            								map[string]interface{}{
-            									"_id": tmp["_id"],
-            								},
-            								map[string]interface{}{
-            									"$set": map[string]interface{}{
-            										"repeat":   -1, //无效数据标签
-            										"dataging": 0,
-            									},
-            								},
-            							})
-            							if len(groupUpdateExtract) > 200 {
-            								mgo.UpSertBulk(extract, groupUpdateExtract...)
-            								groupUpdateExtract = [][]map[string]interface{}{}
-            							}
-            
-            							updatelock.Unlock()
-            
-            							return
-            						}
-            					}
-            					
-     //是否合并-低质量数据不合并
-     				if isMerger && !strings.Contains(reason,"低质量"){
-     					newData, update_map ,isReplace := mergeDataFields(source, info)
-     					if isReplace {//替换-数据池
-     						fmt.Println("合并更新的id:",source.id)
-     						//数据池 - 替换
-     						DM.replacePoolData(newData)
-     						//mongo更新 - 具体字段 - merge
-     						mgo.UpdateById(extract,source.id,update_map)
-     						//发udp  更新索引
-     						//for _, to := range nextNode {
-     						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-     						//	by, _ := json.Marshal(map[string]interface{}{
-     						//		"gtid":  source.id,
-     						//		"lteid": source.id,
-     						//		"stype": "biddingall",
-     						//		"key":   key,
-     						//	})
-     						//	addr := &net.UDPAddr{
-     						//		IP:   net.ParseIP(to["addr"].(string)),
-     						//		Port: util.IntAll(to["port"]),
-     						//	}
-     						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
-     						//	udptaskmap.Store(key, node)
-     						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-     						//}
-     					}
-     				}       					
+		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+		log.Println(gtid,lteid)
+		if end-start<60*5 {
+			log.Println("睡眠.............")
+			time.Sleep(5 * time.Minute)
+		}
+		log.Println("继续下一段的历史判重")
+	}
+}func historyTaskDay() {
+ 	defer util.Catch()
+ 
+ 	for {
+ 		start:=time.Now().Unix()
+ 
+ 		if gtid=="" {
+ 			log.Println("请传gtid,否则无法运行")
+ 			os.Exit(0)
+ 			return
+ 		}
+ 		if lteid!="" {
+ 			//先进行数据迁移
+ 			log.Println("开启一次迁移任务",gtid,lteid)
+ 			moveHistoryData(gtid,lteid)
+ 			gtid = lteid //替换数据
+ 		}
+ 
+ 		//查询表最后一个id
+ 		task_sess := task_mgo.GetMgoConn()
+ 		defer task_mgo.DestoryMongoConn(task_sess)
+ 		q:=map[string]interface{}{
+ 			"isused":true,
+ 		}
+ 		between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+ 		it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
+ 		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+ 			lteid = util.ObjToString(tmp["gtid"])
+ 			log.Println("查询的最后一个任务Id:",lteid)
+ 			break
+ 		}
+ 
+ 		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
+ 		time.Sleep(5 * time.Minute)
+ 
+ 		sess := mgo.GetMgoConn()//连接器
+ 		defer mgo.DestoryMongoConn(sess)
+ 		//开始判重
+ 		q = map[string]interface{}{
+ 			"_id": map[string]interface{}{
+ 				"$gt": StringTOBsonId(gtid),
+ 				"$lte": StringTOBsonId(lteid),
+ 			},
+ 		}
+ 		log.Println("历史判重查询条件:",q,"时间:", between_time)
+ 		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+ 		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+ 		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+ 		pendAllArr:=[][]map[string]interface{}{}//待处理数组
+ 		dayArr := []map[string]interface{}{}
+ 		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+ 			if num%10000 == 0 {
+ 				log.Println("正序遍历:", num)
+ 			}
+ 			source := util.ObjToMap(tmp["jsondata"])
+ 			if util.IntAll((*source)["sourcewebsite"]) == 1 {
+ 				outnum++
+ 				updatelock.Lock()
+ 				updateExtract = append(updateExtract, []map[string]interface{}{
+ 					map[string]interface{}{
+ 						"_id": tmp["_id"],
+ 					},
+ 					map[string]interface{}{
+ 						"$set": map[string]interface{}{
+ 							"repeat": 1,
+ 							"dataging": 0,
+ 							"repeat_reason": "sourcewebsite为1 重复",
+ 						},
+ 					},
+ 				})
+ 				if len(updateExtract) >= 200 {
+ 					log.Println("sourcewebsite,批量更新")
+ 					mgo.UpSertBulk(extract, updateExtract...)
+ 					updateExtract = [][]map[string]interface{}{}
+ 				}
+ 
+ 				updatelock.Unlock()
+ 
+ 
+ 				tmp = make(map[string]interface{})
+ 				continue
+ 			}
+ 
+ 			//取-符合-发布时间X年内的数据
+ 			updatelock.Lock()
+ 			if util.IntAll(tmp["dataging"]) == 1 {
+ 				pubtime := util.Int64All(tmp["publishtime"])
+ 				if pubtime > 0 && pubtime >= between_time {
+ 					oknum++
+ 					if deterTime==0 {
+ 						log.Println("找到第一条符合条件的数据")
+ 						deterTime = util.Int64All(tmp["publishtime"])
+ 						dayArr = append(dayArr,tmp)
+ 					}else {
+ 						if pubtime-deterTime >timingSpanDay*86400 {
+ 							//新数组重新构建,当前组数据加到全部组数据
+ 							pendAllArr = append(pendAllArr,dayArr)
+ 							dayArr = []map[string]interface{}{}
+ 							deterTime = util.Int64All(tmp["publishtime"])
+ 							dayArr = append(dayArr,tmp)
+ 						}else {
+ 							dayArr = append(dayArr,tmp)
+ 						}
+ 					}
+ 				}else {
+ 					outnum++
+ 					//不在两年内的也清标记
+ 					updateExtract = append(updateExtract, []map[string]interface{}{
+ 						map[string]interface{}{
+ 							"_id": tmp["_id"],
+ 						},
+ 						map[string]interface{}{
+ 							"$set": map[string]interface{}{
+ 								"dataging": 0,
+ 							},
+ 						},
+ 					})
+ 					if len(updateExtract) >= 200 {
+ 						log.Println("不在周期内符合dataging==1,批量更新")
+ 						mgo.UpSertBulk(extract, updateExtract...)
+ 						updateExtract = [][]map[string]interface{}{}
+ 					}
+ 
+ 				}
+ 			}
+ 
+ 			updatelock.Unlock()
+ 
+ 			tmp = make(map[string]interface{})
+ 		}
+ 
+ 
+ 		//批量更新标记
+ 		updatelock.Lock()
+ 
+ 		if len(updateExtract) > 0 {
+ 			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
+ 			mgo.UpSertBulk(extract, updateExtract...)
+ 			updateExtract = [][]map[string]interface{}{}
+ 		}
+ 
+ 		updatelock.Unlock()
+ 
+ 
+ 		if len(dayArr)>0 {
+ 			pendAllArr = append(pendAllArr,dayArr)
+ 			dayArr = []map[string]interface{}{}
+ 		}
+ 
+ 		log.Println("查询数量:",num,"符合条件:",oknum)
+ 
+ 		if len(pendAllArr) <= 0 {
+ 			log.Println("没找到dataging==1的数据")
+ 		}
+ 
+ 		//测试分组数量是否正确
+ 		testNum:=0
+ 		for k,v:=range pendAllArr {
+ 			log.Println("第",k,"组--","数量:",len(v))
+ 			testNum = testNum+len(v)
+ 		}
+ 		log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+ 
+ 		n, repeateN := 0, 0
+ 		log.Println("线程数:",threadNum)
+ 		pool := make(chan bool, threadNum)
+ 		wg := &sync.WaitGroup{}
+ 		for k,v:=range pendAllArr { //每组结束更新一波数据
+ 			pool <- true
+ 			wg.Add(1)
+ 			go func(k int, v []map[string]interface{}) {
+ 				defer func() {
+ 					<-pool
+ 					wg.Done()
+ 				}()
+ 				//每组临时数组 -  互不干扰
+ 				groupUpdateExtract := [][]map[string]interface{}{}
+ 				//
+ 				groupOtherExtract := [][]map[string]interface{}{}
+ 
+ 				//构建当前组的数据池
+ 				log.Println("构建第", k, "组---(数据池)")
+ 				//当前组的第一个发布时间
+ 				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+ 				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+ 				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+ 				n = n + len(v)
+ 				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+ 				for _, tmp := range v {
+ 					info := NewInfo(tmp)
+ 					b, source, reason := curTM.check(info)
+ 					if b { //有重复,生成更新语句,更新抽取和更新招标
+ 						repeateN++
+ 						//重复数据打标签
+ 						repeat_ids:=source.repeat_ids
+ 						repeat_ids =  append(repeat_ids,info.id)
+ 						source.repeat_ids = repeat_ids
+ 						//替换数据池-更新
+ 						DM.replacePoolData(source)
+ 						updatelock.Lock()
+ 
+ 
+ 						//更新数据源-   14 或者 15
+ 						//判断是否在当前段落
+ 						if judgeIsCurIds(gtid,lteid,source.id) {
+ 							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+ 								map[string]interface{}{
+ 									"_id": StringTOBsonId(source.id),
+ 								},
+ 								map[string]interface{}{
+ 									"$set": map[string]interface{}{
+ 										"repeat_ids": repeat_ids,
+ 									},
+ 								},
+ 							})
+ 						}else {
+ 							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
+ 								map[string]interface{}{
+ 									"_id": StringTOBsonId(source.id),
+ 								},
+ 								map[string]interface{}{
+ 									"$set": map[string]interface{}{
+ 										"repeat_ids": repeat_ids,
+ 									},
+ 								},
+ 							})
+ 						}
+ 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+ 							map[string]interface{}{
+ 								"_id": tmp["_id"],
+ 							},
+ 							map[string]interface{}{
+ 								"$set": map[string]interface{}{
+ 									"repeat":        1,
+ 									"repeat_reason": reason,
+ 									"repeat_id":     source.id,
+ 									"dataging":      0,
+ 								},
+ 							},
+ 						})
+ 
+ 						if len(groupUpdateExtract) >= 500 {
+ 							mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 							groupUpdateExtract = [][]map[string]interface{}{}
+ 						}
+ 
+ 						if len(groupOtherExtract) >= 500 {
+ 							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+ 							groupOtherExtract = [][]map[string]interface{}{}
+ 						}
+ 
+ 						updatelock.Unlock()
+ 
+ 
+ 					} else {
+ 						updatelock.Lock()
+ 
+ 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+ 							map[string]interface{}{
+ 								"_id": tmp["_id"],
+ 							},
+ 							map[string]interface{}{
+ 								"$set": map[string]interface{}{
+ 									"dataging": 0, //符合条件的都为dataging==0
+ 								},
+ 							},
+ 						})
+ 
+ 						if len(groupUpdateExtract) >= 500 {
+ 							mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 							groupUpdateExtract = [][]map[string]interface{}{}
+ 						}
+ 						updatelock.Unlock()
+ 					}
+ 				}
+ 				//每组数据结束-更新数据
+ 				updatelock.Lock()
+ 				if len(groupUpdateExtract) > 0 {
+ 					mgo.UpSertBulk(extract, groupUpdateExtract...)
+ 				}
+ 
+ 				if len(groupOtherExtract) > 0 {
+ 					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+ 				}
+ 				updatelock.Unlock()
+ 
+ 			}(k, v)
+ 
+ 		}
+ 
+ 		wg.Wait()
+ 
+ 
+ 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+ 		if n >= repeateN && gtid!=lteid{
+ 			for _, to := range nextNode {
+ 				next_sid := util.BsonIdToSId(gtid)
+ 				next_eid := util.BsonIdToSId(lteid)
+ 				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+ 				by, _ := json.Marshal(map[string]interface{}{
+ 					"gtid":  next_sid,
+ 					"lteid": next_eid,
+ 					"stype": util.ObjToString(to["stype"]),
+ 					"key":   key,
+ 				})
+ 				addr := &net.UDPAddr{
+ 					IP:   net.ParseIP(to["addr"].(string)),
+ 					Port: util.IntAll(to["port"]),
+ 				}
+ 				node := &udpNode{by, addr, time.Now().Unix(), 0}
+ 				udptaskmap.Store(key, node)
+ 				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+ 			}
+ 		}
+ 
+ 		end:=time.Now().Unix()
+ 
+ 		log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
+ 		log.Println(gtid,lteid)
+ 		if end-start<60*5 {
+ 			log.Println("睡眠.............")
+ 			time.Sleep(5 * time.Minute)
+ 		}
+ 		log.Println("继续下一段的历史判重")
+ 	}
+ }	       					

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "extract_kf",
-        "extract": "zk_zk_test",
-        "extract_back": "zk_zk_test",
+        "extract": "zk_test",
+        "extract_back": "zk_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -28,7 +28,7 @@
     "isMerger": false,
     "lowHeavy":true,
     "timingTask":false,
-    "timingSpanDay": 3,
+    "timingSpanDay": 4,
     "timingPubScope": 720,
     "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",

+ 51 - 339
udpfilterdup/src/main.go

@@ -48,21 +48,20 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	gtid,lteid,lastid,gtept,ltept string			//命令输入
-	IsFull		   bool
+	gtid,lastid,gtept,ltept string			//命令输入
+	lteid	string							//历史增量属性
+	IsFull		   bool								//是否全量
 	updatelock 		sync.Mutex         //锁
-	testgteid	string	//测试使用
 )
 
 
 
 func init() {
 
-	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //以小于等于此id开始加载最近几天的数据
-	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")
-	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")
-	flag.StringVar(&gtept, "ltept", "", "全量lte发布时间")
-	flag.StringVar(&testgteid, "testgteid", "", "测试使用testgteid")
+	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
+	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
+	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
+	flag.StringVar(&ltept, "ltept", "", "全量lte发布时间") //全量区间pt
 
 	flag.Parse()
 
@@ -131,8 +130,6 @@ func init() {
 
 
 func main() {
-
-
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -140,12 +137,35 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	if TimingTask {
 		go historyTaskDay()
+	}else {
+		if gtept!=""&&ltept!="" {
+			log.Println("全量判重-准备开始")
+			IsFull = true	//全量判重
+			sid := "1fffffffffffffffffffffff"
+			eid := "9fffffffffffffffffffffff"
+			mapinfo := map[string]interface{}{}
+			if sid == "" || eid == "" {
+				log.Println("sid,eid参数不能为空")
+				os.Exit(0)
+			}
+			mapinfo["gtid"] = sid
+			mapinfo["lteid"] = eid
+			mapinfo["stop"] = "true"
+			task([]byte{}, mapinfo)
+			time.Sleep(99999 * time.Hour)
+		}else {
+			//正常增量
+			log.Println("正常增量部署")
+		}
 	}
 	time.Sleep(99999 * time.Hour)
 }
 
 //测试组人员使用
 func mainT() {
+
+	testXiufu24()
+	time.Sleep(99999 * time.Hour)
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)
@@ -153,8 +173,6 @@ func mainT() {
 		IsFull = true	//全量判重
 		sid := "1fffffffffffffffffffffff"
 		eid := "9fffffffffffffffffffffff"
-		log.Println("正常判重测试开始")
-		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
 		if sid == "" || eid == "" {
 			log.Println("sid,eid参数不能为空")
@@ -210,7 +228,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 	//全量
 	if IsFull && gtept!="" && ltept!=""{
-		log.Println("执行分段模式")
+		log.Println("执行全量分段模式")
+		log.Println(gtept,"---",ltept)
 		q = map[string]interface{}{
 			"publishtime": map[string]interface{}{
 				"$gte": util.Int64All(gtept),
@@ -249,14 +268,17 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
-			util.IntAll(tmp["dataging"]) == 1 {
-			if util.IntAll(tmp["repeat"]) == 1 {
-				repeateN++
-			}
+		if util.IntAll(tmp["repeat"]) == 1 {
+			repeateN++
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
 			tmp = make(map[string]interface{})
 			continue
 		}
+
 		pool <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -294,6 +316,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 							"repeat":        1,
 							"repeat_reason": reason,
 							"repeat_id":     source.id,
+							"dataging":		 "0",
 						},
 					},
 				}
@@ -333,253 +356,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 
 
 
-func testHistory() {
-	defer util.Catch()
-	log.Println("修复程序开始")
-
-
-
-	//查询表最后一个id
-
-	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
-
-	sess := mgo.GetMgoConn()//连接器
-	defer mgo.DestoryMongoConn(sess)
-	//开始判重
-	q := map[string]interface{}{
-		"dataging": 1,
-	}
-	q= map[string]interface{}{}
-	log.Println("历史判重查询条件:",q,"时间:", between_time)
-
-	it := sess.DB(mgo.DbName).C("test_dataging_0929").Find(&q).Sort("publishtime").Iter()
-	num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
-	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
-	pendAllArr:=[][]map[string]interface{}{}//待处理数组
-	dayArr := []map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
-		if num%10000 == 0 {
-			log.Println("正序遍历:", num,tmp["_id"])
-		}
-
-		source := util.ObjToMap(tmp["jsondata"])
-		if util.IntAll((*source)["sourcewebsite"]) == 1 {
-			outnum++
-			updatelock.Lock()
-			updateExtract = append(updateExtract, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"repeat": 1,
-						"dataging": 0,
-						"repeat_reason": "sourcewebsite为1 重复",
-					},
-				},
-			})
-			if len(updateExtract) >= 500 {
-				log.Println("sourcewebsite,批量更新")
-				mgo.UpSertBulk(extract, updateExtract...)
-				updateExtract = [][]map[string]interface{}{}
-			}
-
-			updatelock.Unlock()
-
-
-			tmp = make(map[string]interface{})
-			continue
-		}
-
-		//取-符合-发布时间X年内的数据
-		updatelock.Lock()
-		if util.IntAll(tmp["dataging"]) == 1 {
-			pubtime := util.Int64All(tmp["publishtime"])
-			if pubtime > 0 && pubtime >= between_time {
-				oknum++
-				if deterTime==0 {
-					log.Println("找到第一条符合条件的数据")
-					deterTime = util.Int64All(tmp["publishtime"])
-					dayArr = append(dayArr,tmp)
-				}else {
-					if pubtime-deterTime >timingSpanDay*86400 {
-						//新数组重新构建,当前组数据加到全部组数据
-						pendAllArr = append(pendAllArr,dayArr)
-						dayArr = []map[string]interface{}{}
-						deterTime = util.Int64All(tmp["publishtime"])
-						dayArr = append(dayArr,tmp)
-					}else {
-						dayArr = append(dayArr,tmp)
-					}
-				}
-			}else {
-				outnum++
-				//不在两年内的也清标记
-				updateExtract = append(updateExtract, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": map[string]interface{}{
-							"dataging": 0,
-						},
-					},
-				})
-				if len(updateExtract) >= 500 {
-					log.Println("不在周期内符合dataging==1,批量更新")
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
-				}
-
-			}
-		}
-
-		updatelock.Unlock()
-
-		tmp = make(map[string]interface{})
-	}
-
-
-	//批量更新标记
-	updatelock.Lock()
-
-	if len(updateExtract) > 0 {
-		log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
-		mgo.UpSertBulk(extract, updateExtract...)
-		updateExtract = [][]map[string]interface{}{}
-	}
-
-	updatelock.Unlock()
-
-
-	if len(dayArr)>0 {
-		pendAllArr = append(pendAllArr,dayArr)
-		dayArr = []map[string]interface{}{}
-	}
-
-	log.Println("查询数量:",num,"符合条件:",oknum)
-
-	if len(pendAllArr) <= 0 {
-		log.Println("没找到dataging==1的数据")
-	}
-
-	//测试分组数量是否正确
-	testNum:=0
-	for k,v:=range pendAllArr {
-		log.Println("第",k,"组--","数量:",len(v))
-		testNum = testNum+len(v)
-	}
-	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
-
-	n, repeateN := 0, 0
-	log.Println("线程数:",threadNum)
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
-	for k,v:=range pendAllArr { //每组结束更新一波数据
-		pool <- true
-		wg.Add(1)
-		go func(k int, v []map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-			//每组临时数组 -  互不干扰
-			groupUpdateExtract := [][]map[string]interface{}{}
-
-			//构建当前组的数据池
-			log.Println("构建第", k, "组---(数据池)")
-			//当前组的第一个发布时间
-			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
-			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
-			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
-			n = n + len(v)
-			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
-			for _, tmp := range v {
-				info := NewInfo(tmp)
-				b, source, reason := curTM.check(info)
-				if b { //有重复,生成更新语句,更新抽取和更新招标
-					repeateN++
-					//重复数据打标签
-					repeat_ids:=source.repeat_ids
-					repeat_ids =  append(repeat_ids,info.id)
-					source.repeat_ids = repeat_ids
-					//替换数据池-更新
-					DM.replacePoolData(source)
-					updatelock.Lock()
-
-
-
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"repeat":        1,
-								"repeat_reason": reason,
-								"repeat_id":     source.id,
-								"dataging":      0,
-							},
-						},
-					})
-
-					if len(groupUpdateExtract) >= 500 {
-						mgo.UpSertBulk(extract, groupUpdateExtract...)
-						groupUpdateExtract = [][]map[string]interface{}{}
-					}
-
-
-					updatelock.Unlock()
-
-
-				} else {
-					updatelock.Lock()
-
-					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"dataging": 0, //符合条件的都为dataging==0
-							},
-						},
-					})
-
-					if len(groupUpdateExtract) >= 500 {
-						mgo.UpSertBulk(extract, groupUpdateExtract...)
-						groupUpdateExtract = [][]map[string]interface{}{}
-					}
-					updatelock.Unlock()
-				}
-			}
-			//每组数据结束-更新数据
-			updatelock.Lock()
-			if len(groupUpdateExtract) > 0 {
-				mgo.UpSertBulk(extract, groupUpdateExtract...)
-			}
-
-			updatelock.Unlock()
-
-		}(k, v)
-
-	}
-
-	wg.Wait()
-
-
-	log.Println("结束结束-结束结束")
-}
-
-
-
-
-
-
-
-
-
-
 func historyTaskDay() {
 	defer util.Catch()
 
@@ -627,7 +403,6 @@ func historyTaskDay() {
 		log.Println("历史判重查询条件:",q,"时间:", between_time)
 		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
-		updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
 		pendAllArr:=[][]map[string]interface{}{}//待处理数组
 		dayArr := []map[string]interface{}{}
 		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
@@ -637,8 +412,7 @@ func historyTaskDay() {
 			source := util.ObjToMap(tmp["jsondata"])
 			if util.IntAll((*source)["sourcewebsite"]) == 1 {
 				outnum++
-				updatelock.Lock()
-				updateExtract = append(updateExtract, []map[string]interface{}{
+				Update.updatePool <- []map[string]interface{}{//重复数据打标签
 					map[string]interface{}{
 						"_id": tmp["_id"],
 					},
@@ -649,22 +423,12 @@ func historyTaskDay() {
 							"repeat_reason": "sourcewebsite为1 重复",
 						},
 					},
-				})
-				if len(updateExtract) >= 200 {
-					log.Println("sourcewebsite,批量更新")
-					mgo.UpSertBulk(extract, updateExtract...)
-					updateExtract = [][]map[string]interface{}{}
 				}
-
-				updatelock.Unlock()
-
-
 				tmp = make(map[string]interface{})
 				continue
 			}
 
 			//取-符合-发布时间X年内的数据
-			updatelock.Lock()
 			if util.IntAll(tmp["dataging"]) == 1 {
 				pubtime := util.Int64All(tmp["publishtime"])
 				if pubtime > 0 && pubtime >= between_time {
@@ -687,7 +451,7 @@ func historyTaskDay() {
 				}else {
 					outnum++
 					//不在两年内的也清标记
-					updateExtract = append(updateExtract, []map[string]interface{}{
+					Update.updatePool <- []map[string]interface{}{//重复数据打标签
 						map[string]interface{}{
 							"_id": tmp["_id"],
 						},
@@ -696,34 +460,12 @@ func historyTaskDay() {
 								"dataging": 0,
 							},
 						},
-					})
-					if len(updateExtract) >= 200 {
-						log.Println("不在周期内符合dataging==1,批量更新")
-						mgo.UpSertBulk(extract, updateExtract...)
-						updateExtract = [][]map[string]interface{}{}
 					}
-
 				}
 			}
-
-			updatelock.Unlock()
-
 			tmp = make(map[string]interface{})
 		}
 
-
-		//批量更新标记
-		updatelock.Lock()
-
-		if len(updateExtract) > 0 {
-			log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
-		}
-
-		updatelock.Unlock()
-
-
 		if len(dayArr)>0 {
 			pendAllArr = append(pendAllArr,dayArr)
 			dayArr = []map[string]interface{}{}
@@ -755,9 +497,7 @@ func historyTaskDay() {
 					<-pool
 					wg.Done()
 				}()
-				//每组临时数组 -  互不干扰
-				groupUpdateExtract := [][]map[string]interface{}{}
-				//
+				//相关ids 跨表
 				groupOtherExtract := [][]map[string]interface{}{}
 
 				//构建当前组的数据池
@@ -777,15 +517,14 @@ func historyTaskDay() {
 						repeat_ids:=source.repeat_ids
 						repeat_ids =  append(repeat_ids,info.id)
 						source.repeat_ids = repeat_ids
+
+						updatelock.Lock()
 						//替换数据池-更新
 						DM.replacePoolData(source)
-						updatelock.Lock()
-
-
-						//更新数据源-   14 或者 15
+						//更新数据源
 						//判断是否在当前段落
 						if judgeIsCurIds(gtid,lteid,source.id) {
-							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+							Update.updatePool <- []map[string]interface{}{//重复数据打标签
 								map[string]interface{}{
 									"_id": StringTOBsonId(source.id),
 								},
@@ -794,7 +533,7 @@ func historyTaskDay() {
 										"repeat_ids": repeat_ids,
 									},
 								},
-							})
+							}
 						}else {
 							groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
 								map[string]interface{}{
@@ -807,7 +546,7 @@ func historyTaskDay() {
 								},
 							})
 						}
-						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
@@ -819,13 +558,7 @@ func historyTaskDay() {
 									"dataging":      0,
 								},
 							},
-						})
-
-						if len(groupUpdateExtract) >= 500 {
-							mgo.UpSertBulk(extract, groupUpdateExtract...)
-							groupUpdateExtract = [][]map[string]interface{}{}
 						}
-
 						if len(groupOtherExtract) >= 500 {
 							mgo.UpSertBulk(extract_back, groupOtherExtract...)
 							groupOtherExtract = [][]map[string]interface{}{}
@@ -835,9 +568,7 @@ func historyTaskDay() {
 
 
 					} else {
-						updatelock.Lock()
-
-						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+						Update.updatePool <- []map[string]interface{}{//重复数据打标签
 							map[string]interface{}{
 								"_id": tmp["_id"],
 							},
@@ -846,21 +577,11 @@ func historyTaskDay() {
 									"dataging": 0, //符合条件的都为dataging==0
 								},
 							},
-						})
-
-						if len(groupUpdateExtract) >= 500 {
-							mgo.UpSertBulk(extract, groupUpdateExtract...)
-							groupUpdateExtract = [][]map[string]interface{}{}
 						}
-						updatelock.Unlock()
 					}
 				}
 				//每组数据结束-更新数据
 				updatelock.Lock()
-				if len(groupUpdateExtract) > 0 {
-					mgo.UpSertBulk(extract, groupUpdateExtract...)
-				}
-
 				if len(groupOtherExtract) > 0 {
 					mgo.UpSertBulk(extract_back, groupOtherExtract...)
 				}
@@ -920,7 +641,6 @@ func judgeIsCurIds (gtid string,lteid string,curid string) bool {
 }
 
 
-
 //迁移上一段数据
 func moveHistoryData(startid string,endid string) {
 	sess := mgo.GetMgoConn()
@@ -954,14 +674,6 @@ func moveHistoryData(startid string,endid string) {
 
 }
 
-
-
-
-
-
-
-
-
 func moveTimeoutData()  {
 	log.Println("部署迁移定时任务")
 	c := cron.New()

+ 0 - 3
udpfilterdup/src/updateMethod.go

@@ -34,7 +34,6 @@ func (update *updateInfo) updateData() {
 		case value := <-update.updatePool:
 			tmpArr[tmpIndex] = value
 			tmpIndex++
-			log.Println(value,tmpIndex)
 			if tmpIndex == update.saveSize {
 				sp <- true
 				go func(dataArr [][]map[string]interface{}) {
@@ -42,7 +41,6 @@ func (update *updateInfo) updateData() {
 						<-sp
 					}()
 					mgo.UpSertBulk(extract, dataArr...)
-
 				}(tmpArr)
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
 				tmpIndex = 0
@@ -55,7 +53,6 @@ func (update *updateInfo) updateData() {
 						<-sp
 					}()
 					mgo.UpSertBulk(extract, dataArr...)
-
 				}(tmpArr[:tmpIndex])
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
 				tmpIndex = 0

+ 1 - 1
udps/main.go

@@ -18,7 +18,7 @@ var startDate, endDate string
 
 func main() {
 	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param, ids := "", 0, 0, "", "", "", "", "", "", "", ""
-	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
+	flag.StringVar(&startDate, "start", "2020-10-10", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.IntVar(&p, "p", 1483, "端口")