Browse Source

判重-备份-拆分

zhengkun 3 years ago
parent
commit
49b88bbfa5

+ 0 - 2
udpfilterdup/src/README.md

@@ -39,7 +39,6 @@
         }
     ],
     "threads": 1,
-    "isMerger": false,
     "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 5,
@@ -129,7 +128,6 @@ func moveOnceTimeOut()  {
         }
     ],
     "threads": 1,
-    "isMerger": false,
     "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 5,

+ 1 - 2
udpfilterdup/src/config.json

@@ -26,8 +26,7 @@
     ],
     "userName": "",
     "password": "",
-    "threads": 3,
-    "isMerger": false,
+    "threads": 2,
     "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 4,

+ 6 - 6
udpfilterdup/src/datamap.go

@@ -63,13 +63,13 @@ func TimedTaskDatamap(days int,lasttime int64,numIndex int) *datamap {
 		return dm
 	}
 	start := int(time.Now().Unix())
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
 	query := map[string]interface{}{"publishtime": map[string]interface{}{
 		"$lt": lasttime,
 	}}
 	log.Println("query", query)
-	it := sess.DB(mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
+	it := sess.DB(data_mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
@@ -131,13 +131,13 @@ func NewDatamap(days int, lastid string) *datamap {
 		return dm
 	}
 	//初始化加载数据
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
 	query := map[string]interface{}{"_id": map[string]interface{}{
 		"$lte": StringTOBsonId(lastid),
 	}}
 	log.Println("query", query)
-	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
+	it := sess.DB(data_mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
 	nowTime := time.Now().Unix()//当前时间的时间戳
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {

+ 4 - 4
udpfilterdup/src/fullRepeat.go

@@ -22,10 +22,10 @@ func fullRepeat(sid,eid string) {
 			"$lte": StringTOBsonId(eid),
 		},
 	}
-	log.Println("开始全量数据判重~查询条件:",mgo.DbName, extract, q)
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	log.Println("开始全量数据判重~查询条件:",data_mgo.DbName, extract, q)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	n, isok ,repeatN:= 0,0,0
 	dataAllDict := make(map[string][]map[string]interface{},0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {

+ 15 - 19
udpfilterdup/src/historyRepeat.go

@@ -17,10 +17,8 @@ import (
 //历史判重
 func historyRepeat() {
 	defer util.Catch()
-
 	for {
 		start:=time.Now().Unix()
-
 		if gtid=="" {
 			log.Println("请传gtid,否则无法运行")
 			os.Exit(0)
@@ -32,7 +30,6 @@ func historyRepeat() {
 			moveHistoryData(gtid,lteid)
 			gtid = lteid //替换数据
 		}
-
 		//查询表最后一个id
 		task_sess := task_mgo.GetMgoConn()
 		defer task_mgo.DestoryMongoConn(task_sess)
@@ -64,8 +61,8 @@ func historyRepeat() {
 		log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
 		time.Sleep(5 * time.Minute)
 
-		sess := mgo.GetMgoConn()//连接器
-		defer mgo.DestoryMongoConn(sess)
+		sess := data_mgo.GetMgoConn()//连接器
+		defer data_mgo.DestoryMongoConn(sess)
 		//开始判重
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
@@ -74,7 +71,7 @@ func historyRepeat() {
 			},
 		}
 		log.Println("历史判重查询条件:",q,"时间:", between_time)
-		it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 		num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
 		pendAllArr:=[][]map[string]interface{}{}//待处理数组
 		dayArr := []map[string]interface{}{}
@@ -216,13 +213,12 @@ func historyRepeat() {
 							},
 						}
 						if len(groupOtherExtract) >= 500 {
-							mgo.UpSertBulk(extract_back, groupOtherExtract...)
+							data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
 							groupOtherExtract = [][]map[string]interface{}{}
 						}
 
 						updatelock.Unlock()
 
-
 					} else {
 						Update.updatePool <- []map[string]interface{}{//重复数据打标签
 							map[string]interface{}{
@@ -240,7 +236,7 @@ func historyRepeat() {
 				//每组数据结束-更新数据
 				updatelock.Lock()
 				if len(groupOtherExtract) > 0 {
-					mgo.UpSertBulk(extract_back, groupOtherExtract...)
+					data_mgo.UpSertBulk(extract_back, groupOtherExtract...)
 				}
 				updatelock.Unlock()
 
@@ -299,8 +295,8 @@ func judgeIsCurIds (gtid string,lteid string,curid string) bool {
 }
 //迁移上一段数据
 func moveHistoryData(startid string,endid string) {
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
 	year, month, day := time.Now().Date()
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -309,10 +305,10 @@ func moveHistoryData(startid string,endid string) {
 		},
 	}
 	log.Println(q)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
 	index := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		mgo.Save(extract_back, tmp)
+		data_mgo.Save(extract_back, tmp)
 		tmp = map[string]interface{}{}
 		if index%1000 == 0 {
 			log.Println("index", index)
@@ -325,7 +321,7 @@ func moveHistoryData(startid string,endid string) {
 			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
 		},
 	}
-	delnum := mgo.Delete(extract, qv)
+	delnum := data_mgo.Delete(extract, qv)
 	log.Println("remove from ", extract, delnum)
 
 }
@@ -346,8 +342,8 @@ func moveTimeoutData()  {
 }
 func moveOnceTimeOut()  {
 	log.Println("执行一次迁移超时数据")
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
 	now:=time.Now()
 
 	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
@@ -358,15 +354,15 @@ func moveOnceTimeOut()  {
 		},
 	}
 
-	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
 	index := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
 		if index%10000 == 0 {
 			log.Println("index", index)
 		}
 		del_id:=BsonTOStringId(tmp["_id"])
-		mgo.Save("result_20200713", tmp)
-		mgo.DeleteById("result_20200714",del_id)
+		data_mgo.Save("result_20200713", tmp)
+		data_mgo.DeleteById("result_20200714",del_id)
 		tmp = map[string]interface{}{}
 	}
 	log.Println("save and delete", " ok index", index)

+ 6 - 6
udpfilterdup/src/increaseRepeat.go

@@ -21,10 +21,10 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 		},
 	}
-	log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
+	sess := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess)
+	it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	n, isok ,repeatN:= 0,0,0
 	dataAllDict := make(map[string][]map[string]interface{},0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
@@ -40,8 +40,8 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			tmp = make(map[string]interface{})
 			continue
 		}
-		isok++
 		//数据分组-按照类别分组
+		isok++
 		subtype := qu.ObjToString(tmp["subtype"])
 		if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
 			subtype=="竞谈"||subtype=="竞价" {
@@ -69,7 +69,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 			num := 0
 			for _,tmp := range dataArr{
 				info := NewInfo(tmp)
-				b, source, reason := DM.check(info)
+				b,source,reason := DM.check(info)
 				if b {
 					num++
 					var updateID = map[string]interface{}{} //记录更新判重的

+ 6 - 9
udpfilterdup/src/main.go

@@ -19,7 +19,7 @@ import (
 var (
 	Sysconfig    map[string]interface{} 	//配置文件
 	mconf        map[string]interface{} 	//mongodb配置信息
-	mgo          *MongodbSim            	//mongodb操作对象
+	data_mgo          *MongodbSim            	//mongodb操作对象
 	task_mgo     *MongodbSim            	//mongodb操作对象
 	task_collName	string
 	extract      string
@@ -34,8 +34,6 @@ var (
 	FilterRegTitle_0 = regexp.MustCompile("^_$")
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
-
-	isMerger       bool                              //是否合并
 	threadNum      int                               //线程数量
 	SiteMap        map[string]map[string]interface{} //站点map
 	LowHeavy       bool                              //低质量数据判重
@@ -78,12 +76,12 @@ func init() {
 
 	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
-	mgo = &MongodbSim{
+	data_mgo = &MongodbSim{
 		MongodbAddr: mconf["addr"].(string),
 		DbName:      mconf["db"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 10),
 	}
-	mgo.InitPool()
+	data_mgo.InitPool()
 
 	extract = mconf["extract"].(string)
 	extract_back = mconf["extract_back"].(string)
@@ -99,7 +97,6 @@ func init() {
 	FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
-	isMerger = Sysconfig["isMerger"].(bool)
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
 	LowHeavy = Sysconfig["lowHeavy"].(bool)
 	TimingTask = Sysconfig["timingTask"].(bool)
@@ -109,8 +106,8 @@ func init() {
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
 	start := int(time.Now().Unix())
-	sess_site := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess_site)
+	sess_site := data_mgo.GetMgoConn()
+	defer data_mgo.DestoryMongoConn(sess_site)
 	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
 	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
 		data_map := map[string]interface{}{
@@ -184,7 +181,7 @@ func main() {
 		log.Println("正常历史部署")
 		go historyRepeat()
 	}else {
-		IsFull = true
+		//IsFull = true
 		if !IsFull {//正常增量
 			log.Println("正常增量部署,监听任务")
 			go getRepeatTask()

+ 2 - 2
udpfilterdup/src/updateMethod.go

@@ -40,7 +40,7 @@ func (update *updateInfo) updateData() {
 					defer func() {
 						<-sp
 					}()
-					mgo.UpSertBulk(extract, dataArr...)
+					data_mgo.UpSertBulk(extract, dataArr...)
 				}(tmpArr)
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
 				tmpIndex = 0
@@ -52,7 +52,7 @@ func (update *updateInfo) updateData() {
 					defer func() {
 						<-sp
 					}()
-					mgo.UpSertBulk(extract, dataArr...)
+					data_mgo.UpSertBulk(extract, dataArr...)
 				}(tmpArr[:tmpIndex])
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
 				tmpIndex = 0