Преглед на файлове

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang преди 5 години
родител
ревизия
961f65aa47
променени са 4 файла, в които са добавени 659 реда и са изтрити 103 реда
  1. 1 9
      udpfilterdup/src/config.json
  2. 312 37
      udpfilterdup/src/datamap.go
  3. 338 55
      udpfilterdup/src/main.go
  4. 8 2
      udps/main.go

+ 1 - 9
udpfilterdup/src/config.json

@@ -5,7 +5,7 @@
         "addr": "192.168.3.207:27082",
         "pool": 15,
         "db": "extract_kf",
-        "extract": "a_testbidding",
+        "extract": "bidding_20190910_01",
         "extract_copy": "a_testbidding_copy",
         "bidding": "bidding_126"
     },
@@ -27,14 +27,6 @@
             "memo": "创建招标数据索引"
         }
     ],
-    "site": [
-        {
-            "addr": "信息网"
-        },
-        {
-            "addr": "招标网"
-        }
-    ],
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",

+ 312 - 37
udpfilterdup/src/datamap.go

@@ -32,24 +32,42 @@ type Info struct {
 	detail		   	   string		//招标内容
 	site			   string		//站点
 	href			   string		//正文的url
-	titleSpecialWord bool 			//标题特殊词
-	specialWord bool	 			//再次判断的特殊词
+	repeatid   		   string  		//重复id
+
+
+
+	titleSpecialWord   bool 		//标题特殊词
+	specialWord        bool	 		//再次判断的特殊词
 	mergemap           map[string]interface{}   //合并记录
+	accurateTime       int64		//最终准确的时间
+
+
 }
 
-var datelimit = float64(432000)
-var reason string //判重记录
 
+
+var datelimit = float64(432000)  //五天
+var reason string //判重原因记录
+
+//判重数据
 type datamap struct {
 	lock   sync.Mutex //锁
 	days   int        //保留几天数据
 	data   map[string][]*Info
 	keymap []string
 	keys   map[string]bool
+}
 
-
+//历史更新数据
+type historymap struct {
+	lock   sync.Mutex //锁
+	days   int        //保留几天数据
+	data   map[string][]*Info
+	keymap []string
+	keys   map[string]bool
 }
 
+
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
@@ -101,6 +119,88 @@ func NewDatamap(days int, lastid string) *datamap {
 	log.Println("load data:", n)
 	return dm
 }
+
+//构建新历史数据池
+func NewHistorymap(startid string,lastid string,startTime int64,lastTime int64) *historymap {
+	datelimit = qutil.Float64All(5 * 86400)
+	hm := &historymap{sync.Mutex{}, 5, map[string][]*Info{}, []string{}, map[string]bool{}}
+	if lastid == "" ||startid == ""{
+		return hm
+	}
+	//取startid之前5天
+	sess_start := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_start)  //lte  gte
+	it_start := sess_start.DB(mgo.DbName).C(extract).Find(mongodb.ObjToMQ(`{"_id":{"$lte":"`+startid+`"}}`,
+		true)).Sort("-_id").Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it_start.Next(&tmp); n++ {
+		cm := tmp["comeintime"]
+		comeintime := qutil.Int64All(tmp["comeintime"])
+		if comeintime == 0 {
+			id := qutil.BsonIdToSId(tmp["_id"])[0:8]
+			comeintime, _ = strconv.ParseInt(id, 16, 64)
+		}
+
+		if qutil.Float64All(startTime-comeintime) < datelimit {
+			info := NewInfo(tmp)
+			dkey := qutil.FormatDateWithObj(&cm, qutil.Date_yyyyMMdd)
+			k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+			data := hm.data[k]
+			if data == nil {
+				data = []*Info{}
+			}
+			data = append(data, info)
+			hm.data[k] = data
+			hm.keys[dkey] = true
+		} else {
+			break
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+	//取lastid之后5天
+	sess_last := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_last)  //lte  gte
+	it_last := sess_start.DB(mgo.DbName).C(extract).Find(mongodb.ObjToMQ(`{"_id":{"$gte":"`+lastid+`"}}`,
+		true)).Sort("_id").Iter()
+	for tmp := make(map[string]interface{}); it_last.Next(&tmp); n++ {
+		cm := tmp["comeintime"]
+		comeintime := qutil.Int64All(tmp["comeintime"])
+		if comeintime == 0 {
+			id := qutil.BsonIdToSId(tmp["_id"])[0:8]
+			comeintime, _ = strconv.ParseInt(id, 16, 64)
+		}
+
+		if qutil.Float64All(comeintime-lastTime) > datelimit {
+			info := NewInfo(tmp)
+			dkey := qutil.FormatDateWithObj(&cm, qutil.Date_yyyyMMdd)
+			k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+			data := hm.data[k]
+			if data == nil {
+				data = []*Info{}
+			}
+			data = append(data, info)
+			hm.data[k] = data
+			hm.keys[dkey] = true
+		} else {
+			break
+		}
+		tmp = make(map[string]interface{})
+	}
+
+
+
+
+
+	log.Println("load history:", n)
+	return hm
+}
+
+
+
+
+
 func NewInfo(tmp map[string]interface{}) *Info {
 	subtype := qutil.ObjToString(tmp["subtype"])
 	area := qutil.ObjToString(tmp["area"])
@@ -114,31 +214,40 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.subtype = subtype
 	info.buyer = qutil.ObjToString(tmp["buyer"])
 	info.projectname = qutil.ObjToString(tmp["projectname"])
-
-	info.specialWord = FilterRegTitle.MatchString(info.title)
-	info.titleSpecialWord = FilterRegTitle_1.MatchString(info.title)||FilterRegTitle_2.MatchString(info.title)
-
 	info.projectcode = qutil.ObjToString(tmp["projectcode"])
 	info.city = qutil.ObjToString(tmp["city"])
 	info.agency = qutil.ObjToString(tmp["agency"])
 	info.winner = qutil.ObjToString(tmp["winner"])
 	info.budget = qutil.Float64All(tmp["budget"])
 	info.bidamount = qutil.Float64All(tmp["bidamount"])
-
-
 	info.publishtime = qutil.Int64All(tmp["publishtime"])
-
 	info.bidopentime  = qutil.Int64All(tmp["bidopentime"])
 	info.agencyaddr	 = qutil.ObjToString(tmp["agencyaddr"])
 	info.detail		= qutil.ObjToString(tmp["detail"])
 	info.site	 = qutil.ObjToString(tmp["site"])
 	info.href	 = qutil.ObjToString(tmp["href"])
+	info.repeatid = qutil.ObjToString(tmp["repeatid"])
+
+
+	info.specialWord = FilterRegTitle.MatchString(info.title)
+	info.titleSpecialWord = FilterRegTitle_1.MatchString(info.title)||FilterRegTitle_2.MatchString(info.title)
 	info.mergemap = *qutil.ObjToMap(tmp["merge_map"])
+	if info.mergemap==nil {
+		info.mergemap = make(map[string]interface{},0)
+	}
+
 
+	info.accurateTime = qutil.Int64All(tmp["publishtime"])
+	if info.accurateTime ==0 {
+		info.accurateTime = qutil.Int64All(tmp["comeintime"])
+		if info.accurateTime ==0{
+			info.accurateTime, _ = strconv.ParseInt(qutil.BsonIdToSId(tmp["_id"]), 16, 64)
+		}
+	}
 
 	return info
 }
-// 486 396 315
+//判重方法
 func (d *datamap) check(info *Info) (b bool,  source *Info,reasons string) {
 	reason = ""
 	d.lock.Lock()
@@ -157,41 +266,55 @@ L:
 		data := d.data[k]
 		if len(data) > 0 { //对比v   找到同类型,同省或全国的数据作对比
 			for _, v := range data {
+				reason = ""
 				if v.id == info.id {//正常重复
 					return false, v,""
 				}
-				if math.Abs(qutil.Float64All(v.publishtime-info.publishtime)) > datelimit {
+				//备份  新增发布时间为空-取入库时间-在为空取id
+				if math.Abs(qutil.Float64All(v.accurateTime-info.accurateTime)) > datelimit {
 					continue   //是否为5天内数据
 				}
+				//类型分组
+				if info.subtype==v.subtype {
+					//站点配置--
+					if info.site!="" {
+						dict := SiteMap[info.site].(map[string]string)
+						if dict!=nil{
+							//临时改变--具体值
+							if info.area=="全国" &&dict["area"]!="" {
+								info.area = dict["area"]
+								info.city = dict["city"]
+							}else {
+								if info.city=="" &&dict["city"]!="" {
+									info.area = dict["area"]
+									info.city = dict["city"]
+								}
+							}
+						}
+					}
 
-				//if v.agency != "" && info.agency != "" && v.agency != info.agency {
-				//	continue
-				//}
-
-				//指定该范围内数据判重  jsondata
-				if info.agency=="" {
-					if v.agency=="" {
+					//前置条件
+					if info.titleSpecialWord&&info.title!=v.title&&v.title!="" {
 						continue
 					}
-				}
 
-				//是否走站内判重
-				if info.site != "" && v.site == info.site {
-					//独有判重... 待定  jsondata配置
-				}
-				//类型分组-相同类型继续
-				if info.subtype==v.subtype {
+					if info.buyer != "" &&v.buyer == info.buyer {
+						//满足标题
+						if len([]rune(v.title)) >= 10 && len([]rune(info.title)) >= 10 && v.title != info.title && (info.specialWord || v.specialWord) {
+							continue
+						}
+					}
 					//代理机构相同-非空相等
 					if v.agency != "" && info.agency != "" && v.agency == info.agency {
-						reason = fmt.Sprintf(reason,"同机构,")
+						reason = reason + "同机构,"
 						if info.agency=="" {
-							reason = fmt.Sprintf(reason,"指定范围,")
+							reason = reason + "指定范围,"
 							//指定该范围内数据判重  jsondata
 							if v.agency=="" {
 								continue
 							}
 						}else {
-							reason = fmt.Sprintf(reason,"非指定范围,")
+							reason = reason + "非指定范围,"
 							if quickHeavyMethodTwo(v,info) {
 								b = true
 								source = v
@@ -200,15 +323,15 @@ L:
 							}
 						}
 					}else {
-						reason = fmt.Sprintf(reason,"非同机构,")
+						reason = reason + "非同机构,"
 						if info.agency=="" {
-							reason = fmt.Sprintf(reason,"指定范围,")
+							reason = reason + "指定范围,"
 							//指定该范围内数据判重  jsondata
 							if v.agency=="" {
 								continue
 							}
 						}else {
-							reason = fmt.Sprintf(reason,"非指定范围,")
+							reason = reason + "非指定范围,"
 							if quickHeavyMethodOne(v,info) {
 								b = true
 								source = v
@@ -244,7 +367,118 @@ L:
 	return
 }
 
+func (h *historymap) check(info *Info) (b bool,  source *Info ,reasons string ,) {
+	reason = ""
+	h.lock.Lock()
+	defer h.lock.Unlock()
+	keys := []string{}
+	//不同时间段
+	for k, _ := range h.keys {
+		//...代码
+		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
+		if info.area != "全国" { //这个后续可以不要
+			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
+		}
+	}
+L:
+	for _, k := range keys {
+		data := h.data[k]
+		if len(data) > 0 { //对比v   找到同类型,同省或全国的数据作对比
+			for _, v := range data {
+				if v.id == info.id {//正常重复
+					return false, v,""
+				}
+				if math.Abs(qutil.Float64All(v.accurateTime-info.accurateTime)) > datelimit {
+					continue   //是否为5天内数据
+				}
+				//类型分组-相同类型继续
+				if info.subtype==v.subtype {
+					//代理机构相同-非空相等
+					if v.agency != "" && info.agency != "" && v.agency == info.agency {
+						reason = reason + "同机构,"
+						if info.agency=="" {
+							reason = reason + "指定范围,"
+							//指定该范围内数据判重  jsondata
+							if v.agency=="" {
+								continue
+							}
+						}else {
+							reason = reason + "非指定范围,"
+							if quickHeavyMethodTwo(v,info) {
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}
+						}
+					}else {
+						reason = reason + "非同机构,"
+						if info.agency=="" {
+							reason = reason + "指定范围,"
+							//指定该范围内数据判重  jsondata
+							if v.agency=="" {
+								continue
+							}
+						}else {
+							reason = reason + "非指定范围,"
+							if quickHeavyMethodOne(v,info) {
+								b = true
+								source = v
+								reasons = reason
+								break L
+							}
+						}
+					}
+				}
+			}
+		}
+	}
 
+	//
+	if b {
+		//判重
+		if info.repeatid==source.id {
+			//重复-无变化-不处理
+			b = false
+		}else {
+			if source.id !="" {
+				//重复-有变化-覆盖记录处理
+			}
+		}
+	}else {
+		reason = ""
+		if source.repeatid!=""{
+			//未判重-有变化--记录
+			b = true
+			reason = "未判重记录"
+		}
+	}
+
+
+	//往预存数据 d 添加
+	if !b {
+		ct, _ := strconv.ParseInt(info.id[:8], 16, 64)
+		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
+		data := h.data[k]
+		if data == nil {
+			data = []*Info{info}
+			h.data[k] = data
+			if !h.keys[dkey] {
+				h.keys[dkey] = true
+				//h.update(ct)
+			}
+		} else {
+			data = append(data, info)
+			h.data[k] = data
+		}
+	}
+	return
+}
+
+
+
+//替换原始数据池
 func (d *datamap) replaceSourceData(replaceData *Info , replaceId string) {
 	ct, _ := strconv.ParseInt(replaceId[:8], 16, 64)
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
@@ -265,15 +499,40 @@ func (d *datamap) replaceSourceData(replaceData *Info , replaceId string) {
 				break
 			}
 		}
-
-
-
 		d.data[k] = data
 	}
+}
+
+func (h *historymap) replaceSourceData(replaceData *Info , replaceId string) {
+	ct, _ := strconv.ParseInt(replaceId[:8], 16, 64)
+	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+	k := fmt.Sprintf("%s_%s_%s", dkey, replaceData.subtype, replaceData.area)
+	data := h.data[k]
+	if data == nil {
+		data = []*Info{replaceData}
+		h.data[k] = data
+		if !h.keys[dkey] {
+			h.keys[dkey] = true
+			//h.update(ct)
+		}
+	} else {
+		//遍历替换
+		for k,v:=range data{
+			if v.id==replaceId{
+				data[k] = replaceData
+				break
+			}
+		}
 
+		h.data[k] = data
+	}
 }
 
 
+
+
+
+//以下为判重   -   一揽子的方法
 //判重方法1
 func quickHeavyMethodOne(v *Info ,info *Info) bool {
 
@@ -493,6 +752,12 @@ func tenderRepeat_C(v *Info ,info *Info) bool {
 	if v.agencyaddr!=""&&info.agencyaddr!=""&&v.agencyaddr!=info.agencyaddr {
 		return true
 	}
+	if info.specialWord||v.specialWord||info.titleSpecialWord||v.titleSpecialWord{
+		return true
+	}
+
+
+
 	return false
 }
 
@@ -586,6 +851,9 @@ func winningRepeat_C(v *Info ,info *Info) bool {
 	}
 	//原始地址...
 
+	if info.specialWord||v.specialWord||info.titleSpecialWord||v.titleSpecialWord{
+		return true
+	}
 
 	return false
 }
@@ -656,6 +924,13 @@ func (d *datamap) update(t int64) {
 	//log.Println("更新前后数据:", all, all1)
 }
 
+
+
+
+
+
+
+
 func (d *datamap) GetLatelyFiveDay(t int64) []string {
 	array := make([]string, d.days)
 	now := time.Unix(t, 0)

+ 338 - 55
udpfilterdup/src/main.go

@@ -18,11 +18,14 @@ import (
 	"time"
 )
 
+
+
+
 var (
 	Sysconfig    map[string]interface{} //配置文件
 	mconf        map[string]interface{} //mongodb配置信息
 	mgo          *mongodb.MongodbSim    //mongodb操作对象
-
+	siteMgo             *mongodb.MongodbSim
 	//mgoTest          *mongodb.MongodbSim    //mongodb操作对象
 
 	extract      string
@@ -31,18 +34,21 @@ var (
 	udpclient    mu.UdpClient             //udp对象
 	nextNode     []map[string]interface{} //下节点数组
 	dupdays      = 5                      //初始化判重范围
-	DM           *datamap                 //判重数据
-	lastid       = "5da3f2c5a5cb26b9b79847fe"
+	DM           *datamap                 //
+	HM           *historymap                 //判重数据
+	lastid       = "5d767728a5cb26b9b7748868"
 	//5da3f2c5a5cb26b9b79847fc
-
+	//ObjectId("5d767728a5cb26b9b7748868")
+	//5da3f2c5a5cb26b9b79847fe
 	//正则筛选相关
 	FilterRegTitle = regexp.MustCompile("^_$")
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
 
-	siteArr     []map[string]interface{} //站点
-	inV_n int   //无效数据数量
+
+
+	SiteMap  map[string]interface{} //站点map
 )
 
 func init() {
@@ -51,7 +57,6 @@ func init() {
 	//172.17.145.163:27080
 	util.ReadConfig(&Sysconfig)
 	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
-	siteArr = util.ObjArrToMapArr(Sysconfig["site"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
 
 	mgo = &mongodb.MongodbSim{
@@ -65,12 +70,12 @@ func init() {
 	mgo.InitPool()
 
 
-	//测试临时注释
+	//测试临时注释
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
-	fmt.Println(DM.keys)
-	fmt.Println(DM.data)
+	//fmt.Println(DM.keys)
+	//fmt.Println(DM.data)
 	FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
@@ -78,15 +83,34 @@ func init() {
 
 
 
-	//数据库
-	//mongodb.InitMongodbPool(5, "192.168.3.207:27081", "")
+	//站点相关数据库
+	mongodb.InitMongodbPool(5, "192.168.3.207:27082", "")
 
-	//mgoTest = &mongodb.MongodbSim{
-	//	MongodbAddr: "192.168.3.207:27081",
-	//	Size:        5,
-	//	DbName:      "qfw",
-	//}
-	//mgoTest.InitPool()
+	siteMgo = &mongodb.MongodbSim{
+		MongodbAddr: "192.168.3.207:27082",
+		Size:        5,
+		DbName:      "zhaolongyue",
+	}
+	siteMgo.InitPool()
+
+
+	SiteMap = make(map[string]interface{},0)
+
+	start := int(time.Now().Unix())
+	//站点配置
+	sess_site := siteMgo.GetMgoConn()
+	defer sess_site.Close()
+	res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter()
+	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
+			data_map := map[string]string{
+				"area":util.ObjToString(site_dict["area"]),
+				"city":util.ObjToString(site_dict["city"]),
+				"district":util.ObjToString(site_dict["district"]),
+			}
+		SiteMap[site_dict["site"].(string)]= data_map
+	}
+	
+	fmt.Printf("用时:%d秒,%d个",int(time.Now().Unix())-start,len(SiteMap))
 
 
 }
@@ -190,7 +214,7 @@ func mainTest()  {
 		}
 
 	}
-	//打印 1:0情况    66989
+	//打印 1:0情况    66989
 	mm:=0
 	for _,v:=range arr1 {
 		mm++
@@ -224,6 +248,7 @@ func mainTest()  {
 
 
 func main() {
+
 	go checkMapJob()
 
 	updport := Sysconfig["udpport"].(string)
@@ -247,9 +272,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		} else if mapInfo != nil {
 
 			//更新流程
-
-
-
+			//go historyTask(data,mapInfo)
 
 
 			//判重流程
@@ -272,7 +295,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 
-
 	//
 	fmt.Println("开始判重")
 	defer util.Catch()
@@ -308,7 +330,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 
 			//是否为无效数据
 			if invalidData(info.buyer,info.projectname,info.projectcode) {
-				inV_n++
 				mapLock.Lock()
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
@@ -329,15 +350,15 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			}else  {
 				//判重原因 reason
 				// tmp["_id"] 对比id   id原始id
+				mapLock.Lock()
 				b, source,reason := DM.check(info)
 				if b { //有重复,生成更新语句,更新抽取和更新招标
 					repeateN++
-					mapLock.Lock()
-
 					var mergeArr []int64 	//更改合并数组记录
 					var newData *Info		//更换新的数据池数据
 
 					var id_map  = map[string]interface{}{}
+					repeat_id := ""
 					//合并操作--评功权重打分-合并完替换原始数据池
 					basic_bool := basicDataScore(source,info)
 					if basic_bool {
@@ -345,48 +366,55 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						newData,mergeArr= mergeDataFields(source,info)
 						DM.replaceSourceData(newData,source.id) //替换
 						id_map["_id"]= util.StringTOBsonId(source.id)
-
+						repeat_id = source.id
 						//对比的数据打判重标签
-						updateExtract = append(updateExtract, []map[string]interface{}{
-							map[string]interface{}{
-								"_id": tmp["_id"],
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":   1,
-									"repeatid": source.id,
-								},
-							},
-						})
-
-
+						//updateExtract = append(updateExtract, []map[string]interface{}{
+						//	map[string]interface{}{
+						//		"_id": tmp["_id"],
+						//	},
+						//	map[string]interface{}{
+						//		"$set": map[string]interface{}{
+						//			"repeat":   1,
+						//			"repeatid": source.id,
+						//		},
+						//	},
+						//})
+						//if len(updateExtract) > 500 {
+						//	mgo.UpdateBulk(extract, updateExtract...)
+						//	updateExtract = [][]map[string]interface{}{}
+						//}
 
 					}else {
 						//已对比数据为标准 ,数据池的数据打判重标签
 						newData,mergeArr= mergeDataFields(info,source)
 						DM.replaceSourceData(newData,source.id)//替换
 						id_map["_id"]= util.StringTOBsonId(info.id)
-
+						repeat_id = info.id
 						//数据池的数据打判重标签
-						updateExtract = append(updateExtract, []map[string]interface{}{
-							map[string]interface{}{
-								"_id": util.StringTOBsonId(source.id),
-							},
-							map[string]interface{}{
-								"$set": map[string]interface{}{
-									"repeat":   1,
-									"repeatid": info.id,
-								},
-							},
-						})
+						//updateExtract = append(updateExtract, []map[string]interface{}{
+						//	map[string]interface{}{
+						//		"_id": util.StringTOBsonId(source.id),
+						//	},
+						//	map[string]interface{}{
+						//		"$set": map[string]interface{}{
+						//			"repeat":   1,
+						//			"repeatid": info.id,
+						//		},
+						//	},
+						//})
+						//
+						//if len(updateExtract) > 500 {
+						//	mgo.UpdateBulk(extract, updateExtract...)
+						//	updateExtract = [][]map[string]interface{}{}
+						//}
 
 					}
-
-
 					//
 					var update_map  = map[string]interface{}{
 						"$set": map[string]interface{}{
 							"reason":reason,
+							"repeat":"1",
+							"repeatid":repeat_id,
 							"merge":newData.mergemap,
 						},
 					}
@@ -451,6 +479,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 
 				} else {
 					//IS.Add("new")
+					mapLock.Unlock()
 				}
 			}
 		}(tmp)
@@ -461,7 +490,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		mgo.UpdateBulk(extract, updateExtract...)
 		//mgo.UpdateBulk(bidding, updateBidding...)
 	}
-	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"],"无效数据:",inV_n)
+	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
 
 	//任务完成,开始发送广播通知下面节点
 	if n > repeateN && mapInfo["stop"] == nil {
@@ -486,6 +515,260 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 }
 
+//支持历史更新
+func historyTask(data []byte, mapInfo map[string]interface{}) {
+
+	fmt.Println("开始取历史时间段")
+	defer util.Catch()
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	minTime,maxTime:=int64(0),int64(0)
+	for tmp := make(map[string]interface{}); it.Next(&tmp);{
+		//取出最大最小时间
+		if minTime==0||maxTime ==0 {
+			minTime = util.Int64All(tmp["comeintime"])
+			maxTime = util.Int64All(tmp["comeintime"])
+		}else {
+			t := util.Int64All(tmp["comeintime"])
+			if t<minTime&&t!=0 {
+				minTime = t
+			}
+			if t>maxTime&&t!=0 {
+				maxTime = t
+			}
+		}
+	}
+	fmt.Println(minTime,maxTime)
+
+	HM = NewHistorymap(util.ObjToString(mapInfo["gtid"]),
+		util.ObjToString(mapInfo["lteid"]),minTime,maxTime)
+
+
+	//开始判重...
+	defer util.Catch()
+	sess_task := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_task)
+	q_task := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	it_task := sess.DB(mgo.DbName).C(extract).Find(&q_task).Iter()
+	updateExtract := [][]map[string]interface{}{}
+	pool := make(chan bool, 16)
+	wg := &sync.WaitGroup{}
+	mapLock := &sync.Mutex{}
+	n, repeateN := 0, 0
+
+	for tmp := make(map[string]interface{}); it_task.Next(&tmp); n++ {
+
+		if n%10000 == 0 {
+			log.Println("current:", n, tmp["_id"],"repeateN:",repeateN)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			info := NewInfo(tmp)
+
+			//是否为无效数据
+			if invalidData(info.buyer,info.projectname,info.projectcode) {
+				mapLock.Lock()
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmp["_id"],
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":   -1,
+						},
+					},
+				})
+
+				if len(updateExtract) > 500 {
+					mgo.UpdateBulk(extract, updateExtract...)
+					updateExtract = [][]map[string]interface{}{}
+				}
+				mapLock.Unlock()
+			}else  {
+				b, source,reason := HM.check(info)
+				if b { //有重复,生成更新语句,更新抽取和更新招标
+
+					if reason == "未判重记录" {
+						//把info的数据判重的标签更换,并新增字段
+						mapLock.Lock()
+						//构建数据库更新用到的
+						//对比的数据打判重标签
+						DM.replaceSourceData(info,info.id) //替换即添加
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": map[string]interface{}{
+									"repeat":   0,
+									"repeatid": "-1",
+								},
+							},
+						})
+						if len(updateExtract) > 500 {
+							mgo.UpdateBulk(extract, updateExtract...)
+							updateExtract = [][]map[string]interface{}{}
+						}
+						mapLock.Unlock()
+					}else {
+						repeateN++
+						mapLock.Lock()
+
+						var mergeArr []int64 	//更改合并数组记录
+						var newData *Info		//更换新的数据池数据
+
+						var id_map  = map[string]interface{}{}
+						//合并操作--评功权重打分-合并完替换原始数据池
+						basic_bool := basicDataScore(source,info)
+						if basic_bool {
+							//已原始数据为标准-对比数据打判重标签
+							newData,mergeArr= mergeDataFields(source,info)
+							DM.replaceSourceData(newData,source.id) //替换
+							id_map["_id"]= util.StringTOBsonId(source.id)
+
+							//对比的数据打判重标签
+							updateExtract = append(updateExtract, []map[string]interface{}{
+								map[string]interface{}{
+									"_id": tmp["_id"],
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat":   1,
+										"repeatid": source.id,
+									},
+								},
+							})
+
+
+
+						}else {
+							//已对比数据为标准 ,数据池的数据打判重标签
+							newData,mergeArr= mergeDataFields(info,source)
+							DM.replaceSourceData(newData,source.id)//替换
+							id_map["_id"]= util.StringTOBsonId(info.id)
+
+							//数据池的数据打判重标签
+							updateExtract = append(updateExtract, []map[string]interface{}{
+								map[string]interface{}{
+									"_id": util.StringTOBsonId(source.id),
+								},
+								map[string]interface{}{
+									"$set": map[string]interface{}{
+										"repeat":   1,
+										"repeatid": info.id,
+									},
+								},
+							})
+
+						}
+
+
+						//
+						var update_map  = map[string]interface{}{
+							"$set": map[string]interface{}{
+								"reason":reason,
+								"merge":newData.mergemap,
+							},
+						}
+						//更新合并后的数据
+						for _,value :=range mergeArr {
+							if value==1 {
+								update_map["$set"].(map[string]interface{})["area"] = newData.area
+								update_map["$set"].(map[string]interface{})["city"] = newData.city
+							}else if value==2 {
+								update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
+							}else if value==3 {
+								update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
+							}else if value==4 {
+								update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
+							}else if value==5 {
+								update_map["$set"].(map[string]interface{})["budget"] = newData.budget
+							}else if value==6 {
+								update_map["$set"].(map[string]interface{})["winner"] = newData.winner
+							}else if value==7 {
+								update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
+							}else if value==8 {
+								update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
+							}else {
+
+							}
+						}
+
+						//构建数据库更新用到的
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							id_map,
+							update_map,
+						})
+						if len(updateExtract) > 500 {
+							mgo.UpdateBulk(extract, updateExtract...)
+							updateExtract = [][]map[string]interface{}{}
+						}
+						mapLock.Unlock()
+					}
+				} else {
+					//IS.Add("new")
+				}
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	if len(updateExtract) > 0 {
+		mgo.UpdateBulk(extract, updateExtract...)
+		//mgo.UpdateBulk(bidding, updateBidding...)
+	}
+	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
+
+
+
+	//任务完成,开始发送广播通知下面节点
+	if n > repeateN &&mapInfo["stop"] == nil {
+		for _, to := range nextNode {
+			sid, _ := mapInfo["gtid"].(string)
+			eid, _ := mapInfo["lteid"].(string)
+			key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  sid,
+				"lteid": eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+
+
+
+
+
+
 
 //合并字段
 func mergeDataFields(source *Info, info *Info) (*Info,[]int64){

+ 8 - 2
udps/main.go

@@ -27,8 +27,14 @@ func main() {
 	5da3f2c5a5cb26b9b79847fc
 	5db2735ba5cb26b9b7c99c6f   76万
 	*/
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
+
+	/*
+		9W
+	5d767728a5cb26b9b7748868
+	ObjectId("5d77c881a5cb26b9b7de209d")
+	*/
+	flag.StringVar(&sid, "sid", "5d767728a5cb26b9b7748868", "开始id")
+	flag.StringVar(&eid, "eid", "5d77c881a5cb26b9b7de209d", "结束id")
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")