Эх сурвалжийг харах

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 жил өмнө
parent
commit
0c75260d3b

+ 46 - 46
fullproject/src_v1/init.go

@@ -18,10 +18,10 @@ const (
 )
 
 var (
-	Sysconfig                map[string]interface{} //读取配置文件
-	MongoTool                *MongodbSim            //mongodb连接
-	ExtractColl, ProjectColl, BackupColl, SiteColl string	//抽取表、项目表、项目快照表、站点表
-	Thread					 int				    //配置项线程数
+	Sysconfig                                      map[string]interface{} //读取配置文件
+	MongoTool                                      *MongodbSim            //mongodb连接
+	ExtractColl, ProjectColl, BackupColl, SiteColl string                 //抽取表、项目表、项目快照表、站点表
+	Thread                                         int                    //配置项线程数
 	//NextNode                 []interface{}
 )
 
@@ -63,7 +63,7 @@ func init() {
 
 	ExtractColl = Sysconfig["extractColl"].(string)
 	ProjectColl = Sysconfig["projectColl"].(string)
-	BackupColl = Sysconfig["projectColl"].(string)+"_back"
+	BackupColl = Sysconfig["projectColl"].(string) + "_back"
 	SiteColl = Sysconfig["siteColl"].(string)
 	Thread = util.IntAll(Sysconfig["thread"])
 	//NextNode = Sysconfig["nextNode"].([]interface{})
@@ -222,27 +222,27 @@ func NewKeyMap() *KeyMap {
 
 //招标信息实体类
 type Info struct {
-	Id          string                 `json:"_id"`
-	Href        string                 `json:"href"` //源地址
-	Publishtime int64                  `json:"publishtime"`
-	Comeintime  int64				   `json:"comeintime"`
-	Title       string                 `json:"title"`
-	TopType     string                 `json:"toptype"`
-	SubType     string                 `json:"subtype"`
-	ProjectName string                 `json:"projectname"`
-	ProjectCode string                 `json:"projectcode"`
-	ProjectScope string				  	`json:"projectscope"`
-	ContractCode string				   `json:"contractcode"`
-	Buyer       string                 `json:"buyer"`
-	Buyerperson string                 `json:"buyerperson"`
-	Buyertel    string                 `json:"buyertel"`
-	Agency      string                 `json:"agency"`
-	Area        string                 `json:"area"`
-	City        string                 `json:"city"`
-	District    string                 `json:"district"`
-	Infoformat  int					   `json:"infoformat"`
-	HasPackage  bool                   // `json:"haspackage"`
-	Package     map[string]interface{} `json:"package"`
+	Id           string                 `json:"_id"`
+	Href         string                 `json:"href"` //源地址
+	Publishtime  int64                  `json:"publishtime"`
+	Comeintime   int64                  `json:"comeintime"`
+	Title        string                 `json:"title"`
+	TopType      string                 `json:"toptype"`
+	SubType      string                 `json:"subtype"`
+	ProjectName  string                 `json:"projectname"`
+	ProjectCode  string                 `json:"projectcode"`
+	ProjectScope string                 `json:"projectscope"`
+	ContractCode string                 `json:"contractcode"`
+	Buyer        string                 `json:"buyer"`
+	Buyerperson  string                 `json:"buyerperson"`
+	Buyertel     string                 `json:"buyertel"`
+	Agency       string                 `json:"agency"`
+	Area         string                 `json:"area"`
+	City         string                 `json:"city"`
+	District     string                 `json:"district"`
+	Infoformat   int                    `json:"infoformat"`
+	HasPackage   bool                   // `json:"haspackage"`
+	Package      map[string]interface{} `json:"package"`
 	//PNum          string                 `json:"pnum"`
 	Topscopeclass []string `json:"topscopeclass"`
 	Subscopeclass []string `json:"subscopeclass"`
@@ -274,10 +274,10 @@ type ProjectInfo struct {
 	Ids           []string           `json:"ids,omitempty"`
 	Topscopeclass []string           `json:"topscopeclass,omitempty"`
 	Subscopeclass []string           `json:"subscopeclass,omitempty"` //子行业分类
-	Winners       []string           `json:"s_winner,omitempty"`       //中标人
+	Winners       []string           `json:"s_winner,omitempty"`      //中标人
 	ProjectName   string             `json:"projectname,omitempty"`   //项目名称
 	ProjectCode   string             `json:"projectcode,omitempty"`   //项目代码唯一(纯数字的权重低)
-	ContractCode  string			 `json:"contractcode,omitempty"`  //项目编号
+	ContractCode  string             `json:"contractcode,omitempty"`  //项目编号
 	Buyer         string             `json:"buyer,omitempty"`         //采购单位唯一
 	MPN           []string           `json:"mpn,omitempty"`           //合并后多余的项目名称
 	MPC           []string           `json:"mpc,omitempty"`           //合并后多余的项目编号
@@ -287,43 +287,43 @@ type ProjectInfo struct {
 	Area          string             `json:"area"`                    //地区
 	City          string             `json:"city"`                    //地市
 	District      string             `json:"district"`                //区县
-	Bidstatus     string			 `json:"bidstatus"`				  //
-	Bidtype		  string			 `json:"bidtype"`				  //
+	Bidstatus     string             `json:"bidstatus"`               //
+	Bidtype       string             `json:"bidtype"`                 //
 	//HasPackage    bool                   `json:"haspackage"`              //是否有分包
 	Package     map[string]interface{} `json:"package,omitempty"`     //分包的对比对象
 	Buyerclass  string                 `json:"buyerclass"`            //采购单位分类
 	Bidopentime int64                  `json:"bidopentime,omitempty"` //开标时间
 	//	Zbtime        int64                  `json:"zbtime"`        //招标时间
-	Jgtime        int64              `json:"jgtime"`        //结果中标时间
+	Jgtime    int64   `json:"jgtime"`              //结果中标时间
 	Bidamount float64 `json:"bidamount,omitempty"` //中标金额
 	Budget    float64 `json:"budget,omitempty"`    //预算
 	//Winnerorder []string `json:"winnerorder"` //中标候选人
 	score         int
 	comStr        string
 	resVal, pjVal int
-	InfoFiled	map[string]InfoField	`json:"infofiled"`			//逻辑处理需要的info字段
-	Budgettag		int					`json:"budgettag"`			//预算是否有效标记
-	Bidamounttag	int					`json:"bidamounttag"`		//中标金额是否有效标记
+	InfoFiled     map[string]InfoField `json:"infofiled"`    //逻辑处理需要的info字段
+	Budgettag     int                  `json:"budgettag"`    //预算是否有效标记
+	Bidamounttag  int                  `json:"bidamounttag"` //中标金额是否有效标记
 }
 
 //存储部分招标信息字段,业务逻辑处理需要
 type InfoField struct {
-	Budget			float64				`json:"budget"`
-	Bidamount		float64				`json:"bidamount"`
-	ContractCode	string				`json:"contractcode"`
-	ProjectName		string				`json:"projectname"`
-	ProjectCode		string				`json:"projectcode"`
-	Bidstatus		string				`json:"bidstatus"`
+	Budget       float64 `json:"budget"`
+	Bidamount    float64 `json:"bidamount"`
+	ContractCode string  `json:"contractcode"`
+	ProjectName  string  `json:"projectname"`
+	ProjectCode  string  `json:"projectcode"`
+	Bidstatus    string  `json:"bidstatus"`
 }
 
 //站点信息
 type Site struct {
-	Id			string				`json:"_id"`
-	Site		string				`json:"site"`			//站点名字
-	Area 		string				`json:"area"`			//省
-	City		string				`json:"city"`			//市
-	District	string				`json:"district"`		//区、县
-	Domain		string				`json:"domain"`			//地址
+	Id       string `json:"_id"`
+	Site     string `json:"site"`     //站点名字
+	Area     string `json:"area"`     //省
+	City     string `json:"city"`     //市
+	District string `json:"district"` //区、县
+	Domain   string `json:"domain"`   //地址
 }
 
 //二分字符串查找

+ 22 - 9
fullproject/src_v1/load_data.go

@@ -3,6 +3,7 @@ package main
 import (
 	"encoding/json"
 	"log"
+	"qfw/util"
 	"time"
 )
 
@@ -84,15 +85,7 @@ func (p *ProjectTask) loadData(starttime int64) {
 				bys, _ := json.Marshal(result)
 				var tmp *ProjectInfo
 				_ = json.Unmarshal(bys, &tmp)
-				tmpMap := make(map[string]InfoField)
-				infoMap := result["infofield"].(map[string]interface{})
-				for _, v := range infoMap{
-					var field InfoField
-					b, _ := json.Marshal(v)
-					_ = json.Unmarshal(b, &field)
-					tmpMap[tmp.Id.Hex()] = field
-				}
-				tmp.InfoFiled = tmpMap
+				saveFiled(p, result, tmp)
 				pool <- tmp
 			}(result)
 		} else {
@@ -148,3 +141,23 @@ func (p *ProjectTask) loadSite() {
 	log.Println("load site over..", n)
 
 }
+
+func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectInfo) {
+	if jsonData, ok := res["jsondata"].(map[string]interface{}); ok {
+		proHref := util.ObjToString(jsonData["projecthref"])
+		if jsonData != nil && proHref != "" {
+			p.mapHrefLock.Lock()
+			p.mapHref[proHref] = tmp.Id.Hex()
+			p.mapHrefLock.Unlock()
+		}
+	}
+	tmpMap := make(map[string]InfoField)
+	infoMap := res["infofield"].(map[string]interface{})
+	for _, v := range infoMap{
+		var field InfoField
+		b, _ := json.Marshal(v)
+		_ = json.Unmarshal(b, &field)
+		tmpMap[tmp.Id.Hex()] = field
+	}
+	tmp.InfoFiled = tmpMap
+}

+ 10 - 12
fullproject/src_v1/main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"flag"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -16,10 +15,11 @@ import (
 var (
 	udpclient    mu.UdpClient //udp对象
 	SingleThread = make(chan bool, 1)
+	SingleClear  = 0
 	toaddr       = []*net.UDPAddr{} //下节点对象
 	ChSign       = make(chan os.Signal)
 
-	sid, eid string 	//测试使用
+	sid, eid string //测试使用
 )
 
 func init() {
@@ -58,7 +58,7 @@ func DealSign() {
 	}
 }
 
-func mainT() {
+func main() {
 	//udp跑增量  id段   project
 	//udp跑全量			ql
 	//udp跑历史数据  信息id1,id2/或id段  ls
@@ -77,13 +77,12 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
-	//sid = "5649a0fcaf5374672e005704"
-	//eid = "5e169e5250b5ea296ec896f0"
-
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
-	flag.Parse()
+func mainT() {
+	sid = "5d18eca4a5cb26b9b7c7f587"
+	eid = "5e381b7650b5ea296ed16e51"
+	//flag.StringVar(&sid, "sid", "", "开始id")
+	//flag.StringVar(&eid, "eid", "", "结束id")
+	//flag.Parse()
 
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
@@ -108,7 +107,6 @@ func main() {
 	time.Sleep(20 * time.Second)
 }
 
-
 //udp调用信号
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
@@ -147,7 +145,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					P_QL.pici = time.Now().Unix()
 					P_QL.taskZl(mapInfo)
 				}()
-			case "updateInfo":		//招标字段变更
+			case "updateInfo": //招标字段变更
 				go func() {
 					defer func() {
 						<-SingleThread

+ 72 - 66
fullproject/src_v1/project.go

@@ -420,7 +420,6 @@ var FIELDS = []string{
 	"buyertel",
 	"winner",
 	"agency",
-	"projectscope",
 	"topscopeclass",
 	"subscopeclass",
 	"winnerorder",
@@ -429,11 +428,11 @@ var FIELDS = []string{
 
 var bidtype = map[string]string{
 	"招标": "招标",
-	"邀标":	"邀标",
-	"询价":	"询价",
-	"单一":	"单一",
-	"竞价":	"竞价",
-	"竞谈":	"竞谈",
+	"邀标": "邀标",
+	"询价": "询价",
+	"单一": "单一",
+	"竞价": "竞价",
+	"竞谈": "竞谈",
 }
 
 var bidstatus = map[string]string{
@@ -479,25 +478,31 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	bt := qu.ObjToString(tmp["toptype"])
 	bs := qu.ObjToString(tmp["subtype"])
 	p.mapBidLock.Lock()
-	set["bidtype"] = bidtype[bs]
-	if bt == "招标" {
-		set["projectscope"] = qu.ObjToString(tmp["projectscope"])
-		set["bidstatus"] = bt
+	if thisinfo.Infoformat == 2 {
+		set["bidstatus"] = "拟建"
+		bt = "拟建"
 	}else {
-		if bidstatus[bs] != "" {
-			set["bidstatus"] = thisinfo.SubType
-		} else if tmp["infoformat"] == 2 {
-			set["bidstatus"] = "拟建"
-		}else if bs == "" {
-			set["bidstatus"] =  ""
+		set["bidtype"] = bidtype[bs]
+		if bt == "招标" {
+			set["projectscope"] = qu.ObjToString(tmp["projectscope"])
+			set["bidstatus"] = bt
 		} else {
-			set["bidstatus"] = "其它"
+			if bidstatus[bs] != "" {
+				set["bidstatus"] = thisinfo.SubType
+				bt = thisinfo.SubType
+			} else if bs == "" {
+				set["bidstatus"] = ""
+				bt = ""
+			} else {
+				set["bidstatus"] = "其它"
+				bt = "其它"
+			}
 		}
 	}
 	p.mapBidLock.Unlock()
 
 	pkg := PackageFormat(thisinfo, nil)
-	p1 := p.NewCachePinfo(pId, thisinfo, bt, pkg)
+	p1 := p.NewCachePinfo(pId, thisinfo, bs, bt, pkg)
 
 	now := time.Now().Unix()
 	set["createtime"] = now
@@ -512,8 +517,10 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 			set["zbtime"] = tmp["publishtime"]
 		}
 	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
-		set["jgtime"] = tmp["publishtime"]
-		p1.Jgtime = thisinfo.Publishtime
+		if thisinfo.Infoformat != 2 {
+			set["jgtime"] = tmp["publishtime"]
+			p1.Jgtime = thisinfo.Publishtime
+		}
 	}
 
 	if len(thisinfo.Subscopeclass) > 0 {
@@ -524,7 +531,7 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 		set["budget"] = thisinfo.Budget
 		p1.Budgettag = 0
 		set["budgettag"] = 0
-	}else {
+	} else {
 		p1.Budgettag = 1
 		set["budgettag"] = 1
 	}
@@ -532,7 +539,7 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 		set["bidamount"] = thisinfo.Bidamount
 		p1.Bidamounttag = 0
 		set["bidamounttag"] = 0
-	}else {
+	} else {
 		p1.Bidamounttag = 1
 		set["bidamounttag"] = 1
 	}
@@ -630,7 +637,7 @@ func (p *ProjectTask) PushListInfo(tmp map[string]interface{}, infoid string) bs
 }
 
 //生成存放在内存中的对象
-func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidtype string, pkg map[string]interface{}) ProjectInfo {
+func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidtype, bidstatus string, pkg map[string]interface{}) ProjectInfo {
 	p1 := ProjectInfo{
 		Id:            id,
 		Ids:           []string{thisinfo.Id},
@@ -654,9 +661,9 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 		Budget:        thisinfo.Budget,
 		Package:       pkg,
 		Bidamount:     thisinfo.Bidamount,
-		Bidstatus:     thisinfo.SubType,
+		Bidstatus:     bidstatus,
 		Bidtype:       bidtype,
-		Winners: 	   thisinfo.Winners,
+		Winners:       thisinfo.Winners,
 	}
 	if thisinfo.LenPTC > 5 {
 		p1.MPC = append(p1.MPC, thisinfo.PTC)
@@ -683,14 +690,14 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	pInfo.LastTime = thisinfo.Publishtime
 	set["lasttime"] = thisinfo.Publishtime
 	if thisinfo.TopType == "招标" {
-		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" {
+		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" && tmp["zbtime"] == nil {
 			set["zbtime"] = tmp["publishtime"]
 		}
 		if pInfo.Jgtime > 0 {
 			pInfo.Jgtime = int64(0)
 			set["jgtime"] = int64(0)
 		}
-	}else if thisinfo.TopType == "结果" {
+	} else if thisinfo.TopType == "结果" {
 		pInfo.Jgtime = thisinfo.Publishtime
 		set["jgtime"] = thisinfo.Publishtime
 	} else if thisinfo.SubType == "合同" {
@@ -718,19 +725,19 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		}
 		set["bidstatus"] = bt
 		pInfo.Bidstatus = bt
-	}else {
+	} else {
 		if bidstatus[bs] != "" {
 			set["bidstatus"] = thisinfo.SubType
 			pInfo.Bidstatus = thisinfo.SubType
-		} else if tmp["infoformat"] == 2 {
+		} else if thisinfo.Infoformat == 2 {
 			set["bidstatus"] = "拟建"
 			pInfo.Bidstatus = "拟建"
-		}else if bs == "" {
-			set["bidstatus"] =  ""
+		} else if bs == "" {
+			set["bidstatus"] = ""
 			pInfo.Bidstatus = ""
-		}else {
+		} else {
 			set["bidstatus"] = "其它"
-			pInfo.Bidstatus =  "其它"
+			pInfo.Bidstatus = "其它"
 		}
 	}
 	p.mapBidLock.Unlock()
@@ -780,7 +787,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["buyerclass"] = ""
 	}
 	if thisinfo.ContractCode != "" {
-		set["contractcode"] = pInfo.ContractCode + ","+thisinfo.ContractCode
+		set["contractcode"] = pInfo.ContractCode + "," + thisinfo.ContractCode
 	}
 
 	//8--代理机构
@@ -849,13 +856,13 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	if pInfo.Budget >= 0 && pInfo.Budgettag != 1 {
 		set["budget"] = pInfo.Budget
 		set["budgettag"] = 0
-	}else {
+	} else {
 		set["budgettag"] = 1
 	}
 	if pInfo.Bidamount >= 0 && pInfo.Bidamounttag != 1 {
 		set["bidamount"] = pInfo.Bidamount
 		set["bidamounttag"] = 0
-	}else {
+	} else {
 		set["bidamounttag"] = 1
 	}
 
@@ -870,7 +877,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	copyMap := Copy(pInfo.InfoFiled).(map[string]InfoField)
 	copyMap[thisinfo.Id] = infofiled
 	tmpMap := make(map[string]interface{})
-	for k, v := range copyMap{
+	for k, v := range copyMap {
 		tmpMap[k] = StructToMap(v)
 	}
 	tmpMap[thisinfo.Id] = StructToMap(infofiled)
@@ -908,7 +915,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 /**
  *	更新项目时,项目状态的处理
  *	返回是否新增项目,异常标记
- *	1、项目时,新项目时,招标信息的状态(toptype)不是招标、拟建、预告	异常:1
+ *	1、新项目时,招标信息的状态(toptype)不是招标、拟建、预告	异常:1
  *	   异常1是在项目新建的时候才会产生
  *	3、项目合并时,项目状态是”流标“/”废标“,招标信息状态不是”招标“		异常:2
  *	4、项目合并时,项目状态是”合同“/”其它“,招标信息类型是”结果“		异常:3
@@ -917,17 +924,17 @@ func (p *ProjectTask) CompareStatus(project *ProjectInfo, info *Info) (bool, int
 	if info.TopType == "拟建" || info.TopType == "预告" || info.TopType == "招标" {
 		if project.Bidstatus == "拟建" || project.Bidstatus == "预告" || project.Bidstatus == "招标" {
 			return false, 0
-		}else if project.Bidstatus == "废标" || project.Bidstatus == "流标" {
+		} else if project.Bidstatus == "废标" || project.Bidstatus == "流标" {
 			return false, 0
-		}else {
+		} else {
 			return true, 0
 		}
 	} else if info.TopType == "结果" {
 		if project.Bidstatus == "拟建" || project.Bidstatus == "预告" || project.Bidstatus == "招标" {
 			return false, 0
-		}  else if project.Bidstatus == info.SubType {
+		} else if project.Bidstatus == info.SubType {
 			//状态一样,根据发布时间判断是否合并
-			if (info.Publishtime - project.FirstTime) > p.statusTime {
+			if (info.Publishtime - project.LastTime) > p.statusTime {
 				return true, 0
 			} else {
 				return false, 0
@@ -1048,29 +1055,28 @@ func PackageFormat(info *Info, project *ProjectInfo) map[string]interface{} {
 func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 	if info.HasPackage {
 		budget := 0.0
-		for _, v := range project.Package{
+		for _, v := range project.Package {
 			v1, _ := v.([]map[string]interface{})
-			for _, v2 := range v1{
+			for _, v2 := range v1 {
 				if v2["budget"] != nil {
 					b1 := qu.Float64All(v2["budget"])
 					if b1 > 0 {
 						budget = budget + b1
 						break
 					}
-				}else {
+				} else {
 					project.Budgettag = 1
 				}
-
 			}
 		}
 		if budget > 0 {
 			project.Budget = budget
 			project.Budgettag = 0
-		}else if budget == 0 && info.Budget > 0 {
+		} else if budget == 0 && info.Budget > 0 {
 			project.Budget = info.Budget
 			project.Budgettag = 0
 		}
-	}else {
+	} else {
 		//招标没有多包
 		k := KeyPackage.FindStringSubmatch(info.ProjectName)
 		if len(k) > 0 {
@@ -1078,7 +1084,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 			if len(project.Package) > 0 {
 				//项目有多包
 				flag := false
-				for _, v := range project.Package{
+				for _, v := range project.Package {
 					v1, _ := v.([]map[string]interface{})
 					if len(v1) > 0 && v1[0]["name"] == info.ProjectName {
 						flag = true
@@ -1088,18 +1094,18 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 					project.Budget = project.Budget + info.Budget
 					project.Budgettag = 0
 				}
-			}else {
+			} else {
 				//项目没有多包
 				if info.Budget > 0 {
 					project.Budget = project.Budget + info.Budget
 					project.Budgettag = 0
-				}else if info.Budget == 0 && tmp["budget"] != nil {
+				} else if info.Budget == 0 && tmp["budget"] != nil {
 					project.Budgettag = 0
-				}else {
+				} else {
 					project.Budgettag = 1
 				}
 			}
-		}else {
+		} else {
 			if project.Budget < info.Budget {
 				project.Budget = info.Budget
 				project.Budgettag = 0
@@ -1112,16 +1118,16 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 	if info.SubType == "中标" || info.SubType == "成交" || info.SubType == "合同" {
 		if info.HasPackage {
 			bidamount := 0.0
-			for _, v := range project.Package{
+			for _, v := range project.Package {
 				v1, _ := v.([]map[string]interface{})
-				for _, v2 := range v1{
+				for _, v2 := range v1 {
 					if tmp["bidamount"] != nil {
 						b1 := qu.Float64All(v2["bidamount"])
 						if b1 > 0 {
 							bidamount = bidamount + b1
 							break
 						}
-					}else {
+					} else {
 						project.Bidamount = 1
 					}
 				}
@@ -1129,11 +1135,11 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 			if bidamount > 0 {
 				project.Bidamount = bidamount
 				project.Bidamounttag = 0
-			}else if bidamount == 0 && info.Budget > 0 {
+			} else if bidamount == 0 && info.Budget > 0 {
 				project.Bidamount = info.Bidamount
 				project.Bidamounttag = 0
 			}
-		}else {
+		} else {
 			//招标没有多包
 			k := KeyPackage.FindStringSubmatch(info.ProjectName)
 			if len(k) > 0 {
@@ -1141,7 +1147,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 				if len(project.Package) > 0 {
 					//项目有多包
 					flag := false
-					for _, v := range project.Package{
+					for _, v := range project.Package {
 						v1, _ := v.([]map[string]interface{})
 						if len(v1) > 0 && v1[0]["name"] == info.ProjectName {
 							flag = true
@@ -1151,23 +1157,23 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 						project.Bidamount = project.Bidamount + info.Bidamount
 						project.Bidamounttag = 0
 					}
-				}else {
+				} else {
 					//项目没有多包
 					if info.Bidamount > 0 {
 						project.Bidamount = project.Bidamount + info.Bidamount
 						project.Bidamounttag = 0
-					}else if info.Bidamount == 0 && tmp["bidamount"] != nil {
+					} else if info.Bidamount == 0 && tmp["bidamount"] != nil {
 						project.Bidamounttag = 0
-					}else {
+					} else {
 						project.Bidamounttag = 1
 					}
 				}
-			}else {
+			} else {
 				if info.SubType == "中标" || info.SubType == "成交" {
 					if project.Bidamount < info.Bidamount {
 						project.Bidamount = info.Bidamount
 						project.Bidamounttag = 0
-					}else {
+					} else {
 						flag := false
 						if project.InfoFiled != nil && len(project.InfoFiled) > 0 {
 							for _, res := range project.InfoFiled {
@@ -1183,7 +1189,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 							if !flag {
 								project.Bidamount = project.Bidamount + info.Bidamount
 								project.Bidamounttag = 0
-							}else {
+							} else {
 								if project.Bidamount < info.Bidamount {
 									project.Bidamount = info.Bidamount
 									project.Bidamounttag = 0
@@ -1197,7 +1203,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 				}
 			}
 		}
-	}else {
+	} else {
 		project.Bidamounttag = 1
 	}
 

+ 46 - 21
fullproject/src_v1/task.go

@@ -58,9 +58,9 @@ type ProjectTask struct {
 	//当前时间
 	currentTime int64
 	//保存长度
-	saveSize  int
-	pici      int64
-	validTime int64
+	saveSize   int
+	pici       int64
+	validTime  int64
 	statusTime int64
 	//	LockPool     chan *sync.Mutex
 	//	LockPoolLock sync.Mutex
@@ -87,8 +87,8 @@ func NewPT() *ProjectTask {
 
 		//saveSign:   make(chan bool, 1),
 		//updateSign: make(chan bool, 1),
-		coll:      ProjectColl,
-		validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
+		coll:       ProjectColl,
+		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
 		statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 7) * 86400),
 	}
 	return p
@@ -96,6 +96,7 @@ func NewPT() *ProjectTask {
 
 var P_QL *ProjectTask
 var sp = make(chan bool, 5)
+
 //初始化全量合并对象
 func init() {
 	P_QL = NewPT()
@@ -143,9 +144,10 @@ func (p *ProjectTask) updateAllQueue() {
 func (p *ProjectTask) clearMem() {
 	c := cron.New()
 	//在内存中保留最近6个月的信息
-	//跑全量时每4分钟跑一次,跑增量时400分钟跑一次
-	_ = c.AddFunc("50 0/15 * * * *", func() {
-		if p.currentType == "ql" || p.clearContimes >= 60 {
+	//跑全量时每5分钟跑一次,跑增量时400分钟跑一次
+	_ = c.AddFunc("50 0/5 * * * *", func() {
+		if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
+			SingleClear = 1
 			//跳过的次数清零
 			p.clearContimes = 0
 			//信息进入查找对比全局锁
@@ -157,9 +159,15 @@ func (p *ProjectTask) clearMem() {
 			//所有内存中的项目信息
 			p.AllIdsMapLock.Lock()
 			p.mapHrefLock.Lock()
-
+			log.Println("清除开始")
 			//清除计数
 			clearNum := 0
+			for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
+				v := p.AllIdsMap[pid]
+				if p.currentTime-v.P.LastTime > p.validTime {
+					delete(p.mapHref, kHref)
+				}
+			}
 			for k, v := range p.AllIdsMap {
 				if p.currentTime-v.P.LastTime > p.validTime {
 					clearNum++
@@ -170,7 +178,7 @@ func (p *ProjectTask) clearMem() {
 						ids := p.mapPb[v.P.Buyer]
 						if ids != nil {
 							ids.Lock.Lock()
-							ids.Arr = deleteSlice(ids.Arr, k)
+							ids.Arr = deleteSlice(ids.Arr, k, "pb")
 							if len(ids.Arr) == 0 {
 								delete(p.mapPb, v.P.Buyer)
 							}
@@ -183,7 +191,7 @@ func (p *ProjectTask) clearMem() {
 							ids := p.mapPn[vn]
 							if ids != nil {
 								ids.Lock.Lock()
-								ids.Arr = deleteSlice(ids.Arr, k)
+								ids.Arr = deleteSlice(ids.Arr, k, "pn")
 								if len(ids.Arr) == 0 {
 									delete(p.mapPn, vn)
 								}
@@ -197,7 +205,7 @@ func (p *ProjectTask) clearMem() {
 							ids := p.mapPc[vn]
 							if ids != nil {
 								ids.Lock.Lock()
-								ids.Arr = deleteSlice(ids.Arr, k)
+								ids.Arr = deleteSlice(ids.Arr, k, "pc")
 								if len(ids.Arr) == 0 {
 									delete(p.mapPc, vn)
 								}
@@ -205,17 +213,13 @@ func (p *ProjectTask) clearMem() {
 							}
 						}
 					}
-					for kHref, pid := range p.mapHref {
-						if pid == k {
-							delete(p.mapHref, kHref)
-						}
-					}
 					v = nil
 				}
 			}
 			p.mapHrefLock.Unlock()
 			p.AllIdsMapLock.Unlock()
 			p.findLock.Unlock()
+			SingleClear = 0
 			log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
 		} else {
 			p.clearContimes++
@@ -334,7 +338,7 @@ func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
 	if q == nil {
 		q = map[string]interface{}{
 			"_id": map[string]interface{}{
-				"$gte":  StringTOBsonId(gtid),
+				"$gte": StringTOBsonId(gtid),
 				"$lte": StringTOBsonId(lteid),
 			},
 			"is_m": 1,
@@ -416,6 +420,7 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 		ms.Hint(Sysconfig["hints"])
 	}
 	query := ms.Iter()
+	//query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
 	var lastid interface{}
 L:
 	for {
@@ -591,6 +596,20 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 	thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
 	thisinfo.LenPTC = len([]rune(thisinfo.PTC))
 	thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
+
+	//处理分包中数据异常问题
+	for k, tmp := range thisinfo.Package {
+		if ps, ok := tmp.([]map[string]interface{}); ok {
+			for i, p := range ps {
+				name, _ := p["name"].(string)
+				if len([]rune(name)) > 100 {
+					p["name"] = fmt.Sprint([]rune(name[:100]))
+				}
+				ps[i] = p
+			}
+			thisinfo.Package[k] = ps
+		}
+	}
 	return thisinfo
 }
 
@@ -718,7 +737,7 @@ func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
 					tmp["city"] = site.City
 					tmp["district"] = site.District
 					return
-				}else if site.City == "" {
+				} else if site.City == "" {
 					return
 				}
 			}
@@ -732,10 +751,16 @@ func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
 }
 
 //从数组中删除元素
-func deleteSlice(arr []string, v string) []string {
+func deleteSlice(arr []string, v, stype string) []string {
 	for k, v1 := range arr {
 		if v1 == v {
-			return append(arr[:k], arr[k+1:]...)
+			ts := time.Now().Unix()
+			arr = append(arr[:k], arr[k+1:]...)
+			rt := time.Now().Unix() - ts
+			if rt > 0 {
+				log.Println("deleteSlice", stype, rt, v, len(arr))
+			}
+			return arr
 		}
 	}
 	return arr

+ 7 - 3
udp_winner/config.json

@@ -1,11 +1,15 @@
 {
   "elasticsearch": "http://127.0.0.1:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
+  "elasticsearch_index": "localhost_winner",
+  "elasticsearch_type": "mytestwinner",
+  "elasticsearch_buyer_index": "localhost_buyer",
+  "elasticsearch_buyer_type": "mytestbuyer",
+  "elasticsearch_agency_index": "localhost_agency",
+  "elasticsearch_agency_type": "mytestagency",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
   "pool_size": "10",
-  "mgoinit": "192.168.3.207:27081",
+  "mgoinit": "127.0.0.1:27017",
   "mgodb_bidding": "qfw",
   "mgodb_mgoinit_c": "bidding",
   "mgodb_enterprise": "enterprise",

+ 40 - 238
udp_winner/main.go

@@ -4,8 +4,8 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
+	hisRedis "github.com/go-redis/redis"
 	"go.mongodb.org/mongo-driver/bson"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
@@ -15,16 +15,16 @@ import (
 	"regexp"
 	"strconv"
 
-	"sort"
 	"strings"
 	"time"
 )
 
 var (
 	Config                                = make(map[string]string)
-	Fields                                []string
+	Fields,BuyerFields,AgencyFields                                []string
 	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
+	HisRedisPool                          *hisRedis.Client
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
 	ElasticClientIndex, ElasticClientType string
@@ -43,9 +43,19 @@ func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
 	log.Println(Config)
-	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address", "capital",
-		"establish_date", "legal_person", "company_type", "district", "city", "province", "area_code", "credit_no",
-		"company_name", "history_name", "topscopeclass", "wechat_accounts", "alias", "website", "report_websites"}
+	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
+		"capital", "establish_date", "legal_person", "company_type",
+		"district", "city", "province", "area_code", "credit_no",
+		"company_name", "history_name", "topscopeclass", "wechat_accounts",
+		"alias", "website", "report_websites"}
+
+	BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
+		"address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
+		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
+
+	AgencyFields = []string{"_id", "contact", "type", "ranks",
+		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
+		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
 	var err error
 	pool_size, _ := strconv.Atoi(Config["pool_size"])
 
@@ -60,7 +70,7 @@ func init() {
 	FClient = new(MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
-	FClient.DbName =Config["mgodb_extract_kf"]
+	FClient.DbName = Config["mgodb_extract_kf"]
 	//mongodbSim.DbName = "qfw"
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
@@ -104,6 +114,16 @@ func init() {
 		log.Fatalln("redis err:", err)
 	}
 	c.Close()
+	HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
+		Addr:         "127.0.0.1:6380",
+		DB:           1,
+		DialTimeout:  10 * time.Second,
+		ReadTimeout:  30 * time.Second,
+		WriteTimeout: 30 * time.Second,
+		PoolSize:     30,
+		MinIdleConns: 20,
+		PoolTimeout:  30 * time.Second,
+	})
 }
 
 func main() {
@@ -114,7 +134,7 @@ func main() {
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
-	go TimedTask() //定时任务
+	go TimedTaskWinner() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -132,242 +152,24 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 			return
 		} else if tmp != nil {
-			if key,ok := (*tmp)["key"].(string);ok{
+			if key, ok := (*tmp)["key"].(string); ok {
 				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-			}else {
+			} else {
 				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
 			}
-			go task(tmp)
-		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
-	}
-}
-func task(mapinfo *map[string]interface{}) {
-	defer util.Catch()
-	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
-	if gtid == "" || lteid == "" {
-		log.Println(gtid, lteid, "参数错误")
-		return
-	}
-	GId, err := primitive.ObjectIDFromHex(gtid)
-	LtId, err2 := primitive.ObjectIDFromHex(lteid)
-	if err != nil || err2 != nil {
-		log.Println(gtid, lteid, "转换_id错误")
-		return
-	}
-	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
-	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
-	// (area地区-province省份 city城市-city城市 district区县-district区县)
-	// winneraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn()
-	defer SourceClient.DestoryMongoConn(SourceClientcc)
-	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
-		"_id": bson.M{
-			"$gte": GId,
-			"$lte": LtId,
-		},
-	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}).Iter()
-	if cursor == nil {
-		log.Println(cursor)
-		return
-	}
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(primitive.ObjectID).Hex()
-		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
-			continue
-		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
-				log.Println("FClient.Save err",tmpid)
-			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			continue
-		} else {
-			//log.Println("redis get :", reply)
-			//redis存在
-			//log.Println(reply)
-			//reply = "5e0316b998a9abaf6535df3d"
-			//id, err := primitive.ObjectIDFromHex(reply)
-			//if err != nil {
-			//	log.Println("get redis id  Hex err:", err, tmp)
-			//	if err := rdb.Close(); err != nil {
-			//		log.Println(err)
-			//	}
-			//	continue
-			//}
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
-			if oldTmp == nil{
-				log.Println("redis id 不存在")
-				continue
-			}
-			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
-			//if err != nil {
-			//	log.Println("qyk id err:", err, id)
-			//	continue
-			//}
-			//比较合并
-			//行业类型
-			tmpTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+			//data_type:winner data_type:buyer data_type:agency
+			//data_info:save//存量   data_info:add //增量
+			if key, ok := (*tmp)["data_type"].(string); ok {
+				if key == "winner" {
+					go TaskWinner(tmp)
+				} else if key == "buyer" {
+
+				} else if key == "agency" {
 
-			if oldTmp["industry"] == nil {
-				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						}
-					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
-				}
-			} else {
-				if v, ok := oldTmp["industry"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
-						}
-					}
-				}
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						}
-					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
-				}
-			}
-			sort.Strings(tmpTopscopeclass)
-			oldTmp["industry"] = tmpTopscopeclass
-			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				oldTmp["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName =Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-					log.Println("mongo更新err",esId)
-				}
-				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-				//	log.Println("mongo更新err:", err)
-				//}
-				//es更新
-				delete(oldTmp, "_id")
-				//esConn := elastic.GetEsConn()
-				//defer elastic.DestoryEsConn(esConn)
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
-				}
-				//log.Println( err2,err3)
-				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			tmpperson = tmp["winnerperson"].(string)
-			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
-				winnertel = ""
-			} else {
-				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
-					winnertel = ""
-				} else {
-					winnertel = util.ObjToString(tmp["winnertel"])
-				}
-			}
-			contactMaps := make([]interface{}, 0)
-			if oldTmp["contact"] == nil {
-				tmpContact := make(map[string]interface{})
-				tmpContact["contact_person"] = tmpperson
-				tmpContact["contact_type"] = "项目联系人"
-				tmpContact["phone"] = winnertel
-				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-				tmpContact["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, tmpContact)
-			} else {
-				//对比前四项,相等丢弃
-				if v, ok := oldTmp["contact"].(primitive.A); ok {
-					var isNotUpdate bool
-					for _, vv := range v {
-						if vvv, ok := vv.(map[string]interface{}); ok {
-							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-								isNotUpdate = true
-								vvv["updatetime"] = time.Now().Unix()
-							}
-							contactMaps = append(contactMaps, vvv)
-						}
-					}
-					if !isNotUpdate {
-						vvv := make(map[string]interface{})
-						vvv["contact_person"] = tmp["winnerperson"]
-						vvv["contact_type"] = "项目联系人"
-						vvv["phone"] = winnertel
-						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, vvv)
-					}
 				}
 			}
-			oldTmp["contact"] = contactMaps
-			//mongo更新
-			oldTmp["updatatime"] = time.Now().Unix()
-			FClient.DbName=Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-				log.Println("mongo更新 err",esId,oldTmp)
-			}
-			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-			//	log.Println("mongo更新 err :", err)
-			//}
-			//es更新
-			delete(oldTmp, "_id")
-			//esConn := elastic.GetEsConn()
-			//defer elastic.DestoryEsConn(esConn)
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
-			//log.Println( err2,err3)
 		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("发送成功", string(data))
 	}
-	//defer cursor.Close(context.Background())
-	//log.Println("合并执行完成", gtid, lteid, overid)
-	//if overid != lteid {
-	//	by, _ := json.Marshal(map[string]interface{}{
-	//		"gtid":  overid,
-	//		"lteid": lteid,
-	//		"stype": "",
-	//	})
-	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-	//		IP:   net.ParseIP("127.0.0.1"),
-	//		Port: Updport,
-	//	}); e != nil {
-	//		log.Println(e)
-	//	}
-	//	log.Println("重新发送udp:", string(by))
-	//	return
-	//}
-	log.Println("合并执行完成 ok", gtid, lteid, overid)
-
 }

+ 0 - 268
udp_winner/timedTask.go

@@ -1,268 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"github.com/garyburd/redigo/redis"
-	"go.mongodb.org/mongo-driver/bson/primitive"
-	"gopkg.in/mgo.v2/bson"
-	"log"
-	mu "mfw/util"
-	"net"
-	"sort"
-	"strings"
-	"time"
-)
-
-//定时任务
-//1.存异常表
-//2.合并原始库新增
-func TimedTask() {
-	//time.Sleep(time.Hour*70)
-	t2 := time.NewTimer(time.Second * 5)
-	for range t2.C {
-		Fcconn := FClient.GetMgoConn()
-		defer FClient.DestoryMongoConn(Fcconn)
-		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
-			if !iter.Next(&tmpLast) {
-				//临时表无数据
-				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 5)
-				continue
-			} else {
-				log.Println("临时表有数据:", tmpLast)
-				fconn := FClient.GetMgoConn()
-				defer FClient.DestoryMongoConn(fconn)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
-					"_id": bson.M{
-						"$lte": tmpLast["_id"],
-					},
-				}).Sort("_id").Iter()
-				if cursor == nil {
-					log.Println("查询失败")
-					t2.Reset(time.Second * 5)
-					continue
-				}
-				//遍历临时表数据,匹配不到原始库存入异常表
-				tmp := make(map[string]interface{})
-				for cursor.Next(&tmp) {
-					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
-					//再重新查找redis,存在发udp处理,不存在走新增合并
-					rdb := RedisPool.Get()
-					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
-						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
-						//redis存在发送udp进行处理
-						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-							"stype": "",
-						})
-						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP("127.0.0.1"),
-							Port: Updport,
-						}); e != nil {
-							log.Println(e)
-						}
-						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
-							log.Println("删除临时表err:", DeletedCount)
-						}
-						if err := rdb.Close(); err != nil {
-							log.Println(err)
-						}
-						continue
-					} else {
-						if err = rdb.Close(); err != nil {
-							log.Println(err)
-						}
-					}
-					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
-					if resulttmp != nil {
-						//log.Println(r)
-						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
-							log.Println("存入异常表错误", tmp)
-						}
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
-							log.Println("删除临时表错误", deleteNum)
-						}
-						continue
-					} else {
-						//log.Println(123)
-						//匹配到原始库,新增 resulttmp
-						if resulttmp["credit_no"] != nil {
-							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
-								len(strings.TrimSpace(credit_no)) > 8 {
-								dataNo := strings.TrimSpace(credit_no)[2:8]
-								if Addrs[dataNo] != nil {
-									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if resulttmp["province"] == nil || resulttmp["province"] == "" {
-											resulttmp["province"] = v["province"]
-										}
-										resulttmp["city"] = v["city"]
-										resulttmp["district"] = v["district"]
-
-									}
-								}
-							}
-						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if resulttmp["legal_person"] != nil {
-							contact["contact_person"] = resulttmp["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if resulttmp["annual_reports"] != nil {
-							bytes, err := json.Marshal(resulttmp["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
-							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
-									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
-									}
-								} else {
-									contact["phone"] = "" //联系电话
-								}
-
-							}
-						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contacts = append(contacts, contact)
-						resulttmp["contact"] = contacts
-
-						savetmp := make(map[string]interface{}, 0)
-						for _, sk := range Fields {
-							if sk == "establish_date" {
-								if resulttmp[sk] != nil {
-									savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
-									continue
-								}
-							} else if sk == "capital" {
-								//log.Println(sk, resulttmp[sk])
-								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
-								continue
-							} else if sk == "partners" {
-								//log.Println(sk, resulttmp[sk], )
-								//fmt.Println(reflect.TypeOf(resulttmp[sk]))
-								if resulttmp[sk] != nil {
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
-										for i, _ := range ppms {
-											if ppms[i].(map[string]interface{})["stock_type"] != nil {
-												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
-											}
-											delete(ppms[i].(map[string]interface{}), "identify_type")
-										}
-										savetmp[sk] = ppms
-
-									}
-								} else {
-									savetmp[sk] = []interface{}{}
-								}
-								continue
-							} else if sk == "_id" {
-								savetmp["tmp"+sk] = resulttmp[sk]
-								continue
-							} else if sk == "area_code" {
-								//行政区划代码
-								savetmp[sk] = fmt.Sprint(resulttmp[sk])
-								continue
-							} else if sk == "report_websites" {
-								//网址
-								if resulttmp["report_websites"] == nil {
-									savetmp["website"] = ""
-								} else {
-									report_websitesArr := []string{}
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
-										for _, v := range ppms {
-											if vvv, ok := v.(map[string]interface{}); ok {
-												if rv, ok := vvv["website_url"].(string); ok {
-													report_websitesArr = append(report_websitesArr, rv)
-												}
-											}
-										}
-									}
-									sort.Strings(report_websitesArr)
-									savetmp["website"] = strings.Join(report_websitesArr, ";")
-								}
-								continue
-							} else if sk == "wechat_accounts" {
-								savetmp[sk] = []interface{}{}
-								continue
-							}
-							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
-								savetmp[sk] = ""
-							} else {
-								savetmp[sk] = resulttmp[sk]
-							}
-						}
-						//tmps = append(tmps, savetmp)
-						savetmp["updatatime"] = time.Now().Unix()
-						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
-						if saveid != nil {
-							//保存redis
-							rc := RedisPool.Get()
-							var _id string
-							if v, ok := saveid.(primitive.ObjectID); ok {
-								_id = v.Hex()
-							}
-							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
-								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
-										log.Println("删除临时表失败", deleteNum)
-									}
-								}
-							}
-						} else {
-							log.Println("save mongo err:", saveid, tmp["_id"])
-						}
-					}
-				}
-			}
-		}
-		t2.Reset(time.Minute)
-	}
-}

+ 605 - 0
udp_winner/timedTaskAgency.go

@@ -0,0 +1,605 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskAgency(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1,
+		"topscopeclass": 1, "agencyaddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
+				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["agency"])
+						continue
+					}
+
+
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+
+
+
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" ||
+						Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, agencytel string
+					tmpperson = tmp["agencyperson"].(string)
+					if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+						agencytel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+							agencytel = ""
+						} else {
+							agencytel = util.ObjToString(tmp["agencytel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = agencytel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["agencyperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = agencytel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
+			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, agencytel string
+				tmpperson = tmp["agencyperson"].(string)
+				if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+					agencytel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+						agencytel = ""
+					} else {
+						agencytel = util.ObjToString(tmp["agencytel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = agencytel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["agencyperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = agencytel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskAgency() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
+
+					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("agency_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("agency_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range AgencyFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="agency_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "agency_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							//保存redis
+							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
+
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["agency_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 665 - 0
udp_winner/timedTaskBuyer.go

@@ -0,0 +1,665 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskBuyer(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
+		"topscopeclass": 1, "buyeraddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
+				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["buyer"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+					//更新buyerclass
+
+
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+
+						//更新buyerclass合并
+						if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+							//无值,不更新
+						}else {
+							//有值
+							var buyerclass_new,buyerclass_old string
+							buyerclass_new = tmp["buyerclass"].(string)
+							buyerclass_old = oldTmp["buyerclass"].(string)
+							if strings.Contains(buyerclass_old, buyerclass_new) {
+								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+							}
+
+						}
+
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, buyertel string
+					tmpperson = tmp["buyerperson"].(string)
+					if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+						buyertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+							buyertel = ""
+						} else {
+							buyertel = util.ObjToString(tmp["buyertel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = buyertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["buyerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = buyertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						//有值
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if strings.Contains(buyerclass_old, buyerclass_new) {
+							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						}
+
+					}
+
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
+			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+
+
+
+
+				//更新行业类型 buyerclass合并
+				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						//有值
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if strings.Contains(buyerclass_old, buyerclass_new) {
+							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						}
+
+					}
+
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, buyertel string
+				tmpperson = tmp["buyerperson"].(string)
+				if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+					buyertel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+						buyertel = ""
+					} else {
+						buyertel = util.ObjToString(tmp["buyertel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = buyertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["buyerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = buyertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+
+				//更新buyerclass合并
+				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+					//无值,不更新
+				}else {
+					//有值
+					var buyerclass_new,buyerclass_old string
+					buyerclass_new = tmp["buyerclass"].(string)
+					buyerclass_old = oldTmp["buyerclass"].(string)
+					if strings.Contains(buyerclass_old, buyerclass_new) {
+						oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+					}
+
+				}
+
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskBuyer() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
+					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("buyer_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					//qyxy 企业库 两亿条
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("buyer_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range BuyerFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="buyer_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "buyer_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["buyer_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 641 - 0
udp_winner/timedTaskWinner.go

@@ -0,0 +1,641 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskWinner(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
+		"topscopeclass": 1, "winneraddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
+			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
+				if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["winner"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if oldTmp["industry"] == nil {
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
+						}
+					} else {
+						if v, ok := oldTmp["industry"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok {
+									tmpTopscopeclassMap[vvv] = true
+								}
+							}
+						}
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+					oldTmp["industry"] = tmpTopscopeclass
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, winnertel string
+					tmpperson = tmp["winnerperson"].(string)
+					if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+						winnertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+							winnertel = ""
+						} else {
+							winnertel = util.ObjToString(tmp["winnertel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = winnertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["winnerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = winnertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
+			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if oldTmp["industry"] == nil {
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				} else {
+					if v, ok := oldTmp["industry"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok {
+								tmpTopscopeclassMap[vvv] = true
+							}
+						}
+					}
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+				oldTmp["industry"] = tmpTopscopeclass
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, winnertel string
+				tmpperson = tmp["winnerperson"].(string)
+				if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+					winnertel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+						winnertel = ""
+					} else {
+						winnertel = util.ObjToString(tmp["winnertel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = winnertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["winnerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = winnertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+//定时任务  新增
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskWinner() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
+					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range Fields {
+							if sk == "establish_date" {
+								if resulttmp[sk] != nil {
+									savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
+									continue
+								}
+							} else if sk == "capital" {
+								//log.Println(sk, resulttmp[sk])
+								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
+								continue
+							} else if sk == "partners" {
+								//log.Println(sk, resulttmp[sk], )
+								//fmt.Println(reflect.TypeOf(resulttmp[sk]))
+								if resulttmp[sk] != nil {
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for i, _ := range ppms {
+											if ppms[i].(map[string]interface{})["stock_type"] != nil {
+												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
+											}
+											delete(ppms[i].(map[string]interface{}), "identify_type")
+										}
+										savetmp[sk] = ppms
+
+									}
+								} else {
+									savetmp[sk] = []interface{}{}
+								}
+								continue
+							} else if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 1 - 1
udpfilterdup/src/main.go

@@ -765,7 +765,7 @@ func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
 		source.mergemap[key] = merge_recordMap
 	}
 
-	//以上合并过于简单,待进一步优化
+	//待进一步优化
 	return source, mergeArr, is_replace
 }