소스 검색

全量-判重控制

apple 4 년 전
부모
커밋
06ad8101c9
3개의 변경된 파일127개의 추가작업 그리고 104개의 파일을 삭제
  1. 82 0
      udpfilterdup/src/README.md
  2. 21 20
      udpfilterdup/src/datamap.go
  3. 24 84
      udpfilterdup/src/main.go

+ 82 - 0
udpfilterdup/src/README.md

@@ -361,3 +361,85 @@ func repairHistory() {
 	log.Println("修复结束")
 
 }
+
+
+
+
+//if !LowHeavy { //是否进行低质量数据判重
+			//	if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+			//		updateExtract = append(updateExtract, []map[string]interface{}{
+			//			map[string]interface{}{
+			//				"_id": tmp["_id"],
+			//			},
+			//			map[string]interface{}{
+			//				"$set": map[string]interface{}{
+			//					"repeat": -1, //无效数据标签
+			//				},
+			//			},
+			//		})
+			//		if len(updateExtract) >= 200 {
+			//			mgo.UpSertBulk(extract, updateExtract...)
+			//			updateExtract = [][]map[string]interface{}{}
+			//		}
+			//		return
+			//	}
+			//}
+			
+			
+			
+			
+			if !LowHeavy { //是否进行低质量数据判重
+            						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+            							log.Println("无效数据")
+            							updatelock.Lock()
+            
+            							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
+            								map[string]interface{}{
+            									"_id": tmp["_id"],
+            								},
+            								map[string]interface{}{
+            									"$set": map[string]interface{}{
+            										"repeat":   -1, //无效数据标签
+            										"dataging": 0,
+            									},
+            								},
+            							})
+            							if len(groupUpdateExtract) > 200 {
+            								mgo.UpSertBulk(extract, groupUpdateExtract...)
+            								groupUpdateExtract = [][]map[string]interface{}{}
+            							}
+            
+            							updatelock.Unlock()
+            
+            							return
+            						}
+            					}
+            					
+     //是否合并-低质量数据不合并
+     				if isMerger && !strings.Contains(reason,"低质量"){
+     					newData, update_map ,isReplace := mergeDataFields(source, info)
+     					if isReplace {//替换-数据池
+     						fmt.Println("合并更新的id:",source.id)
+     						//数据池 - 替换
+     						DM.replacePoolData(newData)
+     						//mongo更新 - 具体字段 - merge
+     						mgo.UpdateById(extract,source.id,update_map)
+     						//发udp  更新索引
+     						//for _, to := range nextNode {
+     						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+     						//	by, _ := json.Marshal(map[string]interface{}{
+     						//		"gtid":  source.id,
+     						//		"lteid": source.id,
+     						//		"stype": "biddingall",
+     						//		"key":   key,
+     						//	})
+     						//	addr := &net.UDPAddr{
+     						//		IP:   net.ParseIP(to["addr"].(string)),
+     						//		Port: util.IntAll(to["port"]),
+     						//	}
+     						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
+     						//	udptaskmap.Store(key, node)
+     						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+     						//}
+     					}
+     				}       					

+ 21 - 20
udpfilterdup/src/datamap.go

@@ -130,6 +130,7 @@ func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400 * 2)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
 	if lastid == "" {
+		log.Println("不构建数据池")
 		return dm
 	}
 	//初始化加载数据
@@ -471,30 +472,30 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 
 	}else {
-		//d.keymap = d.GetLatelyFiveDayDouble(t)
-		d.keymap = d.GetLatelyFiveDay(t)
-	}
-	m := map[string]bool{}
-	for _, v := range d.keymap {
-		m[v] = true
-	}
-	all, all1 := 0, 0
 
-	for k, v := range d.data {
-		all += len(v)
-		if !m[k[:8]] {
-			delete(d.data, k)
+		if IsFull {
+			d.keymap = d.GetLatelyFiveDay(t)//全量
+		}else {
+			d.keymap = d.GetLatelyFiveDayDouble(t) //增量
 		}
-	}
-	for k, _ := range d.keys {
-		if !m[k] {
-			delete(d.keys, k)
+		d.keymap = d.GetLatelyFiveDay(t)//全量
+
+		m := map[string]bool{}
+		for _, v := range d.keymap {
+			m[v] = true
+		}
+		for k, _ := range d.data {
+			if !m[k[:8]] {
+				delete(d.data, k)
+			}
+		}
+		for k, _ := range d.keys {
+			if !m[k] {
+				delete(d.keys, k)
+			}
 		}
 	}
-	for _, v := range d.data {
-		all1 += len(v)
-	}
-	//log.Println("更新前后数据:", all, all1)
+
 }
 
 func (d *datamap) GetLatelyFiveDay(t int64) []string  {

+ 24 - 84
udpfilterdup/src/main.go

@@ -17,7 +17,6 @@ import (
 	"qfw/util"
 	"regexp"
 	"strconv"
-	"strings"
 	"sync"
 	"time"
 )
@@ -48,19 +47,22 @@ var (
 	TimingTask     bool                              //是否定时任务
 	timingSpanDay  int64                             //时间跨度
 	timingPubScope int64                             //发布时间周期
-	gtid,lteid,lastid string
+	gtid,lteid,lastid,gtept string
 	IdType         bool   							 //默认object类型
-
+	IsFull		   bool
 	updatelock 		sync.Mutex         //锁
 )
 
 
 
 func init() {
+
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
 	flag.StringVar(&gtid, "gtid", "", "历史的起始id")
+	flag.StringVar(&gtept, "pt", "", "全量发布时间")
 
 	flag.Parse()
+
 	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
 
@@ -143,8 +145,9 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid := "1f16936d52c1d9fbf843c60e"
-		eid := "9f16936d52c1d9fbf843c60e"
+		IsFull = true	//全量判重
+		sid := "1fffffffffffffffffffffff"
+		eid := "9fffffffffffffffffffffff"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -216,6 +219,13 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 
+	if IsFull && gtept!="" {
+		q = map[string]interface{}{
+			"publishtime": map[string]interface{}{
+				"$gte": util.Int64All(gtept)-864000,
+			},
+		}
+	}
 
 	log.Println(mgo.DbName, extract, q)
 	sess := mgo.GetMgoConn()
@@ -227,11 +237,12 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	n, repeateN := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%10000 == 0 {
-			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
+			log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
 		}
 		source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
 		if util.IntAll((*source)["sourcewebsite"]) == 1 {
 			repeateN++
+			updatelock.Lock()
 			updateExtract = append(updateExtract, []map[string]interface{}{
 				map[string]interface{}{
 					"_id": tmp["_id"],
@@ -244,14 +255,15 @@ func task(data []byte, mapInfo map[string]interface{}) {
 					},
 				},
 			})
-			if len(updateExtract) >= 200 {
+			if len(updateExtract) >= 500 {
+				log.Println("批量-更新-sourcewebsite")
 				mgo.UpSertBulk(extract, updateExtract...)
 				updateExtract = [][]map[string]interface{}{}
 			}
+			updatelock.Unlock()
 			tmp = make(map[string]interface{})
 			continue
 		}
-
 		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
 			util.IntAll(tmp["dataging"]) == 1 {
 			if util.IntAll(tmp["repeat"]) == 1 {
@@ -268,25 +280,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				wg.Done()
 			}()
 			info := NewInfo(tmp)
-			if !LowHeavy { //是否进行低质量数据判重
-				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-					updateExtract = append(updateExtract, []map[string]interface{}{
-						map[string]interface{}{
-							"_id": tmp["_id"],
-						},
-						map[string]interface{}{
-							"$set": map[string]interface{}{
-								"repeat": -1, //无效数据标签
-							},
-						},
-					})
-					if len(updateExtract) >= 200 {
-						mgo.UpSertBulk(extract, updateExtract...)
-						updateExtract = [][]map[string]interface{}{}
-					}
-					return
-				}
-			}
 			//正常判重
 			b, source, reason := DM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
@@ -324,42 +317,15 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						},
 					},
 				})
-
-				//是否合并-低质量数据不合并
-				if isMerger && !strings.Contains(reason,"低质量"){
-					newData, update_map ,isReplace := mergeDataFields(source, info)
-					if isReplace {//替换-数据池
-						fmt.Println("合并更新的id:",source.id)
-						//数据池 - 替换
-						DM.replacePoolData(newData)
-						//mongo更新 - 具体字段 - merge
-						mgo.UpdateById(extract,source.id,update_map)
-						//发udp  更新索引
-						//for _, to := range nextNode {
-						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-						//	by, _ := json.Marshal(map[string]interface{}{
-						//		"gtid":  source.id,
-						//		"lteid": source.id,
-						//		"stype": "biddingall",
-						//		"key":   key,
-						//	})
-						//	addr := &net.UDPAddr{
-						//		IP:   net.ParseIP(to["addr"].(string)),
-						//		Port: util.IntAll(to["port"]),
-						//	}
-						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
-						//	udptaskmap.Store(key, node)
-						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-						//}
-					}
-				}
 			}
 		}(tmp)
-
-		if len(updateExtract) >= 200 {
+		updatelock.Lock()
+		if len(updateExtract) >=500 {
+			log.Println("批量-更新")
 			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
+		updatelock.Unlock()
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
@@ -587,32 +553,6 @@ func historyTaskDay() {
 				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
 				for _, tmp := range v {
 					info := NewInfo(tmp)
-					if !LowHeavy { //是否进行低质量数据判重
-						if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-							log.Println("无效数据")
-							updatelock.Lock()
-
-							groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
-								map[string]interface{}{
-									"_id": tmp["_id"],
-								},
-								map[string]interface{}{
-									"$set": map[string]interface{}{
-										"repeat":   -1, //无效数据标签
-										"dataging": 0,
-									},
-								},
-							})
-							if len(groupUpdateExtract) > 200 {
-								mgo.UpSertBulk(extract, groupUpdateExtract...)
-								groupUpdateExtract = [][]map[string]interface{}{}
-							}
-
-							updatelock.Unlock()
-
-							return
-						}
-					}
 					b, source, reason := curTM.check(info)
 					if b { //有重复,生成更新语句,更新抽取和更新招标
 						repeateN++