Jelajahi Sumber

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 tahun lalu
induk
melakukan
fb37b7235f
4 mengubah file dengan 83 tambahan dan 104 penghapusan
  1. 3 0
      udp_winner/main.go
  2. 5 5
      udpfilterdup/src/config.json
  3. 4 4
      udpfilterdup/src/datamap.go
  4. 71 95
      udpfilterdup/src/main.go

+ 3 - 0
udp_winner/main.go

@@ -208,6 +208,9 @@ func initMongo() {
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
 	defer FClient.DestoryMongoConn(FClientmgoConn)
+
+
+
 	//加载省市县代码
 	cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
 	//defer FClient.Connect(cc)

+ 5 - 5
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "extract_kf",
-        "extract": "zk_test_2019_new",
-        "extract_back": "zk_test_2019_new",
+        "extract": "zk_task_test",
+        "extract_back": "zk_task_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -19,11 +19,11 @@
     "nextNode": [],
     "threads": 1,
     "isMerger": false,
-    "isSort":true,
+    "isSort":false,
     "lowHeavy":false,
-    "timingTask":false,
+    "timingTask":true,
     "timingSpanDay": 2,
-    "timingPubScope": 180,
+    "timingPubScope": 360,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",

+ 4 - 4
udpfilterdup/src/datamap.go

@@ -67,7 +67,7 @@ type historymap struct {
 
 func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	log.Println("数据池开始重新构建")
-	datelimit = qutil.Float64All(1 * 86400)
+	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
 	if lasttime <0 {
 		log.Println("数据池空数据")
@@ -387,9 +387,9 @@ L:
 					return false, v, ""
 				}
 
-				if v.id == "5c761a4fa5cb26b9b73d9512" &&info.id=="5c767bd1a5cb26b9b7a61597" {
-					log.Println("测试数据")
-				}
+				//if v.id == "5c761a4fa5cb26b9b73d9512" &&info.id=="5c767bd1a5cb26b9b7a61597" {
+				//	log.Println("测试数据")
+				//}
 
 				if info.subtype == v.subtype {
 					if info.site != "" {

+ 71 - 95
udpfilterdup/src/main.go

@@ -8,6 +8,7 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
+	"github.com/cron"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -17,7 +18,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/cron"
 	"gopkg.in/mgo.v2/bson"
 )
 
@@ -106,6 +106,11 @@ func init() {
 }
 
 func main() {
+
+	//TestXiuFu()
+	//
+	//return
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -587,8 +592,10 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 							})
 						}
 					} else { //高质量数据
+
 						basic_bool := basicDataScore(source, info)
 						if !basic_bool {
+
 							HM.replaceSourceData(info, source.id) //替换
 							repeat_idMap["_id"] = StringTOBsonId(source.id)
 							repeat_id = info.id
@@ -660,24 +667,12 @@ func timedTaskOnce() {
 	defer util.Catch()
 
 	now := time.Now()
-	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
-	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
+	preTime := time.Date(now.Year(), now.Month(), now.Day()-2, 0, 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 
-	//发布时间间隔时间 半年
-	//测试数据 6点每个间隔6个月
-	//task_sid = "5e20965785a9271abf0ad6bd"
-	//task_eid = "5e20968d85a9271abf0ad6c2"
-	//between_time := int64(1565801997)
-
-	//测试数据 180个点 每个隔1天
-	//task_sid = "5e208f9b50b5ea296eccbb8a"
-	//task_eid = "5e20968d85a9271abf0ad6c2"
-	//between_time := int64(1563641997)
-
 	between_time := curTime.Unix() - (86400 * timingPubScope)
-	lasttime := int64(0)
 	log.Println(task_sid, task_eid, curTime.Unix(), between_time)
 	//区间id
 	q_start := map[string]interface{}{
@@ -686,93 +681,65 @@ func timedTaskOnce() {
 			"$lte": StringTOBsonId(task_eid),
 		},
 	}
-	sess_start := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess_start)
-	it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
-	startNum := 0
-	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
-
-		if startNum%10000 == 0 {
-			log.Println("正序遍历:", startNum)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
+	num, deterTime:= int64(0),int64(0) //计数
+	pendAllArr:=[][]map[string]interface{}{}//待处理数组
+	dayArr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
+		if num%10000 == 0 {
+			log.Println("正序遍历:", num)
+		}
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
+			tmp = make(map[string]interface{})
+			continue
 		}
 		//取-符合-发布时间半年内的数据
-		if util.IntAll(tmp_start["dataging"]) == 1 {
-			pubtime := util.Int64All(tmp_start["publishtime"])
-			//log.Println(startNum,"--",pubtime,"--",between_time)
+		if util.IntAll(tmp["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp["publishtime"])
 			if pubtime > 0 && pubtime >= between_time {
-				lasttime = pubtime
-				log.Println("找到第一条符合条件的数据")
-				break
+				if deterTime==0 {
+					log.Println("找到第一条符合条件的数据")
+					deterTime = util.Int64All(tmp["publishtime"])
+					dayArr = append(dayArr,tmp)
+				}else {
+					if util.Int64All(tmp["publishtime"])-deterTime >timingSpanDay*86400 {
+						//新数组重新构建,当前组数据加到全部组数据
+						pendAllArr = append(pendAllArr,dayArr)
+						dayArr = []map[string]interface{}{}
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						dayArr = append(dayArr,tmp)
+					}
+				}
 			}
 		}
+		tmp = make(map[string]interface{})
 	}
 
-	log.Println("... ...", lasttime)
-	if lasttime <= 0 {
+	if len(pendAllArr) <= 0 {
 		log.Println("没找到dataging==1的数据")
 		return
 	}
 
-	//构建第一条需要判重的数据   (数据池)
-	log.Println("开始构建第一条需要判重的数据 ---(数据池)")
-	DM = TimedTaskDatamap(dupdays, lasttime)
+	log.Println("本地构建分组完成:",len(pendAllArr),"组")
 
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-
-	q := map[string]interface{}{
-		"_id": map[string]interface{}{
-			"$gte": StringTOBsonId(task_sid),
-			"$lte": StringTOBsonId(task_eid),
-		},
-	}
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
-	log.Println("线程数只能为1")
-	pool := make(chan bool, threadNum)
-	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
-	pre_publishtime := int64(0)
-	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
-		if n%10000 == 0 {
-			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
-		}
-
-		//log.Println("当前测试重复数量:",repeateN)
-		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
-			tmp = make(map[string]interface{})
-			continue
-		}
-		if util.IntAll(tmp["dataging"]) != 1 {
-			tmp = make(map[string]interface{})
-			continue
-		}
-		pool <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-pool
-				wg.Done()
-			}()
-
-			//log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
-
-			if pre_publishtime == 0 {
-				pre_publishtime = util.Int64All(tmp["publishtime"])
-			} else {
-				//时间跨度是否大于X天
-				if (util.Int64All(tmp["publishtime"]) - pre_publishtime) >= (86400 * timingSpanDay) {
-					//重新构建数据池
-					//log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
-					pre_publishtime = util.Int64All(tmp["publishtime"])
-					DM = TimedTaskDatamap(dupdays, pre_publishtime)
-				}
-			}
-
+	for k,v:=range pendAllArr {
+		//构建当前组的数据池
+		log.Println("构建第",k,"组---(数据池)")
+		DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
+		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
+		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
+		n = n+len(v)
+		for _,tmp:=range v {
 			info := NewInfo(tmp)
 			if !LowHeavy { //是否进行低质量数据判重
 				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
-					log.Println("测试-无效数据")
+					log.Println("无效数据")
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						map[string]interface{}{
 							"_id": tmp["_id"],
@@ -788,13 +755,12 @@ func timedTaskOnce() {
 						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
-					return
+					continue
 				}
 			}
-
 			b, source, reason := DM.check(info)
-			log.Println("判重结果", b, reason)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
+				log.Println("判重结果", b, reason)
 				repeateN++
 				var is_replace = false
 				var mergeArr = []int64{}                    //更改合并数组记录
@@ -873,6 +839,7 @@ func timedTaskOnce() {
 				} else { //高质量数据
 					basic_bool := basicDataScore(source, info)
 					if !basic_bool {
+						log.Println("高质量数据替换:",source.id,info.id)
 						DM.replaceSourceData(info, source.id) //替换
 						repeat_idMap["_id"] = StringTOBsonId(source.id)
 						repeat_id = info.id
@@ -891,16 +858,25 @@ func timedTaskOnce() {
 						},
 					},
 				})
-
+			}else {
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,//符合条件的都为dataging==0
+						},
+					},
+				})
+			}
+			if len(updateExtract) > 500 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
 			}
-		}(tmp)
-		if len(updateExtract) > 500 {
-			mgo.UpSertBulk(extract, updateExtract...)
-			updateExtract = [][]map[string]interface{}{}
 		}
-		tmp = make(map[string]interface{})
 	}
-	wg.Wait()
+
 	if len(updateExtract) > 0 {
 		mgo.UpSertBulk(extract, updateExtract...)
 	}