apple 5 years ago
parent
commit
d02e7e6d69
1 changed files with 52 additions and 3 deletions
  1. 52 3
      udpfilterdup/src/main.go

+ 52 - 3
udpfilterdup/src/main.go

@@ -129,9 +129,14 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//2019年8月1日-8月17日  712646
-		IdType = true
+		/*
 		sid = "5d55031fa5cb26b9b7f57570"
 		eid = "5e8c02b150b5ea296eed4509"
+		5e933b1a50b5ea296ef0e839
+		*/
+		//IdType = true
+		sid = "5eca4d52511b120337790325"
+		eid = "5eca4d55511b120337790329"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -146,6 +151,9 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	}
 }
+
+
+
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	fmt.Println("接受的段数据")
 	switch act {
@@ -220,12 +228,32 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
+
 		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
-			util.IntAll(tmp["dataging"]) == 1 ||util.IntAll(tmp["dataging"]) == -1{
+			util.IntAll(tmp["dataging"]) == 1 ||util.IntAll(tmp["sourcewebsite"]) == 1{
 			tmp = make(map[string]interface{})
 			if util.IntAll(tmp["repeat"]) == 1 {
 				repeateN++
 			}
+			if util.IntAll(tmp["sourcewebsite"]) == 1 {
+				repeateN++
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat": 1,
+							"repeat_reason": "sourcewebsite为1,重复",
+						},
+					},
+				})
+				if len(updateExtract) >= 200 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+			}
 			continue
 		}
 		pool <- true
@@ -246,6 +274,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						map[string]interface{}{
 							"$set": map[string]interface{}{
 								"repeat": -1, //无效数据标签
+
 							},
 						},
 					})
@@ -257,6 +286,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				}
 			}
 
+
 			b, source, reason := DM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
 				repeateN++
@@ -483,7 +513,26 @@ func timedTaskOnce() {
 		if num%10000 == 0 {
 			log.Println("正序遍历:", num)
 		}
-		//取-符合-发布时间半年内的数据
+		if util.IntAll(tmp["sourcewebsite"]) == 1 {
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging": 0,
+						"repeat_reason": "sourcewebsite为1 重复",
+					},
+				},
+			})
+			if len(updateExtract) > 50 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+			continue
+		}
+		//取-符合-发布时间X年内的数据
 		if util.IntAll(tmp["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp["publishtime"])
 			if pubtime > 0 && pubtime >= between_time {