Просмотр исходного кода

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 лет назад
Родитель
Сommit
5806cb74c1

+ 1 - 1
src/jy/extract/newextractcity.go

@@ -314,7 +314,7 @@ func GetByACDSimJb(pbrief, city, district, a_c_d string, e *ExtractTask, j *ju.J
 				PCDScore(j, "district", sim, 5, true)
 				for _, dfullAndCity := range dfullarr { //district简称对应的所有全称
 					for _, c := range dfullAndCity {
-						if c == nil{
+						if c == nil {
 							continue
 						}
 						tmpcity := c.Name      //城市全称

+ 1 - 1
src/res/fieldscore.json

@@ -202,7 +202,7 @@
             },
             {
                 "describe": "包含负分不再展示",
-                "regstr": "(详见|提出|面向|施工|获得|test|认定|一批|项目|系统)",
+                "regstr": "(详见|提出|面向|施工|获得|test|认定|一批|项目$|系统)",
                 "score": -50
             },
             {

+ 9 - 9
udp_city/src/main.go

@@ -15,7 +15,7 @@ var Udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 var Config map[string]interface{}
 var PageSize = 5000 //查询分页
-var biddingFields = `{"buyer":1,"modifyinfo":1}`
+var biddingFields = `{"buyer":1,"modifyinfo":1,"area":1,"province":1,"city":1,"district":1}`
 var qyxyFields = `{"company_code":1,"province":1,"city":1,"district":1}`
 var findDb string
 var cc chan bool = make(chan bool, 5)
@@ -95,7 +95,7 @@ func getCity(sid, eid, rep string) {
 		log.Printf("page=%d,query=%v,db=%v\n", i+1, query, table)
 		list, _ := mgo.Mgo_Bidding.Find(table, query, nil, biddingFields, false, 0, limit)
 		for _, v := range *list {
-			if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" {
+			if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" && qu.ObjToString(v["area"]) != "全国" {
 				index++
 				continue
 			}
@@ -149,15 +149,15 @@ func cityMarshal(data map[string]interface{}) map[string]string {
 	company_code := qu.ObjToString((*tmp)["company_code"])
 	if len(company_code) > 5 {
 		province_city_district, _ := mgo.Mgo.FindOne("address", `{"code":"`+company_code[:6]+`"}`)
-		if province_city_district != nil && (*province_city_district) != nil {
-			if qu.ObjToString(data["area"]) == "" {
+		if province_city_district != nil && (*province_city_district) != nil && (*province_city_district)["Remarks"] != "废除" {
+			if qu.ObjToString(data["area"]) == "" || qu.ObjToString(data["area"]) == "全国" {
 				if province := qu.ObjToString((*province_city_district)["province"]); province != "" {
 					rdata["area"] = province
 				}
 				if city := qu.ObjToString((*province_city_district)["city"]); city != "" && !strings.Contains(city, rdata["area"]) {
 					rdata["city"] = city
 				}
-				if district := qu.ObjToString((*province_city_district)["district"]); district != "" && !strings.Contains(district, rdata["area"])&& !strings.Contains(district, rdata["city"]) {
+				if district := qu.ObjToString((*province_city_district)["district"]); district != "" && !strings.Contains(district, rdata["area"]) && !strings.Contains(district, rdata["city"]) {
 					rdata["district"] = district
 				}
 			} else if qu.ObjToString(data["city"]) == "" && qu.ObjToString((*province_city_district)["province"]) != "" && qu.ObjToString((*province_city_district)["province"]) == qu.ObjToString(data["area"]) {
@@ -176,7 +176,7 @@ func cityMarshal(data map[string]interface{}) map[string]string {
 		}
 	}
 	var province string
-	if qu.ObjToString(data["area"]) == "" {
+	if qu.ObjToString(data["area"]) == "" || qu.ObjToString(data["area"]) == "全国" {
 		if province = qu.ObjToString((*tmp)["province"]); province != "" {
 			province = strings.TrimRight(province, "省")
 			province = strings.TrimRight(province, "市")
@@ -189,14 +189,14 @@ func cityMarshal(data map[string]interface{}) map[string]string {
 			rdata["district"] = district
 		}
 	} else if qu.ObjToString(data["city"]) == "" && province != "" && qu.ObjToString(data["area"]) == province {
-		if city := qu.ObjToString((*tmp)["city"]); city != "" && province !=city{
+		if city := qu.ObjToString((*tmp)["city"]); city != "" && province != city {
 			rdata["city"] = city
 		}
-		if district := qu.ObjToString((*tmp)["district"]); district != "" && qu.ObjToString((*tmp)["city"])!= district{
+		if district := qu.ObjToString((*tmp)["district"]); district != "" && qu.ObjToString((*tmp)["city"]) != district {
 			rdata["district"] = district
 		}
 	} else if qu.ObjToString(data["district"]) == "" && qu.ObjToString((*tmp)["city"]) != "" && qu.ObjToString((*tmp)["city"]) == qu.ObjToString(data["city"]) {
-		if district := qu.ObjToString((*tmp)["district"]); district != "" &&qu.ObjToString(data["city"]) != district {
+		if district := qu.ObjToString((*tmp)["district"]); district != "" && qu.ObjToString(data["city"]) != district {
 			if qu.ObjToString(data["district"]) != rdata["city"] {
 				rdata["district"] = district
 			}

+ 3 - 2
udpcreateindex/src/projectindex.go

@@ -23,8 +23,9 @@ func projectTask(data []byte, project, mapInfo map[string]interface{}) {
 				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
-	} else {
-		idMap := q["_id"].(map[string]interface{})
+	}
+	idMap, _ := q["_id"].(map[string]interface{})
+	if idMap != nil {
 		tmpQ := map[string]interface{}{}
 		for c, id := range idMap {
 			if idStr, ok := id.(string); ok && id != "" {

+ 9 - 0
udpfilterdup/src/README.md

@@ -1,3 +1,12 @@
+
+mgo = &MongodbSim{
+		MongodbAddr: "172.17.4.187:27083",
+		DbName:      "qfw",
+		Size:        10,
+	}
+mgo.InitPool()
+	return
+	
 func moveTimeoutData()  {
 	log.Println("部署迁移定时任务")
 	c := cron.New()

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk_zk_test",
-        "extract_back": "zk_zk_test",
+        "extract": "zk_move",
+        "extract_back": "zk_move",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -24,7 +24,7 @@
     "timingTask":false,
     "timingSpanDay": 3,
     "timingPubScope": 720,
-    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",

+ 28 - 1
udpfilterdup/src/dataMethod.go

@@ -76,6 +76,33 @@ func againRepeat(v *Info, info *Info) bool {
 	return false
 }
 
+////站点再次判断
+//func againSite(v *Info, info *Info) bool {
+//
+//	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+//		return true
+//	}
+//	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+//		return true
+//	}
+//	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+//		return true
+//	}
+//	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+//		return true
+//	}
+//	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+//		return true
+//	}
+//
+//	return false
+//}
+
+
+
+
+
+
 //删除中标单位字符串中多余的空格(含tab)
 func deleteExtraSpace(s string) string {
 	//删除字符串中的多余空格,有多个空格时,仅保留一个空格
@@ -112,7 +139,7 @@ func isBidopentimeInterval(i1 int64 ,i2 int64) bool {
 	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
 	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
 	if day1==day2 {
-		//是否间隔超过小时
+		//是否间隔超过十二小时
 		if math.Abs(float64(i1-i2)) >43200.0 {
 			return true
 		}else {

+ 16 - 5
udpfilterdup/src/datamap.go

@@ -37,6 +37,7 @@ type Info struct {
 	specialWord      bool                   //再次判断的特殊词
 	mergemap         map[string]interface{} //合并记录
 	is_site          bool                   //是否站点城市
+	repeat_ids        []string               //记录所有重复id
 
 }
 
@@ -231,6 +232,11 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	if info.mergemap == nil {
 		info.mergemap = make(map[string]interface{}, 0)
 	}
+	if info.repeat_ids == nil {
+		info.repeat_ids = make([]string, 0)
+	}
+
+
 
 	info.is_site = false
 
@@ -304,8 +310,8 @@ L:
 						reasons = reason
 						break L
 					}
-					if info.href != "" && info.href != v.href {
-						if v.title==info.title&&len([]rune(info.title)) >10 && isTheSameDay(info.publishtime,v.publishtime){
+					if info.href != "" && info.href != v.href { //待优化
+						if v.title==info.title{
 							if !againRepeat(v, info) {//进行同站点二次判断
 								reason = "同站点-href不同-标题相同等"
 								b = true
@@ -316,7 +322,9 @@ L:
 								continue
 							}
 						}else {
-							continue
+							if againRepeat(v, info) {//进行同站点二次判断
+								continue
+							}
 						}
 					}
 				}
@@ -324,7 +332,7 @@ L:
 				specialNum:= dealWithSpecialWordNumber(info,v)
 				//前置条件 - 标题相关,有且一个关键词
 				if specialNum==1 {
-					if info.title != v.title && v.title != "" && info.title != "" {
+					if againRepeat(v, info) {
 						continue
 					}
 				}
@@ -352,7 +360,10 @@ L:
 						}else {
 							if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
 								//无包含关系-即不相等
-								continue
+								if againRepeat(v, info) {
+									continue
+								}
+
 							}
 						}
 					}

+ 82 - 5
udpfilterdup/src/main.go

@@ -8,6 +8,8 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -103,6 +105,7 @@ func init() {
 
 
 func main() {
+
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -116,13 +119,13 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-	testRepairData11()
-	return
+
+	//testRepairData11()
+	//return
 
 	if TimingTask {
 		log.Println("新历史任务测试开始")
 		go historyTaskDay()
-		//go timedTaskDay()
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
@@ -289,6 +292,23 @@ func task(data []byte, mapInfo map[string]interface{}) {
 					updateID["_id"] = info.id
 				}
 
+				repeat_ids:=source.repeat_ids
+				repeat_ids =  append(repeat_ids,info.id)
+				source.repeat_ids = repeat_ids
+				//替换数据池-更新
+				DM.replacePoolData(source)
+				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+					map[string]interface{}{
+						"_id": StringTOBsonId(source.id),
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat_ids": repeat_ids,
+						},
+					},
+				})
+
+
 				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
 					updateID,
 					map[string]interface{}{
@@ -473,6 +493,7 @@ func historyTaskDay() {
 					})
 					if len(updateExtract) > 50 {
 						mgo.UpSertBulk(extract, updateExtract...)
+
 						updateExtract = [][]map[string]interface{}{}
 					}
 
@@ -485,6 +506,7 @@ func historyTaskDay() {
 		//批量更新标记
 		if len(updateExtract) > 0 {
 			mgo.UpSertBulk(extract, updateExtract...)
+
 			updateExtract = [][]map[string]interface{}{}
 		}
 
@@ -556,6 +578,23 @@ func historyTaskDay() {
 					if b { //有重复,生成更新语句,更新抽取和更新招标
 						repeateN++
 						//重复数据打标签
+						repeat_ids:=source.repeat_ids
+						repeat_ids =  append(repeat_ids,info.id)
+						source.repeat_ids = repeat_ids
+						//替换数据池-更新
+						DM.replacePoolData(source)
+						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
+							map[string]interface{}{
+								"_id": StringTOBsonId(source.id),
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat_ids": repeat_ids,
+								},
+							},
+						})
+
+
 						groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
 							map[string]interface{}{
 								"_id": tmp["_id"],
@@ -629,8 +668,6 @@ func historyTaskDay() {
 		}
 		log.Println("继续下一段的历史判重")
 	}
-
-
 }
 
 
@@ -678,6 +715,46 @@ func moveHistoryData(startid string,endid string) {
 
 
 
+func moveTimeoutData()  {
+	log.Println("部署迁移定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
+	c.Start()
+}
+
+func moveOnceTimeOut()  {
+	log.Println("执行一次迁移超时数据")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	now:=time.Now()
+	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
+	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$lt": StringTOBsonId(task_id),
+		},
+	}
+
+	it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("index", index)
+		}
+		del_id:=BsonTOStringId(tmp["_id"])
+		mgo.Save("result_20200713", tmp)
+		mgo.DeleteById("result_20200714",del_id)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("save and delete", " ok index", index)
+
+
+
+
+
+}
+
+