apple 5 years ago
parent
commit
4627cc9c24

+ 224 - 0
udpfilterdup/src1/README.md

@@ -0,0 +1,224 @@
+基于内存的信息重复过滤
+"extract": "result_file_20200410",
+"extract_back": "result_file_20200409",
+
+{
+    "udpport": ":11485",
+    "dupdays": 7,
+    "mongodb": {
+        "addr": "172.17.4.187:27083",
+        "pool": 10,
+        "db": "qfw",
+        "extract": "result_file_20200410",
+        "extract_back": "result_file_20200409",
+        "site": {
+            "dbname": "qfw",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "nextNode": [
+        {
+            "addr": "172.17.145.179",
+            "port": 1782,
+            "stype": "project",
+            "memo": "合并项目"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+
+    "threads": 1,
+    "isMerger": false,
+    "isSort":false,
+    "lowHeavy":false,
+    "timingTask":true,
+    "timingSpanDay": 3,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}
+
+
+
+
+
+
+{
+    "udpport": ":1785",
+    "dupdays": 5,
+    "mongodb": {
+        "addr": "172.17.4.187:27083",
+        "pool": 5,
+        "db": "qfw",
+        "extract": "result_file_20200410",
+        "extract_back": "result_file_20200409",
+        "site": {
+            "dbname": "qfw",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "nextNode": [
+        {
+            "addr": "172.17.145.179",
+            "port": 1782,
+            "stype": "project",
+            "memo": "合并项目"
+        },
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
+    ],
+    "threads": 1,
+    "isMerger": false,
+    "isSort":true,
+    "lowHeavy":false,
+    "timingTask":false,
+    "timingSpanDay": 3,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}
+
+
+
+
+//basic_bool := basicDataScore(source, info)
+					//if basic_bool {
+					//	//已原始数据为标准 - 对比数据打判重标签-
+					//	newData, mergeArr, is_replace = mergeDataFields(source, info)
+					//	//对比数据打重复标签的id,原始数据id的记录
+					//	repeat_idMap["_id"] = StringTOBsonId(info.id)
+					//	merge_idMap["_id"] = StringTOBsonId(source.id)
+					//
+					//	if IdType {
+					//		repeat_idMap["_id"] = info.id
+					//		merge_idMap["_id"] = source.id
+					//	}
+					//	repeat_id = source.id
+					//} else {
+					//	//已对比数据为标准 ,数据池的数据打判重标签
+					//	newData, mergeArr, is_replace = mergeDataFields(info, source)
+					//	DM.replaceSourceData(newData, source) //替换
+					//	//原始数据打重复标签的id,   对比数据id的记录
+					//	repeat_idMap["_id"] = StringTOBsonId(source.id)
+					//	merge_idMap["_id"] = StringTOBsonId(info.id)
+					//	if IdType {
+					//		repeat_idMap["_id"] = source.id
+					//		merge_idMap["_id"] = info.id
+					//	}
+					//	repeat_id = info.id
+					//}
+
+
+
+
+//basic_bool := basicDataScore(source, info)
+					//if !basic_bool {
+					//	DM.replaceSourceData(info, source) //替换
+					//	repeat_idMap["_id"] = StringTOBsonId(source.id)
+					//	if IdType {
+					//		repeat_idMap["_id"] = source.id
+					//	}
+					//	repeat_id = info.id
+					//	if len(ids)>=9 {
+					//		ids=append(ids,source.id)
+					//
+					//
+					//		for _, to := range nextNode {
+					//
+					//			key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+					//			by, _ := json.Marshal(map[string]interface{}{
+					//				"gtid":  source.id,
+					//				"lteid": source.id,
+					//				"stype": util.ObjToString(to["stype"]),
+					//				"key":   key,
+					//				"ids":   strings.Join(ids, ","),
+					//			})
+					//			addr := &net.UDPAddr{
+					//				IP:   net.ParseIP(to["addr"].(string)),
+					//				Port: util.IntAll(to["port"]),
+					//			}
+					//			node := &udpNode{by, addr, time.Now().Unix(), 0}
+					//			udptaskmap.Store(key, node)
+					//			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+					//		}
+					//
+					//		//
+					//		ids = []string{}
+					//	}else {
+					//		ids=append(ids,source.id)
+					//	}
+					//
+					//}
+		
+		
+		
+					
+if isMerger { //合并相关
+					newData, mergeArr, is_replace := mergeDataFields(source, info)
+					merge_map := make(map[string]interface{}, 0)
+					if is_replace { //支持合并-更新数据
+						merge_map = map[string]interface{}{
+							"$set": map[string]interface{}{
+								"merge": newData.mergemap,
+							},
+						}
+						//更新合并后的数据
+						for _, value := range mergeArr {
+							if value == 0 {
+								merge_map["$set"].(map[string]interface{})["area"] = newData.area
+								merge_map["$set"].(map[string]interface{})["city"] = newData.city
+							} else if value == 1 {
+								merge_map["$set"].(map[string]interface{})["area"] = newData.area
+								merge_map["$set"].(map[string]interface{})["city"] = newData.city
+							} else if value == 2 {
+								merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
+							} else if value == 3 {
+								merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
+							} else if value == 4 {
+								merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
+							} else if value == 5 {
+								merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
+							} else if value == 6 {
+								merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
+							} else if value == 7 {
+								merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
+							} else if value == 8 {
+								merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
+							} else if value == 9 {
+								merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
+							} else if value == 10 {
+								merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
+							} else if value == 11 {
+								merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
+							} else {
+							}
+						}
+						//模板数据更新
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							merge_idMap,
+							merge_map,
+						})
+					}
+				}					

+ 32 - 0
udpfilterdup/src1/config.json

@@ -0,0 +1,32 @@
+{
+    "udpport": ":11888",
+    "dupdays": 5,
+    "mongodb": {
+        "addr": "192.168.3.207:27092",
+        "pool": 5,
+        "db": "extract_kf",
+        "extract": "zk_zk_newTest",
+        "extract_back": "zk_zk_newTest",
+        "site": {
+            "dbname": "zhaolongyue",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "nextNode": [
+    ],
+    "threads": 1,
+    "isMerger": false,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 3,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}

+ 232 - 0
udpfilterdup/src1/dataMethod.go

@@ -0,0 +1,232 @@
+package src1
+
+import (
+	"math"
+	"regexp"
+	"strings"
+	qutil "qfw/util"
+)
+
+
+
+//完善判重数据检测-前置条件
+func convertArabicNumeralsAndLetters(data string) string {
+	newData :=data
+	res1, _ := regexp.Compile("[a-zA-Z]+");
+	if res1.MatchString(data) {
+		newData = res1.ReplaceAllStringFunc(data, strings.ToUpper);
+	}
+	res2, _ := regexp.Compile("[0-9]+");
+	if res2.MatchString(newData) {
+		arr1:=[]string {"0","1","2","3","4","5","6","7","8","9"}
+		arr2:=[]string {"零","一","二","三","四","五","六","七","八","九"}
+		for i:=0 ;i<len(arr1) ;i++  {
+			resTemp ,_:=regexp.Compile(arr1[i])
+			newData= resTemp.ReplaceAllString(newData, arr2[i]);
+		}
+	}
+	return newData
+}
+
+func dealWithSpecialPhrases(str1 string,str2 string) (string,string) {
+	newStr1:=str1
+	newStr2:=str2
+	res, _ := regexp.Compile("重新招标");
+	if res.MatchString(newStr1) {
+		newStr1 = res.ReplaceAllString(newStr1,"重招");
+	}
+	if res.MatchString(newStr2) {
+		newStr2 = res.ReplaceAllString(newStr2,"重招");
+	}
+	return newStr1,newStr2
+}
+//关键词数量v
+func dealWithSpecialWordNumber(info*Info,v*Info) int {
+	okNum:=0
+	if  info.titleSpecialWord || info.specialWord {
+		okNum++
+	}
+	if  v.titleSpecialWord || v.specialWord {
+		okNum++
+	}
+	return okNum
+}
+
+//关键词再次判断
+func againRepeat(v *Info, info *Info) bool {
+	if isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+		return true
+	}
+	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+		return true
+	}
+	if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+		return true
+	}
+	if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+		return true
+	}
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+
+	return false
+}
+
+//删除中标单位字符串中多余的空格(含tab)
+func deleteExtraSpace(s string) string {
+	//删除字符串中的多余空格,有多个空格时,仅保留一个空格
+	s1 := strings.Replace(s, "  ", " ", -1)      //替换tab为空格
+	regstr := "\\s{2,}"                          //两个及两个以上空格的正则表达式
+	reg, _ := regexp.Compile(regstr)             //编译正则表达式
+	s2 := make([]byte, len(s1))                  //定义字符数组切片
+	copy(s2, s1)                                 //将字符串复制到切片
+	spc_index := reg.FindStringIndex(string(s2)) //在字符串中搜索
+	for len(spc_index) > 0 {                     //找到适配项
+		s2 = append(s2[:spc_index[0]+1], s2[spc_index[1]:]...) //删除多余空格
+		spc_index = reg.FindStringIndex(string(s2))            //继续在字符串中搜索
+	}
+	return string(s2)
+}
+
+//中标金额倍率:10000
+func isBidWinningAmount(f1 float64 ,f2 float64) bool {
+
+	if f1==f2||f1*10000==f2||f2*10000==f1 {
+		return false
+	}
+	return true
+}
+
+
+//开标时间区间为一天
+func isBidopentimeInterval(i1 int64 ,i2 int64) bool {
+	if i1==0||i2==0 {
+		return false
+	}
+	//不在同一天-或者同一天间隔超过六小时,属于不相等返回true
+	timeOne,timeTwo:=i1,i2
+	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
+	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
+	if day1==day2 {
+		//是否间隔超过六小时
+		if math.Abs(float64(i1-i2)) >43200.0 {
+			return true
+		}else {
+			return false
+		}
+	}else {
+		return true
+	}
+}
+
+//开标时间区间为一天
+func isTheSameDay(i1 int64 ,i2 int64) bool {
+	if i1==0||i2==0 {
+		return false
+	}
+	timeOne,timeTwo:=i1,i2
+	day1 := qutil.FormatDateByInt64(&timeOne, qutil.Date_yyyyMMdd)
+	day2 := qutil.FormatDateByInt64(&timeTwo, qutil.Date_yyyyMMdd)
+	if day1==day2 {
+		return true
+	}
+	return false
+}
+
+
+
+//前置0 五要素均相等认为重复
+func leadingElementSame(v *Info, info *Info) bool {
+
+	isok:= 0
+	if info.projectname != "" && v.projectname == info.projectname {
+		isok++
+	}
+	if info.buyer != "" && v.buyer == info.buyer {
+		isok++
+	}
+	if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+		if info.contractnumber != "" && v.contractnumber == info.contractnumber {
+			isok++
+		}
+	}else {
+		if info.projectcode != "" && v.projectcode == info.projectcode {
+			isok++
+		}
+	}
+	if info.title != "" && v.title == info.title {
+		isok++
+	}
+	if v.agency == info.agency {
+		isok++
+	}
+
+	if v.winner == info.winner&&info.winner != "" {
+		isok++
+	}
+
+	if isok>=5 {
+		return true
+	}
+
+
+
+	return false
+}
+
+//buyer的优先级
+func buyerIsContinue(v *Info, info *Info) bool {
+	if !isTheSameDay(info.publishtime,v.publishtime) {
+		return true
+	}
+	if v.title != info.title && v.title != "" && info.title != ""{
+		return true
+	}
+	if v.projectname != info.projectname && v.projectname != "" && info.projectname != ""{
+		return true
+	}
+	//if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+	//	return true
+	//}
+	//if isBidWinningAmount(v.bidamount,info.bidamount) && v.bidamount != 0 && info.bidamount != 0{
+	//	return true
+	//}
+	//if deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) && v.winner != "" && info.winner != "" {
+	//	return true
+	//}
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+
+	return false
+}
+
+
+
+//无效数据
+func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
+	var n int
+	if d1 != "" {
+		n++
+	}
+	if d2 != "" {
+		n++
+	}
+	if d3 != "" {
+		n++
+	}
+	if d4 != "" {
+		n++
+	}
+	if n == 0 {
+		return true
+	}
+	return false
+}

+ 476 - 0
udpfilterdup/src1/dataMethodHeavy.go

@@ -0,0 +1,476 @@
+package src1
+
+import "strings"
+
+//判重方法1
+func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet := false
+	if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
+		info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
+		info.subtype == "变更" || info.subtype == "其他" {
+		//招标结果
+		if isMeet, reason = tenderRepeat_A(v, info, reason); isMeet {
+			if tenderRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---招标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+
+	} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
+		//中标结果
+		if isMeet, reason = winningRepeat_A(v, info, reason); isMeet {
+			if winningRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---中标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+
+	} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+		//合同
+		if isMeet, reason = contractRepeat_A(v, info, reason); isMeet {
+			if contractRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---合同类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+	} else {
+		//招标结果
+		if isMeet, reason = tenderRepeat_A(v, info, reason); isMeet {
+			if tenderRepeat_C(v, info) {
+				return false, reason
+			} else {
+				reason = reason + "---类别空-招标类"
+				return true, reason
+			}
+		} else {
+			return false, reason
+		}
+	}
+
+	return false, reason
+}
+
+//判重方法2
+func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
+	isMeet := false
+	if v.agency == info.agency && v.agency != "" && info.agency != "" {
+		if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
+			info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
+			info.subtype == "变更" || info.subtype == "其他" {
+			//招标结果
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
+				if tenderRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---招标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+
+		} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
+			//中标结果
+			if isMeet, reason = winningRepeat_B(v, info, reason); isMeet {
+				if winningRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---中标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+
+		} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
+			//合同
+			if isMeet, reason = contractRepeat_B(v, info, reason); isMeet {
+				if contractRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---合同类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+		} else {
+			//招标结果
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
+				if tenderRepeat_C(v, info) { //有不同
+					return false, reason
+				} else {
+					reason = reason + "---类别空-招标类"
+					return true, reason
+				}
+			} else {
+				return false, reason
+			}
+		}
+	}
+
+	//不同
+	if v.agency != info.agency && v.agency != "" && info.agency != "" {
+		return false, reason
+	}
+	//机构最少一个为空
+	if v.agency == "" || info.agency == "" {
+		var repeat = false
+		if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat {
+			reason = reason + "---机构最少一个空"
+			return true, reason
+		} else {
+			return false, reason
+		}
+	}
+
+	return false, reason
+}
+
+//招标_A
+func tenderRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	var ss string
+	p1, p2, p3, p4, p9, p10, p11 := false, false, false, false, false, false, false
+	if v.projectname != "" && v.projectname == info.projectname {
+		ss = ss + "p1-名称-"
+		p1 = true
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		ss = ss + "p2-单位-"
+		p2 = true
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		ss = ss + "p3-编号组-"
+		p3 = true
+	}
+	if v.budget != 0 && v.budget == info.budget {
+		ss = ss + "p4-预算-"
+		p4 = true
+	}
+	if v.bidopentime != 0 && v.bidopentime == info.bidopentime {
+		ss = ss + "p9-开标时间相同-"
+		p9 = true
+	}
+	if v.bidopenaddress != "" && v.bidopenaddress == info.bidopenaddress {
+		ss = ss + "p10-开标地点-"
+		p10 = true
+	}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		ss = ss + "p11-标题-"
+		p11 = true
+	}
+
+	if info.subtype !=""&&(p1 && p3 && p11)  {
+		reason = reason + "满足招标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+
+	if  (p1 && p2 && p3) || (p1 && p2 && p4) || (p1 && p2 && p9) ||
+		(p1 && p2 && p10) || (p1 && p2 && p11) || (p1 && p3 && p9) || (p1 && p3 && p10) ||
+		(p1 && p4 && p9) || (p1 && p4 && p10) || (p2 && p3 && p4) ||
+		(p2 && p3 && p9) || (p2 && p3 && p10) || (p2 && p3 && p11) ||
+		(p2 && p4 && p9) || (p2 && p4 && p10) || (p2 && p4 && p11) ||
+		(p3 && p4 && p9) || (p3 && p4 && p10) || (p3 && p4 && p11) ||
+		(p4 && p9 && p10) || (p4 && p9 && p11) || (p9 && p10 && p11) {
+		reason = reason + "满足招标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+	return false, reason
+}
+
+//招标_B
+func tenderRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	m, n := 0, 0
+	if v.projectname != "" && v.projectname == info.projectname {
+		m++
+		n++
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		m++
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		m++
+	}
+	if v.budget != 0 && v.budget == info.budget {
+		m++
+	}
+	if v.bidopentime != 0 && v.bidopentime == info.bidopentime {
+		m++
+	}
+	//if v.bidopenaddress != "" && v.bidopenaddress == info.bidopenaddress {
+	//	m++
+	//}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		m++
+		n++
+	}
+	if m >= 2 {
+		if n == 2 && m == 2 {
+			return false, reason
+		} else {
+			reason = reason + "满足招标B,六选二,"
+			return true, reason
+		}
+	}
+	return false, reason
+}
+
+//招标_C
+func tenderRepeat_C(v *Info, info *Info) bool {
+
+	if v.budget != 0 && info.budget != 0 && v.budget != info.budget {
+		return true
+	}
+	//原始地址...
+	//if v.buyer != "" && info.buyer != "" && v.buyer != info.buyer {
+	//	return true
+	//}
+
+	if v.bidopentime != 0 && info.bidopentime != 0 && isBidopentimeInterval(info.bidopentime,v.bidopentime) {
+		return true
+	}
+	//if v.bidopenaddress != "" && info.bidopenaddress != "" && v.bidopenaddress != info.bidopenaddress {
+	//	return true
+	//}
+
+	return false
+}
+
+//中标_A
+func winningRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	var ss string
+	p1, p2, p3, p5, p6, p11 := false, false, false, false, false, false
+	if v.projectname != "" && v.projectname == info.projectname {
+		ss = ss + "p1-项目名称-"
+		p1 = true
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		ss = ss + "p2-单位-"
+		p2 = true
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		ss = ss + "p3-编号组--"
+		p3 = true
+	}
+	if v.bidamount != 0 && !isBidWinningAmount(v.bidamount,info.bidamount) {
+		ss = ss + "p5-中标金-"
+		p5 = true
+	}
+	if v.winner != "" && deleteExtraSpace(v.winner) == deleteExtraSpace(info.winner) {
+		ss = ss + "p6-中标人-"
+		p6 = true
+	}
+
+
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		ss = ss + "p11-标题-"
+		p11 = true
+	}
+
+	if 	(p1 && p2 && p3) || (p1 && p2 && p5) || (p1 && p2 && p6) ||
+		(p1 && p2 && p11)|| (p1 && p3 && p11)||
+		(p1 && p3 && p5) || (p1 && p3 && p6) || (p1 && p5 && p6) ||
+		(p2 && p3 && p5) || (p2 && p3 && p6) || (p2 && p3 && p11) ||
+		(p2 && p5 && p6) || (p2 && p5 && p11) || (p2 && p6 && p11) ||
+		(p3 && p5 && p6) || (p3 && p5 && p11) || (p3 && p6 && p11) ||
+		(p5 && p6 && p11) {
+		reason = reason + "满足中标A,3要素组合-" + ss + ","
+		return true, reason
+	}
+
+	return false, reason
+}
+
+//中标_B
+func winningRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	m, n := 0, 0
+	if v.projectname != "" && v.projectname == info.projectname {
+		m++
+		n++
+	}
+	if v.buyer != "" && v.buyer == info.buyer {
+		m++
+	}
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
+		m++
+	}
+	if v.bidamount != 0 && !isBidWinningAmount(v.bidamount,info.bidamount) {
+		m++
+	}
+	if v.winner != "" && deleteExtraSpace(v.winner) == deleteExtraSpace(info.winner) {
+		m++
+	}
+	if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+		(strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
+		m++
+		n++
+	}
+	if m >= 2 {
+		if n == 2 && m == 2 {
+			return false, reason
+		} else {
+			reason = reason + "满足中标B.六选二,"
+			return true, reason
+		}
+	}
+	return false, reason
+}
+
+//中标_C
+func winningRepeat_C(v *Info, info *Info) bool {
+
+	if v.bidamount != 0 && info.bidamount != 0 && isBidWinningAmount(v.bidamount,info.bidamount) {
+		//避免抽错金额- title+name+winner
+		if ((v.projectcode!=""&&info.projectcode!=""&&v.projectcode==info.projectcode)||
+			(v.contractnumber!=""&&info.contractnumber!=""&&v.contractnumber==info.contractnumber)) &&
+			(v.winner!=""&&info.winner!=""&&v.winner==info.winner) {
+			return false
+		}
+		return true
+	}
+	if v.winner != "" && info.winner != "" && deleteExtraSpace(v.winner) != deleteExtraSpace(info.winner) {
+		return true
+	}
+	return false
+}
+
+//合同_A
+func contractRepeat_A(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet_1 := false
+	if isMeet_1, reason = tenderRepeat_A(v, info, reason); isMeet_1 {
+		return true, reason
+	}
+
+	isMeet_2 := false
+	if isMeet_2, reason = winningRepeat_A(v, info, reason); isMeet_2 {
+		return true, reason
+	}
+	return false, reason
+}
+
+//合同_B
+func contractRepeat_B(v *Info, info *Info, reason string) (bool, string) {
+
+	isMeet_1 := false
+	if isMeet_1, reason = tenderRepeat_B(v, info, reason); isMeet_1 {
+		return true, reason
+	}
+	isMeet_2 := false
+	if isMeet_2, reason = winningRepeat_B(v, info, reason); isMeet_2 {
+		return true, reason
+	}
+	return false, reason
+}
+
+//合同_C
+func contractRepeat_C(v *Info, info *Info) bool {
+
+	if tenderRepeat_C(v, info) {
+		return true
+	}
+	if winningRepeat_C(v, info) {
+		return true
+	}
+
+	//合同类 - 新增编号
+	if v.contractnumber != "" && info.contractnumber != "" && v.contractnumber != info.contractnumber {
+		return true
+	}
+	if v.projectcode != "" && info.projectcode != "" && v.projectcode != info.projectcode {
+		return true
+	}
+
+	return false
+}
+
+
+
+
+
+
+
+
+
+//快速低质量数据判重
+func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
+	if !isTheSameDay(v.publishtime,info.publishtime) {
+		return false,reason
+	}
+	//首先判定是否为低质量数据    info目标数据
+	if info.title!=""&&(info.agency==""||v.agency=="")&&
+		info.title==v.title&&info.projectcode==""&&info.contractnumber==""&&info.buyer=="" {
+		isValue:=0//五要素判断
+		if info.projectname != "" {//项目名称
+			isValue++
+		}
+		if info.budget != 0 {//预算
+			isValue++
+		}
+		if info.winner != ""{//中标单位
+			isValue++
+		}
+		if info.bidamount != 0 {//中标金额
+			isValue++
+		}
+		if isValue==0 {
+			reason = reason + "---低质量-要素均为空-标题满足"
+			return true, reason
+		}else if isValue==1 {
+			isMeet := false
+			if isMeet, reason = judgeLowQualityData(v, info, reason); isMeet {
+				reason = reason + "---低质量-有且一个要素组合"
+				return true, reason
+			}
+		}else {
+
+		}
+	}
+	return false,reason
+}
+
+
+//类别细节原因记录
+func judgeLowQualityData(v *Info, info *Info, reason string) (bool, string) {
+	if info.projectname!="" && info.projectname == v.projectname{//项目名称
+		reason = reason + "---项目名称"
+		return true,reason
+	}
+	if info.budget != 0 && info.budget == v.budget{//预算
+		reason = reason + "---预算"
+		return true,reason
+	}
+	if v.winner != "" && info.winner == v.winner{//中标单位
+		reason = reason + "---中标单位"
+		return true,reason
+	}
+	if v.bidamount != 0 && info.bidamount == v.bidamount{//中标金额
+		reason = reason + "---中标金额"
+		return true,reason
+	}
+	return false,reason
+}

+ 302 - 0
udpfilterdup/src1/dataMethodMerge.go

@@ -0,0 +1,302 @@
+package src1
+
+import "qfw/util"
+
+//合并字段-并更新merge字段的值
+func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
+
+	merge_recordMap := make(map[string]interface{}, 0)
+	mergeArr := make([]int64, 0)
+	//是否替换数据了-记录原始的数据
+	is_replace := false
+	//1、城市
+	if source.area == "" || source.area == "全国" {
+		//为空
+		if info.area != "全国" && info.area != "" {
+			merge_recordMap["area"] = source.area
+			merge_recordMap["city"] = source.city
+			source.area = info.area
+			source.city = info.city
+			mergeArr = append(mergeArr, 1)
+			is_replace = true
+		}
+	} else {
+		//不为空-查看站点相关-有值必替换
+		if source.is_site {
+			//是站点替换的城市
+			merge_recordMap["site_area"] = source.area
+			merge_recordMap["site_city"] = source.city
+			mergeArr = append(mergeArr, 0)
+			is_replace = true
+			source.is_site = false
+
+		}
+	}
+	//2、项目名称
+	if source.projectname == "" && info.projectname != "" {
+		merge_recordMap["projectname"] = source.projectname
+		source.projectname = info.projectname
+		mergeArr = append(mergeArr, 2)
+		is_replace = true
+	}
+	//3、项目编号
+	if source.projectcode == "" && info.projectcode != "" {
+		merge_recordMap["projectcode"] = source.projectcode
+		source.projectcode = info.projectcode
+		mergeArr = append(mergeArr, 3)
+		is_replace = true
+	}
+	//4、采购单位
+	if source.buyer == "" && info.buyer != "" {
+		merge_recordMap["buyer"] = source.buyer
+		source.buyer = info.buyer
+		mergeArr = append(mergeArr, 4)
+		is_replace = true
+	}
+	//5、预算
+	if source.budget == 0 && info.budget != 0 {
+		merge_recordMap["budget"] = source.budget
+		source.budget = info.budget
+		mergeArr = append(mergeArr, 5)
+		is_replace = true
+	}
+	//6、中标单位
+	if source.winner == "" && info.winner != "" {
+		merge_recordMap["winner"] = source.winner
+		source.winner = info.winner
+		mergeArr = append(mergeArr, 6)
+		is_replace = true
+	}
+	//7、中标金额
+	if source.bidamount == 0 && info.bidamount != 0 {
+		merge_recordMap["bidamount"] = source.bidamount
+		source.bidamount = info.bidamount
+		mergeArr = append(mergeArr, 7)
+		is_replace = true
+	}
+	//8、开标时间-地点
+	if source.bidopentime == 0 && info.bidopentime != 0 {
+		merge_recordMap["bidopentime"] = source.bidopentime
+		source.bidopentime = info.bidopentime
+		mergeArr = append(mergeArr, 8)
+		is_replace = true
+	}
+
+	//9、合同编号
+	if source.contractnumber == "" && info.contractnumber != "" {
+		merge_recordMap["contractnumber"] = source.contractnumber
+		source.contractnumber = info.contractnumber
+		mergeArr = append(mergeArr, 9)
+		is_replace = true
+	}
+
+	//10、发布时间
+	if source.publishtime == 0 && info.publishtime != 0 {
+		merge_recordMap["publishtime"] = source.publishtime
+		source.publishtime = info.publishtime
+		mergeArr = append(mergeArr, 10)
+		is_replace = true
+	}
+	//11、代理机构
+	if source.agency == "" && info.agency != "" {
+		merge_recordMap["agency"] = source.agency
+		source.agency = info.agency
+		mergeArr = append(mergeArr, 11)
+		is_replace = true
+	}
+
+	if is_replace { //有过替换更新
+		//总次数+1
+		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
+		merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
+		//和哪一个数据id进行非空替换的-记录
+		key := info.id
+		source.mergemap[key] = merge_recordMap
+	}
+
+	//待进一步优化
+	return source, mergeArr, is_replace
+}
+
+//权重评估
+func basicDataScore(v *Info, info *Info) bool {
+
+	/*
+	  权重评估
+	  网站优先级判定规则:
+	  1、国家>省级>市级>县区
+	  2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台
+	  3、同sitetype-分析weight
+	  4、要素打分-分析
+	*/
+	v_score, info_score := -1, -1
+	dict_v := SiteMap[v.site]
+	dict_info := SiteMap[info.site]
+	//先判断level
+	if dict_v != nil {
+		v_level := util.ObjToString(dict_v["level"])
+		if v_level == "国家" {
+			v_score = 4
+		} else if v_level == "省级" {
+			v_score = 3
+		} else if v_level == "市级" {
+			v_score = 2
+		} else if v_level == "县区" {
+			v_score = 1
+		} else if v_level == "" {
+		} else {
+			v_score = 0
+		}
+	}
+
+	if dict_info != nil {
+		info_level := util.ObjToString(dict_info["level"])
+		if info_level == "国家" {
+			info_score = 4
+		} else if info_level == "省级" {
+			info_score = 3
+		} else if info_level == "市级" {
+			info_score = 2
+		} else if info_level == "县区" {
+			info_score = 1
+		} else if info_level == "" {
+
+		} else {
+			v_score = 0
+		}
+	}
+
+	if v_score > info_score {
+		return true
+	}
+	if v_score < info_score {
+		return false
+	}
+
+	//判断sitetype
+	if dict_v != nil {
+		v_sitetype := util.ObjToString(dict_v["sitetype"])
+		if v_sitetype == "政府采购" {
+			v_score = 4
+		} else if v_sitetype == "公共资源" {
+			v_score = 3
+		} else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
+			v_score = 2
+		} else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
+			v_score = 1
+		} else if v_sitetype == "" {
+		} else {
+			v_score = 0
+		}
+	}
+
+	if dict_info != nil {
+		info_sitetype := util.ObjToString(dict_info["sitetype"])
+		if info_sitetype == "政府采购" {
+			info_score = 4
+		} else if info_sitetype == "公共资源" {
+			info_score = 3
+		} else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
+			info_score = 2
+		} else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
+			info_score = 1
+		} else if info_sitetype == "" {
+		} else {
+			info_score = 0
+		}
+	}
+
+	if v_score > info_score {
+		return true
+	}
+	if v_score < info_score {
+		return false
+	}
+
+	if v_score == info_score {//同sitetype 情况下   分析weight
+		v_weight := util.IntAll(dict_v["weight"])
+		info_weight := util.IntAll(dict_info["weight"])
+		if v_weight>info_weight {
+			return true
+		}
+		if info_weight>v_weight {
+			return false
+		}
+	}
+
+	//网站评估
+	m, n := 0, 0
+	if v.projectname != "" {
+		m++
+	}
+	if v.buyer != "" {
+		m++
+	}
+	if v.projectcode != "" || v.contractnumber != "" {
+		m++
+	}
+	if v.budget != 0 {
+		m++
+	}
+	if v.bidamount != 0 {
+		m++
+	}
+	if v.winner != "" {
+		m++
+	}
+	if v.bidopentime != 0 {
+		m++
+	}
+	if v.bidopenaddress != "" {
+		m++
+	}
+	if v.agency != "" {
+		m = m + 2
+	}
+	if v.city != "" {
+		m = m + 2
+	}
+
+	if info.projectname != "" {
+		n++
+	}
+	if info.buyer != "" {
+		n++
+	}
+	if info.projectcode != "" || info.contractnumber != "" {
+		n++
+	}
+	if info.budget != 0 {
+		n++
+	}
+	if info.bidamount != 0 {
+		n++
+	}
+	if info.winner != "" {
+		n++
+	}
+	if info.bidopentime != 0 {
+		n++
+	}
+	if info.bidopenaddress != "" {
+		n++
+	}
+	if info.agency != "" {
+		n = n + 2
+	}
+	if info.city != "" {
+		n = n + 2
+	}
+
+	if m > n {
+		return true
+	} else if m == n {
+		if v.publishtime >= info.publishtime {
+			return true
+		} else {
+			return false
+		}
+	} else {
+		return false
+	}
+}

+ 563 - 0
udpfilterdup/src1/datamap.go

@@ -0,0 +1,563 @@
+package src1
+
+import (
+	"fmt"
+	"log"
+	qutil "qfw/util"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Info struct {
+	id    string //id
+	title string //标题
+
+	area           string  //省份
+	city           string  //城市
+	subtype        string  //信息类型
+	buyer          string  //采购单位
+	agency         string  //代理机构
+	winner         string  //中标单位
+	budget         float64 //预算金额
+	bidamount      float64 //中标金额
+	projectname    string  //项目名称
+	projectcode    string  //项目编号
+	contractnumber string  //合同编号
+	publishtime    int64   //发布时间
+	comeintime     int64   //入库时间
+	bidopentime    int64   //开标时间
+	bidopenaddress string  //开标地点
+	site 		   string //站点
+	href 		     string //正文的url
+	repeatid         string                 //重复id
+	titleSpecialWord bool                   //标题特殊词
+	specialWord      bool                   //再次判断的特殊词
+	mergemap         map[string]interface{} //合并记录
+	is_site          bool                   //是否站点城市
+
+}
+
+var datelimit = float64(432000) //五天
+var sitelock sync.Mutex         //锁
+
+//一般数据判重
+type datamap struct {
+	lock   sync.Mutex //锁
+	days   int        //保留几天数据
+	data   map[string][]*Info
+	keymap []string
+	areakeys []string
+	keys   map[string]bool
+}
+
+//历史
+func TimedTaskDatamap(days int,lasttime int64) *datamap {
+	log.Println("数据池开始重新构建")
+	datelimit = qutil.Float64All(days * 86400)
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{},map[string]bool{}}
+	if lasttime <0 {
+		log.Println("数据池空数据")
+		return dm
+	}
+	start := int(time.Now().Unix())
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{"publishtime": map[string]interface{}{
+		"$lt": lasttime,
+	}}
+	log.Println("query", query)
+	it := sess.DB(mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
+	n, continuSum := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		//qutil.IntAll(tmp["dataging"]) == 1
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
+			qutil.IntAll(tmp["dataging"]) == 1 {
+
+		} else {
+			pt := tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+			if pt_time <= 0 {
+				break
+			}
+			if qutil.Float64All(lasttime-pt_time) < datelimit {
+				continuSum++
+				info := NewInfo(tmp)
+				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
+				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+				data := dm.data[k]
+				if data == nil {
+					data = []*Info{}
+				}
+				data = append(data, info)
+				dm.data[k] = data
+				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
+			} else {
+				break
+			}
+		}
+		if n%50000 == 0 {
+			log.Println("当前数据池:", n, continuSum)
+		}
+		tmp = make(map[string]interface{})
+	}
+
+	log.Printf("数据池构建完成:%d秒,%d个\n", int(time.Now().Unix())-start, n)
+
+	return dm
+}
+
+
+//增量
+func NewDatamap(days int, lastid string) *datamap {
+	datelimit = qutil.Float64All(days * 86400 * 2)
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{},[]string{}, map[string]bool{}}
+	if lastid == "" {
+		return dm
+	}
+	//初始化加载数据
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	query := map[string]interface{}{"_id": map[string]interface{}{
+		"$lte": StringTOBsonId(lastid),
+	}}
+	log.Println("query", query)
+	it := sess.DB(mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
+	now1 := int64(0)
+	n, continuSum := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{
+
+		} else {
+			pt := tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+			if pt_time <= 0 {
+				break
+			}
+			if now1 == 0 {
+				now1 = pt_time
+			}
+			if qutil.Float64All(now1-pt_time) < datelimit {
+				continuSum++
+				info := NewInfo(tmp)
+				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
+				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+				data := dm.data[k]
+				if data == nil {
+					data = []*Info{}
+				}
+				data = append(data, info)
+				dm.data[k] = data
+				dm.keys[dkey] = true
+				//添加省
+				isAreaExist :=false
+				for _,v:= range dm.areakeys {
+					if v==info.area {
+						isAreaExist = true
+					}
+				}
+				if !isAreaExist {
+					areaArr := dm.areakeys
+					areaArr = append(areaArr,info.area)
+					dm.areakeys = areaArr
+				}
+			} else {
+				break
+			}
+		}
+		if n%10000 == 0 {
+			log.Println("当前 n:", n,"数量:" ,continuSum)
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Println("load data:", n,"总数:",continuSum)
+	return dm
+}
+
+//数据构建
+func NewInfo(tmp map[string]interface{}) *Info {
+	subtype := qutil.ObjToString(tmp["subtype"])
+	area := qutil.ObjToString(tmp["area"])
+	if area == "A" {
+		area = "全国"
+	}
+	info := &Info{}
+	if IdType {
+		info.id = qutil.ObjToString(tmp["_id"])
+	}else  {
+		info.id = BsonTOStringId(tmp["_id"])
+	}
+
+	info.title = qutil.ObjToString(tmp["title"])
+	info.area = area
+	info.subtype = subtype
+	info.buyer = qutil.ObjToString(tmp["buyer"])
+	info.projectname = qutil.ObjToString(tmp["projectname"])
+	info.contractnumber = qutil.ObjToString(tmp["contractnumber"])
+	info.projectcode = qutil.ObjToString(tmp["projectcode"])
+	info.city = qutil.ObjToString(tmp["city"])
+	info.agency = qutil.ObjToString(tmp["agency"])
+	info.winner = qutil.ObjToString(tmp["winner"])
+	info.budget = qutil.Float64All(tmp["budget"])
+	info.bidamount = qutil.Float64All(tmp["bidamount"])
+	info.publishtime = qutil.Int64All(tmp["publishtime"])
+	info.comeintime = qutil.Int64All(tmp["comeintime"])
+	info.bidopentime = qutil.Int64All(tmp["bidopentime"])
+	info.bidopenaddress = qutil.ObjToString(tmp["bidopenaddress"])
+	info.site = qutil.ObjToString(tmp["site"])
+	info.href = qutil.ObjToString(tmp["href"])
+	info.repeatid = qutil.ObjToString(tmp["repeatid"])
+
+	info.specialWord = FilterRegTitle.MatchString(info.title)
+	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
+	info.mergemap = *qutil.ObjToMap(tmp["merge_map"])
+	if info.mergemap == nil {
+		info.mergemap = make(map[string]interface{}, 0)
+	}
+
+	info.is_site = false
+
+	return info
+}
+
+//判重方法
+//判重方法
+//判重方法
+func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
+	reason := ""
+	keys := []string{}
+	d.lock.Lock()
+	for k, _ := range d.keys { //不同时间段
+		if info.area=="全国" {
+			//匹配所有省
+			for _,v := range d.areakeys{
+				keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
+			}
+		}else {
+			//匹配指定省
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
+		}
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+
+	}
+
+
+	d.lock.Unlock()
+L:
+	for _, k := range keys {
+		d.lock.Lock()
+		data := d.data[k]
+		d.lock.Unlock()
+		if len(data) > 0 { //对比v   找到同类型,同省或全国的数据作对比
+			for _, v := range data {
+				reason = ""
+				if v.id == info.id { //正常重复
+					return false, v, ""
+				}
+
+
+				//buyer 优先级高,有值且不相等过滤
+				if info.buyer!=""&&v.buyer!=""&&info.buyer!=v.buyer {
+					if buyerIsContinue(v,info) {
+						continue
+					}
+				}
+
+				if info.site != "" {//站点临时赋值
+					sitelock.Lock()
+					dict := SiteMap[info.site]
+					sitelock.Unlock()
+					if dict != nil {
+						if (info.area == "全国" && dict["area"] != "")||
+							(info.city == "" && dict["city"] != ""){
+							info.is_site = true
+							info.area = qutil.ObjToString(dict["area"])
+							info.city = qutil.ObjToString(dict["city"])
+						}
+					}
+				}
+
+
+				//前置条件 - 站点相关
+				if info.site != "" && info.site == v.site {
+					if info.href != "" && info.href == v.href {
+						reason = "同站点-href相同"
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+					if info.href != "" && info.href != v.href {
+						if v.title==info.title&&len([]rune(info.title)) >10 && isTheSameDay(info.publishtime,v.publishtime){
+							if !againRepeat(v, info) {//进行同站点二次判断
+								reason = "同站点-href不同-标题相同等"
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}else {
+								continue
+							}
+						}else {
+							continue
+						}
+					}
+				}
+
+				specialNum:= dealWithSpecialWordNumber(info,v)
+				//前置条件 - 标题相关,有且一个关键词
+				if specialNum==1 {
+					if info.title != v.title && v.title != "" && info.title != "" {
+						continue
+					}
+				}
+				//前置条件3 - 标题相关,均含有关键词
+				if specialNum==2 {
+					if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
+						v.title != "" && info.title != "" {
+						letter1,letter2:=v.title,info.title
+						res, _ := regexp.Compile("[0-9a-zA-Z]+");
+						if res.MatchString(letter1)||res.MatchString(letter2) {
+							letter1=convertArabicNumeralsAndLetters(letter1)
+							letter2=convertArabicNumeralsAndLetters(letter2)
+						}
+						if strings.Contains(letter1,"重新招标")|| strings.Contains(letter2,"重新招标"){
+							letter1,letter2=dealWithSpecialPhrases(letter1,letter2)
+						}
+						if letter1==letter2 {
+							reason = reason + "标题关键词相等关系"
+							if !againRepeat(v, info) {//进行二级金额判断
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}
+						}else {
+							if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
+								//无包含关系-即不相等
+								continue
+							}
+						}
+					}
+				}
+
+
+				//前置条件-五要素均相等
+				if leadingElementSame(v,info) {
+					reason = "五要素-相同-满足"
+					b = true
+					source = v
+					reasons = reason
+					break L
+				}
+
+
+
+				//新增快速数据过少判重
+				if LowHeavy {
+					repeat := false
+					if repeat, reason = fastLowQualityHeavy(v, info, reason); repeat {
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+				}
+
+				//代理机构相同-非空相等
+				if v.agency != "" && info.agency != "" && v.agency == info.agency {
+					reason = reason + "同机构-"
+					repeat := false
+					if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
+						b = true
+						source = v
+						reasons = reason
+						break L
+					}
+				} else {
+					reason = reason + "非同机构-"
+					if info.city != "" && info.city == v.city {
+						reason = reason + "同城-"
+						repeat := false
+						if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
+							b = true
+							source = v
+							reasons = reason
+							break L
+						}
+					} else {
+						reason = reason + "不同城-"
+						repeat := false
+						if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat {
+							b = true
+							source = v
+							reasons = reason
+							break L
+						}
+					}
+				}
+			}
+
+		}
+	}
+
+	//往预存数据 d 添加
+	if !b {
+		ct := info.publishtime
+		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+		d.lock.Lock()
+		data := d.data[k]
+		if data == nil {
+			data = []*Info{info}
+			d.data[k] = data
+			if !d.keys[dkey] {
+				d.keys[dkey] = true
+				d.update(ct)
+			}
+		} else {
+			data = append(data, info)
+			d.data[k] = data
+		}
+
+		//添加省
+		isAreaExist :=false
+		for _,v:= range d.areakeys {
+			if v==info.area {
+				isAreaExist = true
+			}
+		}
+		if !isAreaExist {
+			areaArr := d.areakeys
+			areaArr = append(areaArr,info.area)
+			d.areakeys = areaArr
+		}
+
+		d.lock.Unlock()
+	}
+
+	return
+}
+
+func (d *datamap) update(t int64) {
+
+	if TimingTask {
+		d.keymap = d.GetLatelyFiveDay(t)
+	}else {
+		//d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
+		d.keymap = d.GetLatelyFiveDayDouble(t)
+	}
+	m := map[string]bool{}
+	for _, v := range d.keymap {
+		m[v] = true
+	}
+	all, all1 := 0, 0
+
+	for k, v := range d.data {
+		all += len(v)
+		if !m[k[:8]] {
+			delete(d.data, k)
+		}
+	}
+	for k, _ := range d.keys {
+		if !m[k] {
+			delete(d.keys, k)
+		}
+	}
+	for _, v := range d.data {
+		all1 += len(v)
+	}
+	//log.Println("更新前后数据:", all, all1)
+}
+
+func (d *datamap) GetLatelyFiveDay(t int64) []string  {
+	array := make([]string, d.days)
+	now := time.Unix(t, 0)
+	for i := 0; i < d.days; i++ {
+		array[i] = now.Format(qutil.Date_yyyyMMdd)
+		now = now.AddDate(0, 0, -1)
+	}
+	return array
+}
+
+func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
+	array := make([]string, d.days*2)
+	now := time.Now()
+	for i := 0; i < d.days*2; i++ {
+		array[i] = now.Format(qutil.Date_yyyyMMdd)
+		now = now.AddDate(0, 0, -1)
+	}
+	return array
+}
+
+
+
+//替换原始数据池
+func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
+	//删除数据池的老数据
+	ct_old := oldData.publishtime
+	dkey_old := qutil.FormatDateByInt64(&ct_old, qutil.Date_yyyyMMdd)
+	k_old := fmt.Sprintf("%s_%s_%s", dkey_old, oldData.subtype, oldData.area)
+	data_old := d.data[k_old]
+	for k, v := range data_old {
+		if v.id == oldData.id {//删除对应当前的老数据
+			data_old = append(data_old[:k], data_old[k+1:]...)
+			break
+		}
+	}
+	d.data[k_old] = data_old
+
+	//添加新的
+	ct := newData.publishtime
+	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+	k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
+	d.lock.Lock()
+	data := d.data[k]
+	if data == nil {
+		data = []*Info{newData}
+		d.data[k] = data
+		if !d.keys[dkey] {
+			d.keys[dkey] = true
+			d.update(ct)
+		}
+	} else {
+		data = append(data, newData)
+		d.data[k] = data
+	}
+	//添加省
+	isAreaExist :=false
+	for _,v:= range d.areakeys {
+		if v==newData.area {
+			isAreaExist = true
+		}
+	}
+	if !isAreaExist {
+		areaArr := d.areakeys
+		areaArr = append(areaArr,newData.area)
+		d.areakeys = areaArr
+	}
+
+
+	d.lock.Unlock()
+}
+
+
+
+
+
+
+
+

+ 609 - 0
udpfilterdup/src1/main.go

@@ -0,0 +1,609 @@
+package src1
+
+/**
+招标信息判重
+**/
+
+import (
+	"encoding/json"
+	"flag"
+	"fmt"
+	"github.com/cron"
+	"log"
+	mu "mfw/util"
+	"net"
+	"os"
+	"qfw/util"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+var (
+	Sysconfig    map[string]interface{} //配置文件
+	mconf        map[string]interface{} //mongodb配置信息
+	mgo          *MongodbSim            //mongodb操作对象
+	extract      string
+	extract_back string
+	udpclient    mu.UdpClient             //udp对象
+	nextNode     []map[string]interface{} //下节点数组
+	dupdays      = 5                      //初始化判重范围
+	DM           *datamap                 //
+
+	//正则筛选相关
+	FilterRegTitle   = regexp.MustCompile("^_$")
+	FilterRegTitle_0 = regexp.MustCompile("^_$")
+	FilterRegTitle_1 = regexp.MustCompile("^_$")
+	FilterRegTitle_2 = regexp.MustCompile("^_$")
+
+	isMerger       bool                              //是否合并
+	threadNum      int                               //线程数量
+	SiteMap        map[string]map[string]interface{} //站点map
+	LowHeavy       bool                              //低质量数据判重
+	TimingTask     bool                              //是否定时任务
+	timingSpanDay  int64                             //时间跨度
+	timingPubScope int64                             //发布时间周期
+	sid,eid,lastid string                     		 //测试人员判重使用
+	IdType         bool   							 //默认object类型
+)
+
+func init() {
+
+	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
+	flag.StringVar(&sid, "sid", "", "开始id")
+	flag.StringVar(&eid, "eid", "", "结束id")
+	flag.Parse()
+	//172.17.145.163:27080
+	util.ReadConfig(&Sysconfig)
+	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
+	mconf = Sysconfig["mongodb"].(map[string]interface{})
+	mgo = &MongodbSim{
+		MongodbAddr: mconf["addr"].(string),
+		DbName:      mconf["db"].(string),
+		Size:        util.IntAllDef(mconf["pool"], 10),
+	}
+	mgo.InitPool()
+	extract = mconf["extract"].(string)
+	extract_back = mconf["extract_back"].(string)
+
+	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
+	//加载数据
+	DM = NewDatamap(dupdays, lastid)
+	FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
+	FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
+	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
+	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
+	isMerger = Sysconfig["isMerger"].(bool)
+	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
+	LowHeavy = Sysconfig["lowHeavy"].(bool)
+	TimingTask = Sysconfig["timingTask"].(bool)
+	timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
+	timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
+
+	//站点配置
+	site := mconf["site"].(map[string]interface{})
+	SiteMap = make(map[string]map[string]interface{}, 0)
+	start := int(time.Now().Unix())
+	sess_site := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_site)
+	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
+	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
+		data_map := map[string]interface{}{
+			"area":     util.ObjToString(site_dict["area"]),
+			"city":     util.ObjToString(site_dict["city"]),
+			"district": util.ObjToString(site_dict["district"]),
+			"sitetype": util.ObjToString(site_dict["sitetype"]),
+			"level":    util.ObjToString(site_dict["level"]),
+			"weight":   util.ObjToString(site_dict["weight"]),
+		}
+		SiteMap[util.ObjToString(site_dict["site"])] = data_map
+	}
+	log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
+}
+
+
+func main() {
+
+	go checkMapJob()
+	updport := Sysconfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	if TimingTask {
+		go timedTaskDay()
+	}
+	time.Sleep(99999 * time.Hour)
+}
+
+
+
+//测试组人员使用
+func mainT() {
+	if TimingTask {
+		log.Println("定时任务测试开始")
+		go timedTaskDay()
+		time.Sleep(99999 * time.Hour)
+	} else {
+		//IdType = true  //打开id字符串模式
+		sid = "5ef01220801f744d045f51f1"
+		eid = "5ef61eb3801f744d046402dd"
+		log.Println("正常判重测试开始")
+		log.Println(sid, "---", eid)
+		mapinfo := map[string]interface{}{}
+		if sid == "" || eid == "" {
+			log.Println("sid,eid参数不能为空")
+			os.Exit(0)
+		}
+		mapinfo["gtid"] = sid
+		mapinfo["lteid"] = eid
+		mapinfo["stop"] = "true"
+		task([]byte{}, mapinfo)
+		time.Sleep(99999 * time.Hour)
+	}
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	fmt.Println("接受的段数据")
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		//从表中开始处理
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Println("err:", err, "mapInfo:", mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			taskType := util.ObjToString(mapInfo["stype"])
+			if taskType == "normalTask" {
+				//判重流程
+				go task(data, mapInfo)
+			} else {
+				//其他
+				go task(data, mapInfo)
+			}
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
+			}
+			udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+		}
+	case mu.OP_NOOP: //下个节点回应
+		ok := string(data)
+		if ok != "" {
+			log.Println("ok:", ok)
+			udptaskmap.Delete(ok)
+		}
+	}
+}
+
+//开始判重程序
+func task(data []byte, mapInfo map[string]interface{}) {
+	log.Println("开始数据判重")
+	defer util.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	if IdType {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mapInfo["gtid"].(string),
+				"$lte": mapInfo["lteid"].(string),
+			},
+		}
+	}
+	log.Println(mgo.DbName, extract, q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	updateExtract := [][]map[string]interface{}{}
+	ids:=[]string{}
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	n, repeateN := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%10000 == 0 {
+			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
+		}
+		source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			repeateN++
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging":0,
+						"repeat_reason": "sourcewebsite为1,重复",
+					},
+				},
+			})
+			if len(updateExtract) >= 200 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
+			util.IntAll(tmp["dataging"]) == 1 {
+			if util.IntAll(tmp["repeat"]) == 1 {
+				repeateN++
+			}
+			tmp = make(map[string]interface{})
+			continue
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			info := NewInfo(tmp)
+			if !LowHeavy { //是否进行低质量数据判重
+				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat": -1, //无效数据标签
+							},
+						},
+					})
+					if len(updateExtract) >= 200 {
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+					return
+				}
+			}
+			//正常判重
+			b, source, reason := DM.check(info)
+			if b { //有重复,生成更新语句,更新抽取和更新招标
+				repeateN++
+				var updateID = map[string]interface{}{} //记录更新判重的
+				updateID["_id"] = StringTOBsonId(info.id)
+				if IdType {
+					updateID["_id"] = info.id
+				}
+				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+					updateID,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":        1,
+							"repeat_reason": reason,
+							"repeat_id":     source.id,
+						},
+					},
+				})
+			}
+		}(tmp)
+		if len(updateExtract) >= 200 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+	}
+	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
+
+	//任务完成,开始发送广播通知下面节点
+	if n > repeateN && mapInfo["stop"] == nil {
+		log.Println("判重任务完成发送udp")
+		for _, to := range nextNode {
+			sid, _ := mapInfo["gtid"].(string)
+			eid, _ := mapInfo["lteid"].(string)
+			key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  sid,
+				"lteid": eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+				"ids":   strings.Join(ids, ","),
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+
+
+//定时任务--定时任务--定时任务
+func timedTaskDay() {
+	log.Println("部署定时任务")
+	c := cron.New()
+	c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
+	c.Start()
+}
+func timedTaskOnce() {
+	defer util.Catch()
+	log.Println("开始一次迁移任务")
+	movedata()
+	log.Println("开始一次任务判重")
+	//当前时间-8   -4 小时
+	now := time.Now()
+	log.Println(now)
+	preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
+	curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
+	log.Println(preTime,curTime)
+	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
+	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	between_time := curTime.Unix() - (86400 * timingPubScope)
+	log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
+	//区间id
+	q_start := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": StringTOBsonId(task_sid),
+			"$lte": StringTOBsonId(task_eid),
+		},
+	}
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
+	num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
+	updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
+	pendAllArr:=[][]map[string]interface{}{}//待处理数组
+	dayArr := []map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
+		if num%10000 == 0 {
+			log.Println("正序遍历:", num)
+		}
+		source := util.ObjToMap(tmp["jsondata"])
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging": 0,
+						"repeat_reason": "sourcewebsite为1 重复",
+					},
+				},
+			})
+			if len(updateExtract) > 50 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+
+
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+
+
+		//取-符合-发布时间X年内的数据
+		if util.IntAll(tmp["dataging"]) == 1 {
+			pubtime := util.Int64All(tmp["publishtime"])
+			if pubtime > 0 && pubtime >= between_time {
+				oknum++
+				if deterTime==0 {
+					log.Println("找到第一条符合条件的数据")
+					deterTime = util.Int64All(tmp["publishtime"])
+					dayArr = append(dayArr,tmp)
+				}else {
+					if pubtime-deterTime >timingSpanDay*86400 {
+						//新数组重新构建,当前组数据加到全部组数据
+						pendAllArr = append(pendAllArr,dayArr)
+						dayArr = []map[string]interface{}{}
+						deterTime = util.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr,tmp)
+					}else {
+						dayArr = append(dayArr,tmp)
+					}
+				}
+			}else {
+				//不在两年内的也清标记
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,
+						},
+					},
+				})
+				if len(updateExtract) > 50 {
+					mgo.UpSertBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	//批量更新标记
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+
+	if len(dayArr)>0 {
+		pendAllArr = append(pendAllArr,dayArr)
+		dayArr = []map[string]interface{}{}
+	}
+
+	log.Println("查询数量:",num,"符合条件:",oknum)
+
+	if len(pendAllArr) <= 0 {
+		log.Println("没找到dataging==1的数据")
+		return
+	}
+
+	//测试分组数量是否正确
+	testNum:=0
+	for k,v:=range pendAllArr {
+		log.Println("第",k,"组--","数量:",len(v))
+		testNum = testNum+len(v)
+	}
+	log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
+
+	n, repeateN := 0, 0
+	for k,v:=range pendAllArr { //每组结束更新一波数据
+		//构建当前组的数据池
+		log.Println("构建第",k,"组---(数据池)")
+		//当前组的第一个发布时间
+		first_pt :=util.Int64All(v[0]["publishtime"])
+		DM = TimedTaskDatamap(dupdays, first_pt)
+		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
+		n = n+len(v)
+		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
+		for _,tmp:=range v {
+			info := NewInfo(tmp)
+			if !LowHeavy { //是否进行低质量数据判重
+				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+					log.Println("无效数据")
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat":   -1, //无效数据标签
+								"dataging": 0,
+							},
+						},
+					})
+					if len(updateExtract) > 50 {
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+					continue
+				}
+			}
+			b, source, reason := DM.check(info)
+			if b { //有重复,生成更新语句,更新抽取和更新招标
+				log.Println("判重结果", b, reason,"目标id",info.id)
+				repeateN++
+				//重复数据打标签
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":        1,
+							"repeat_reason": reason,
+							"repeat_id":     source.id,
+							"dataging":      0,
+						},
+					},
+				})
+			}else {
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"dataging": 0,//符合条件的都为dataging==0
+						},
+					},
+				})
+			}
+			if len(updateExtract) > 50 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+		}
+
+		//每组数据结束-更新数据
+		if len(updateExtract) > 0 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+
+	}
+
+
+
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+		updateExtract = [][]map[string]interface{}{}
+	}
+	log.Println("this timeTask over.", n, "repeateN:", repeateN)
+
+	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+	if n > repeateN {
+		for _, to := range nextNode {
+			next_sid := util.BsonIdToSId(task_sid)
+			next_eid := util.BsonIdToSId(task_eid)
+			key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  next_sid,
+				"lteid": next_eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+//迁移数据dupdays+5之前的数据
+func movedata() {
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	year, month, day := time.Now().Date()
+	now:=time.Now()
+	move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
+	q := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": move_time,
+		},
+	}
+	log.Println(q)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		mgo.Save(extract_back, tmp)
+		tmp = map[string]interface{}{}
+		if index%1000 == 0 {
+			log.Println("index", index)
+		}
+	}
+	log.Println("save to", extract_back, " ok index", index)
+	qv := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
+		},
+	}
+	delnum := mgo.Delete(extract, qv)
+	log.Println("remove from ", extract, delnum)
+}
+
+

+ 315 - 0
udpfilterdup/src1/mgo.go

@@ -0,0 +1,315 @@
+package src1
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 59 - 0
udpfilterdup/src1/udptaskmap.go

@@ -0,0 +1,59 @@
+package src1
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				node.retry++
+				if node.retry > 5 {
+					log.Println("udp重试失败", k)
+					udptaskmap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+				}
+			} else if now-node.timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}