Explorar el Código

判重 全国情况

apple hace 5 años
padre
commit
391ddad7b1
Se han modificado 3 ficheros con 122 adiciones y 19 borrados
  1. 7 7
      udpfilterdup/src/config.json
  2. 107 9
      udpfilterdup/src/datamap.go
  3. 8 3
      udpfilterdup/src/main.go

+ 7 - 7
udpfilterdup/src/config.json

@@ -2,13 +2,13 @@
     "udpport": ":1485",
     "dupdays": 5,
     "mongodb": {
-        "addr": "172.17.4.187:27083",
+        "addr": "192.168.3.207:27092",
         "pool": 10,
-        "db": "qfw",
-        "extract": "result_20200116",
-        "extract_back": "result_20200116_new",
+        "db": "extract_kf",
+        "extract": "zk_test_2019_new",
+        "extract_back": "zk_test_2019_new",
         "site": {
-            "dbname": "qfw",
+            "dbname": "extract_kf",
             "coll": "site"
         }
     },
@@ -19,9 +19,9 @@
     "nextNode": [],
     "threads": 1,
     "isMerger": false,
-    "isSort":false,
+    "isSort":true,
     "lowHeavy":false,
-    "timingTask":true,
+    "timingTask":false,
     "timingSpanDay": 2,
     "timingPubScope": 180,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",

+ 107 - 9
udpfilterdup/src/datamap.go

@@ -51,6 +51,7 @@ type datamap struct {
 	days   int        //保留几天数据
 	data   map[string][]*Info
 	keymap []string
+	areakeys []string
 	keys   map[string]bool
 }
 
@@ -60,13 +61,14 @@ type historymap struct {
 	days   int        //保留几天数据
 	data   map[string][]*Info
 	keymap []string
+	areakeys []string
 	keys   map[string]bool
 }
 
 func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	log.Println("数据池开始重新构建")
 	datelimit = qutil.Float64All(1 * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
 	if lasttime <0 {
 		log.Println("数据池空数据")
 		return dm
@@ -100,6 +102,18 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 				data = append(data, info)
 				dm.data[k] = data
 				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
 			} else {
 				break
 			}
@@ -120,7 +134,7 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
 	if lastid == "" {
 		return dm
 	}
@@ -160,6 +174,18 @@ func NewDatamap(days int, lastid string) *datamap {
 				data = append(data, info)
 				dm.data[k] = data
 				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
 			} else {
 				break
 			}
@@ -176,7 +202,7 @@ func NewDatamap(days int, lastid string) *datamap {
 //构建新历史数据池
 func NewHistorymap(startid string, lastid string, startTime int64, lastTime int64) *historymap {
 	datelimit = qutil.Float64All(5 * 86400)
-	hm := &historymap{sync.Mutex{}, 5, map[string][]*Info{}, []string{}, map[string]bool{}}
+	hm := &historymap{sync.Mutex{}, 5, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
 	if lastid == "" || startid == "" {
 		return hm
 	}
@@ -212,6 +238,19 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 			data = append(data, info)
 			hm.data[k] = data
 			hm.keys[dkey] = true
+
+			//添加省
+			isAreaExist :=false
+			for _,v:= range hm.areakeys {
+				if v==info.area {
+					isAreaExist = true
+				}
+			}
+			if !isAreaExist {
+				areaArr := hm.areakeys
+				areaArr = append(areaArr,info.area)
+				hm.areakeys = areaArr
+			}
 		} else {
 			break
 		}
@@ -251,6 +290,19 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 			data = append(data, info)
 			hm.data[k] = data
 			hm.keys[dkey] = true
+
+			//添加省
+			isAreaExist :=false
+			for _,v:= range hm.areakeys {
+				if v==info.area {
+					isAreaExist = true
+				}
+			}
+			if !isAreaExist {
+				areaArr := hm.areakeys
+				areaArr = append(areaArr,info.area)
+				hm.areakeys = areaArr
+			}
 		} else {
 			break
 		}
@@ -308,10 +360,17 @@ func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 	keys := []string{}
 	d.lock.Lock()
 	for k, _ := range d.keys { //不同时间段
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+		if info.area=="全国" {
+			//匹配所有省
+			for _,v := range d.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {
+			//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+
 	}
 	d.lock.Unlock()
 L:
@@ -327,6 +386,11 @@ L:
 					//log.Println("相同id",info.id)
 					return false, v, ""
 				}
+
+				if v.id == "5c761a4fa5cb26b9b73d9512" &&info.id=="5c767bd1a5cb26b9b7a61597" {
+					log.Println("测试数据")
+				}
+
 				if info.subtype == v.subtype {
 					if info.site != "" {
 						sitelock.Lock()
@@ -456,6 +520,20 @@ L:
 			data = append(data, info)
 			d.data[k] = data
 		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range d.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := d.areakeys
+			areaArr = append(areaArr,info.area)
+			d.areakeys = areaArr
+		}
+
 		d.lock.Unlock()
 	}
 
@@ -467,10 +545,17 @@ func (h *historymap) checkHistory(info *Info) (b bool, source *Info, reasons str
 	keys := []string{}
 	h.lock.Lock()
 	for k, _ := range h.keys { //不同时间段
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+		if info.area=="全国" {
+			//匹配所有省
+			for _,v := range h.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {
+			//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+
 	}
 	h.lock.Unlock()
 
@@ -623,6 +708,19 @@ L:
 			data = append(data, info)
 			h.data[k] = data
 		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range h.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := h.areakeys
+			areaArr = append(areaArr,info.area)
+			h.areakeys = areaArr
+		}
 	}
 	return
 }

+ 8 - 3
udpfilterdup/src/main.go

@@ -128,13 +128,14 @@ func mainT() {
 		ObjectId("5da3f2c5a5cb26b9b79847fc")
 		ObjectId("5db2735ba5cb26b9b7c99c6f")
 	*/
-	log.Println("测试开始")
 	if TimingTask {
+		log.Println("定时任务测试开始")
 		go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
 	}else {
-		//sid = "5da3f2c5a5cb26b9b79847fc"
-		//eid = "5db2735ba5cb26b9b7c99c6f"
+		sid = "5c2c10fda5cb26b9b75e6f7f"
+		eid = "5e976e4a50b5ea296ef376b9"
+		log.Println("正常判重测试开始")
 		log.Println(sid,"---",eid)
 		mapinfo := map[string]interface{}{}
 		if sid == "" || eid == "" {
@@ -253,7 +254,9 @@ func task(data []byte, mapInfo map[string]interface{}) {
 
 			b, source, reason := DM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
+
 				repeateN++
+
 				var is_replace = false
 				var mergeArr = []int64{}                    //更改合并数组记录
 				var newData = &Info{}                       //更换新的数据池数据
@@ -663,6 +666,8 @@ func timedTaskOnce() {
 	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
 	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
 
+
+
 	//发布时间间隔时间 半年
 	//测试数据 6点每个间隔6个月
 	//task_sid = "5e20965785a9271abf0ad6bd"