Преглед на файлове

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan преди 5 години
родител
ревизия
5f16878fb8
променени са 3 файла, в които са добавени 59 реда и са изтрити 111 реда
  1. 1 1
      udpfilterdup/src/README.md
  2. 23 9
      udpfilterdup/src/config.json
  3. 35 101
      udpfilterdup/src/main.go

+ 1 - 1
udpfilterdup/src/README.md

@@ -7,7 +7,7 @@
         "addr": "172.17.4.187:27083",
         "pool": 10,
         "db": "qfw",
-        "extract": "result_file_20200410_test",
+        "extract": "result_file_20200410",
         "extract_back": "result_file_20200409",
         "site": {
             "dbname": "qfw",

+ 23 - 9
udpfilterdup/src/config.json

@@ -1,14 +1,14 @@
 {
-    "udpport": ":1485",
-    "dupdays": 5,
+    "udpport": ":11485",
+    "dupdays": 7,
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "172.17.4.187:27083",
         "pool": 10,
-        "db": "extract_kf",
-        "extract": "zk_task_test",
-        "extract_back": "zk_task_test",
+        "db": "qfw",
+        "extract": "result_file_20200410",
+        "extract_back": "result_file_20200409",
         "site": {
-            "dbname": "extract_kf",
+            "dbname": "qfw",
             "coll": "site"
         }
     },
@@ -16,14 +16,28 @@
         "to": "zhangjinkun@topnet.net.cn",
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
-    "nextNode": [],
+    "nextNode": [
+        {
+            "addr": "172.17.145.179",
+            "port": 1782,
+            "stype": "project",
+            "memo": "合并项目"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+
     "threads": 1,
     "isMerger": false,
     "isSort":false,
     "lowHeavy":false,
     "timingTask":true,
     "timingSpanDay": 2,
-    "timingPubScope": 360,
+    "timingPubScope": 720,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",

+ 35 - 101
udpfilterdup/src/main.go

@@ -113,11 +113,10 @@ func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
 	if TimingTask {
 		go timedTaskDay()
-	} else {
-		udpclient.Listen(processUdpMsg)
-		log.Println("Udp服务监听", updport)
 	}
 
 	time.Sleep(99999 * time.Hour)
@@ -674,7 +673,13 @@ func timedTaskOnce() {
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 
 	between_time := curTime.Unix() - (86400 * timingPubScope)
-	log.Println(task_sid, task_eid, curTime.Unix(), between_time)
+
+	//task_sid = "5e9f18800000000000000000"
+	//task_eid = "5ea06a000000000000000000"
+	//between_time = int64(1525363200)
+
+
+	log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
 	//区间id
 	q_start := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -685,27 +690,24 @@ func timedTaskOnce() {
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
-	num, deterTime:= int64(0),int64(0) //计数
+	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
 	pendAllArr:=[][]map[string]interface{}{}//待处理数组
 	dayArr := []map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
 		if num%10000 == 0 {
 			log.Println("正序遍历:", num)
 		}
-		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
-			tmp = make(map[string]interface{})
-			continue
-		}
 		//取-符合-发布时间半年内的数据
 		if util.IntAll(tmp["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp["publishtime"])
 			if pubtime > 0 && pubtime >= between_time {
+				oknum++
 				if deterTime==0 {
 					log.Println("找到第一条符合条件的数据")
 					deterTime = util.Int64All(tmp["publishtime"])
 					dayArr = append(dayArr,tmp)
 				}else {
-					if util.Int64All(tmp["publishtime"])-deterTime >timingSpanDay*86400 {
+					if pubtime-deterTime >timingSpanDay*86400 {
 						//新数组重新构建,当前组数据加到全部组数据
 						pendAllArr = append(pendAllArr,dayArr)
 						dayArr = []map[string]interface{}{}
@@ -720,12 +722,26 @@ func timedTaskOnce() {
 		tmp = make(map[string]interface{})
 	}
 
+	if len(dayArr)>0 {
+		pendAllArr = append(pendAllArr,dayArr)
+		dayArr = []map[string]interface{}{}
+	}
+
+	log.Println("查询数量:",num,"符合条件:",oknum)
+
 	if len(pendAllArr) <= 0 {
 		log.Println("没找到dataging==1的数据")
 		return
 	}
 
-	log.Println("本地构建分组完成:",len(pendAllArr),"组")
+	//测试分组数量是否正确
+	testNum:=0
+	for k,v:=range pendAllArr {
+		log.Println("第",k,"组--","数量:",len(v))
+		testNum = testNum+len(v)
+	}
+	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
 
 	updateExtract := [][]map[string]interface{}{}
 	n, repeateN := 0, 0
@@ -734,8 +750,8 @@ func timedTaskOnce() {
 		log.Println("构建第",k,"组---(数据池)")
 		DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
 		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
-		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
 		n = n+len(v)
+		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
 		for _,tmp:=range v {
 			info := NewInfo(tmp)
 			if !LowHeavy { //是否进行低质量数据判重
@@ -752,7 +768,7 @@ func timedTaskOnce() {
 							},
 						},
 					})
-					if len(updateExtract) > 500 {
+					if len(updateExtract) > 10 {
 						mgo.UpSertBulk(extract, updateExtract...)
 						updateExtract = [][]map[string]interface{}{}
 					}
@@ -761,100 +777,18 @@ func timedTaskOnce() {
 			}
 			b, source, reason := DM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
-				log.Println("判重结果", b, reason)
+				log.Println("判重结果", b, reason,"目标id",info.id)
 				repeateN++
-				var is_replace = false
-				var mergeArr = []int64{}                    //更改合并数组记录
-				var newData = &Info{}                       //更换新的数据池数据
-				var repeat_idMap = map[string]interface{}{} //记录判重的
-				var merge_idMap = map[string]interface{}{}  //记录合并的
-				repeat_idMap["_id"] = StringTOBsonId(info.id)
-				merge_idMap["_id"] = StringTOBsonId(source.id)
-				repeat_id := source.id //初始化一个数据
-
-				if isMerger { //合并相关
-					basic_bool := basicDataScore(source, info)
-					if basic_bool {
-						//已原始数据为标准 - 对比数据打判重标签-
-						newData, mergeArr, is_replace = mergeDataFields(source, info)
-						DM.replaceSourceData(newData, source.id) //替换
-						//对比数据打重复标签的id,原始数据id的记录
-						repeat_idMap["_id"] = StringTOBsonId(info.id)
-						merge_idMap["_id"] = StringTOBsonId(source.id)
-						repeat_id = source.id
-					} else {
-						//已对比数据为标准 ,数据池的数据打判重标签
-						newData, mergeArr, is_replace = mergeDataFields(info, source)
-						DM.replaceSourceData(newData, source.id) //替换
-						//原始数据打重复标签的id,   对比数据id的记录
-						repeat_idMap["_id"] = StringTOBsonId(source.id)
-						merge_idMap["_id"] = StringTOBsonId(info.id)
-						repeat_id = info.id
-					}
-
-					merge_map := make(map[string]interface{}, 0)
-					if is_replace { //有过合并-更新数据
-						merge_map = map[string]interface{}{
-							"$set": map[string]interface{}{
-								"merge":    newData.mergemap,
-								"dataging": 0,
-							},
-						}
-						//更新合并后的数据
-						for _, value := range mergeArr {
-							if value == 0 {
-								merge_map["$set"].(map[string]interface{})["area"] = newData.area
-								merge_map["$set"].(map[string]interface{})["city"] = newData.city
-							} else if value == 1 {
-								merge_map["$set"].(map[string]interface{})["area"] = newData.area
-								merge_map["$set"].(map[string]interface{})["city"] = newData.city
-							} else if value == 2 {
-								merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
-							} else if value == 3 {
-								merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
-							} else if value == 4 {
-								merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
-							} else if value == 5 {
-								merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
-							} else if value == 6 {
-								merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
-							} else if value == 7 {
-								merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
-							} else if value == 8 {
-								merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
-							} else if value == 9 {
-								merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
-							} else if value == 10 {
-								merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
-							} else if value == 11 {
-								merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
-							} else {
-							}
-						}
-						//模板数据更新
-						updateExtract = append(updateExtract, []map[string]interface{}{
-							merge_idMap,
-							merge_map,
-						})
-					}
-				} else { //高质量数据
-					basic_bool := basicDataScore(source, info)
-					if !basic_bool {
-						log.Println("高质量数据替换:",source.id,info.id)
-						DM.replaceSourceData(info, source.id) //替换
-						repeat_idMap["_id"] = StringTOBsonId(source.id)
-						repeat_id = info.id
-					}
-				}
-
 				//重复数据打标签
 				updateExtract = append(updateExtract, []map[string]interface{}{
-					repeat_idMap,
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
 					map[string]interface{}{
 						"$set": map[string]interface{}{
 							"repeat":        1,
 							"repeat_reason": reason,
-							"repeat_id":     repeat_id,
+							"repeat_id":     source.id,
 							"dataging":      0,
 						},
 					},
@@ -871,7 +805,7 @@ func timedTaskOnce() {
 					},
 				})
 			}
-			if len(updateExtract) > 500 {
+			if len(updateExtract) > 10 {
 				mgo.UpSertBulk(extract, updateExtract...)
 				updateExtract = [][]map[string]interface{}{}
 			}