apple 5 年 前
コミット
409afa82f1
3 ファイル変更36 行追加17 行削除
  1. 3 3
      udpfilterdup/src/config.json
  2. 3 2
      udpfilterdup/src/datamap.go
  3. 30 12
      udpfilterdup/src/main.go

+ 3 - 3
udpfilterdup/src/config.json

@@ -6,9 +6,9 @@
         "pool": 10,
         "db": "qfw",
         "extract": "result_20200116",
-        "extract_back": "result_20200116",
+        "extract_back": "result_20200116_back",
         "site": {
-            "dbname": "extract_kf",
+            "dbname": "qfw",
             "coll": "site"
         }
     },
@@ -22,7 +22,7 @@
     "isSort":false,
     "lowHeavy":false,
     "timingTask":true,
-    "timingSpanDay": 5,
+    "timingSpanDay": 2,
     "timingPubScope": 180,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",

+ 3 - 2
udpfilterdup/src/datamap.go

@@ -82,7 +82,8 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
 	n, continuSum := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||qutil.IntAll(tmp["dataging"]) == 1 {
+		//qutil.IntAll(tmp["dataging"]) == 1
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1||qutil.IntAll(tmp["dataging"]) == 1  {
 			continuSum++
 		} else {
 			pt := tmp["publishtime"]
@@ -102,7 +103,7 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 				break
 			}
 		}
-		if n%5000 == 0 {
+		if n%10000 == 0 {
 			log.Println("current 数据池 n:", n, continuSum)
 		}
 		tmp = make(map[string]interface{})

+ 30 - 12
udpfilterdup/src/main.go

@@ -674,10 +674,19 @@ func timedTaskOnce() {
 	//task_eid = "5db2735ba5cb26b9b7c99c6f"
 	//between_time := int64(1331898261)
 
+	/*
+	ObjectId("5e20965785a9271abf0ad6bd")
+	ObjectId("5e20968d85a9271abf0ad6c2")
+	ObjectId("5e20965785a9271abf0ad6bd")
+	*/
+
+
+	task_sid = "5e20965785a9271abf0ad6bd"
+	task_eid = "5e20968d85a9271abf0ad6c2"
+	between_time := int64(1565801997)
 
 	//发布时间间隔时间 半年
-	between_time := curTime.Unix()-(86400*timingPubScope)
-	lastid := task_sid
+	//between_time := curTime.Unix()-(86400*timingPubScope)
 	lasttime := int64(0)
 	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
 	//区间id
@@ -697,10 +706,9 @@ func timedTaskOnce() {
 		}
 		//log.Println(util.Int64All(tmp_start["publishtime"]))
 		//取-符合-发布时间半年内的数据
-		if util.IntAll(tmp_start["dataging"]) != 1 {
+		if util.IntAll(tmp_start["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp_start["publishtime"])
 			if pubtime>0 && pubtime>between_time {
-				lastid = BsonTOStringId(tmp_start["_id"])
 				lasttime = pubtime
 				log.Println("找到第一条符合条件的数据")
 				break
@@ -708,8 +716,14 @@ func timedTaskOnce() {
 		}
 	}
 
-	log.Println("... ...",lasttime)
+	log.Println("... ...",lasttime,)
+	if lasttime <=0 {
+		log.Println("没找到dataging==1的数据")
+		return
+	}
+
 	//构建第一条需要判重的数据   (数据池)
+	log.Println("开始构建第一条需要判重的数据 ---(数据池)")
 	DM = TimedTaskDatamap(dupdays,lasttime)
 
 
@@ -718,12 +732,15 @@ func timedTaskOnce() {
 
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gte": StringTOBsonId(lastid),
+			"$gte": StringTOBsonId(task_sid),
 			"$lte": StringTOBsonId(task_eid),
 		},
 	}
 
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	log.Println("正式判重:",q)
+	it := sess.DB(mgo.DbName).C(extract_back).Find(&q).Sort("publishtime").Iter()
+
+
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
@@ -734,6 +751,7 @@ func timedTaskOnce() {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
+
 		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
 			continue
@@ -749,12 +767,12 @@ func timedTaskOnce() {
 				<-pool
 				wg.Done()
 			}()
-
 			if pre_publishtime==0 {
 				pre_publishtime = util.Int64All(tmp["publishtime"])
 			}else {
+				log.Println("现在时间差:",util.Int64All(tmp["publishtime"])-pre_publishtime,"跨度--",86400*timingSpanDay)
 				//时间跨度是否大于X天
-				if util.Int64All(tmp["publishtime"])-pre_publishtime >86400*timingSpanDay {
+				if (util.Int64All(tmp["publishtime"])-pre_publishtime) >(86400*timingSpanDay) {
 					//重新构建数据池
 					log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
 					pre_publishtime = util.Int64All(tmp["publishtime"])
@@ -776,7 +794,7 @@ func timedTaskOnce() {
 						},
 					})
 					if len(updateExtract) > 500 {
-						mgo.UpSertBulk(extract, updateExtract...)
+						//mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
 					return
@@ -885,14 +903,14 @@ func timedTaskOnce() {
 			}
 		}(tmp)
 		if len(updateExtract) > 500 {
-			mgo.UpSertBulk(extract, updateExtract...)
+			//mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
 	if len(updateExtract) > 0 {
-		mgo.UpSertBulk(extract, updateExtract...)
+		//mgo.UpSertBulk(extract, updateExtract...)
 	}
 	log.Println("this timeTask over.", n, "repeateN:", repeateN)