소스 검색

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 년 전
부모
커밋
3e5f51e478
3개의 변경된 파일157개의 추가작업 그리고 57개의 파일을 삭제
  1. 7 7
      udpfilterdup/src/config.json
  2. 107 9
      udpfilterdup/src/datamap.go
  3. 43 41
      udpfilterdup/src/main.go

+ 7 - 7
udpfilterdup/src/config.json

@@ -2,13 +2,13 @@
     "udpport": ":1485",
     "dupdays": 5,
     "mongodb": {
-        "addr": "172.17.4.187:27083",
+        "addr": "192.168.3.207:27092",
         "pool": 10,
-        "db": "qfw",
-        "extract": "result_20200116",
-        "extract_back": "result_20200116_new",
+        "db": "extract_kf",
+        "extract": "zk_test_2019_new",
+        "extract_back": "zk_test_2019_new",
         "site": {
-            "dbname": "qfw",
+            "dbname": "extract_kf",
             "coll": "site"
         }
     },
@@ -19,9 +19,9 @@
     "nextNode": [],
     "threads": 1,
     "isMerger": false,
-    "isSort":false,
+    "isSort":true,
     "lowHeavy":false,
-    "timingTask":true,
+    "timingTask":false,
     "timingSpanDay": 2,
     "timingPubScope": 180,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",

+ 107 - 9
udpfilterdup/src/datamap.go

@@ -51,6 +51,7 @@ type datamap struct {
 	days   int        //保留几天数据
 	data   map[string][]*Info
 	keymap []string
+	areakeys []string
 	keys   map[string]bool
 }
 
@@ -60,13 +61,14 @@ type historymap struct {
 	days   int        //保留几天数据
 	data   map[string][]*Info
 	keymap []string
+	areakeys []string
 	keys   map[string]bool
 }
 
 func TimedTaskDatamap(days int,lasttime int64) *datamap {
 	log.Println("数据池开始重新构建")
 	datelimit = qutil.Float64All(1 * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
 	if lasttime <0 {
 		log.Println("数据池空数据")
 		return dm
@@ -100,6 +102,18 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 				data = append(data, info)
 				dm.data[k] = data
 				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
 			} else {
 				break
 			}
@@ -120,7 +134,7 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
 	if lastid == "" {
 		return dm
 	}
@@ -160,6 +174,18 @@ func NewDatamap(days int, lastid string) *datamap {
 				data = append(data, info)
 				dm.data[k] = data
 				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
 			} else {
 				break
 			}
@@ -176,7 +202,7 @@ func NewDatamap(days int, lastid string) *datamap {
 //构建新历史数据池
 func NewHistorymap(startid string, lastid string, startTime int64, lastTime int64) *historymap {
 	datelimit = qutil.Float64All(5 * 86400)
-	hm := &historymap{sync.Mutex{}, 5, map[string][]*Info{}, []string{}, map[string]bool{}}
+	hm := &historymap{sync.Mutex{}, 5, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
 	if lastid == "" || startid == "" {
 		return hm
 	}
@@ -212,6 +238,19 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 			data = append(data, info)
 			hm.data[k] = data
 			hm.keys[dkey] = true
+
+			//添加省
+			isAreaExist :=false
+			for _,v:= range hm.areakeys {
+				if v==info.area {
+					isAreaExist = true
+				}
+			}
+			if !isAreaExist {
+				areaArr := hm.areakeys
+				areaArr = append(areaArr,info.area)
+				hm.areakeys = areaArr
+			}
 		} else {
 			break
 		}
@@ -251,6 +290,19 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 			data = append(data, info)
 			hm.data[k] = data
 			hm.keys[dkey] = true
+
+			//添加省
+			isAreaExist :=false
+			for _,v:= range hm.areakeys {
+				if v==info.area {
+					isAreaExist = true
+				}
+			}
+			if !isAreaExist {
+				areaArr := hm.areakeys
+				areaArr = append(areaArr,info.area)
+				hm.areakeys = areaArr
+			}
 		} else {
 			break
 		}
@@ -308,10 +360,17 @@ func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 	keys := []string{}
 	d.lock.Lock()
 	for k, _ := range d.keys { //不同时间段
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+		if info.area=="全国" {
+			//匹配所有省
+			for _,v := range d.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {
+			//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+
 	}
 	d.lock.Unlock()
 L:
@@ -327,6 +386,11 @@ L:
 					//log.Println("相同id",info.id)
 					return false, v, ""
 				}
+
+				if v.id == "5c761a4fa5cb26b9b73d9512" &&info.id=="5c767bd1a5cb26b9b7a61597" {
+					log.Println("测试数据")
+				}
+
 				if info.subtype == v.subtype {
 					if info.site != "" {
 						sitelock.Lock()
@@ -456,6 +520,20 @@ L:
 			data = append(data, info)
 			d.data[k] = data
 		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range d.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := d.areakeys
+			areaArr = append(areaArr,info.area)
+			d.areakeys = areaArr
+		}
+
 		d.lock.Unlock()
 	}
 
@@ -467,10 +545,17 @@ func (h *historymap) checkHistory(info *Info) (b bool, source *Info, reasons str
 	keys := []string{}
 	h.lock.Lock()
 	for k, _ := range h.keys { //不同时间段
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+		if info.area=="全国" {
+			//匹配所有省
+			for _,v := range h.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {
+			//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
 		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+
 	}
 	h.lock.Unlock()
 
@@ -623,6 +708,19 @@ L:
 			data = append(data, info)
 			h.data[k] = data
 		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range h.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := h.areakeys
+			areaArr = append(areaArr,info.area)
+			h.areakeys = areaArr
+		}
 	}
 	return
 }

+ 43 - 41
udpfilterdup/src/main.go

@@ -22,16 +22,16 @@ import (
 )
 
 var (
-	Sysconfig map[string]interface{} //配置文件
-	mconf     map[string]interface{} //mongodb配置信息
-	mgo       *MongodbSim            //mongodb操作对象
-	extract   string
+	Sysconfig    map[string]interface{} //配置文件
+	mconf        map[string]interface{} //mongodb配置信息
+	mgo          *MongodbSim            //mongodb操作对象
+	extract      string
 	extract_back string
-	udpclient mu.UdpClient             //udp对象
-	nextNode  []map[string]interface{} //下节点数组
-	dupdays   = 5                      //初始化判重范围
-	DM        *datamap                 //
-	HM        *historymap              //判重数据
+	udpclient    mu.UdpClient             //udp对象
+	nextNode     []map[string]interface{} //下节点数组
+	dupdays      = 5                      //初始化判重范围
+	DM           *datamap                 //
+	HM           *historymap              //判重数据
 
 	lastid = ""
 
@@ -41,15 +41,15 @@ var (
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
-	isMerger   bool                              //是否合并
-	Is_Sort    bool                              //是否排序
-	threadNum  int                               //线程数量
-	SiteMap    map[string]map[string]interface{} //站点map
-	LowHeavy   bool                              //低质量数据判重
-	TimingTask bool                              //是否定时任务
-	timingSpanDay int64							//时间跨度
-	timingPubScope int64						//发布时间周期
-	sid, eid   string                            //测试人员判重使用
+	isMerger       bool                              //是否合并
+	Is_Sort        bool                              //是否排序
+	threadNum      int                               //线程数量
+	SiteMap        map[string]map[string]interface{} //站点map
+	LowHeavy       bool                              //低质量数据判重
+	TimingTask     bool                              //是否定时任务
+	timingSpanDay  int64                             //时间跨度
+	timingPubScope int64                             //发布时间周期
+	sid, eid       string                            //测试人员判重使用
 )
 
 func init() {
@@ -128,14 +128,15 @@ func mainT() {
 		ObjectId("5da3f2c5a5cb26b9b79847fc")
 		ObjectId("5db2735ba5cb26b9b7c99c6f")
 	*/
-	log.Println("测试开始")
 	if TimingTask {
+		log.Println("定时任务测试开始")
 		go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
-	}else {
-		//sid = "5da3f2c5a5cb26b9b79847fc"
-		//eid = "5db2735ba5cb26b9b7c99c6f"
-		log.Println(sid,"---",eid)
+	} else {
+		sid = "5c2c10fda5cb26b9b75e6f7f"
+		eid = "5e976e4a50b5ea296ef376b9"
+		log.Println("正常判重测试开始")
+		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
 		if sid == "" || eid == "" {
 			log.Println("sid,eid参数不能为空")
@@ -253,7 +254,9 @@ func task(data []byte, mapInfo map[string]interface{}) {
 
 			b, source, reason := DM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
+
 				repeateN++
+
 				var is_replace = false
 				var mergeArr = []int64{}                    //更改合并数组记录
 				var newData = &Info{}                       //更换新的数据池数据
@@ -646,8 +649,8 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 //定时任务
 func timedTaskDay() {
 	c := cron.New()
-	c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
-	c.AddFunc("0 0 2 * * ?", func() { movedata() })      //每天凌晨1点执行一次
+	c.AddFunc("0 0 1 * * ?", func() { movedata() })      //每天凌晨1点执行一次
+	c.AddFunc("0 0 2 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
 	c.Start()
 	timedTaskOnce()
 }
@@ -656,7 +659,6 @@ func timedTaskOnce() {
 	log.Println("开始一次定时任务")
 	defer util.Catch()
 
-
 	now := time.Now()
 	preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
 	curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
@@ -674,9 +676,9 @@ func timedTaskOnce() {
 	//task_eid = "5e20968d85a9271abf0ad6c2"
 	//between_time := int64(1563641997)
 
-	between_time := curTime.Unix()-(86400*timingPubScope)
+	between_time := curTime.Unix() - (86400 * timingPubScope)
 	lasttime := int64(0)
-	log.Println(task_sid, task_eid,curTime.Unix(),between_time)
+	log.Println(task_sid, task_eid, curTime.Unix(), between_time)
 	//区间id
 	q_start := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -697,7 +699,7 @@ func timedTaskOnce() {
 		if util.IntAll(tmp_start["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp_start["publishtime"])
 			//log.Println(startNum,"--",pubtime,"--",between_time)
-			if pubtime>0 && pubtime>=between_time {
+			if pubtime > 0 && pubtime >= between_time {
 				lasttime = pubtime
 				log.Println("找到第一条符合条件的数据")
 				break
@@ -705,15 +707,15 @@ func timedTaskOnce() {
 		}
 	}
 
-	log.Println("... ...",lasttime,)
-	if lasttime <=0 {
+	log.Println("... ...", lasttime)
+	if lasttime <= 0 {
 		log.Println("没找到dataging==1的数据")
 		return
 	}
 
 	//构建第一条需要判重的数据   (数据池)
 	log.Println("开始构建第一条需要判重的数据 ---(数据池)")
-	DM = TimedTaskDatamap(dupdays,lasttime)
+	DM = TimedTaskDatamap(dupdays, lasttime)
 
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
@@ -730,7 +732,7 @@ func timedTaskOnce() {
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
-	pre_publishtime :=int64(0)
+	pre_publishtime := int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
@@ -755,15 +757,15 @@ func timedTaskOnce() {
 
 			//log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
 
-			if pre_publishtime==0 {
+			if pre_publishtime == 0 {
 				pre_publishtime = util.Int64All(tmp["publishtime"])
-			}else {
+			} else {
 				//时间跨度是否大于X天
-				if (util.Int64All(tmp["publishtime"])-pre_publishtime) >=(86400*timingSpanDay) {
+				if (util.Int64All(tmp["publishtime"]) - pre_publishtime) >= (86400 * timingSpanDay) {
 					//重新构建数据池
 					//log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
 					pre_publishtime = util.Int64All(tmp["publishtime"])
-					DM = TimedTaskDatamap(dupdays,pre_publishtime)
+					DM = TimedTaskDatamap(dupdays, pre_publishtime)
 				}
 			}
 
@@ -791,7 +793,7 @@ func timedTaskOnce() {
 			}
 
 			b, source, reason := DM.check(info)
-			log.Println("判重结果",b,reason)
+			log.Println("判重结果", b, reason)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
 				repeateN++
 				var is_replace = false
@@ -1241,20 +1243,20 @@ func movedata() {
 	year, month, day := time.Now().Date()
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
 		},
 	}
 	log.Println(q)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	index := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		mgo.Save(extract+"_back", tmp)
+		mgo.Save(extract_back, tmp)
 		tmp = map[string]interface{}{}
 		if index%1000 == 0 {
 			log.Println("index", index)
 		}
 	}
-	log.Println("save to", extract+"_back", " ok index", index)
+	log.Println("save to", extract_back, " ok index", index)
 	delnum := mgo.Delete(extract, q)
 	log.Println("remove from ", extract, delnum)
 }