瀏覽代碼

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

# Conflicts:
#	udpfilterdup/src/config.json
#	udpfilterdup/src/main.go
合并
apple 5 年之前
父節點
當前提交
404bff1b43

+ 4 - 2
fullproject/src_v1/init.go

@@ -294,14 +294,16 @@ type ProjectInfo struct {
 	Buyerclass  string                 `json:"buyerclass"`            //采购单位分类
 	Bidopentime int64                  `json:"bidopentime,omitempty"` //开标时间
 	//	Zbtime        int64                  `json:"zbtime"`        //招标时间
-	//	Jgtime        int64                  `json:"jgtime"`        //结果中标时间
+	Jgtime        int64              `json:"jgtime"`        //结果中标时间
 	Bidamount float64 `json:"bidamount,omitempty"` //中标金额
 	Budget    float64 `json:"budget,omitempty"`    //预算
 	//Winnerorder []string `json:"winnerorder"` //中标候选人
 	score         int
 	comStr        string
 	resVal, pjVal int
-	InfoFiled	map[string]InfoField	`json:"infofiled"`		//逻辑处理需要的info字段
+	InfoFiled	map[string]InfoField	`json:"infofiled"`			//逻辑处理需要的info字段
+	Budgettag		int					`json:"budgettag"`			//预算是否有效标记
+	Bidamounttag	int					`json:"bidamounttag"`		//中标金额是否有效标记
 }
 
 //存储部分招标信息字段,业务逻辑处理需要

+ 159 - 128
fullproject/src_v1/project.go

@@ -3,13 +3,13 @@ package main
 import (
 	"encoding/json"
 	"log"
+	"time"
+
 	//	"log"
 	"math"
 	qu "qfw/util"
 	"sort"
 	"strings"
-	"time"
-
 	//"gopkg.in/mgo.v2/bson"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
@@ -456,37 +456,11 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 			set[f] = tmp[f]
 		}
 	}
-	if tmp["budget"] != nil {
-		set["budget"] = thisinfo.Budget
-	}
-	if tmp["bidamount"] != nil {
-		set["bidamount"] = thisinfo.Bidamount
-	}
 	bidopentime := qu.Int64All(tmp["bidopentime"])
 	if bidopentime > 0 {
 		set["bidopentime"] = bidopentime
 	}
-	if thisinfo.ProjectName != "" {
-		set["s_projectname"] = tmp["projectname"]
-	}
-	now := time.Now().Unix()
-	set["createtime"] = now
-	set["sourceinfoid"] = thisinfo.Id
-	set["sourceinfourl"] = tmp["href"]
-	set["firsttime"] = tmp["publishtime"]
-	set["lasttime"] = tmp["publishtime"]
-	set["pici"] = p.pici
-	set["ids"] = []string{thisinfo.Id}
-	if thisinfo.TopType == "招标" {
-		set["zbtime"] = tmp["publishtime"]
-	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
-		set["jgtime"] = tmp["publishtime"]
-	}
-	//废标、流标   处理时间
-	if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
-		set["zbtime"] = int64(0)
-		set["bidopentime"] = int64(0)
-	}
+
 	//异常标记
 	if thisinfo.TopType != "招标" && thisinfo.TopType != "拟建" && thisinfo.TopType != "预告" {
 		set["exception"] = 1
@@ -508,12 +482,14 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	set["bidtype"] = bidtype[bs]
 	if bt == "招标" {
 		set["projectscope"] = qu.ObjToString(tmp["projectscope"])
-		set["bidstatus"] = bs
+		set["bidstatus"] = bt
 	}else {
 		if bidstatus[bs] != "" {
 			set["bidstatus"] = thisinfo.SubType
 		} else if tmp["infoformat"] == 2 {
 			set["bidstatus"] = "拟建"
+		}else if bs == "" {
+			set["bidstatus"] =  ""
 		} else {
 			set["bidstatus"] = "其它"
 		}
@@ -522,10 +498,45 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 
 	pkg := PackageFormat(thisinfo, nil)
 	p1 := p.NewCachePinfo(pId, thisinfo, bt, pkg)
+
+	now := time.Now().Unix()
+	set["createtime"] = now
+	set["sourceinfoid"] = thisinfo.Id
+	set["sourceinfourl"] = tmp["href"]
+	set["firsttime"] = tmp["publishtime"]
+	set["lasttime"] = tmp["publishtime"]
+	set["pici"] = p.pici
+	set["ids"] = []string{thisinfo.Id}
+	if thisinfo.TopType == "招标" {
+		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" {
+			set["zbtime"] = tmp["publishtime"]
+		}
+	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
+		set["jgtime"] = tmp["publishtime"]
+		p1.Jgtime = thisinfo.Publishtime
+	}
+
 	if len(thisinfo.Subscopeclass) > 0 {
-		s_subscopeclass := strings.Join(thisinfo.Subscopeclass, ",")
-		set["s_subscopeclass"] = s_subscopeclass
+		set["s_subscopeclass"] = strings.Join(thisinfo.Subscopeclass, ",")
 	}
+
+	if tmp["budget"] != nil && tmp["budget"] != "" {
+		set["budget"] = thisinfo.Budget
+		p1.Budgettag = 0
+		set["budgettag"] = 0
+	}else {
+		p1.Budgettag = 1
+		set["budgettag"] = 1
+	}
+	if tmp["bidamount"] != nil && tmp["bidamount"] != "" {
+		set["bidamount"] = thisinfo.Bidamount
+		p1.Bidamounttag = 0
+		set["bidamounttag"] = 0
+	}else {
+		p1.Bidamounttag = 1
+		set["bidamounttag"] = 1
+	}
+
 	if len(thisinfo.Winners) > 0 {
 		set["s_winner"] = strings.Join(thisinfo.Winners, ",")
 		p1.Winners = thisinfo.Winners
@@ -537,6 +548,10 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 		set["multipackage"] = 0
 	}
 
+	if thisinfo.Buyer == "" {
+		set["buyerclass"] = ""
+	}
+
 	p1.InfoFiled = make(map[string]InfoField)
 	infofiled := InfoField{
 		Budget:       thisinfo.Budget,
@@ -596,6 +611,7 @@ var INFOFIELDS = []string{
 	"buyertel",
 	"area",
 	"city",
+	"district",
 	"spidercode",
 	"site",
 }
@@ -658,22 +674,35 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	}
 	set := map[string]interface{}{}
 	pInfo.Ids = append(pInfo.Ids, thisinfo.Id)
-
-	//1--firsttime
-	if thisinfo.Publishtime < pInfo.FirstTime && thisinfo.Publishtime > 0 {
-		pInfo.FirstTime = thisinfo.Publishtime
-		set["firsttime"] = thisinfo.Publishtime
-		if thisinfo.TopType == "招标" {
-			set["zbtime"] = tmp["publishtime"]
-		}
+	if len(pInfo.Ids) > 30 {
+		//异常标记
+		set["listtag"] = 1
 	}
-	//2--lasttime
+
+	//zbtime、lasttime、jgtime
 	pInfo.LastTime = thisinfo.Publishtime
 	set["lasttime"] = thisinfo.Publishtime
 	if thisinfo.TopType == "招标" {
-		set["zbtime"] = tmp["publishtime"]
-	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
-		set["jgtime"] = tmp["publishtime"]
+		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" {
+			set["zbtime"] = tmp["publishtime"]
+		}
+		if pInfo.Jgtime > 0 {
+			pInfo.Jgtime = int64(0)
+			set["jgtime"] = int64(0)
+		}
+	}else if thisinfo.TopType == "结果" {
+		pInfo.Jgtime = thisinfo.Publishtime
+		set["jgtime"] = thisinfo.Publishtime
+	} else if thisinfo.SubType == "合同" {
+		if pInfo.Jgtime <= 0 {
+			set["jgtime"] = tmp["publishtime"]
+			pInfo.Jgtime = thisinfo.Publishtime
+		}
+	}
+
+	if thisinfo.Bidopentime > pInfo.Bidopentime {
+		pInfo.Bidopentime = thisinfo.Bidopentime
+		set["bidopentime"] = pInfo.Bidopentime
 	}
 
 	bt := qu.ObjToString(tmp["toptype"])
@@ -687,30 +716,25 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		if tmp["projectscope"] != nil {
 			set["projectscope"] = qu.ObjToString(tmp["projectscope"])
 		}
-		set["bidstatus"] = bs
+		set["bidstatus"] = bt
+		pInfo.Bidstatus = bt
 	}else {
 		if bidstatus[bs] != "" {
 			set["bidstatus"] = thisinfo.SubType
+			pInfo.Bidstatus = thisinfo.SubType
 		} else if tmp["infoformat"] == 2 {
 			set["bidstatus"] = "拟建"
-		} else {
+			pInfo.Bidstatus = "拟建"
+		}else if bs == "" {
+			set["bidstatus"] =  ""
+			pInfo.Bidstatus = ""
+		}else {
 			set["bidstatus"] = "其它"
+			pInfo.Bidstatus =  "其它"
 		}
 	}
 	p.mapBidLock.Unlock()
 
-	//废标、流标   处理时间
-	if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
-		pInfo.FirstTime = thisinfo.Publishtime
-		pInfo.Bidopentime = int64(0)
-		pInfo.LastTime = thisinfo.Publishtime
-
-		set["firsttime"] = thisinfo.Publishtime
-		set["zbtime"] = int64(0)
-		set["publishtime"] = thisinfo.Publishtime
-		set["bidopentime"] = int64(0)
-	}
-
 	//异常标记
 	if ex > 0 {
 		set["exception"] = ex
@@ -775,11 +799,6 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["buyertel"] = pInfo.Buyertel
 	}
 
-	if thisinfo.Bidopentime > pInfo.Bidopentime {
-		pInfo.Bidopentime = thisinfo.Bidopentime
-		set["bidopentime"] = pInfo.Bidopentime
-	}
-
 	if len(thisinfo.Topscopeclass) > 0 {
 		sort.Strings(pInfo.Topscopeclass)
 		for _, k := range thisinfo.Topscopeclass {
@@ -805,7 +824,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	//winner
 	if len(thisinfo.Winners) > 0 {
 		if len(pInfo.Winners) <= 0 {
-			set["winner"] = tmp["winner"].(string)
+			set["winner"] = qu.ObjToString(tmp["winner"])
 		}
 
 		sort.Strings(pInfo.Winners)
@@ -826,9 +845,20 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["multipackage"] = 0
 	}
 	//处理多包后,计算预算金额、中标金额
-	CountAmount(pInfo, thisinfo)
-	set["budget"] = pInfo.Budget
-	set["bidamount"] = pInfo.Bidamount
+	CountAmount(pInfo, thisinfo, tmp)
+	if pInfo.Budget >= 0 && pInfo.Budgettag != 1 {
+		set["budget"] = pInfo.Budget
+		set["budgettag"] = 0
+	}else {
+		set["budgettag"] = 1
+	}
+	if pInfo.Bidamount >= 0 && pInfo.Bidamounttag != 1 {
+		set["bidamount"] = pInfo.Bidamount
+		set["bidamounttag"] = 0
+	}else {
+		set["bidamounttag"] = 1
+	}
+
 	infofiled := InfoField{
 		Budget:       thisinfo.Budget,
 		Bidamount:    thisinfo.Bidamount,
@@ -887,7 +917,9 @@ func (p *ProjectTask) CompareStatus(project *ProjectInfo, info *Info) (bool, int
 	if info.TopType == "拟建" || info.TopType == "预告" || info.TopType == "招标" {
 		if project.Bidstatus == "拟建" || project.Bidstatus == "预告" || project.Bidstatus == "招标" {
 			return false, 0
-		} else {
+		}else if project.Bidstatus == "废标" || project.Bidstatus == "流标" {
+			return false, 0
+		}else {
 			return true, 0
 		}
 	} else if info.TopType == "结果" {
@@ -926,10 +958,10 @@ func ComparePlace(project *ProjectInfo, info *Info) bool {
 		return false
 	}
 	if info.Area == project.Area {
-		if info.City == "" {
+		if info.City == "" || project.City == "" {
 			return false
 		} else if info.City == project.City {
-			if info.District == "" || info.District == project.District {
+			if project.District == "" || info.District == "" || info.District == project.District {
 				return false
 			} else {
 				return true
@@ -1013,28 +1045,34 @@ func PackageFormat(info *Info, project *ProjectInfo) map[string]interface{} {
 }
 
 //计算预算(budget)、中标金额(bidamount)
-func CountAmount(project *ProjectInfo, info *Info) {
+func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 	if info.HasPackage {
 		budget := 0.0
 		for _, v := range project.Package{
 			v1, _ := v.([]map[string]interface{})
 			for _, v2 := range v1{
-				b1 := qu.Float64All(v2["budget"])
-				if b1 > 0 {
-					budget = budget + b1
-					break
+				if v2["budget"] != nil {
+					b1 := qu.Float64All(v2["budget"])
+					if b1 > 0 {
+						budget = budget + b1
+						break
+					}
+				}else {
+					project.Budgettag = 1
 				}
+
 			}
 		}
-		if budget == 0 && info.Budget > 0 {
-			budget = info.Budget
-		}
 		if budget > 0 {
 			project.Budget = budget
+			project.Budgettag = 0
+		}else if budget == 0 && info.Budget > 0 {
+			project.Budget = info.Budget
+			project.Budgettag = 0
 		}
 	}else {
 		//招标没有多包
-		k := KeyPackage.FindStringSubmatch(project.ProjectName)
+		k := KeyPackage.FindStringSubmatch(info.ProjectName)
 		if len(k) > 0 {
 			//招标是单包
 			if len(project.Package) > 0 {
@@ -1048,32 +1086,26 @@ func CountAmount(project *ProjectInfo, info *Info) {
 				}
 				if !flag {
 					project.Budget = project.Budget + info.Budget
+					project.Budgettag = 0
 				}
 			}else {
 				//项目没有多包
 				if info.Budget > 0 {
 					project.Budget = project.Budget + info.Budget
+					project.Budgettag = 0
+				}else if info.Budget == 0 && tmp["budget"] != nil {
+					project.Budgettag = 0
+				}else {
+					project.Budgettag = 1
 				}
 			}
 		}else {
-			//招标不是单包
-			//flag := false
-			//if project.InfoFiled != nil && len(project.InfoFiled) > 0 {
-			//	for _, res := range project.InfoFiled {
-			//		if res.ProjectName == info.ProjectName {
-			//			if res.Budget < info.Budget {
-			//				project.Budget = project.Budget - res.Budget + info.Budget
-			//			}
-			//			flag = true
-			//			break
-			//		}
-			//	}
-			//	if !flag {
-			//		project.Budget = project.Budget + info.Budget
-			//	}
-			//}
 			if project.Budget < info.Budget {
 				project.Budget = info.Budget
+				project.Budgettag = 0
+			}
+			if tmp["budget"] == nil {
+				project.Budgettag = 1
 			}
 		}
 	}
@@ -1083,17 +1115,27 @@ func CountAmount(project *ProjectInfo, info *Info) {
 			for _, v := range project.Package{
 				v1, _ := v.([]map[string]interface{})
 				for _, v2 := range v1{
-					b1 := qu.Float64All(v2["bidamount"])
-					if b1 > 0 {
-						bidamount = bidamount + b1
-						break
+					if tmp["bidamount"] != nil {
+						b1 := qu.Float64All(v2["bidamount"])
+						if b1 > 0 {
+							bidamount = bidamount + b1
+							break
+						}
+					}else {
+						project.Bidamount = 1
 					}
 				}
 			}
-			project.Bidamount = bidamount
+			if bidamount > 0 {
+				project.Bidamount = bidamount
+				project.Bidamounttag = 0
+			}else if bidamount == 0 && info.Budget > 0 {
+				project.Bidamount = info.Bidamount
+				project.Bidamounttag = 0
+			}
 		}else {
 			//招标没有多包
-			k := KeyPackage.FindStringSubmatch(project.ProjectName)
+			k := KeyPackage.FindStringSubmatch(info.ProjectName)
 			if len(k) > 0 {
 				//招标是单包
 				if len(project.Package) > 0 {
@@ -1101,49 +1143,30 @@ func CountAmount(project *ProjectInfo, info *Info) {
 					flag := false
 					for _, v := range project.Package{
 						v1, _ := v.([]map[string]interface{})
-						if len(v1) > 0 {
+						if len(v1) > 0 && v1[0]["name"] == info.ProjectName {
 							flag = true
 						}
 					}
 					if !flag {
 						project.Bidamount = project.Bidamount + info.Bidamount
+						project.Bidamounttag = 0
 					}
 				}else {
 					//项目没有多包
 					if info.Bidamount > 0 {
 						project.Bidamount = project.Bidamount + info.Bidamount
+						project.Bidamounttag = 0
+					}else if info.Bidamount == 0 && tmp["bidamount"] != nil {
+						project.Bidamounttag = 0
+					}else {
+						project.Bidamounttag = 1
 					}
 				}
 			}else {
-				//招标不是单包
-				//flag := false
-				//if project.InfoFiled != nil && len(project.InfoFiled) > 0 {
-				//	for _, res := range project.InfoFiled {
-				//		if res.Bidstatus == "合同" && res.ContractCode != "" && info.SubType == "合同" && info.ContractCode != "" {
-				//			if res.ContractCode == info.ContractCode {
-				//				if res.Bidamount < info.Bidamount {
-				//					project.Bidamount = project.Bidamount - res.Bidamount + info.Bidamount
-				//				}
-				//				flag = true
-				//				break
-				//			}
-				//		}else {
-				//			if res.ProjectName == info.ProjectName {
-				//				if res.Bidamount < info.Bidamount {
-				//					project.Bidamount = project.Bidamount - res.Bidamount + info.Bidamount
-				//				}
-				//				flag = true
-				//				break
-				//			}
-				//		}
-				//	}
-				//	if !flag {
-				//		project.Bidamount = project.Bidamount + info.Bidamount
-				//	}
-				//}
 				if info.SubType == "中标" || info.SubType == "成交" {
 					if project.Bidamount < info.Bidamount {
 						project.Bidamount = info.Bidamount
+						project.Bidamounttag = 0
 					}else {
 						flag := false
 						if project.InfoFiled != nil && len(project.InfoFiled) > 0 {
@@ -1159,17 +1182,25 @@ func CountAmount(project *ProjectInfo, info *Info) {
 							}
 							if !flag {
 								project.Bidamount = project.Bidamount + info.Bidamount
+								project.Bidamounttag = 0
 							}else {
 								if project.Bidamount < info.Bidamount {
 									project.Bidamount = info.Bidamount
+									project.Bidamounttag = 0
 								}
 							}
 						}
 					}
 				}
+				if tmp["bidamount"] == nil {
+					project.Budgettag = 1
+				}
 			}
 		}
+	}else {
+		project.Bidamounttag = 1
 	}
+
 }
 
 //结构体转map

+ 2 - 0
fullproject/src_v1/task.go

@@ -718,6 +718,8 @@ func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
 					tmp["city"] = site.City
 					tmp["district"] = site.District
 					return
+				}else if site.City == "" {
+					return
 				}
 			}
 		} else {

+ 28 - 29
fullproject/src_v1/update.go

@@ -63,7 +63,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index int, info *Info, tmp
 			//跟原项目list第0条信息作对比,可以合并
 			if pId == pInfoId {
 				//合并到当前项目
-				mergeProject(p, currentPro.P, info, tmpPro, index)
+				mergeProject(p, currentPro.P, info, tmpPro, tmp)
 				p.AllIdsMapLock.Lock()
 				p.AllIdsMap[currentPro.P.Id.Hex()].P = currentPro.P
 				p.AllIdsMapLock.Unlock()
@@ -263,13 +263,13 @@ func (p *ProjectTask) updateMerge(index int, info *Info, pInfoId string, tmp map
 //内部合并
 func (p *ProjectTask) innerMerge(pInfo *ProjectInfo, info *Info, tmp map[string]interface{}, tmpPro map[string]interface{}) {
 	infoList := []interface{}(tmpPro["list"].(primitive.A))
-	for index, info1 := range infoList{
+	for _, info1 := range infoList{
 		info2 := info1.(map[string]interface{})
 		if info2["infoid"] == info.Id {
 			deleteSlice1(infoList, info1)
 			continue
 		}
-		mergeProject(p, pInfo, info, tmpPro, index)
+		mergeProject(p, pInfo, info, tmpPro, tmp)
 	}
 	mapTmp, _ := tmpPro["infofiled"].(map[string]interface{})
 	delete(mapTmp, info.Id)
@@ -280,46 +280,43 @@ func (p *ProjectTask) innerMerge(pInfo *ProjectInfo, info *Info, tmp map[string]
 }
 
 //合并字段到project
-func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[string]interface{}, index int) map[string]interface{} {
-	//1--firsttime
-	if thisinfo.Publishtime < pInfo.FirstTime && thisinfo.Publishtime > 0 {
-		pInfo.FirstTime = thisinfo.Publishtime
-		set["firsttime"] = thisinfo.Publishtime
-		if thisinfo.TopType == "招标" {
-			set["zbtime"] = thisinfo.Publishtime
-		}
-	}
-	//2--lasttime
+func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[string]interface{}, tmp map[string]interface{}) map[string]interface{} {
 	pInfo.LastTime = thisinfo.Publishtime
 	set["lasttime"] = thisinfo.Publishtime
 	if thisinfo.TopType == "招标" {
-		set["zbtime"] = thisinfo.Publishtime
-	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
+		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" {
+			set["zbtime"] = thisinfo.Publishtime
+		}
+		if pInfo.Jgtime > 0 {
+			pInfo.Jgtime = int64(0)
+			set["jgtime"] = int64(0)
+		}
+	}else if thisinfo.TopType == "结果" {
+		pInfo.Jgtime = thisinfo.Publishtime
 		set["jgtime"] = thisinfo.Publishtime
+	} else if thisinfo.SubType == "合同" {
+		if pInfo.Jgtime <= 0 {
+			set["jgtime"] = thisinfo.Publishtime
+		}
 	}
-	//废标、流标   处理时间
-	if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
-		pInfo.FirstTime = thisinfo.Publishtime
-		pInfo.Bidopentime = int64(0)
-		pInfo.LastTime = thisinfo.Publishtime
 
-		set["firsttime"] = thisinfo.Publishtime
-		set["zbtime"] = int64(0)
-		set["publishtime"] = thisinfo.Publishtime
-		set["bidopentime"] = int64(0)
+	if thisinfo.Bidopentime > pInfo.Bidopentime {
+		pInfo.Bidopentime = thisinfo.Bidopentime
+		set["bidopentime"] = pInfo.Bidopentime
 	}
 
+
 	bt := thisinfo.TopType
 	bs := thisinfo.SubType
 	p.mapBidLock.Lock()
+	set["bidtype"] = bidtype[bs]
 	if bt == "招标" {
 		//招标状态,更新projectscope
 		if thisinfo.ProjectScope != "" {
 			set["projectscope"] = thisinfo.ProjectScope
 		}
-		set["bidtype"] = bidtype[bs]
+		set["bidstatus"] = bt
 	}else {
-		set["bidtype"] = bt
 		if bidstatus[bs] != "" {
 			set["bidstatus"] = thisinfo.SubType
 		} else if thisinfo.Infoformat == 2 {
@@ -328,7 +325,6 @@ func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[st
 			set["bidstatus"] = "其它"
 		}
 	}
-
 	p.mapBidLock.Unlock()
 
 	//3\4\5--省、市、县
@@ -373,6 +369,10 @@ func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[st
 		set["buyerclass"] = ""
 	}
 
+	if thisinfo.ContractCode != "" {
+		set["contractcode"] = pInfo.ContractCode + ","+thisinfo.ContractCode
+	}
+
 	//8--代理机构
 	if (pInfo.Agency == "" && thisinfo.Agency != "") || (len([]rune(pInfo.Agency)) < 5 && len([]rune(thisinfo.Agency)) > 5) {
 		pInfo.Agency = thisinfo.Agency
@@ -437,7 +437,6 @@ func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[st
 				sort.Strings(pInfo.Winners)
 			}
 		}
-		//set["winners"] = pInfo.Winners
 		set["s_winner"] = strings.Join(pInfo.Winners, ",")
 	}
 
@@ -449,7 +448,7 @@ func mergeProject(p *ProjectTask, pInfo *ProjectInfo, thisinfo *Info, set map[st
 		set["multipackage"] = 0
 	}
 	//处理多包后,计算预算金额、中标金额
-	CountAmount(pInfo, thisinfo)
+	CountAmount(pInfo, thisinfo, tmp)
 	set["budget"] = pInfo.Budget
 	set["bidamount"] = pInfo.Bidamount
 

+ 50 - 38
src/jy/extract/extract.go

@@ -27,12 +27,12 @@ import (
 var (
 	lock, lockrule, lockclear, locktag, blocktag sync.RWMutex
 
-	cut           = ju.NewCut()                          //获取正文并清理
-	ExtLogs       map[*TaskInfo][]map[string]interface{} //抽取日志
-	TaskList      map[string]*ExtractTask                //任务列表
-	ClearTaskList map[string]*ClearTask                  //清理任务列表
-	saveLimit     = 100                                  //抽取日志批量保存
-	PageSize      = 5000                                 //查询分页
+	cut     = ju.NewCut()                          //获取正文并清理
+	ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
+	TaskList      map[string]*ExtractTask          //任务列表
+	ClearTaskList map[string]*ClearTask            //清理任务列表
+	saveLimit     = 100                            //抽取日志批量保存
+	PageSize      = 5000                           //查询分页
 	Fields        = `{"title":1,"summary":1,"detail":1,"contenthtml":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"bidstatus":1,"area":1,"city":1,"comeintime":1,"publishtime":1,"sensitive":1,"projectinfo":1,"jsondata":1,"href":1}`
 	Fields2       = `{"budget":1,"bidamount":1,"title":1,"projectname":1,"winner":1}`
 )
@@ -319,8 +319,8 @@ func (e *ExtractTask) PreInfo(doc map[string]interface{}) (j, jf *ju.Job, isSite
 		BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
 		RuleBlock: e.RuleBlock,
 	}
-	if (j.Jsondata != nil||(*j.Jsondata) != nil)  && (*j.Jsondata)["jsoncontent"]!= nil{
-		delete((*j.Jsondata),"jsoncontent")
+	if (j.Jsondata != nil || (*j.Jsondata) != nil) && (*j.Jsondata)["jsoncontent"] != nil {
+		delete((*j.Jsondata), "jsoncontent")
 	}
 	if isextFile {
 		jf = &ju.Job{
@@ -339,8 +339,8 @@ func (e *ExtractTask) PreInfo(doc map[string]interface{}) (j, jf *ju.Job, isSite
 			RuleBlock:  e.RuleBlock,
 			IsFile:     isextFile,
 		}
-		if (jf.Jsondata != nil||(*jf.Jsondata) != nil)  && (*jf.Jsondata)["jsoncontent"]!= nil{
-			delete((*jf.Jsondata),"jsoncontent")
+		if (jf.Jsondata != nil || (*jf.Jsondata) != nil) && (*jf.Jsondata)["jsoncontent"] != nil {
+			delete((*jf.Jsondata), "jsoncontent")
 		}
 	}
 	codeSite := j.SpiderCode
@@ -974,7 +974,7 @@ func ExtRuleCoreByPkgReg(j *ju.Job, in *RegLuaInfo, e *ExtractTask) {
 								cfn := e.ClearFn[in.Field]
 								lock.Unlock()
 								data := clear.DoClearFn(cfn, []interface{}{strings.TrimSpace(rep[in.Field+"_"+fmt.Sprint(i)]), j.Content})
-								if data[len(data)-1].(bool){
+								if data[len(data)-1].(bool) {
 									j.BlockPackage[k].Budget = qu.Float64All(data[0])
 									j.BlockPackage[k].IsTrueBudget = true
 								}
@@ -984,7 +984,7 @@ func ExtRuleCoreByPkgReg(j *ju.Job, in *RegLuaInfo, e *ExtractTask) {
 								cfn := e.ClearFn[in.Field]
 								lock.Unlock()
 								data := clear.DoClearFn(cfn, []interface{}{strings.TrimSpace(rep[in.Field+"_"+fmt.Sprint(i)]), j.Content})
-								if data[len(data)-1].(bool){
+								if data[len(data)-1].(bool) {
 									j.BlockPackage[k].Bidamount = qu.Float64All(data[0])
 									j.BlockPackage[k].IsTrueBidamount = true
 								}
@@ -1026,7 +1026,7 @@ func ExtRuleCoreByPkgReg(j *ju.Job, in *RegLuaInfo, e *ExtractTask) {
 						cfn := e.ClearFn[in.Field]
 						lock.Unlock()
 						data := clear.DoClearFn(cfn, []interface{}{val, j.Content})
-						if data[len(data)-1].(bool){
+						if data[len(data)-1].(bool) {
 							j.BlockPackage[k].Budget = qu.Float64All(data[0])
 							j.BlockPackage[k].IsTrueBudget = true
 						}
@@ -1037,7 +1037,7 @@ func ExtRuleCoreByPkgReg(j *ju.Job, in *RegLuaInfo, e *ExtractTask) {
 						cfn := e.ClearFn[in.Field]
 						lock.Unlock()
 						data := clear.DoClearFn(cfn, []interface{}{val, j.Content})
-						if data[len(data)-1].(bool){
+						if data[len(data)-1].(bool) {
 							j.BlockPackage[k].Bidamount = qu.Float64All(data[0])
 							j.BlockPackage[k].IsTrueBidamount = true
 						}
@@ -1071,7 +1071,7 @@ func getKvByLuaFields(vc *RuleCore, j *ju.Job, et *ExtractTask) (map[string][]ma
 				kvmap[vc.Field] = append(kvmap[vc.Field], map[string]interface{}{
 					"code":        "winnerorder",
 					"field":       vc.Field,
-					"ruletext":    "中标候选人_"+ v["sortstr"].(string),
+					"ruletext":    "中标候选人_" + v["sortstr"].(string),
 					"extfrom":     v["sortstr"],
 					"sourcevalue": v["price"],
 					"value":       v["price"],
@@ -1596,7 +1596,8 @@ type FieldValue struct {
 	Value interface{}
 	Count int
 }
-var clearWinnerReg =regexp.MustCompile("名称|施工|拟定供应商名称|:|:")
+
+var clearWinnerReg = regexp.MustCompile("名称|施工|拟定供应商名称|:|:")
 
 //分析抽取结果并保存
 func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
@@ -1630,20 +1631,27 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				}
 			}
 		}
+		if len(j.PackageInfo) > 10 {
+			for k, v := range j.PackageInfo {
+				j.PackageInfo = map[string]map[string]interface{}{}
+				j.PackageInfo[k] = v
+				break
+			}
+		}
 		if len(j.PackageInfo) > 0 { //分包信息
 			tmp["package"] = j.PackageInfo
 			//包预算,中标金额合并大于抽取就覆盖
 			var tmpBidamount, tmpBudget float64
 			//s_winner逗号分隔拼接,分包中标人
-			var tmpstr,savewinner []string
+			var tmpstr, savewinner []string
 			//按包排序
 			for b, v := range j.PackageInfo {
-				if v["winner"]!= nil && v["winner"]!=""{
-					tmpstr = append(tmpstr,b)
+				if v["winner"] != nil && v["winner"] != "" {
+					tmpstr = append(tmpstr, b)
 				}
 			}
 			//包预算,中标金额合并大于抽取就覆盖
-			if len(j.PackageInfo) >1{
+			if len(j.PackageInfo) > 1 {
 				//包数大于1累加
 				for _, v := range j.PackageInfo {
 					if v["budget"] != nil {
@@ -1659,10 +1667,10 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 				if qu.Float64All(tmp["bidamount"]) < tmpBidamount {
 					tmp["bidamount"] = tmpBidamount
 				}
-			}else {
+			} else {
 				//包数等于1,tmp没有值取包里的值
 				if tmp["budget"] == nil || tmp["budget"] == 0 {
-					for _,v := range j.PackageInfo {
+					for _, v := range j.PackageInfo {
 						if v["budget"] != nil {
 							tmp["budget"] = v["budget"]
 						}
@@ -1670,7 +1678,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 
 				}
 				if tmp["bidamount"] == nil || tmp["bidamount"] == 0 {
-					for _,v := range j.PackageInfo {
+					for _, v := range j.PackageInfo {
 						if v["bidamount"] != nil {
 							tmp["bidamount"] = v["bidamount"]
 						}
@@ -1679,21 +1687,21 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 			}
 			//s_winner逗号分隔拼接,分包中标人
 			sort.Strings(tmpstr)
-			for _,v := range tmpstr{
+			for _, v := range tmpstr {
 				svvvv := qu.ObjToString(j.PackageInfo[v]["winner"])
 				savevvv := clearWinnerReg.ReplaceAllString(svvvv, "")
-				if savevvv == ""{
+				if savevvv == "" {
 					continue
 				}
-				savewinner = append(savewinner,savevvv)
+				savewinner = append(savewinner, savevvv)
 			}
-			if (savewinner  == nil || len(savewinner)==0) && tmp["winner"]!=nil{
+			if (savewinner == nil || len(savewinner) == 0) && tmp["winner"] != nil {
 				tmp["s_winner"] = tmp["winner"]
-			}else if savewinner != nil{
-				tmp["s_winner"] = strings.Join(savewinner,",")
+			} else if savewinner != nil {
+				tmp["s_winner"] = strings.Join(savewinner, ",")
 			}
 
-		}else if tmp["winner"]!= nil && tmp["winner"]!=""{
+		} else if tmp["winner"] != nil && tmp["winner"] != "" {
 			//没有分包取winner
 			tmp["s_winner"] = tmp["winner"]
 		}
@@ -1853,8 +1861,12 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		} else { //测试结果
 			delete(tmp, "_id")
 			if len(j.BlockPackage) > 0 { //分包详情
-				bs, _ := json.Marshal(j.BlockPackage)
-				tmp["epackage"] = string(bs)
+				if len(j.BlockPackage) >10{
+					tmp["epackage"] = "分包异常"
+				}else {
+					bs, _ := json.Marshal(j.BlockPackage)
+					tmp["epackage"] = string(bs)
+				}
 			}
 			tmp["result"] = result
 			tmp["resultf"] = resultf
@@ -2046,7 +2058,7 @@ func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
 func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) {
 	defer qu.Catch()
 	i := redis.GetInt(field, field+"_"+fv) //查找redis
-	if i == 0 {                            //reids未找到,执行规则匹配
+	if i == 0 { //reids未找到,执行规则匹配
 		val[field+"_isredis"] = false
 		e.RuleMatch(field, fv, val) //规则匹配
 	} else { //redis找到,打标识存库
@@ -2138,20 +2150,20 @@ func resetWinnerorder(j *ju.Job) {
 	if maxlen > 0 {
 		winners = append(winners, &ju.ExtField{Code: "winnerorder", Field: "winner", ExtFrom: "j.Winnerorder", Value: j.Winnerorder[0]["entname"], Score: 0.5})
 		if j.Winnerorder[0]["price"] != nil {
-			tmpPrice := clear.ObjToMoney([]interface{}{j.Winnerorder[0]["price"],""})
-			if tmpPrice[len(tmpPrice)-1].(bool){
-				bidamounts = append(bidamounts, &ju.ExtField{Code: "winnerorder", Field: "bidamount", ExtFrom: "j.Winnerorder",SourceValue:j.Winnerorder[0]["price"], Value: tmpPrice[0], Score: 0.5})
+			tmpPrice := clear.ObjToMoney([]interface{}{j.Winnerorder[0]["price"], ""})
+			if tmpPrice[len(tmpPrice)-1].(bool) {
+				bidamounts = append(bidamounts, &ju.ExtField{Code: "winnerorder", Field: "bidamount", ExtFrom: "j.Winnerorder", SourceValue: j.Winnerorder[0]["price"], Value: tmpPrice[0], Score: 0.5})
 			}
 		}
 	}
 	if j.Result["winner"] == nil && len(winners) > 0 {
 		j.Result["winner"] = winners
-	} else {
+	} else if len(winners) > 0 {
 		j.Result["winner"] = append(j.Result["winner"], winners...)
 	}
 	if j.Result["bidamount"] == nil && len(bidamounts) > 0 {
 		j.Result["bidamount"] = bidamounts
-	} else {
+	} else if len(bidamounts) > 0 {
 		j.Result["bidamount"] = append(j.Result["bidamount"], bidamounts...)
 	}
 	//j.Result["winner"] = winners

+ 1 - 2
src/jy/extract/extractInit.go

@@ -1409,8 +1409,7 @@ func (e *ExtractTask) ResultSave(init bool) {
 						log.Debug(err)
 					})
 				}
-
-				time.Sleep(3 * time.Second)
+				time.Sleep(2 * time.Second)
 			}
 		}()
 	} else {

+ 5 - 5
udpfilterdup/src/config.json

@@ -2,12 +2,12 @@
     "udpport": ":1488",
     "dupdays": 5,
     "mongodb": {
-        "addr": "192.168.3.166:27082",
+        "addr": "192.168.3.207:27092",
         "pool": 5,
-        "db": "zhaolongyue",
-        "extract": "huaweiv1_rdata0109",
+        "db": "extract_kf",
+        "extract": "demo_data3.2",
         "site": {
-            "dbname": "zhaolongyue",
+            "dbname": "extract_kf",
             "coll": "site"
         }
     },
@@ -17,7 +17,7 @@
     },
     "nextNode": [],
     "isMerger": false,
-    "threads": 1,
+    "threads": 4,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",

+ 104 - 124
udpfilterdup/src/datamap.go

@@ -11,32 +11,33 @@ import (
 )
 
 type Info struct {
-	id          string	//id
-	title       string  //标题
-
-	area        string  //省份
-	city        string  //城市
-	subtype     string  //信息类型
-	buyer       string  //采购单位
-	agency      string  //代理机构
-	winner      string  //中标单位
-	budget      float64 //预算金额
-	bidamount   float64 //中标金额
-	projectname string  //项目名称
-	projectcode string  //项目编号
-	contractnumber string //合同编号
-	publishtime int64   //发布时间
-	bidopentime int64   //开标时间
-	agencyaddr  string  //开标地点
-
-	site        string  //站点
-	href        string  //正文的url
-
-	repeatid    string  //重复id
+	id    string //id
+	title string //标题
+
+	area           string  //省份
+	city           string  //城市
+	subtype        string  //信息类型
+	buyer          string  //采购单位
+	agency         string  //代理机构
+	winner         string  //中标单位
+	budget         float64 //预算金额
+	bidamount      float64 //中标金额
+	projectname    string  //项目名称
+	projectcode    string  //项目编号
+	contractnumber string  //合同编号
+	publishtime    int64   //发布时间
+	comeintime     int64   //入库时间
+	bidopentime    int64   //开标时间
+	agencyaddr     string  //开标地点
+
+	site string //站点
+	href string //正文的url
+
+	repeatid         string                 //重复id
 	titleSpecialWord bool                   //标题特殊词
 	specialWord      bool                   //再次判断的特殊词
 	mergemap         map[string]interface{} //合并记录
-	is_site     bool   //是否站点城市
+	is_site          bool                   //是否站点城市
 
 }
 
@@ -77,9 +78,9 @@ func NewDatamap(days int, lastid string) *datamap {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 {
 			continuSum++
 		} else {
-			pt:=tmp["publishtime"]
-			pt_time:=qutil.Int64All(pt)
-			if pt_time<=0 {
+			pt := tmp["publishtime"]
+			pt_time := qutil.Int64All(pt)
+			if pt_time <= 0 {
 				continue
 			}
 			if now1 == 0 {
@@ -123,9 +124,9 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("-_id").Iter()
 	m, n := 0, 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); {
-		pt_s:=tmp_start["publishtime"]
-		pt_time:=qutil.Int64All(pt_s)
-		if pt_time<=0 {
+		pt_s := tmp_start["publishtime"]
+		pt_time := qutil.Int64All(pt_s)
+		if pt_time <= 0 {
 			continue
 		}
 		if qutil.Float64All(startTime-pt_time) <= datelimit {
@@ -154,9 +155,9 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("_id").Iter()
 
 	for tmp_last := make(map[string]interface{}); it_last.Next(&tmp_last); {
-		pt_l:=tmp_last["publishtime"]
-		pt_time:=qutil.Int64All(pt_l)
-		if pt_time<=0 {
+		pt_l := tmp_last["publishtime"]
+		pt_time := qutil.Int64All(pt_l)
+		if pt_time <= 0 {
 			continue
 		}
 		if qutil.Float64All(pt_time-lastTime) <= datelimit {
@@ -188,7 +189,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 		area = "全国"
 	}
 	info := &Info{}
-	info.id = qutil.BsonIdToSId(tmp["_id"])
+	info.id = BsonTOStringId(tmp["_id"])
 	info.title = qutil.ObjToString(tmp["title"])
 	info.area = area
 	info.subtype = subtype
@@ -202,6 +203,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.budget = qutil.Float64All(tmp["budget"])
 	info.bidamount = qutil.Float64All(tmp["bidamount"])
 	info.publishtime = qutil.Int64All(tmp["publishtime"])
+	info.comeintime = qutil.Int64All(tmp["comeintime"])
 	info.bidopentime = qutil.Int64All(tmp["bidopentime"])
 	info.agencyaddr = qutil.ObjToString(tmp["agencyaddr"])
 	info.site = qutil.ObjToString(tmp["site"])
@@ -217,13 +219,13 @@ func NewInfo(tmp map[string]interface{}) *Info {
 
 	info.is_site = false
 
-
 	return info
 }
+
 //判重方法
 func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
 
-	reason:=""
+	reason := ""
 	keys := []string{}
 	d.lock.Lock()
 	for k, _ := range d.keys { //不同时间段
@@ -290,9 +292,9 @@ L:
 							continue //无包含关系
 						}
 						if strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title) {
-							reason = reason+"标题关键词且包含关系"
+							reason = reason + "标题关键词且包含关系"
 							//继续二级金额判断
-							if !againRepeat(v,info){
+							if !againRepeat(v, info) {
 								b = true
 								source = v
 								reasons = reason
@@ -301,10 +303,6 @@ L:
 						}
 					}
 
-
-
-
-
 					//代理机构相同-非空相等
 					if v.agency != "" && info.agency != "" && v.agency == info.agency {
 						reason = reason + "同机构-"
@@ -344,7 +342,8 @@ L:
 
 	//往预存数据 d 添加
 	if !b {
-		ct := info.publishtime
+		//ct := info.publishtime
+		ct := info.comeintime
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		d.lock.Lock()
@@ -367,7 +366,7 @@ L:
 }
 
 func (h *historymap) checkHistory(info *Info) (b bool, source *Info, reasons string) {
-	reason:=""
+	reason := ""
 	keys := []string{}
 	h.lock.Lock()
 	for k, _ := range h.keys { //不同时间段
@@ -433,9 +432,9 @@ L:
 							continue //无包含关系
 						}
 						if strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title) {
-							reason = reason+"标题关键词且包含关系"
+							reason = reason + "标题关键词且包含关系"
 							//继续二级金额判断
-							if !againRepeat(v,info){
+							if !againRepeat(v, info) {
 								b = true
 								source = v
 								reasons = reason
@@ -485,11 +484,11 @@ L:
 	//
 	if b {
 		if info.repeatid == source.id {
-			b = false//重复-无变化-不处理
+			b = false //重复-无变化-不处理
 		}
 	} else {
 		if source != nil {
-			if source.repeatid != "" {//未判重-有变化--记录
+			if source.repeatid != "" { //未判重-有变化--记录
 				b = true
 				reason = "未判重记录"
 				reasons = reason
@@ -498,7 +497,8 @@ L:
 	}
 	//往预存数据 d 添加
 	if !b {
-		ct := info.publishtime
+		//ct := info.publishtime
+		ct := info.comeintime
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		data := h.data[k]
@@ -568,8 +568,6 @@ func (h *historymap) replaceSourceData(replaceData *Info, replaceId string) {
 	h.lock.Unlock()
 }
 
-
-
 func (d *datamap) update(t int64) {
 	//每天0点清除历史数据
 	d.keymap = d.GetLatelyFiveDay(t)
@@ -605,21 +603,15 @@ func (d *datamap) GetLatelyFiveDay(t int64) []string {
 	return array
 }
 
-
-
-
-
-
-
 /*
 **************************
 ******* 以下为判重 ********
 **************************
-*/
+ */
 //判重方法1
 func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
 
-	isMeet:=false
+	isMeet := false
 	if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
 		info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
 		info.subtype == "变更" || info.subtype == "其他" {
@@ -637,7 +629,7 @@ func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
 
 	} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
 		//中标结果
-		if isMeet, reason = winningRepeat_A(v, info, reason);isMeet {
+		if isMeet, reason = winningRepeat_A(v, info, reason); isMeet {
 			if winningRepeat_C(v, info) {
 				return false, reason
 			} else {
@@ -650,7 +642,7 @@ func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
 
 	} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
 		//合同
-		if isMeet, reason = contractRepeat_A(v, info, reason);isMeet {
+		if isMeet, reason = contractRepeat_A(v, info, reason); isMeet {
 			if contractRepeat_C(v, info) {
 				return false, reason
 			} else {
@@ -662,7 +654,7 @@ func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
 		}
 	} else {
 		//招标结果
-		if isMeet, reason = tenderRepeat_A(v, info, reason);isMeet {
+		if isMeet, reason = tenderRepeat_A(v, info, reason); isMeet {
 			if tenderRepeat_C(v, info) {
 				return false, reason
 			} else {
@@ -679,13 +671,13 @@ func quickHeavyMethodOne(v *Info, info *Info, reason string) (bool, string) {
 
 //判重方法2
 func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
-	isMeet:=false
+	isMeet := false
 	if v.agency == info.agency && v.agency != "" && info.agency != "" {
 		if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
 			info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
 			info.subtype == "变更" || info.subtype == "其他" {
 			//招标结果
-			if isMeet, reason =  tenderRepeat_B(v, info, reason);isMeet {
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
 				if tenderRepeat_C(v, info) { //有不同
 					return false, reason
 				} else {
@@ -698,7 +690,7 @@ func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
 
 		} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" || info.subtype == "流标" {
 			//中标结果
-			if isMeet, reason =  winningRepeat_B(v, info, reason);isMeet {
+			if isMeet, reason = winningRepeat_B(v, info, reason); isMeet {
 				if winningRepeat_C(v, info) { //有不同
 					return false, reason
 				} else {
@@ -711,7 +703,7 @@ func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
 
 		} else if info.subtype == "合同" || info.subtype == "验收" || info.subtype == "违规" {
 			//合同
-			if isMeet, reason = contractRepeat_B(v, info, reason);isMeet {
+			if isMeet, reason = contractRepeat_B(v, info, reason); isMeet {
 				if contractRepeat_C(v, info) { //有不同
 					return false, reason
 				} else {
@@ -723,7 +715,7 @@ func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
 			}
 		} else {
 			//招标结果
-			if isMeet, reason = tenderRepeat_B(v, info, reason);isMeet {
+			if isMeet, reason = tenderRepeat_B(v, info, reason); isMeet {
 				if tenderRepeat_C(v, info) { //有不同
 					return false, reason
 				} else {
@@ -755,7 +747,7 @@ func quickHeavyMethodTwo(v *Info, info *Info, reason string) (bool, string) {
 }
 
 //招标_A
-func tenderRepeat_A(v *Info, info *Info, reason string) (bool ,string) {
+func tenderRepeat_A(v *Info, info *Info, reason string) (bool, string) {
 
 	var ss string
 	p1, p2, p3, p4, p9, p10, p11 := false, false, false, false, false, false, false
@@ -767,8 +759,8 @@ func tenderRepeat_A(v *Info, info *Info, reason string) (bool ,string) {
 		ss = ss + "p2(单位)-"
 		p2 = true
 	}
-	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode)>=5)||
-		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber)>=5){
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
 		ss = ss + "p3(编号组)-"
 		p3 = true
 	}
@@ -798,13 +790,13 @@ func tenderRepeat_A(v *Info, info *Info, reason string) (bool ,string) {
 		(p3 && p4 && p9) || (p3 && p4 && p10) || (p3 && p4 && p11) ||
 		(p4 && p9 && p10) || (p4 && p9 && p11) || (p9 && p10 && p11) {
 		reason = reason + "满足招标A,3要素组合-" + ss + ","
-		return true,reason
+		return true, reason
 	}
-	return false,reason
+	return false, reason
 }
 
 //招标_B
-func tenderRepeat_B(v *Info, info *Info, reason string) (bool,string) {
+func tenderRepeat_B(v *Info, info *Info, reason string) (bool, string) {
 
 	m, n := 0, 0
 	if v.projectname != "" && v.projectname == info.projectname {
@@ -814,8 +806,8 @@ func tenderRepeat_B(v *Info, info *Info, reason string) (bool,string) {
 	if v.buyer != "" && v.buyer == info.buyer {
 		m++
 	}
-	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode)>=5)||
-		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber)>=5){
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
 		m++
 	}
 	if v.budget != 0 && v.budget == info.budget {
@@ -834,13 +826,13 @@ func tenderRepeat_B(v *Info, info *Info, reason string) (bool,string) {
 	}
 	if m >= 2 {
 		if n == 2 && m == 2 {
-			return false,reason
+			return false, reason
 		} else {
 			reason = reason + "满足招标B,七选二,"
-			return true,reason
+			return true, reason
 		}
 	}
-	return false,reason
+	return false, reason
 }
 
 //招标_C
@@ -865,7 +857,7 @@ func tenderRepeat_C(v *Info, info *Info) bool {
 }
 
 //中标_A
-func winningRepeat_A(v *Info, info *Info, reason string) (bool,string) {
+func winningRepeat_A(v *Info, info *Info, reason string) (bool, string) {
 
 	var ss string
 	p1, p2, p3, p5, p6, p11 := false, false, false, false, false, false
@@ -877,8 +869,8 @@ func winningRepeat_A(v *Info, info *Info, reason string) (bool,string) {
 		ss = ss + "p2(单位)-"
 		p2 = true
 	}
-	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode)>=5)||
-		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber)>=5){
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
 		ss = ss + "p3(编号组)-"
 		p3 = true
 	}
@@ -903,14 +895,14 @@ func winningRepeat_A(v *Info, info *Info, reason string) (bool,string) {
 		(p3 && p5 && p6) || (p3 && p5 && p11) || (p3 && p6 && p11) ||
 		(p5 && p6 && p11) {
 		reason = reason + "满足中标A,3要素组合-" + ss + ","
-		return true,reason
+		return true, reason
 	}
 
-	return false,reason
+	return false, reason
 }
 
 //中标_B
-func winningRepeat_B(v *Info, info *Info, reason string) (bool,string) {
+func winningRepeat_B(v *Info, info *Info, reason string) (bool, string) {
 
 	m, n := 0, 0
 	if v.projectname != "" && v.projectname == info.projectname {
@@ -920,8 +912,8 @@ func winningRepeat_B(v *Info, info *Info, reason string) (bool,string) {
 	if v.buyer != "" && v.buyer == info.buyer {
 		m++
 	}
-	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode)>=5)||
-		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber)>=5){
+	if (v.projectcode != "" && v.projectcode == info.projectcode && len(v.projectcode) >= 5) ||
+		(v.contractnumber != "" && v.contractnumber == info.contractnumber && len(v.contractnumber) >= 5) {
 		m++
 	}
 	if v.bidamount != 0 && v.bidamount == info.bidamount {
@@ -937,13 +929,13 @@ func winningRepeat_B(v *Info, info *Info, reason string) (bool,string) {
 	}
 	if m >= 2 {
 		if n == 2 && m == 2 {
-			return false,reason
+			return false, reason
 		} else {
 			reason = reason + "满足中标B.六选二,"
-			return true,reason
+			return true, reason
 		}
 	}
-	return false,reason
+	return false, reason
 }
 
 //中标_C
@@ -963,32 +955,32 @@ func winningRepeat_C(v *Info, info *Info) bool {
 }
 
 //合同_A
-func contractRepeat_A(v *Info, info *Info, reason string) (bool,string) {
+func contractRepeat_A(v *Info, info *Info, reason string) (bool, string) {
 
 	isMeet_1 := false
-	if isMeet_1, reason = tenderRepeat_A(v, info, reason);isMeet_1 {
-		return true,reason
+	if isMeet_1, reason = tenderRepeat_A(v, info, reason); isMeet_1 {
+		return true, reason
 	}
 
 	isMeet_2 := false
-	if isMeet_2, reason = winningRepeat_A(v, info, reason);isMeet_2 {
-		return true,reason
+	if isMeet_2, reason = winningRepeat_A(v, info, reason); isMeet_2 {
+		return true, reason
 	}
-	return false,reason
+	return false, reason
 }
 
 //合同_B
-func contractRepeat_B(v *Info, info *Info, reason string) (bool,string) {
+func contractRepeat_B(v *Info, info *Info, reason string) (bool, string) {
 
 	isMeet_1 := false
-	if isMeet_1, reason = tenderRepeat_B(v, info, reason);isMeet_1 {
-		return true,reason
+	if isMeet_1, reason = tenderRepeat_B(v, info, reason); isMeet_1 {
+		return true, reason
 	}
 	isMeet_2 := false
-	if isMeet_2, reason = winningRepeat_B(v, info, reason);isMeet_2 {
-		return true,reason
+	if isMeet_2, reason = winningRepeat_B(v, info, reason); isMeet_2 {
+		return true, reason
 	}
-	return false,reason
+	return false, reason
 }
 
 //合同_C
@@ -1003,40 +995,28 @@ func contractRepeat_C(v *Info, info *Info) bool {
 	return false
 }
 
-
-func againRepeat(v *Info ,info *Info) bool {
+func againRepeat(v *Info, info *Info) bool {
 	//相同采购单位下
-	if info.buyer != "" &&v.buyer == info.buyer {
-		if info.subtype=="招标"||info.subtype=="邀标"||info.subtype=="询价"||
-			info.subtype=="竞谈"||info.subtype=="单一"||info.subtype=="竞价"||
-			info.subtype=="其他"||info.subtype=="变更" {
+	if info.buyer != "" && v.buyer == info.buyer {
+		if info.subtype == "招标" || info.subtype == "邀标" || info.subtype == "询价" ||
+			info.subtype == "竞谈" || info.subtype == "单一" || info.subtype == "竞价" ||
+			info.subtype == "其他" || info.subtype == "变更" {
 			//预算金额满足条件
-			if v.budget!=info.budget&&v.budget!=0&&info.budget!=0 {
+			if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
 				return true
 			}
-		}else if info.subtype=="中标"||info.subtype=="成交"||info.subtype=="废标"||
-			info.subtype=="流标"||info.subtype=="合同"||info.subtype=="验收"||
-			info.subtype=="违规"{
+		} else if info.subtype == "中标" || info.subtype == "成交" || info.subtype == "废标" ||
+			info.subtype == "流标" || info.subtype == "合同" || info.subtype == "验收" ||
+			info.subtype == "违规" {
 			//中标金额单位满足条件
-			if (v.bidamount!=info.bidamount&&v.bidamount!=0&&info.bidamount!=0)||
-				(v.winner!=info.winner&&v.winner!=""&&info.winner!=""){
+			if (v.bidamount != info.bidamount && v.bidamount != 0 && info.bidamount != 0) ||
+				(v.winner != info.winner && v.winner != "" && info.winner != "") {
 				return true
 			}
-		}else {
+		} else {
 
 		}
 	}
 
 	return false
 }
-
-
-
-
-
-
-
-
-
-
-

+ 100 - 109
udpfilterdup/src/main.go

@@ -13,7 +13,6 @@ import (
 	"net"
 	"os"
 	"qfw/util"
-	"qfw/util/mongodb"
 	"regexp"
 	"sync"
 	"time"
@@ -22,7 +21,7 @@ import (
 var (
 	Sysconfig map[string]interface{} //配置文件
 	mconf     map[string]interface{} //mongodb配置信息
-	mgo       *mongodb.MongodbSim    //mongodb操作对象
+	mgo       *MongodbSim            //mongodb操作对象
 	extract   string
 	udpclient mu.UdpClient             //udp对象
 	nextNode  []map[string]interface{} //下节点数组
@@ -38,14 +37,13 @@ var (
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
-	isMerger bool                              //是否合并
-	threadNum int								   //线程数量
-	SiteMap  map[string]map[string]interface{} //站点map
-	idtype, sid, eid string //测试人员判重使用
+	isMerger         bool                              //是否合并
+	threadNum        int                               //线程数量
+	SiteMap          map[string]map[string]interface{} //站点map
+	idtype, sid, eid string                            //测试人员判重使用
 )
 
 func init() {
-
 	flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
 	flag.StringVar(&sid, "sid", "", "开始id")
 	flag.StringVar(&eid, "eid", "", "结束id")
@@ -55,13 +53,13 @@ func init() {
 	util.ReadConfig(&Sysconfig)
 	nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf = Sysconfig["mongodb"].(map[string]interface{})
-	mgo = &mongodb.MongodbSim{
+	mgo = &MongodbSim{
 		MongodbAddr: mconf["addr"].(string),
 		DbName:      mconf["db"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 10),
 	}
-	extract = mconf["extract"].(string)
 	mgo.InitPool()
+	extract = mconf["extract"].(string)
 
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
@@ -77,8 +75,8 @@ func init() {
 	SiteMap = make(map[string]map[string]interface{}, 0)
 	start := int(time.Now().Unix())
 	sess_site := mgo.GetMgoConn()
-	defer sess_site.Close()
-	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter()
+	defer mgo.DestoryMongoConn(sess_site)
+	res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
 	for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
 		data_map := map[string]interface{}{
 			"area":     util.ObjToString(site_dict["area"]),
@@ -104,13 +102,13 @@ func main() {
 //测试组人员使用
 func mainT() {
 	/*
-	ObjectId("5da3f31aa5cb26b9b798d3aa")
-	ObjectId("5da418c4a5cb26b9b7e3e9a6")
-	ObjectId("5df5071ce9d1f601e495fa54")
-	ObjectId("5e09c05f0cf41612e0626abc")
+		ObjectId("5da3f31aa5cb26b9b798d3aa")
+		ObjectId("5da418c4a5cb26b9b7e3e9a6")
+		ObjectId("5df5071ce9d1f601e495fa54")
+		ObjectId("5e09c05f0cf41612e0626abc")
 	*/
-	//sid = "5642ae4baf5374672e002200"
-	//eid = "5e169e5250b5ea296ec896f0"
+	sid = "5df5071ce9d1f601e495fa54"
+	eid = "5e09c05f0cf41612e0626abc"
 
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
@@ -166,9 +164,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	fmt.Println("开始数据判重")
 	defer util.Catch()
 	//区间id
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	var q map[string]interface{}
+	q := map[string]interface{}{}
 	if idtype == "1" {
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
@@ -179,15 +175,18 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	} else {
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
-				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+				"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
 	}
-	log.Println(mgo.DbName,extract,q)
-	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	log.Println(mgo.DbName, extract, q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	//it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	updateExtract := [][]map[string]interface{}{}
-	log.Println("线程数:",threadNum)
+	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
 
 	wg := &sync.WaitGroup{}
@@ -206,7 +205,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			}()
 			info := NewInfo(tmp)
 			//是否为无效数据
-			if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
+			if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
 						"_id": tmp["_id"],
@@ -218,45 +217,45 @@ func task(data []byte, mapInfo map[string]interface{}) {
 					},
 				})
 				if len(updateExtract) > 500 {
-					mgo.UpdateBulk(extract, updateExtract...)
+					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
 				}
 			} else {
 				b, source, reason := DM.check(info)
 				if b { //有重复,生成更新语句,更新抽取和更新招标
 					repeateN++
-					var is_replace  = false
-					var mergeArr = []int64{} //更改合并数组记录
-					var newData = &Info{}    //更换新的数据池数据
+					var is_replace = false
+					var mergeArr = []int64{}                    //更改合并数组记录
+					var newData = &Info{}                       //更换新的数据池数据
 					var repeat_idMap = map[string]interface{}{} //记录判重的
-					var merge_idMap = map[string]interface{}{} //记录合并的
-					if idtype == "1" { //先临时决定一个id
+					var merge_idMap = map[string]interface{}{}  //记录合并的
+					if idtype == "1" {                          //先临时决定一个id
 						repeat_idMap["_id"] = info.id
 						merge_idMap["_id"] = source.id
 					} else {
-						repeat_idMap["_id"] = util.StringTOBsonId(info.id)
-						merge_idMap["_id"] = util.StringTOBsonId(source.id)
+						repeat_idMap["_id"] = StringTOBsonId(info.id)
+						merge_idMap["_id"] = StringTOBsonId(source.id)
 					}
-					repeat_id:=source.id
+					repeat_id := source.id
 					//以下合并相关
 					if isMerger {
 						basic_bool := basicDataScore(source, info)
 						if basic_bool {
 							//已原始数据为标准 - 对比数据打判重标签-
-							newData, mergeArr,is_replace = mergeDataFields(source, info)
+							newData, mergeArr, is_replace = mergeDataFields(source, info)
 							DM.replaceSourceData(newData, source.id) //替换
 							//对比数据打重复标签的id,原始数据id的记录
 							if idtype == "1" {
 								repeat_idMap["_id"] = info.id
 								merge_idMap["_id"] = source.id
 							} else {
-								repeat_idMap["_id"] = util.StringTOBsonId(info.id)
-								merge_idMap["_id"] = util.StringTOBsonId(source.id)
+								repeat_idMap["_id"] = StringTOBsonId(info.id)
+								merge_idMap["_id"] = StringTOBsonId(source.id)
 							}
 							repeat_id = source.id
 						} else {
 							//已对比数据为标准 ,数据池的数据打判重标签
-							newData, mergeArr,is_replace = mergeDataFields(info, source)
+							newData, mergeArr, is_replace = mergeDataFields(info, source)
 							DM.replaceSourceData(newData, source.id) //替换
 
 							//原始数据打重复标签的id,   对比数据id的记录
@@ -264,19 +263,18 @@ func task(data []byte, mapInfo map[string]interface{}) {
 								repeat_idMap["_id"] = source.id
 								merge_idMap["_id"] = info.id
 							} else {
-								repeat_idMap["_id"] = util.StringTOBsonId(source.id)
-								merge_idMap["_id"] = util.StringTOBsonId(info.id)
+								repeat_idMap["_id"] = StringTOBsonId(source.id)
+								merge_idMap["_id"] = StringTOBsonId(info.id)
 							}
 							repeat_id = info.id
 						}
 
-
-						merge_map := make(map[string]interface{},0)
-						if is_replace {//有过合并-更新数据
+						merge_map := make(map[string]interface{}, 0)
+						if is_replace { //有过合并-更新数据
 
 							merge_map = map[string]interface{}{
 								"$set": map[string]interface{}{
-									"merge":newData.mergemap,
+									"merge": newData.mergemap,
 								},
 							}
 
@@ -304,11 +302,11 @@ func task(data []byte, mapInfo map[string]interface{}) {
 									merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
 								} else if value == 9 {
 									merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
-								}else if value == 10 {
+								} else if value == 10 {
 									merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
-								}else if value == 11 {
+								} else if value == 11 {
 									merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
-								}else {
+								} else {
 								}
 							}
 							//模板数据更新
@@ -319,15 +317,14 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						}
 					}
 
-
 					//重复数据打标签
 					updateExtract = append(updateExtract, []map[string]interface{}{
 						repeat_idMap,
 						map[string]interface{}{
 							"$set": map[string]interface{}{
-								"repeat": 1,
+								"repeat":        1,
 								"repeat_reason": reason,
-								"repeat_id":repeat_id,
+								"repeat_id":     repeat_id,
 							},
 						},
 					})
@@ -336,14 +333,14 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			}
 		}(tmp)
 		if len(updateExtract) > 500 {
-			mgo.UpdateBulk(extract, updateExtract...)
+			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
 	if len(updateExtract) > 0 {
-		mgo.UpdateBulk(extract, updateExtract...)
+		mgo.UpSertBulk(extract, updateExtract...)
 		//mgo.UpdateBulk(bidding, updateBidding...)
 	}
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
@@ -390,8 +387,8 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	} else {
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
-				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+				"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
 	}
@@ -400,7 +397,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	minTime, maxTime := int64(0), int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
 		//取出最大最小时间
-		if minTime == 0 || maxTime == 0 &&util.Int64All(tmp["publishtime"])!=0{
+		if minTime == 0 || maxTime == 0 && util.Int64All(tmp["publishtime"]) != 0 {
 			minTime = util.Int64All(tmp["publishtime"])
 			maxTime = util.Int64All(tmp["publishtime"])
 		} else {
@@ -414,19 +411,19 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 	//时间不正确时
-	if minTime==0&&maxTime==0 {
+	if minTime == 0 && maxTime == 0 {
 		log.Println("段数据区间 publishtime不符合")
 		return
 	}
 	fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
-	gtid,lteid:= util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
-	fmt.Println(gtid,lteid)
-	HM = NewHistorymap(gtid,lteid, minTime, maxTime)
+	gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
+	fmt.Println(gtid, lteid)
+	HM = NewHistorymap(gtid, lteid, minTime, maxTime)
 	fmt.Println("开始历史数据判重")
 
 	defer util.Catch()
 	//区间id
-	sess_history:= mgo.GetMgoConn()
+	sess_history := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess_history)
 	var q_history map[string]interface{}
 	if idtype == "1" {
@@ -439,15 +436,15 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	} else {
 		q_history = map[string]interface{}{
 			"_id": map[string]interface{}{
-				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
-				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+				"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
 			},
 		}
 	}
-	log.Println(mgo.DbName,extract,q_history)
+	log.Println(mgo.DbName, extract, q_history)
 	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
-	log.Println("线程数:",threadNum)
+	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	//mapLock := &sync.Mutex{}
@@ -464,7 +461,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 				wg.Done()
 			}()
 			info := NewInfo(tmp)
-			if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
+			if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
 				updateExtract = append(updateExtract, []map[string]interface{}{
 					map[string]interface{}{
 						"_id": tmp["_id"],
@@ -476,7 +473,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 					},
 				})
 				if len(updateExtract) > 500 {
-					mgo.UpdateBulk(extract, updateExtract...)
+					mgo.UpSertBulk(extract, updateExtract...)
 					updateExtract = [][]map[string]interface{}{}
 				}
 			} else {
@@ -499,38 +496,38 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 						})
 					} else {
 						repeateN++
-						var is_replace  = false
-						var mergeArr = []int64{} //更改合并数组记录
-						var newData = &Info{}    //更换新的数据池数据
+						var is_replace = false
+						var mergeArr = []int64{}                    //更改合并数组记录
+						var newData = &Info{}                       //更换新的数据池数据
 						var repeat_idMap = map[string]interface{}{} //记录判重的
-						var merge_idMap = map[string]interface{}{} //记录合并的
-						if idtype == "1" { //先临时决定一个id
+						var merge_idMap = map[string]interface{}{}  //记录合并的
+						if idtype == "1" {                          //先临时决定一个id
 							repeat_idMap["_id"] = info.id
 							merge_idMap["_id"] = source.id
 						} else {
-							repeat_idMap["_id"] = util.StringTOBsonId(info.id)
-							merge_idMap["_id"] = util.StringTOBsonId(source.id)
+							repeat_idMap["_id"] = StringTOBsonId(info.id)
+							merge_idMap["_id"] = StringTOBsonId(source.id)
 						}
-						repeat_id:=source.id
+						repeat_id := source.id
 						//以下合并相关
 						if isMerger {
 							basic_bool := basicDataScore(source, info)
 							if basic_bool {
 								//已原始数据为标准 - 对比数据打判重标签-
-								newData, mergeArr,is_replace = mergeDataFields(source, info)
+								newData, mergeArr, is_replace = mergeDataFields(source, info)
 								DM.replaceSourceData(newData, source.id) //替换
 								//对比数据打重复标签的id,原始数据id的记录
 								if idtype == "1" {
 									repeat_idMap["_id"] = info.id
 									merge_idMap["_id"] = source.id
 								} else {
-									repeat_idMap["_id"] = util.StringTOBsonId(info.id)
-									merge_idMap["_id"] = util.StringTOBsonId(source.id)
+									repeat_idMap["_id"] = StringTOBsonId(info.id)
+									merge_idMap["_id"] = StringTOBsonId(source.id)
 								}
 								repeat_id = source.id
 							} else {
 								//已对比数据为标准 ,数据池的数据打判重标签
-								newData, mergeArr,is_replace = mergeDataFields(info, source)
+								newData, mergeArr, is_replace = mergeDataFields(info, source)
 								DM.replaceSourceData(newData, source.id) //替换
 
 								//原始数据打重复标签的id,   对比数据id的记录
@@ -538,19 +535,18 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 									repeat_idMap["_id"] = source.id
 									merge_idMap["_id"] = info.id
 								} else {
-									repeat_idMap["_id"] = util.StringTOBsonId(source.id)
-									merge_idMap["_id"] = util.StringTOBsonId(info.id)
+									repeat_idMap["_id"] = StringTOBsonId(source.id)
+									merge_idMap["_id"] = StringTOBsonId(info.id)
 								}
 								repeat_id = info.id
 							}
 
-
-							merge_map := make(map[string]interface{},0)
-							if is_replace {//有过合并-更新数据
+							merge_map := make(map[string]interface{}, 0)
+							if is_replace { //有过合并-更新数据
 
 								merge_map = map[string]interface{}{
 									"$set": map[string]interface{}{
-										"merge":newData.mergemap,
+										"merge": newData.mergemap,
 									},
 								}
 
@@ -578,11 +574,11 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 										merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
 									} else if value == 9 {
 										merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
-									}else if value == 10 {
+									} else if value == 10 {
 										merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
-									}else if value == 11 {
+									} else if value == 11 {
 										merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
-									}else {
+									} else {
 									}
 								}
 								//模板数据更新
@@ -593,15 +589,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 							}
 						}
 
-
 						//重复数据打标签
 						updateExtract = append(updateExtract, []map[string]interface{}{
 							repeat_idMap,
 							map[string]interface{}{
 								"$set": map[string]interface{}{
-									"repeat": 1,
+									"repeat":        1,
 									"repeat_reason": reason,
-									"repeat_id":repeat_id,
+									"repeat_id":     repeat_id,
 								},
 							},
 						})
@@ -611,14 +606,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 			}
 		}(tmp)
 		if len(updateExtract) > 500 {
-			mgo.UpdateBulk(extract, updateExtract...)
+			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
 	if len(updateExtract) > 0 {
-		mgo.UpdateBulk(extract, updateExtract...)
+		mgo.UpSertBulk(extract, updateExtract...)
 		//mgo.UpdateBulk(bidding, updateBidding...)
 	}
 	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
@@ -647,14 +642,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 }
 
 //合并字段-并更新merge字段的值
-func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
+func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
 
-	merge_recordMap := make(map[string]interface{},0)
+	merge_recordMap := make(map[string]interface{}, 0)
 	mergeArr := make([]int64, 0)
 	//是否替换数据了-记录原始的数据
-	is_replace :=false
+	is_replace := false
 	//1、城市
-	if source.area == "" || source.area == "全国"{
+	if source.area == "" || source.area == "全国" {
 		//为空
 		if info.area != "全国" && info.area != "" {
 			merge_recordMap["area"] = source.area
@@ -664,7 +659,7 @@ func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
 			mergeArr = append(mergeArr, 1)
 			is_replace = true
 		}
-	}else {
+	} else {
 		//不为空-查看站点相关-有值必替换
 		if source.is_site {
 			//是站点替换的城市
@@ -749,21 +744,17 @@ func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
 		is_replace = true
 	}
 
-
-
-
-	if is_replace {//有过替换更新
+	if is_replace { //有过替换更新
 		//总次数+1
-		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"])+1
+		source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
 		merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
 		//和哪一个数据id进行非空替换的-记录
-		key:=info.id
+		key := info.id
 		source.mergemap[key] = merge_recordMap
 	}
 
-
 	//以上合并过于简单,待进一步优化
-	return source, mergeArr,is_replace
+	return source, mergeArr, is_replace
 }
 
 //权重评估
@@ -867,7 +858,7 @@ func basicDataScore(v *Info, info *Info) bool {
 	if v.buyer != "" {
 		m++
 	}
-	if v.projectcode != ""||v.contractnumber != "" {
+	if v.projectcode != "" || v.contractnumber != "" {
 		m++
 	}
 	if v.budget != 0 {
@@ -898,7 +889,7 @@ func basicDataScore(v *Info, info *Info) bool {
 	if info.buyer != "" {
 		n++
 	}
-	if info.projectcode != "" || info.contractnumber != ""{
+	if info.projectcode != "" || info.contractnumber != "" {
 		n++
 	}
 	if info.budget != 0 {
@@ -951,7 +942,7 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
 	if d4 != "" {
 		n++
 	}
- 	if n == 0 {
+	if n == 0 {
 		return true
 	}
 	return false

+ 315 - 0
udpfilterdup/src/mgo.go

@@ -0,0 +1,315 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}