apple 4 年之前
父节点
当前提交
53b8e47b6f
共有 4 个文件被更改,包括 263 次插入6 次删除
  1. 4 0
      data_quality/src/main.go
  2. 0 2
      udpfilterdup/src/datamap.go
  3. 256 4
      udpfilterdup/src/main.go
  4. 3 0
      udpfilterdup/src/updateMethod.go

+ 4 - 0
data_quality/src/main.go

@@ -105,6 +105,10 @@ func mainT() {
 //快速测试使用
 func main() {
 
+
+
+
+
 	sid := "1f0000000000000000000000"
 	eid := "9f0000000000000000000000"
 	log.Println(sid, "---", eid)

+ 0 - 2
udpfilterdup/src/datamap.go

@@ -468,8 +468,6 @@ func (d *datamap) update(t int64) {
 		}else {
 			d.keymap = d.GetLatelyFiveDayDouble(t) //增量
 		}
-		d.keymap = d.GetLatelyFiveDay(t)//全量
-
 		m := map[string]bool{}
 		for _, v := range d.keymap {
 			m[v] = true

+ 256 - 4
udpfilterdup/src/main.go

@@ -51,6 +51,7 @@ var (
 	gtid,lteid,lastid,gtept,ltept string			//命令输入
 	IsFull		   bool
 	updatelock 		sync.Mutex         //锁
+	testgteid	string	//测试使用
 )
 
 
@@ -61,6 +62,8 @@ func init() {
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")
 	flag.StringVar(&gtept, "ltept", "", "全量lte发布时间")
+	flag.StringVar(&testgteid, "testgteid", "", "测试使用testgteid")
+
 	flag.Parse()
 
 	util.ReadConfig(&Sysconfig)
@@ -128,6 +131,8 @@ func init() {
 
 
 func main() {
+
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -145,7 +150,6 @@ func mainT() {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
-		//IdType = true  //打开id字符串模式
 		IsFull = true	//全量判重
 		sid := "1fffffffffffffffffffffff"
 		eid := "9fffffffffffffffffffffff"
@@ -215,7 +219,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 
-	log.Println("增量查询条件:",mgo.DbName, extract, q)
+	log.Println("查询条件:",mgo.DbName, extract, q)
 
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
@@ -263,7 +267,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			info := NewInfo(tmp)
 			//正常判重
 			b, source, reason := DM.check(info)
-			if b { //有重复,生成更新语句,更新抽取和更新招标
+			if b {
 				repeateN++
 				var updateID = map[string]interface{}{} //记录更新判重的
 				updateID["_id"] = StringTOBsonId(info.id)
@@ -328,6 +332,254 @@ func task(data []byte, mapInfo map[string]interface{}) {
 }
 
 
+
+func testHistory() {
+	defer util.Catch()
+	log.Println("修复程序开始")
+
+
+
+	//查询表最后一个id
+
+	between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
+
+	sess := mgo.GetMgoConn()//连接器
+	defer mgo.DestoryMongoConn(sess)
+	//开始判重
+	q := map[string]interface{}{
+		"dataging": 1,
+	}
+	q= map[string]interface{}{}
+	log.Println("历史判重查询条件:",q,"时间:", between_time)
+
+	it := sess.DB(mgo.DbName).C("test_dataging_0929").Find(&q).Sort("publishtime").Iter()
+	num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
+	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+	pendAllArr:=[][]map[string]interface{}{}//待处理数组
+	dayArr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+		if num%10000 == 0 {
+			log.Println("正序遍历:", num,tmp["_id"])
+		}
+
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			outnum++
+			updatelock.Lock()
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging": 0,
+						"repeat_reason": "sourcewebsite为1 重复",
+					},
+				},
+			})
+			if len(updateExtract) >= 500 {
+				log.Println("sourcewebsite,批量更新")
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+
+			updatelock.Unlock()
+
+
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		//取-符合-发布时间X年内的数据
+		updatelock.Lock()
+		if util.IntAll(tmp["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp["publishtime"])
+			if pubtime > 0 && pubtime >= between_time {
+				oknum++
+				if deterTime==0 {
+					log.Println("找到第一条符合条件的数据")
+					deterTime = util.Int64All(tmp["publishtime"])
+					dayArr = append(dayArr,tmp)
+				}else {
+					if pubtime-deterTime >timingSpanDay*86400 {
+						//新数组重新构建,当前组数据加到全部组数据
+						pendAllArr = append(pendAllArr,dayArr)
+						dayArr = []map[string]interface{}{}
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						dayArr = append(dayArr,tmp)
+					}
+				}
+			}else {
+				outnum++
+				//不在两年内的也清标记
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,
+						},
+					},
+				})
+				if len(updateExtract) >= 500 {
+					log.Println("不在周期内符合dataging==1,批量更新")
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+			}
+		}
+
+		updatelock.Unlock()
+
+		tmp = make(map[string]interface{})
+	}
+
+
+	//批量更新标记
+	updatelock.Lock()
+
+	if len(updateExtract) > 0 {
+		log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+
+	updatelock.Unlock()
+
+
+	if len(dayArr)>0 {
+		pendAllArr = append(pendAllArr,dayArr)
+		dayArr = []map[string]interface{}{}
+	}
+
+	log.Println("查询数量:",num,"符合条件:",oknum)
+
+	if len(pendAllArr) <= 0 {
+		log.Println("没找到dataging==1的数据")
+	}
+
+	//测试分组数量是否正确
+	testNum:=0
+	for k,v:=range pendAllArr {
+		log.Println("第",k,"组--","数量:",len(v))
+		testNum = testNum+len(v)
+	}
+	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+	n, repeateN := 0, 0
+	log.Println("线程数:",threadNum)
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	for k,v:=range pendAllArr { //每组结束更新一波数据
+		pool <- true
+		wg.Add(1)
+		go func(k int, v []map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			//每组临时数组 -  互不干扰
+			groupUpdateExtract := [][]map[string]interface{}{}
+
+			//构建当前组的数据池
+			log.Println("构建第", k, "组---(数据池)")
+			//当前组的第一个发布时间
+			first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+			curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+			log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+			n = n + len(v)
+			log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+			for _, tmp := range v {
+				info := NewInfo(tmp)
+				b, source, reason := curTM.check(info)
+				if b { //有重复,生成更新语句,更新抽取和更新招标
+					repeateN++
+					//重复数据打标签
+					repeat_ids:=source.repeat_ids
+					repeat_ids =  append(repeat_ids,info.id)
+					source.repeat_ids = repeat_ids
+					//替换数据池-更新
+					DM.replacePoolData(source)
+					updatelock.Lock()
+
+
+
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":        1,
+								"repeat_reason": reason,
+								"repeat_id":     source.id,
+								"dataging":      0,
+							},
+						},
+					})
+
+					if len(groupUpdateExtract) >= 500 {
+						mgo.UpSertBulk(extract, groupUpdateExtract...)
+						groupUpdateExtract = [][]map[string]interface{}{}
+					}
+
+
+					updatelock.Unlock()
+
+
+				} else {
+					updatelock.Lock()
+
+					groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"dataging": 0, //符合条件的都为dataging==0
+							},
+						},
+					})
+
+					if len(groupUpdateExtract) >= 500 {
+						mgo.UpSertBulk(extract, groupUpdateExtract...)
+						groupUpdateExtract = [][]map[string]interface{}{}
+					}
+					updatelock.Unlock()
+				}
+			}
+			//每组数据结束-更新数据
+			updatelock.Lock()
+			if len(groupUpdateExtract) > 0 {
+				mgo.UpSertBulk(extract, groupUpdateExtract...)
+			}
+
+			updatelock.Unlock()
+
+		}(k, v)
+
+	}
+
+	wg.Wait()
+
+
+	log.Println("结束结束-结束结束")
+}
+
+
+
+
+
+
+
+
+
+
 func historyTaskDay() {
 	defer util.Catch()
 
@@ -359,7 +611,7 @@ func historyTaskDay() {
 			log.Println("查询的最后一个任务Id:",lteid)
 			break
 		}
-		
+
 		log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
 		time.Sleep(5 * time.Minute)
 

+ 3 - 0
udpfilterdup/src/updateMethod.go

@@ -14,6 +14,9 @@ type updateInfo struct {
 
 }
 
+
+
+
 var sp = make(chan bool, 5)
 
 func newUpdatePool() *updateInfo {