Browse Source

判重-修改备份

apple 5 years ago
parent
commit
751446aa2b
3 changed files with 189 additions and 234 deletions
  1. 27 11
      udpfilterdup/src/datamap.go
  2. 162 207
      udpfilterdup/src/main.go
  3. 0 16
      udps/main.go

+ 27 - 11
udpfilterdup/src/datamap.go

@@ -418,6 +418,7 @@ L:
 					//站点配置--
 					if info.site!="" {
 						dict := SiteMap[info.site]
+
 						if dict!=nil{
 							//临时改变--具体值
 							if info.area=="全国" &&dict["area"]!="" {
@@ -432,27 +433,43 @@ L:
 						}
 					}
 
-					//前置条件  一个不重复  一个重复
+					//前置条件1  	站点相关
+					if info.site!=""&&info.site==v.site{
+						if info.href!=""&&info.href==v.href {
+							reason = "href相同"
+							b = true
+							source = v
+							reasons = reason
+							break L
+						}
+						if info.href!=""&&info.href!=v.href {
+							continue
+						}
+					}
+
+					//前置条件2  标题相关 - 有且一个关键词
 					if ((info.titleSpecialWord&&!v.titleSpecialWord)||(info.specialWord&&!v.specialWord))&&
-						info.title!=v.title&&v.title!="" {
+						info.title!=v.title&&v.title!=""&&info.title!="" {
 						continue
 					}
 
-					if info.buyer != "" &&v.buyer == info.buyer {
-						//满足标题
-						if len([]rune(v.title)) >= 10 && len([]rune(info.title)) >= 10 && v.title != info.title && (info.specialWord || v.specialWord) {
-							continue
+					//前置条件3 	标题相关 - 均含有关键词
+					if ((info.titleSpecialWord&&v.titleSpecialWord)||(info.specialWord&&v.specialWord))&&
+						len([]rune(v.title))>10 && len([]rune(info.title))>10&&v.title!=""&&info.title!=""{
+						if !(strings.Contains(v.title, info.title)||strings.Contains(info.title, v.title)) {
+							continue //无包含关系
 						}
-					}
-					if info.site!=""&&info.site==v.site{
-						if info.href!=""&&info.href==v.href {
-							reason = "href相同"
+						if strings.Contains(v.title, info.title)||strings.Contains(info.title, v.title) {
+							reason = "标题关键词且包含关系"
 							b = true
 							source = v
 							reasons = reason
 							break L
 						}
 					}
+
+
+
 					//代理机构相同-非空相等
 					if v.agency != "" && info.agency != "" && v.agency == info.agency {
 						reason = reason + "同机构-"
@@ -924,7 +941,6 @@ func winningRepeat_C(v *Info ,info *Info) bool {
 	}
 	//原始地址...
 
-
 	return false
 }
 

+ 162 - 207
udpfilterdup/src/main.go

@@ -8,7 +8,6 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -26,9 +25,7 @@ var (
 	Sysconfig    map[string]interface{} //配置文件
 	mconf        map[string]interface{} //mongodb配置信息
 	mgo          *mongodb.MongodbSim    //mongodb操作对象
-	siteMgo             *mongodb.MongodbSim
-	//mgoTest          *mongodb.MongodbSim    //mongodb操作对象
-
+	//siteMgo      *mongodb.MongodbSim
 	extract      string
 	extract_copy string
 	bidding      string
@@ -38,16 +35,12 @@ var (
 	DM           *datamap                 //
 	HM           *historymap                 //判重数据
 	lastid       = ""
-	//ObjectId("5c2c70c8a5cb26b9b7b74f42")
-	//5da3f2c5a5cb26b9b79847fc
 	//正则筛选相关
 	FilterRegTitle = regexp.MustCompile("^_$")
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
-
-
-	isMerger bool
+	isMerger bool //是否合并
 	SiteMap  map[string]map[string]interface{} //站点map
 )
 
@@ -58,7 +51,6 @@ func init() {
 	util.ReadConfig(&Sysconfig)
 	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
-
 	mgo = &mongodb.MongodbSim{
 		MongodbAddr: mconf["addr"].(string),
 		DbName:      mconf["db"].(string),
@@ -66,36 +58,24 @@ func init() {
 	}
 	extract = mconf["extract"].(string)
 	extract_copy = mconf["extract_copy"].(string)
-	isMerger = mconf["isMerger"].(bool)
-
-	//bidding = mconf["bidding"].(string)
 	mgo.InitPool()
 
 
-	//测试临时注释
+	//测试可以临时注释
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
 	FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
-
-	//站点相关数据库
-	mongodb.InitMongodbPool(5, "192.168.3.207:27082", "")
-
-	siteMgo = &mongodb.MongodbSim{
-		MongodbAddr: "192.168.3.207:27082",
-		Size:        5,
-		DbName:      "zhaolongyue",
-	}
-	siteMgo.InitPool()
+	isMerger = Sysconfig["isMerger"].(bool)
 
 
+	//配置站点Map
 	SiteMap = make(map[string]map[string]interface{},0)
-
 	start := int(time.Now().Unix())
 	//站点配置
-	sess_site := siteMgo.GetMgoConn()
+	sess_site := mgo.GetMgoConn()
 	defer sess_site.Close()
 	res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter()
 	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
@@ -103,150 +83,18 @@ func init() {
 				"area":util.ObjToString(site_dict["area"]),
 				"city":util.ObjToString(site_dict["city"]),
 				"district":util.ObjToString(site_dict["district"]),
-				"subdepttype":util.ObjToString(site_dict["subdepttype"]),
+				"sitetype":util.ObjToString(site_dict["sitetype"]),
 				"level":util.ObjToString(site_dict["level"]),
 			}
 		SiteMap[util.ObjToString(site_dict["site"])]= data_map
 	}
-	
 	fmt.Printf("用时:%d秒,%d个",int(time.Now().Unix())-start,len(SiteMap))
 
 
 }
 
-//新增一个方法 判断
-func mainTest()  {
-
-	//log.Println("1")
-	//代码copy数据
-	//sessTest :=mgoTest.GetMgoConn()
-	//defer sessTest.Close()
-	//
-	//sess := mgo.GetMgoConn()
-	//defer sess.Close()
-	//
-	////var arr []map[string]interface{}
-	//
-	//res_test := sessTest.DB("qfw").C("bidding").Find(mongodb.ObjToMQ(`{"comeintime":{"$gte": 1571025600, "$lte": 1571976000}}`, true)).Iter()
-	//res :=sess.DB("extract_kf").C("a_testbidding")
-	//5
-	//
-	//
-	//
-	//
-	//i:=0
-	//for dict := make(map[string]interface{}); res_test.Next(&dict); i++{
-	//
-	//	//插入
-	//	if i%2000==0 {
-	//		log.Println("当前:",i)
-	//	}
-	//	res.Insert(dict)
-	//	//if len(arr)>=500 {
-	//	//	arr = make([]map[string]interface{},0)
-	//	//}else {
-	//	//	arr = append(arr,dict)
-	//	//}
-	//}
-	//
-
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	res_copy := sess.DB("extract_kf").C(extract_copy).Find(nil).Iter()
-
-	m1 :=map[string]int{} //老版本
-	m2 :=map[string]int{} //新版本
-
-	i:=0
-	j:=0
-	for v1 := make(map[string]interface{}); res_copy.Next(&v1); i++{
-		if i%2000==0 {
-			log.Println("当前i:",i)
-		}
-		m1[(v1["_id"].(bson.ObjectId).Hex())]= util.IntAll(v1["repeat"])
-	}
-
-	sesss := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sesss)
-	res := sesss.DB("extract_kf").C(extract).Find(nil).Iter()
-
-
-	for v2 := make(map[string]interface{}); res.Next(&v2); j++{
-		if j%2000==0 {
-			log.Println("当前j:",j)
-		}
-		m2[(v2["_id"].(bson.ObjectId).Hex())]= util.IntAll(v2["repeat"])
-	}
-
-	fmt.Println(len(m1),len(m2))
-	n1:=0
-	n2:=0
-	n3:=0
-	n4:=0
-	n5:=0
-	n6:=0
-
-	var arr1 []string
-	var arr2 []string
-	for k,v:=range m1{
-
-		if m2[k]==1&&v==0{//0:1
-			n1++
-			arr2 = append(arr2,fmt.Sprintf("目标_id:%s",k))
-		}
-		if m2[k]==0&&v==1{ //1:0
-			n2++
-			arr1 = append(arr1,fmt.Sprintf("目标_id:%s",k))
-		}
-		if m2[k]==0&&v==0{ //0:0
-			n3++
-		}
-		if m2[k]==1&&v==1{//1:1
-			n4++
-		}
-		if m2[k]==-1&&v==0{ //0:-1
-			n5++
-		}
-		if m2[k]==-1&&v==1{//1:-1
-			n6++
-		}
-
-	}
-	//打印 1:0情况    ;
-	mm:=0
-	for _,v:=range arr1 {
-		mm++
-		if mm%200==0 {
-			log.Println(v)
-		}
-	}
-
-	log.Println("分割线---------------")
-	log.Println("分割线---------------")
-
-
-	//打印 0:1情况
-	nn:=0
-	for _,v:=range arr2 {
-		nn++
-		if nn%200==0 {
-			log.Println(v)
-		}
-	}
-
-	log.Println("V1 0:1---",n1)
-	log.Println("V1 1:0---",n2)
-	log.Println("V1 0:0---",n3)
-	log.Println("V1 1:1---",n4)
-	log.Println("V1 0:-1---",n5)
-	log.Println("V1 1:-1---",n6)
-
-}
-
-
 
 func main() {
-
 	go checkMapJob()
 
 	updport := Sysconfig["udpport"].(string)
@@ -269,12 +117,17 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 		} else if mapInfo != nil {
 
-			//更新流程
-			//go historyTask(data,mapInfo)
-
-
-			//判重流程
-			go task(data, mapInfo)
+			taskType:= util.ObjToString(mapInfo["stype"])
+			if taskType == "historyTask" {
+				//更新流程
+				go historyTask(data,mapInfo)
+			}else if taskType == "normalTask" {
+				//判重流程
+				go task(data, mapInfo)
+			}else {
+				//其他
+				go task(data, mapInfo)
+			}
 
 			key, _ := mapInfo["key"].(string)
 			if key == "" {
@@ -581,24 +434,30 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 						var mergeArr  = []int64{} 	//更改合并数组记录
 						var newData  = &Info{}		//更换新的数据池数据
 						var id_map  = map[string]interface{}{}
-						repeat_id := ""
-
-						//合并操作--评功权重打分-合并完替换原始数据池
-						basic_bool := basicDataScore(source,info)
-						if basic_bool {
-							//已原始数据为标准-对比数据打判重标签
-							newData,mergeArr= mergeDataFields(source,info)
-							DM.replaceSourceData(newData,source.id) //替换。
-							id_map["_id"]= util.StringTOBsonId(source.id)
-							repeat_id = source.id
-						}else {
-							//已对比数据为标准 ,数据池的数据打判重标签
-							newData,mergeArr= mergeDataFields(info,source)
-							DM.replaceSourceData(newData,source.id)//替换
-							id_map["_id"]= util.StringTOBsonId(info.id)
-							repeat_id = info.id
+						repeat_id := source.id
+						id_map["_id"]= util.StringTOBsonId(info.id)
+
+						if isMerger{
+							//需要合并相关操作
+							//合并操作--评功权重打分-合并完替换原始数据池
+							basic_bool := basicDataScore(source,info)
+							if basic_bool {
+								//已原始数据为标准-对比数据打判重标签
+								newData,mergeArr= mergeDataFields(source,info)
+								DM.replaceSourceData(newData,source.id) //替换
+								id_map["_id"]= util.StringTOBsonId(source.id)
+								repeat_id = source.id
+							}else {
+								//已对比数据为标准 ,数据池的数据打判重标签
+								newData,mergeArr= mergeDataFields(info,source)
+								DM.replaceSourceData(newData,source.id)//替换
+								id_map["_id"]= util.StringTOBsonId(info.id)
+								repeat_id = info.id
+							}
 						}
 
+
+
 						var update_map  = map[string]interface{}{
 							"$set": map[string]interface{}{
 								"repeat_reason":reason,
@@ -607,35 +466,39 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 							},
 						}
 
-						//合并记录
-						if len(newData.mergemap)>0 {
-							update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
-							//fmt.Println("合并长度:",len(newData.mergemap))
-						}
-
-						//更新合并后的数据
-						for _,value :=range mergeArr {
-							if value==1 {
-								update_map["$set"].(map[string]interface{})["area"] = newData.area
-								update_map["$set"].(map[string]interface{})["city"] = newData.city
-							}else if value==2 {
-								update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
-							}else if value==3 {
-								update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
-							}else if value==4 {
-								update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
-							}else if value==5 {
-								update_map["$set"].(map[string]interface{})["budget"] = newData.budget
-							}else if value==6 {
-								update_map["$set"].(map[string]interface{})["winner"] = newData.winner
-							}else if value==7 {
-								update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
-							}else if value==8 {
-								update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
-							}else {
+						if isMerger {
+							//合并记录
+							if len(newData.mergemap)>0 {
+								update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap
+								//fmt.Println("合并长度:",len(newData.mergemap))
+							}
 
+							//更新合并后的数据
+							for _,value :=range mergeArr {
+								if value==1 {
+									update_map["$set"].(map[string]interface{})["area"] = newData.area
+									update_map["$set"].(map[string]interface{})["city"] = newData.city
+								}else if value==2 {
+									update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
+								}else if value==3 {
+									update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
+								}else if value==4 {
+									update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
+								}else if value==5 {
+									update_map["$set"].(map[string]interface{})["budget"] = newData.budget
+								}else if value==6 {
+									update_map["$set"].(map[string]interface{})["winner"] = newData.winner
+								}else if value==7 {
+									update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
+								}else if value==8 {
+									update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
+								}else {
+
+								}
 							}
+
 						}
+
 						//构建数据库更新用到的
 						updateExtract = append(updateExtract, []map[string]interface{}{
 							id_map,
@@ -839,6 +702,98 @@ func mergeDataFields(source *Info, info *Info) (*Info,[]int64){
 //权重评估
 func basicDataScore(v *Info, info *Info) bool  {
 
+	//权重评估
+	/*
+	网站优先级判定规则:
+    1、中央>省>市>县区
+    2、政府采购>公共资源>采购单位官网>招标代理公司/平台
+	*/
+
+
+	v_score,info_score :=-1,-1
+	dict_v := SiteMap[v.site]
+	dict_info := SiteMap[info.site]
+	//先判断level
+	if dict_v !=nil {
+		v_level := util.ObjToString(dict_v["level"])
+		if v_level =="中央" {
+			v_score = 4
+		}else if v_level =="省级" {
+			v_score = 3
+		}else if v_level =="市级" {
+			v_score = 2
+		}else if v_level =="县区" {
+			v_score = 1
+		}else if v_level =="" {
+		}else {
+			v_score = 0
+		}
+	}
+
+	if dict_info !=nil {
+		info_level := util.ObjToString(dict_info["level"])
+		if info_level =="中央" {
+			info_score = 4
+		}else if info_level =="省级" {
+			info_score = 3
+		}else if info_level =="市级" {
+			info_score = 2
+		}else if info_level =="县区" {
+			info_score = 1
+		}else if info_level == ""{
+
+		}else {
+			v_score = 0
+		}
+	}
+
+	if v_score>info_score{
+		return true
+	}
+	if v_score<info_score{
+		return false
+	}
+
+	//判断sitetype
+	if dict_v !=nil {
+		v_sitetype := util.ObjToString(dict_v["sitetype"])
+		if v_sitetype =="政府采购"||v_sitetype=="政府门户" {
+			v_score = 4
+		}else if v_sitetype =="公共资源" {
+			v_score = 3
+		}else if v_sitetype =="官方网站" {
+			v_score = 2
+		}else if v_sitetype =="社会公共招标平台"||v_sitetype =="企业招标平台" {
+			v_score = 1
+		}else if v_sitetype =="" {
+		}else {
+			v_score = 0
+		}
+	}
+
+	if dict_info !=nil {
+		info_sitetype := util.ObjToString(dict_info["sitetype"])
+		if info_sitetype =="政府采购"||info_sitetype=="政府门户" {
+			info_score = 4
+		}else if info_sitetype =="公共资源" {
+			info_score = 3
+		}else if info_sitetype =="官方网站" {
+			info_score = 2
+		}else if info_sitetype =="社会公共招标平台"||info_sitetype =="企业招标平台" {
+			info_score = 1
+		}else if info_sitetype =="" {
+		}else {
+			info_score = 0
+		}
+	}
+
+	if v_score>info_score{
+		return true
+	}
+	if v_score<info_score{
+		return false
+	}
+
 
 
 	//网站评估

+ 0 - 16
udps/main.go

@@ -23,25 +23,9 @@ func main() {
 	//2017-06-01,2018-06-01
 	//2018-06-01,2019-02-20
 
-	/*
-	5da3f2c5a5cb26b9b79847fc
-	5db2735ba5cb26b9b7c99c6f   76万
-	*/
 
 	/*
-		9W
-	5d767728a5cb26b9b7748868
-	ObjectId("5d77c881a5cb26b9b7de209d")
-
-ObjectId("5da3f2c5a5cb26b9b79847fc")
-	ObjectId("5db2735ba5cb26b9b7c99c6f")
-	//历史中间一段数据
-	ObjectId("5d771e90a5cb26b9b7be7976")
-	ObjectId("5d775be4a5cb26b9b759b5eb")
 
-	ObjectId("5dfc98f5e9d1f601e46f047c")
-	ObjectId("5a4ad8f240d2d9bbe8adfbda")
-	ObjectId("5e0bf92b0cf41612e063cc28")
 	*/
 	flag.StringVar(&sid, "sid", "", "开始id")
 	flag.StringVar(&eid, "eid", "", "结束id")