Browse Source

定时任务判重

apple 5 years ago
parent
commit
0335edfa54
3 changed files with 37 additions and 17 deletions
  1. 1 1
      udpfilterdup/src/config.json
  2. 11 4
      udpfilterdup/src/datamap.go
  3. 25 12
      udpfilterdup/src/main.go

+ 1 - 1
udpfilterdup/src/config.json

@@ -6,7 +6,7 @@
         "pool": 10,
         "db": "qfw",
         "extract": "result_20200116",
-        "extract_back": "result_20200116_back",
+        "extract_back": "result_20200116_new",
         "site": {
             "dbname": "qfw",
             "coll": "site"

+ 11 - 4
udpfilterdup/src/datamap.go

@@ -65,7 +65,7 @@ type historymap struct {
 
 func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	log.Println("数据池开始重新构建")
-	datelimit = qutil.Float64All(days * 86400)
+	datelimit = qutil.Float64All(1 * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
 	if lasttime <0 {
 		log.Println("数据池空数据")
@@ -84,11 +84,12 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		//qutil.IntAll(tmp["dataging"]) == 1
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1||qutil.IntAll(tmp["dataging"]) == 1  {
-			continuSum++
+
 		} else {
 			pt := tmp["publishtime"]
 			pt_time := qutil.Int64All(pt)
 			if qutil.Float64All(lasttime-pt_time) < datelimit {
+				continuSum++
 				info := NewInfo(tmp)
 				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
 				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
@@ -103,13 +104,16 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 				break
 			}
 		}
-		if n%10000 == 0 {
-			log.Println("current 数据池 n:", n, continuSum)
+		if n%50000 == 0 {
+			log.Println("current 数据池:", n, continuSum)
 		}
 		tmp = make(map[string]interface{})
 	}
 
+
 	log.Printf("数据池构建完成::%d秒,%d个\n", int(time.Now().Unix())-start, n)
+
+
 	return dm
 }
 
@@ -316,9 +320,11 @@ L:
 		data := d.data[k]
 		d.lock.Unlock()
 		if len(data) > 0 { //对比v   找到同类型,同省或全国的数据作对比
+			//log.Println(info.area,info.subtype,k)
 			for _, v := range data {
 				reason = ""
 				if v.id == info.id { //正常重复
+					//log.Println("相同id",info.id)
 					return false, v, ""
 				}
 				if info.subtype == v.subtype {
@@ -425,6 +431,7 @@ L:
 					}
 				}
 			}
+
 		}
 	}
 

+ 25 - 12
udpfilterdup/src/main.go

@@ -664,13 +664,17 @@ func timedTaskOnce() {
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 
 	//发布时间间隔时间 半年
-
-
-	//测试数据
+	//测试数据 6点每个间隔6个月
 	//task_sid = "5e20965785a9271abf0ad6bd"
 	//task_eid = "5e20968d85a9271abf0ad6c2"
 	//between_time := int64(1565801997)
-	between_time := curTime.Unix()-(86400*timingPubScope)
+
+	//测试数据 180个点 每个隔1天
+	task_sid = "5e208f9b50b5ea296eccbb8a"
+	task_eid = "5e20968d85a9271abf0ad6c2"
+	between_time := int64(1563641997)
+
+	//between_time := curTime.Unix()-(86400*timingPubScope)
 	lasttime := int64(0)
 	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
 	//区间id
@@ -685,14 +689,15 @@ func timedTaskOnce() {
 	it_start := sess_start.DB(mgo.DbName).C(extract_back).Find(&q_start).Sort("publishtime").Iter()
 	startNum := 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
+
 		if startNum%10000 == 0 {
 			log.Println("正序遍历:", startNum)
 		}
-		//log.Println(util.Int64All(tmp_start["publishtime"]))
 		//取-符合-发布时间半年内的数据
 		if util.IntAll(tmp_start["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp_start["publishtime"])
-			if pubtime>0 && pubtime>between_time {
+			//log.Println(startNum,"--",pubtime,"--",between_time)
+			if pubtime>0 && pubtime>=between_time {
 				lasttime = pubtime
 				log.Println("找到第一条符合条件的数据")
 				break
@@ -709,6 +714,7 @@ func timedTaskOnce() {
 	//构建第一条需要判重的数据   (数据池)
 	log.Println("开始构建第一条需要判重的数据 ---(数据池)")
 	DM = TimedTaskDatamap(dupdays,lasttime)
+
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 
@@ -729,6 +735,8 @@ func timedTaskOnce() {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
+
+		//log.Println("当前测试重复数量:",repeateN)
 		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
 			continue
@@ -744,21 +752,25 @@ func timedTaskOnce() {
 				<-pool
 				wg.Done()
 			}()
+
+			//log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
+
 			if pre_publishtime==0 {
 				pre_publishtime = util.Int64All(tmp["publishtime"])
 			}else {
-				log.Println("现在时间差:",util.Int64All(tmp["publishtime"])-pre_publishtime,"跨度--",86400*timingSpanDay)
 				//时间跨度是否大于X天
-				if (util.Int64All(tmp["publishtime"])-pre_publishtime) >(86400*timingSpanDay) {
+				if (util.Int64All(tmp["publishtime"])-pre_publishtime) >=(86400*timingSpanDay) {
 					//重新构建数据池
-					log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
+					//log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
 					pre_publishtime = util.Int64All(tmp["publishtime"])
 					DM = TimedTaskDatamap(dupdays,pre_publishtime)
 				}
 			}
+
 			info := NewInfo(tmp)
 			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+					log.Println("测试-无效数据")
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
@@ -771,7 +783,7 @@ func timedTaskOnce() {
 						},
 					})
 					if len(updateExtract) > 500 {
-						//mgo.UpSertBulk(extract, updateExtract...)
+						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
 					return
@@ -779,6 +791,7 @@ func timedTaskOnce() {
 			}
 
 			b, source, reason := DM.check(info)
+			log.Println("判重结果",b,reason)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
 				repeateN++
 				var is_replace = false
@@ -880,14 +893,14 @@ func timedTaskOnce() {
 			}
 		}(tmp)
 		if len(updateExtract) > 500 {
-			//mgo.UpSertBulk(extract, updateExtract...)
+			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
 	if len(updateExtract) > 0 {
-		//mgo.UpSertBulk(extract, updateExtract...)
+		mgo.UpSertBulk(extract, updateExtract...)
 	}
 	log.Println("this timeTask over.", n, "repeateN:", repeateN)