apple vor 5 Jahren
Ursprung
Commit
a2e6a1a622
3 geänderte Dateien mit 62 neuen und 27 gelöschten Zeilen
  1. 7 4
      udpfilterdup/src/config.json
  2. 9 7
      udpfilterdup/src/datamap.go
  3. 46 16
      udpfilterdup/src/main.go

+ 7 - 4
udpfilterdup/src/config.json

@@ -2,10 +2,11 @@
     "udpport": ":1485",
     "dupdays": 5,
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "172.17.4.187:27083",
         "pool": 10,
-        "db": "extract_kf",
-        "extract": "zk",
+        "db": "qfw",
+        "extract": "result_20200116",
+        "extract_back": "result_20200116",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -16,11 +17,13 @@
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
     "nextNode": [],
-    "isMerger": false,
     "threads": 1,
+    "isMerger": false,
     "isSort":false,
     "lowHeavy":false,
     "timingTask":true,
+    "timingSpanDay": 5,
+    "timingPubScope": 180,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",

+ 9 - 7
udpfilterdup/src/datamap.go

@@ -64,15 +64,19 @@ type historymap struct {
 }
 
 func TimedTaskDatamap(days int,lasttime int64) *datamap {
+	log.Println("数据池开始重新构建")
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
-	if lastid == "" {
+	if lasttime <0 {
+		log.Println("数据池空数据")
 		return dm
 	}
+
+	start := int(time.Now().Unix())
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	query := map[string]interface{}{"publishtime": map[string]interface{}{
-		"$lte": lasttime,
+		"$lt": lasttime,
 	}}
 	log.Println("query", query)
 	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
@@ -83,9 +87,6 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 		} else {
 			pt := tmp["publishtime"]
 			pt_time := qutil.Int64All(pt)
-			if pt_time <= 0 {
-				continue
-			}
 			if qutil.Float64All(lasttime-pt_time) < datelimit {
 				info := NewInfo(tmp)
 				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
@@ -102,11 +103,12 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 			}
 		}
 		if n%5000 == 0 {
-			log.Println("current n:", n, continuSum)
+			log.Println("current 数据池 n:", n, continuSum)
 		}
 		tmp = make(map[string]interface{})
 	}
-	log.Println("load data:", n)
+
+	log.Printf("数据池构建完成::%d秒,%d个\n", int(time.Now().Unix())-start, n)
 	return dm
 }
 

+ 46 - 16
udpfilterdup/src/main.go

@@ -26,6 +26,7 @@ var (
 	mconf     map[string]interface{} //mongodb配置信息
 	mgo       *MongodbSim            //mongodb操作对象
 	extract   string
+	extract_back string
 	udpclient mu.UdpClient             //udp对象
 	nextNode  []map[string]interface{} //下节点数组
 	dupdays   = 5                      //初始化判重范围
@@ -46,6 +47,8 @@ var (
 	SiteMap    map[string]map[string]interface{} //站点map
 	LowHeavy   bool                              //低质量数据判重
 	TimingTask bool                              //是否定时任务
+	timingSpanDay int64							//时间跨度
+	timingPubScope int64						//发布时间周期
 	sid, eid   string                            //测试人员判重使用
 )
 
@@ -65,6 +68,8 @@ func init() {
 	}
 	mgo.InitPool()
 	extract = mconf["extract"].(string)
+	extract_back = mconf["extract_back"].(string)
+
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
@@ -77,6 +82,9 @@ func init() {
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
 	LowHeavy = Sysconfig["lowHeavy"].(bool)
 	TimingTask = Sysconfig["timingTask"].(bool)
+	timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
+	timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
+
 	//站点配置
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -644,26 +652,34 @@ func timedTaskDay() {
 	timedTaskOnce()
 }
 func timedTaskOnce() {
+
 	log.Println("开始一次定时任务")
+	defer util.Catch()
+
+
 	now := time.Now()
 	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
-	//间隔时间 半年
-	between_time := curTime.Unix()-(86400*180)
-	lastid := task_sid
-	lasttime := int64(0)
-	//
 
-	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
 
-	//ObjectId("5da3f31aa5cb26b9b798d3aa")
-	//ObjectId("5da418c4a5cb26b9b7e3e9a6")
-	//task_sid = "5da3f31aa5cb26b9b798d3aa"
-	//task_eid = "5da418c4a5cb26b9b7e3e9a6"
+	/*ObjectId("5da3f2c5a5cb26b9b79847fc")
+	ObjectId("5db2735ba5cb26b9b7c99c6f")
+	task_sid = "5da3f31aa5cb26b9b798d3aa"
+	task_eid = "5da418c4a5cb26b9b7e3e9a6"
 
-	defer util.Catch()
+	*/
+	//task_sid = "5da3f2c5a5cb26b9b79847fc"
+	//task_eid = "5db2735ba5cb26b9b7c99c6f"
+	//between_time := int64(1331898261)
+
+
+	//发布时间间隔时间 半年
+	between_time := curTime.Unix()-(86400*timingPubScope)
+	lastid := task_sid
+	lasttime := int64(0)
+	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
 	//区间id
 	q_start := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -673,26 +689,28 @@ func timedTaskOnce() {
 	}
 	sess_start := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess_start)
-	it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
+	it_start := sess_start.DB(mgo.DbName).C(extract_back).Find(&q_start).Sort("publishtime").Iter()
 	startNum := 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
 		if startNum%10000 == 0 {
 			log.Println("正序遍历:", startNum)
 		}
+		//log.Println(util.Int64All(tmp_start["publishtime"]))
 		//取-符合-发布时间半年内的数据
-		if util.IntAll(tmp_start["dataging"]) == 1 {
+		if util.IntAll(tmp_start["dataging"]) != 1 {
 			pubtime := util.Int64All(tmp_start["publishtime"])
 			if pubtime>0 && pubtime>between_time {
 				lastid = BsonTOStringId(tmp_start["_id"])
 				lasttime = pubtime
+				log.Println("找到第一条符合条件的数据")
 				break
 			}
 		}
 	}
 
-
+	log.Println("... ...",lasttime)
+	//构建第一条需要判重的数据   (数据池)
 	DM = TimedTaskDatamap(dupdays,lasttime)
-	log.Println("本地数据加载完成,定时任务数据判重开始")
 
 
 	sess := mgo.GetMgoConn()
@@ -711,6 +729,7 @@ func timedTaskOnce() {
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
+	pre_publishtime :=int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
@@ -723,7 +742,6 @@ func timedTaskOnce() {
 			tmp = make(map[string]interface{})
 			continue
 		}
-
 		pool <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -731,6 +749,18 @@ func timedTaskOnce() {
 				<-pool
 				wg.Done()
 			}()
+
+			if pre_publishtime==0 {
+				pre_publishtime = util.Int64All(tmp["publishtime"])
+			}else {
+				//时间跨度是否大于X天
+				if util.Int64All(tmp["publishtime"])-pre_publishtime >86400*timingSpanDay {
+					//重新构建数据池
+					log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
+					pre_publishtime = util.Int64All(tmp["publishtime"])
+					DM = TimedTaskDatamap(dupdays,pre_publishtime)
+				}
+			}
 			info := NewInfo(tmp)
 			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {