Browse Source

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

* 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract:
  1
  修改注释
  buyer agency 增量存量  修改
  审核
  buyer,agency存量修改
  redis db选择  本地测试配置
  备份
  采购单位类型更新
  buyer agency 存量
  增量
  定时任务完善
  采购单位 , 招标代理机构 ,
  存量处理
Jianghan 5 years ago
parent
commit
6303a258e1

+ 3 - 3
src/config.json

@@ -1,9 +1,9 @@
 {
     "port": "9090",
-    "mgodb": "192.168.3.207:27092",
-    "dbsize": 10,
+    "mgodb": "127.0.0.1:27092",
+    "dbsize": 2,
     "dbname": "extract_kf",
-    "redis": "buyer=192.168.3.205:6379,winner=192.168.3.205:6379,agency=192.168.3.205:6379",
+    "redis": "buyer=127.0.0.1:6379,winner=127.0.0.1:6379,agency=127.0.0.1:6379",
     "elasticsearch": "http://127.0.0.1:9800",
     "elasticsearch_index": "winner_enterprise",
     "elasticsearch_type": "winnerent",

+ 6 - 2
src/jy/admin/audit/qiyeku.go

@@ -37,6 +37,7 @@ func init() {
 			c.JSON(200, gin.H{"rep": 400})
 			return
 		}
+		//金额转换
 		capitalfloat := clear.ObjToMoney([]interface{}{capital, ""})[0]
 		e := make(map[string]interface{})
 		e["company_name"] = company_name
@@ -58,6 +59,7 @@ func init() {
 				"capital":         capitalfloat,
 				"company_address": company_address,
 			}}, false, false)
+			//更新es
 			if tmpb {
 				escon := elastic.GetEsConn()
 				defer elastic.DestoryEsConn(escon)
@@ -76,6 +78,7 @@ func init() {
 				}
 			}
 		} else {
+			//不存在直接保存新数据
 			sid = Mgo.Save("enterprise_qyxy", e)
 			delete(e, "_id")
 			escon := elastic.GetEsConn()
@@ -101,6 +104,7 @@ func init() {
 			c.JSON(200, gin.H{"data": []map[string]interface{}{}, "recordsFiltered": 0, "recordsTotal": 0})
 		} else {
 			//log.Println(util.ElasticClientIndex, util.ElasticClientType, search)
+			//查询es
 			escon := elastic.GetEsConn()
 			defer elastic.DestoryEsConn(escon)
 			res, err := escon.Search(util.ElasticClientIndex).
@@ -162,7 +166,7 @@ func init() {
 			c.JSON(200, gin.H{"rep": 400})
 		}
 	})
-	//updateIndustrys
+	//updateIndustrys  更新行业类型
 	Admin.POST("/audit/query_qyk/UpdateIndustrys", func(c *gin.Context) {
 		_id := c.PostForm("_id")
 		industrys := c.PostFormArray("industry")
@@ -184,7 +188,7 @@ func init() {
 			c.JSON(200, gin.H{"rep": 400})
 		}
 	})
-	//updateTels
+	//updateTels  更新联系方式
 	Admin.POST("/audit/query_qyk/UpdateTels", func(c *gin.Context) {
 		_id := c.PostForm("_id")
 		//log.Println(_id)

+ 53 - 137
src/jy/admin/audit/rulemanager.go

@@ -7,7 +7,6 @@ import (
 	. "jy/mongodbutil"
 	ju "jy/util"
 	qu "qfw/util"
-	"regexp"
 	"strings"
 	"time"
 
@@ -25,53 +24,29 @@ func init() {
 	Admin.POST("/rulemanager/saverecogfield", SaveRecogField) //保存
 	Admin.POST("/rulemanager/delrecogfield", DelRecogField)   //删除
 	//class
-	Admin.GET("/rulemanager/getclasslist", func(c *gin.Context) {
-		fname := c.Query("fname")
-		fid := c.Query("id")
-		c.HTML(200, "audit_classlist.html", gin.H{"fname": fname, "fid": fid})
-	})
-	Admin.POST("/rulemanager/getclass", GetClass)           //获取分类
-	Admin.POST("/rulemanager/saveclass", SaveClass)         //保存
-	Admin.POST("/rulemanager/getauditfield", GetAuditField) //获取要审核的字段
-	Admin.POST("/rulemanager/delclass", DelClass)           //删除
-	//Admin.POST("/rulemanager/getclist", GetCList)           //获取父分类
+	//	Admin.GET("/rulemanager/getclasslist", func(c *gin.Context) {
+	//		fname := c.Query("fname")
+	//		fid := c.Query("id")
+	//		c.HTML(200, "audit_classlist.html", gin.H{"fname": fname, "fid": fid})
+	//	})
+	//	Admin.POST("/rulemanager/getclass", GetClass)           //获取分类
+	//	Admin.POST("/rulemanager/saveclass", SaveClass)         //保存
+	//	Admin.POST("/rulemanager/getauditfield", GetAuditField) //获取要审核的字段
+	//	Admin.POST("/rulemanager/delclass", DelClass)           //删除
+
 	//rule
 	Admin.GET("/rulemanager/getrulelist", func(c *gin.Context) {
-		cid := c.Query("id")
-		cname := c.Query("cname")
-		fid := c.Query("fid")
-		//		class, _ := Mgo.FindById("rc_class", cid, `{"_id":1,"s_name":1,"s_pid":1}`)
-		//		//级联查询,往上查
-		//		if class != nil && *class != nil {
-		//			pid := qu.ObjToString((*class)["s_pid"])
-		//			if pid != "" {
-		//				GetParent(pid, "rc_class", class)
-		//			}
-		//		}
-		rule, _ := Mgo.Find("rc_rule", `{"s_classid":"`+cid+`","delete":false}`, `{"i_order":1}`, nil, false, 0, 200)
-		//Finds := map[string]interface{}{}
-		data := map[string]interface{}{}
-		//		for _, r := range *rule {
-		//			rpid := qu.ObjToString(r["s_pid"])
-		//			if rpid != "" {
-		//				if Finds[rpid] != nil {
-		//					r["p"] = Finds[rpid]
-		//				} else {
-		//					GetParent(rpid, "rc_rule", &r)
-		//					Finds[rpid] = r["p"]
-		//				}
-		//			}
-		//		}
-		data["rule"] = rule
-		//data["class"] = class
-		c.HTML(200, "audit_rulelist.html", gin.H{"cid": cid, "cname": cname, "data": data, "fid": fid})
+		fid := c.Query("id")
+		fname := c.Query("fname")
+		c.HTML(200, "audit_rulelist.html", gin.H{"fname": fname, "fid": fid})
 	})
 	Admin.POST("/rulemanager/getrule", GetRule)   //获取规则
 	Admin.POST("/rulemanager/saverule", SaveRule) //保存规则
-	//	Admin.POST("/rulemanager/getrlist", GetRList)       //获取父分类中的所有规则
-	Admin.POST("/rulemanager/delrule", DelRule)         //删除
-	Admin.POST("/rulemanager/shift", Shift)             //移动
-	Admin.POST("/rulemanager/runruletest", RunRuleTest) //规则测试
+	Admin.POST("/rulemanager/delrule", DelRule)   //删除
+	//	Admin.POST("/rulemanager/shift", Shift)             //移动
+	//	Admin.POST("/rulemanager/runruletest", RunRuleTest) //规则测试
+
+	Admin.POST("/rulemanager/ruleuse", RuleUse) //是否适用
 
 }
 func GetRecogField(c *gin.Context) {
@@ -202,58 +177,46 @@ func GetOrder(sel string) int {
 }
 
 func GetRule(c *gin.Context) {
-	cid, _ := c.GetPostForm("cid")
-	data, _ := Mgo.Find("rc_rule", `{"s_classid":"`+cid+`","delete":false}`, `{"i_order":1}`, nil, false, -1, -1)
+	fid, _ := c.GetPostForm("fid")
+	rule_type, _ := c.GetPostForm("rule_type")
+	start := c.GetInt("start")
+	limit := c.GetInt("length")
+	query := map[string]interface{}{
+		"s_fid":  fid,
+		"delete": false,
+	}
+	if rule_type != "-1" {
+		query["s_type"] = rule_type
+	}
+
+	data, _ := Mgo.Find("rc_rule", query, `{"_id":1}`, nil, false, start, limit)
+	count := Mgo.Count("rc_rule", query)
 	for _, d := range *data {
 		timeStr := time.Unix(d["l_createtime"].(int64), 0).Format(Date_Short_Layout)
 		d["l_createtime"] = timeStr
 	}
-	c.JSON(200, gin.H{"data": data})
+	c.JSON(200, gin.H{"data": data, "recordsFiltered": count, "recordsTotal": count})
 }
 
 func SaveRule(c *gin.Context) {
 	data := GetPostForm(c)
 	_id, _ := c.GetPostForm("_id")
-	session := sessions.Default(c)
-	var s_ruleArr []string
-	rule, _ := c.GetPostForm("s_rule")
-	rule = regexp.MustCompile("\\n|\\s+").ReplaceAllString(rule, "")
-	s_rule := strings.Split(rule, "'")
-	for k, r := range s_rule {
-		if k%2 == 1 { //奇数为正则
-			r = "'" + r + "'" //为正则加上''
-			if r != "" {
-				s_ruleArr = append(s_ruleArr, r)
-			}
-		} else {
-			fr := strings.Split(r, ",")
-			for _, frs := range fr {
-				if frs != "" {
-					s_ruleArr = append(s_ruleArr, frs)
-				}
-			}
-		}
-	}
-	data["s_rule"] = s_ruleArr
-	if _id != "" {
-		Mgo.UpdateById("rc_rule", _id, map[string]interface{}{"$set": data})
-		c.JSON(200, gin.H{"rep": true})
+	b := false
+	if _id == "" {
+		data["l_createtime"] = time.Now().Unix()
+		data["s_username"] = sessions.Default(c).Get("username")
+		//data["i_order"] = GetOrder("rule")
+		data["delete"] = false
+		//code := ju.GetSyncIndex(E_CE)
+		//data["s_code"] = code
+		b = Mgo.Save("rc_rule", data) != ""
 	} else {
-		s_name, _ := c.GetPostForm("s_name")
-		s_classid, _ := c.GetPostForm("s_classid")
-		d, _ := Mgo.FindOne("rc_rule", `{"s_name":"`+s_name+`","s_classid":"`+s_classid+`","delete":false}`)
-		if len(*d) > 0 {
-			c.JSON(200, gin.H{"msg": "已存在!"})
-		} else {
-			data["l_createtime"] = time.Now().Unix()
-			//data["l_date"] = time.Now().Unix()
-			data["s_user"] = session.Get("username")
-			data["i_order"] = GetOrder("rule")
-			data["delete"] = false
-			Mgo.Save("rc_rule", data)
-			c.JSON(200, gin.H{"rep": true})
-		}
+		data["l_lasttime"] = time.Now().Unix()
+		b = Mgo.Update("rc_rule", `{"_id":"`+_id+`"}`, map[string]interface{}{
+			"$set": data,
+		}, true, false)
 	}
+	c.JSON(200, gin.H{"rep": b})
 }
 
 func DelRule(c *gin.Context) {
@@ -365,56 +328,9 @@ func DelRuleByClass(classid string) {
 	}, false, true)
 }
 
-//func GetCList(c *gin.Context) {
-//	nid := c.Query("nid")
-//	fname := c.Query("fname")
-//	query := map[string]interface{}{
-//		"delete": false,
-//	}
-//	if nid != "" {
-//		query = map[string]interface{}{
-//			"_id": map[string]interface{}{
-//				"$ne": qu.StringTOBsonId(nid),
-//			},
-//		}
-//	}
-//	if fname != "" {
-//		query = map[string]interface{}{
-//			"s_recogfield": map[string]interface{}{
-//				"$ne": fname,
-//			},
-//		}
-//	}
-//	class, _ := Mgo.Find("rc_class", query, nil, `{"s_name":1}`, false, 0, 200)
-//	c.JSON(200, gin.H{"data": class})
-//}
-
-//func GetRList(c *gin.Context) {
-//	cid := c.Query("id")
-//	if cid == "" {
-//		c.JSON(200, gin.H{"data": map[string]interface{}{}})
-//	} else {
-//		//先找类
-//		ids := strings.Split(cid, ",")
-//		ids2 := make([]string, len(ids))
-//		idsmap := []map[string]interface{}{}
-//		for n, val := range ids {
-//			idsmap = append(idsmap, map[string]interface{}{
-//				"_id": qu.StringTOBsonId(val),
-//			})
-//			ids2[n] = fmt.Sprintf(`{"s_classid":"%s"}`, val)
-//		}
-//		class, _ := Mgo.Find("rc_class", &map[string]interface{}{
-//			"$or": idsmap,
-//		}, nil, `{"s_name":1}`, false, 0, 10)
-//		rule, _ := Mgo.Find("rc_rule", fmt.Sprintf(`{"$or":[%s]}`, strings.Join(ids2, ",")), `{"i_order":1}`, `{"s_name":1,"s_code":1,"s_classid":1}`, false, 0, 200)
-//		for _, ru := range *rule {
-//			for _, c := range *class {
-//				if qu.BsonIdToSId(c["_id"]) == ru["s_classid"] {
-//					ru["s_name"] = fmt.Sprintf("%s[%s]", ru["s_name"], c["s_name"])
-//				}
-//			}
-//		}
-//		c.JSON(200, gin.H{"data": rule})
-//	}
-//}
+func RuleUse(c *gin.Context) {
+	_id, _ := c.GetPostForm("_id")
+	isuse, _ := c.GetPostForm("isuse")
+	b := Mgo.UpdateById("rc_rule", _id, `{"$set":{"isuse":`+isuse+`}}`)
+	c.JSON(200, gin.H{"rep": b})
+}

+ 31 - 5
src/jy/admin/rulecheck.go

@@ -31,6 +31,12 @@ func init() {
 		tmp := checkCoreReg(field, con, rule)
 		c.JSON(200, gin.H{"rep": tmp})
 	})
+	Admin.POST("/check/auditrule", func(c *gin.Context) {
+		rule, _ := c.GetPostForm("s_rule")
+		con, _ := c.GetPostForm("s_testcon")
+		tmp := checkAuditReg(con, rule)
+		c.JSON(200, gin.H{"rep": tmp})
+	})
 	Admin.POST("/check/backrule", func(c *gin.Context) {
 		rule, _ := c.GetPostForm("s_rule")
 		con, _ := c.GetPostForm("s_testcon")
@@ -304,10 +310,30 @@ func checkCoreReg(field, content, ruleText string) map[string]string {
 	return rep
 }
 
+func checkAuditReg(content, ruleText string) map[string]interface{} {
+	rep := map[string]interface{}{}
+	qu.Try(func() {
+		var pattern string
+		if strings.Contains(ruleText, "\\u") {
+			ruleText = strings.Replace(ruleText, "\\", "\\\\", -1)
+			ruleText = strings.Replace(ruleText, "\\\\u", "\\u", -1)
+			pattern, _ = strconv.Unquote(`"` + ruleText + `"`)
+		} else {
+			pattern = ruleText
+		}
+		log.Println("pattern", pattern)
+		reg := regexp.MustCompile(pattern)
+		rep[pattern] = reg.MatchString(content)
+	}, func(err interface{}) {
+		rep["err"] = fmt.Sprint(err)
+	})
+	return rep
+}
+
 //lua脚本前置过滤验证
 func checkPreScript(code, name, infoid, script string) map[string]interface{} {
 	doc, _ := Mgo.FindById("bidding", infoid, extract.Fields)
-	j, _,_ := extract.PreInfo(*doc)
+	j, _, _ := extract.PreInfo(*doc)
 	delete(*j.Data, "contenthtml")
 	lua := ju.LuaScript{Code: code, Name: name, Doc: *j.Data, Script: script}
 	lua.Block = j.Block
@@ -332,7 +358,7 @@ func checkBackScript(table, code, name, version, infoid, script string, alone bo
 	e.InitTag(false)
 	e.InitTag(true)
 	tmp, _ := Mgo.FindById("bidding", infoid, extract.Fields)
-	j, _,_ := extract.PreInfo(*tmp)
+	j, _, _ := extract.PreInfo(*tmp)
 	doc := *j.Data
 	//全局前置规则,结果覆盖doc属性
 	for _, v := range e.RulePres {
@@ -352,7 +378,7 @@ func checkBackScript(table, code, name, version, infoid, script string, alone bo
 					tmp = extract.ExtRegPre(tmp, j, v, e.TaskInfo)
 				}
 				//抽取-规则
-				extract.ExtRuleCore(tmp, e, vc, j,false)
+				extract.ExtRuleCore(tmp, e, vc, j, false)
 			}
 		}
 	} else {
@@ -368,7 +394,7 @@ func checkBackScript(table, code, name, version, infoid, script string, alone bo
 					tmp = extract.ExtRegPre(tmp, j, v, e.TaskInfo)
 				}
 				//抽取-规则
-				extract.ExtRuleCore(tmp, e, vc, j,false)
+				extract.ExtRuleCore(tmp, e, vc, j, false)
 			}
 		}
 	}
@@ -395,7 +421,7 @@ func checkBackScript(table, code, name, version, infoid, script string, alone bo
 //lua脚本抽取验证
 func checkCoreScript(code, name, infoid, script string) interface{} {
 	doc, _ := Mgo.FindById("bidding", infoid, extract.Fields)
-	j, _,_ := extract.PreInfo(*doc)
+	j, _, _ := extract.PreInfo(*doc)
 	delete(*j.Data, "contenthtml")
 	lua := ju.LuaScript{Code: code, Name: name, Doc: *j.Data, Script: script}
 	lua.Block = j.Block

+ 5 - 0
src/res/fieldscore.json

@@ -262,6 +262,11 @@
                 "describe": "包含负分",
                 "regstr": "(详见公告|原因|未知|收费|标注|负责人)",
                 "score": -10
+            },
+            {
+                "describe": "时间",
+                "regstr": "^\\d{4}-\\d{1,2}-\\d{1,2}\\s{0,1}\\d{1,2}:\\d{1,2}:\\d{1,2}$",
+                "score": -10
             }
         ],
         "length": [

+ 13 - 14
src/web/templates/admin/audit_recogfield.html

@@ -7,10 +7,10 @@
 <div class="content-wrapper" id="showbtn">
 	<section class="content-header">
 		<h1>
-			<small><a class="btn btn-primary opr" opr="new">新增识别字段</a></small>
+			<small><a class="btn btn-primary opr" opr="new">新增审核字段</a></small>
 		</h1>
 		<ol class="breadcrumb">
-		  <li><a href="/admin/audit/recogfield"><i class="fa fa-dashboard"></i> 识别字段</a></li>		  
+		  <li><a href="/admin/audit/recogfield"><i class="fa fa-dashboard"></i> 审核字段</a></li>		  
 		</ol>
     </section>
   <!-- Main content -->
@@ -22,11 +22,11 @@
 		            <table id="recogfieldTable" class="table table-bordered table-hover">
 		              <thead>
 		              <tr>
-		                <th>名称</th>
-						<th>识别字段</th>
-						<th>时间</th>
-						<th>创建人</th>
-						<th>操作</th>
+	                		<th>名称</th>
+					<th>审核字段</th>
+					<th>时间</th>
+					<th>创建人</th>
+					<th>操作</th>
 		              </tr>
 		              </thead>
 		            </table>
@@ -50,7 +50,7 @@ $(function () {
 		"searching"   : true,
 		"ordering"    : false,
 		"info"        : true,
-		"autoWidth"   : false,
+		"autoWidth"   : true,
 		"ajax": {
 			"url": "/admin/rulemanager/getrecogfield",
 			"type": "post",
@@ -64,10 +64,10 @@ $(function () {
 			{ "data": "s_recogfield"},
 			{ "data": "l_createtime"},
 			{ "data": "s_user"},
-			{ "data": "_id",render:function(val,a,row){
-				return '<a class="btn btn-sm btn-info opr" opr="edit">编辑</a>'+
-					'&nbsp;&nbsp;<a class="btn btn-sm btn-warning" href="/admin/rulemanager/getclasslist?id='+val+'&fname='+row["s_recogfield"]+'">编辑分类</a>'+
+			{ "data": "_id","width":"30%",render:function(val,a,row){
+				 return	 '<a class="btn btn-sm btn-info opr" opr="edit">编辑</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-success" href="/admin/audit/dataaudit?name='+row["s_recogfield"]+'">数据审核</a>'+
+					'&nbsp;&nbsp;<a class="btn btn-sm btn-warning" href="/admin/rulemanager/getrulelist?id='+val+'&fname='+row["s_recogfield"]+'">编辑规则</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-danger" onclick="del(\''+val+'\')">删除</a>'
 			}}
        	]
@@ -83,12 +83,11 @@ $(function () {
 			case "new":
 				tag=[
 					{label:"名称",s_label:"s_name",must:true},
-					{label:"识别字段",s_label:"s_recogfield",must:true},
-					{label:"前置过滤",s_label:"s_recogfield_prerule",type:"tpl_text"},
+					{label:"审核字段",s_label:"s_recogfield",must:true},
 					{s_label:"_id",type:"tpl_hidden"},
 				]
 				if(n == "new"){
-					_tit="新增识别字段";
+					_tit="新增审核字段";
 				}else{
 					_tit="编辑_"+obj.s_recogfield+"字段";
 					//tag[1]= {label:"识别字段",s_label:"s_recogfield",must:true,disabled:true}

+ 139 - 115
src/web/templates/admin/audit_rulelist.html

@@ -7,12 +7,11 @@
 <div class="content-wrapper" id="showbtn">
 	<section class="content-header">
 		<h1>
-			<small><a class="btn btn-primary opr" opr="new">新增{{.cname}}规则</a></small>
+			<small><a class="btn btn-primary opr" opr="new">新增{{.fname}}规则</a></small>
 		</h1>
 		<ol class="breadcrumb">
-			<li><a href="/admin/audit/recogfield"><i class="fa fa-dashboard"></i> 识别字段</a></li>
-			<li class="active"><a href="/admin/rulemanager/getclasslist?id={{.fid}}&fname={{.cname}}">分类列表</a></li>
-		  	<li class="active"><a href="/admin/rulemanager/getrulelist?id={{.cid}}&cname={{.cname}}&fid={{.fid}}">规则列表</a></li>
+			<li><a href="/admin/audit/recogfield"><i class="fa fa-dashboard"></i> 审核字段</a></li>
+		  	<li class="active"><a href="/admin/rulemanager/getrulelist?id={{.fid}}&fname={{.fname}}">规则列表</a></li>
 		</ol>
     </section>
   <!-- Main content -->
@@ -27,6 +26,9 @@
 		                <th>名称</th>
 						<th>时间</th>
 						<th>创建人</th>
+						<th>描述</th>
+						<th>类型</th>
+						<th>是否启用</th>
 						<th>操作</th>
 		              </tr>
 		              </thead>
@@ -44,8 +46,7 @@
 {{template "footer"}}
 <script>
 menuActive("recogfield")
-var cid = {{.cid}};
-var cname = {{.cname}};
+var fname = {{.fname}};
 var fid = {{.fid}};
 $(function () {
 	ttablerulemanager=$('#rulemanagerTable').DataTable({
@@ -54,101 +55,72 @@ $(function () {
 		"searching"   : true,
 		"ordering"    : false,
 		"info"        : true,
-		"autoWidth"   : false,
+		"autoWidth"   : true,
+		"serverSide": true,
 		"ajax": {
 			"url": "/admin/rulemanager/getrule",
 			"type": "post",
-			"data":{"cid":cid}
+			"data":{"fid":fid}
 		},
 		"language": {
             "url": "/res/dist/js/dataTables.chinese.lang"
         },
 		"columns": [
-            { "data": "s_name"},
+           		{ "data": "s_name"},
 			{ "data": "l_createtime"},
-			{ "data": "s_user"},
+			{ "data": "s_username"},
+			{ "data": "s_descript"},
+			{ "data": "s_type",render:function(val){
+				if(val=="0"){
+					return "正确";
+				}else if(val == "1"){
+					return "异常";
+				}
+			}},
+			{ "data": "isuse",render:function(val,a,row){
+				tmp=""
+				if(val){
+					tmp="<a href='#' title='停用' onclick='use(\""+row._id+"\",false)'><i class='fa fa-fw fa-circle text-green'></i></a>已启用"
+				}else{
+					tmp="<a href='#' title='启用' onclick='use(\""+row._id+"\",true)'><i class='fa fa-fw fa-circle text-red'></i></a>未启用"
+				}
+				return tmp
+			}},
 			{ "data": "_id",render:function(val,a,row,meta){
-				var udhtml = '&nbsp;&nbsp;<a class="btn btn-sm btn-success opr" opr="moveup" num="'+meta.row+'">上移</a>'+
+				/*var udhtml = '&nbsp;&nbsp;<a class="btn btn-sm btn-success opr" opr="moveup" num="'+meta.row+'">上移</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-success opr" opr="movedown" num="'+meta.row+'">下移</a>';
-				if({{.data.rule}}.length ==1){//一条数据
+				if(row.count ==1){//一条数据
 					udhtml = '&nbsp;&nbsp;<a class="btn btn-sm btn-success btn-default opr" opr="moveup" num="'+meta.row+'" disabled>上移</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-success btn-default opr" opr="movedown" num="'+meta.row+'" disabled>下移</a>';
 				}else if(meta.row == 0){//第一行
 					udhtml = '&nbsp;&nbsp;<a class="btn btn-sm btn-success btn-default opr" opr="moveup" num="'+meta.row+'" disabled>上移</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-success opr" opr="movedown" num="'+meta.row+'">下移</a>';
-				}else if(meta.row+1 == {{.data.rule}}.length){//最后一行
+				}else if(meta.row+1 == row.count){//最后一行
 					udhtml = '&nbsp;&nbsp;<a class="btn btn-sm btn-success opr" opr="moveup" num="'+meta.row+'">上移</a>'+
 					'&nbsp;&nbsp;<a class="btn btn-sm btn-success btn-default opr" opr="movedown" num="'+meta.row+'" disabled>下移</a>';
-				}
+				}*/
 				return '<a class="btn btn-sm btn-info opr" opr="edit" num="'+meta.row+'">编辑</a>'+
-					'&nbsp;&nbsp;<a class="btn btn-sm btn-warning opr" opr="ruletest" num="'+meta.row+'">测试</a>'+
-					'&nbsp;&nbsp;<a class="btn btn-sm btn-danger" onclick="del(\''+val+'\')">删除</a>'+udhtml
+					//'&nbsp;&nbsp;<a class="btn btn-sm btn-warning opr" opr="ruletest" num="'+meta.row+'">测试</a>'+
+					'&nbsp;&nbsp;<a class="btn btn-sm btn-danger" onclick="del(\''+val+'\')">删除</a>'
 					
 			}}
-       	]
+       	],
+		"fnServerParams": function (e) {  
+			var rule_type=$("#rule_type").val();
+			if(rule_type){
+				e.rule_type=rule_type;
+			}else{
+				e.rule_type="-1";
+			}
+      	}
 	});
 	ttablerulemanager.on('init.dt', function () {
 		$("#showbtn").on('click','a.opr',function(){
 			var n=$(this).attr("opr");
 			var rownum=parseInt($(this).attr("num"));
-			var tobj = {{.data.rule}}[rownum];//本行数据
+			//var tobj = {{.data.rule}}[rownum];//本行数据
 			var _tit="",htmlObj={},obj,tag=[];
-			var delid=$(this).attr("value");
-			switch(n){
-			//测试规则
-			case "ruletest":
-				htmlObj={
-					title:"测试_规则:"+tobj["s_name"],
-					content:"<div>测试规则...</div>",
-					tag:[
-						{s_label:function(){var str="";for(var i in tobj["s_rule"]){str+="<p>"+(parseInt(i)+1)+"、"+tobj["s_rule"][i]+"</p>";}return str
-								}(),type:"tpl_small"},
-						{s_label:"s_con",type:"tpl_text",rows:18},
-						{s_label:"_id",type:"tpl_hidden",val:tobj["_id"]}
-					],
-					bts:[{label:"RUN",class:"btn-danger",
-							fun:function(){
-								var form = $('<form/>').appendTo("body");  
-							    // 设置属性  
-							    form.attr('action','/admin/rulemanager/runruletest').attr('method', 'post').attr('target', '_blank');  
-							    // 创建Input  
-							    var my_input = $('<input type="text" name="_id" />'); 
-							    my_input.attr('value', $("#_id").val()); 
-								var mytext = $('<textarea name="s_con" />') 
-								mytext.val($("#s_con").val())
-							    // 附加到Form  
-							    form.append(my_input).append(mytext); 
-							    // 提交表单  
-							    form.submit();  
-								form.remove();
-							}
-						}]
-				}
-			OpenDialog(htmlObj,obj);
-			break;
-			case "edit":			
-				obj=ttablerulemanager.row($(this).closest("tr")).data();
-			case "new":
-				tag=[
-					{label:"名称",s_label:"s_name",must:true},
-					{label:"代码",s_label:"s_code",must:true},
-					//{label:"标识",s_label:"s_isok",must:true,type:"tpl_list_local",list:[{"s_name":"wrong","_id":"wrong"},{"s_name":"right","_id":"right"}],default:"wrong"},
-					{label:"前置过滤",s_label:"s_rule_prerule",type:"tpl_text",placeholder:"XXX__YYY"},
-					{label:"规则",s_label:"s_rule",type:"tpl_text",rows:8,must:true},
-					{s_label:"_id",type:"tpl_hidden"},
-					{s_label:"s_classid",type:"tpl_hidden",val:cid},
-					{s_label:"s_pfield",type:"tpl_hidden",val:cname}
-				]
-				if(n == "new"){
-					_tit="新增_"+cname+"规则";
-				}else{
-					_tit="编辑_"+cname+"规则:"+tobj["s_name"];
-				}
-				htmlObj={
-					mutilfield:"s_pid",
-					title:_tit,
-					tag:tag,
-					bts:[
+			var btn =[
 						{label:"保存",class:"btn-primary",
 							fun:function(){
 								var obj={}
@@ -164,7 +136,7 @@ $(function () {
 								if (bcon){								
 									$.post("/admin/rulemanager/saverule",obj,function(data){
 										if(data&&data.rep){
-											window.location.href="/admin/rulemanager/getrulelist?id="+cid+"&cname="+cname+"&fid="+fid;	
+											window.location.href="/admin/rulemanager/getrulelist?id="+fid+"&fname="+fname;	
 										}else{
 											showTip(data.msg,1000)
 										}
@@ -174,46 +146,62 @@ $(function () {
 								}
 							}
 						}
-					]
+					];
+			var delid=$(this).attr("value");
+			switch(n){
+			case "edit":			
+				obj=ttablerulemanager.row($(this).closest("tr")).data();
+			case "new":
+				tag=[
+					{label:"名称",s_label:"s_name",must:true},
+					{label:"描述",s_label:"s_descript"},
+					{label:"启用",s_label:"isuse",type:"tpl_list_local",list:[{"s_name":"是","_id":true},{"s_name":"否","_id":false}],default:true},
+					{label:"类型",s_label:"s_type",type:"tpl_list_local",must:true,list:[{"s_name":"正确","_id":"0"},{"s_name":"异常","_id":"1"}],default:"0"},
+					{label:"正则",s_label:"s_rule",type:"tpl_text",rows:2,must:true},
+					{s_label:"_id",type:"tpl_hidden"},
+					{s_label:"s_fid",type:"tpl_hidden",val:fid},
+					{s_label:"s_field",type:"tpl_hidden",val:fname}
+				]
+				if(n == "new"){
+					_tit="新增"+fname+"规则";
+				}else{
+					_tit="编辑"+fname+"规则";
+					testcon=[{label:"测试内容",s_label:"s_testcon",type:"tpl_text"}];
+					check=[{label:"测试",class:"btn-warning",
+								fun:function(){
+									var obj={}
+									var bcon=true
+									$("#_con").find("input[id!=s_show],textarea").each(function(i,el){
+										var val=$(el).val(); 
+										obj[el.id]=$(el).val()
+										if(el.id!="_id"&&$(el).attr("must")&&!val){
+											bcon=false
+											return false
+										}
+									})
+									if (bcon){								
+										$.post("/admin/check/auditrule",obj,function(data){
+											showMsg(JSON.stringify(data.rep))
+										},'json')
+									}else{
+										alert("红色标签的表单不能为空!")
+									}
+								}
+							}];
+					tag = com.pushArry(tag,testcon)
+					btn = com.pushArry(btn,check)
 				}
-			//if({{.data.class.s_pid}}){
-			//	htmlObj.tag.push({label:"父规则",s_label:"s_pid",type:"tpl_list_ajax",url:"/admin/rulemanager/getrlist?id="+{{.data.class.s_pid}},fun:function(){
-			//		var ids=$("#s_pid").data("ids")
-			//		ids=ids||{}
-			//		var tid=$(this).attr("_id")
-			//		if(!ids[tid]){
-			//			ids[tid]=true
-			//			$("#s_pid").data("ids",ids)
-			//			var tpl1=$('<div class="alert alert-dismissible alert-success" style="min-width:50px;max-width:250px;font-size:10px;padding:3px;margin:5px;display:inline-block"><button type="button" class="close" data-dismiss="alert" style="right:0px;">&times;</button><span></span></div>')
-			//			tpl1.find("span").text($(this).text())							
-			//			tpl1.attr("tid",tid)
-			//			tpl1.find("button").click(function(){
-			//				var ttid=$(this).closest(".alert").attr("tid");
-			//				var iids=$("#s_pid").data("ids")
-			//				delete iids[ttid]
-			//				$("#s_pid").val(function(){
-			//					var strid=[]
-			//					for(var k in iids){
-			//						strid.push(k)
-			//					}
-			//					return strid.join(",")
-			//				}())
-			//			})
-			//			$("#s_pid").prev().append(tpl1)
-			//			$("#s_pid").val(function(){
-			//				var strid=[]
-			//				for(var k in ids){
-			//					strid.push(k)
-			//				}
-			//				return strid.join(",")
-			//			}())
-			//		}
-			//	}})
-			//}
+				htmlObj={
+					mutilfield:"s_pid",
+					title:_tit,
+					tag:tag,
+					bts:btn
+				}
+			
 			OpenDialog(htmlObj,obj)
 			break;
 			//上移下移
-			case "moveup":
+			/*case "moveup":
 			case "movedown":
 				if($(this).attr("disabled")=="disabled"){
 					return
@@ -228,17 +216,32 @@ $(function () {
 				}
 				$.post("/admin/rulemanager/shift",{str:poststr},function(data){
 					if(data&&data.rep){
-						window.location.href="/admin/rulemanager/getrulelist?id="+cid+"&cname="+cname+"&fid="+fid;						
+						window.location.href="/admin/rulemanager/getrulelist?id="+fid+"&fname="+fname;						
 					}else{
 						showTip("移动失败", 1000, function() {});
 					}
 				})
-				break;
+				break;*/
 			}
 		});
+	
+		var opt="<option value='-1'>全部</option>"+
+				"<option value='0'>正确</option>"+
+				"<option value='1'>异常</option>";
+		var select="<div class='form-group'><label for='name'>类型:</label>"+
+			"<select id='rule_type' onchange='checkclick(this.value)' class='form-control input-sm'>"+
+			opt+
+			"</select></div>"
+		$("#rulemanagerTable_filter").prepend("&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;");
+		$("#rulemanagerTable_filter").prepend(select);
 	})
+	
 })
 
+function checkclick(){
+	ttablerulemanager.ajax.reload();
+}
+
 function del(_id){
 	showConfirm("确定删除?", function() {
 		$.ajax({
@@ -247,7 +250,7 @@ function del(_id){
 			data:{"_id":_id},
 			success:function(r){
 				if(r.rep){				
-					window.location.href="/admin/rulemanager/getrulelist?id="+cid+"&cname="+cname+"&fid="+fid;
+					window.location.href="/admin/rulemanager/getrulelist?id="+fid+"&fname="+fname;
 				}else{
 					showTip("删除失败", 1000, function() {});
 				}
@@ -255,5 +258,26 @@ function del(_id){
 		})
 	});
 }
-
+function use(_id,utype){
+	smg=""
+	if(utype){
+		smg="确定启用?"
+	}else{
+		smg="确定停用?"
+	}
+	showConfirm(smg, function() {
+		$.ajax({
+			url:"/admin/rulemanager/ruleuse",
+			type:"post",
+			data:{"_id":_id,"isuse":utype},
+			success:function(r){
+				if(r.rep){				
+					window.location.href="/admin/rulemanager/getrulelist?id="+fid+"&fname="+fname;
+				}else{
+					showTip("启用失败", 1000, function() {});
+				}
+			}
+		})
+	});
+}
 </script>

+ 9 - 3
udp_winner/config.json

@@ -1,11 +1,15 @@
 {
   "elasticsearch": "http://127.0.0.1:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
+  "elasticsearch_index": "localhost_winner",
+  "elasticsearch_type": "mytestwinner",
+  "elasticsearch_buyer_index": "localhost_buyer",
+  "elasticsearch_buyer_type": "mytestbuyer",
+  "elasticsearch_agency_index": "localhost_agency",
+  "elasticsearch_agency_type": "mytestagency",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
   "pool_size": "10",
-  "mgoinit": "192.168.3.207:27081",
+  "mgoinit": "127.0.0.1:27017",
   "mgodb_bidding": "qfw",
   "mgodb_mgoinit_c": "bidding",
   "mgodb_enterprise": "enterprise",
@@ -14,5 +18,7 @@
   "mgourl": "127.0.0.1:27017",
   "mgodb_extract_kf": "extract_kf",
   "mgo_qyk_c": "enterprise_qyxy",
+  "mgo_qyk_buyer": "buyer_qyxy",
+  "mgo_qyk_agency": "gency_qyxy",
   "redis": "127.0.0.1:6379"
 }

+ 30 - 6
udp_winner/main.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
+	hisRedis "github.com/go-redis/redis"
 	"go.mongodb.org/mongo-driver/bson"
 	es "gopkg.in/olivere/elastic.v1"
 	"log"
@@ -20,9 +21,10 @@ import (
 
 var (
 	Config                                = make(map[string]string)
-	Fields                                []string
+	Fields,BuyerFields,AgencyFields                                []string
 	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
+	HisRedisPool                          *hisRedis.Client
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
 	ElasticClientIndex, ElasticClientType string
@@ -41,9 +43,19 @@ func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
 	log.Println(Config)
-	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address", "capital",
-		"establish_date", "legal_person", "company_type", "district", "city", "province", "area_code", "credit_no",
-		"company_name", "history_name", "topscopeclass", "wechat_accounts", "alias", "website", "report_websites"}
+	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
+		"capital", "establish_date", "legal_person", "company_type",
+		"district", "city", "province", "area_code", "credit_no",
+		"company_name", "history_name", "topscopeclass", "wechat_accounts",
+		"alias", "website", "report_websites"}
+
+	BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
+		"address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
+		"history_name", "wechat_accounts", "website", "report_websites"}
+
+	AgencyFields = []string{"_id", "contact", "type", "ranks",
+		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
+		"history_name", "wechat_accounts", "website", "report_websites"}
 	var err error
 	pool_size, _ := strconv.Atoi(Config["pool_size"])
 
@@ -102,6 +114,16 @@ func init() {
 		log.Fatalln("redis err:", err)
 	}
 	c.Close()
+	HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
+		Addr:         "127.0.0.1:6380",
+		DB:           1,
+		DialTimeout:  10 * time.Second,
+		ReadTimeout:  30 * time.Second,
+		WriteTimeout: 30 * time.Second,
+		PoolSize:     30,
+		MinIdleConns: 20,
+		PoolTimeout:  30 * time.Second,
+	})
 }
 
 func main() {
@@ -113,6 +135,8 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
+	go TimedTaskBuyer() //定时任务
+	go TimedTaskAgency() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -141,9 +165,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				if key == "winner" {
 					go TaskWinner(tmp)
 				} else if key == "buyer" {
-
+					go TaskBuyer(tmp)
 				} else if key == "agency" {
-
+					go TaskAgency(tmp)
 				}
 			}
 		}

+ 605 - 0
udp_winner/timedTaskAgency.go

@@ -0,0 +1,605 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskAgency(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1,
+		"topscopeclass": 1, "agencyaddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","3")
+				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["agency"])
+						continue
+					}
+
+
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+
+
+
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" ||
+						Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, agencytel string
+					tmpperson = tmp["agencyperson"].(string)
+					if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+						agencytel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+							agencytel = ""
+						} else {
+							agencytel = util.ObjToString(tmp["agencytel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = agencytel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["agencyperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = agencytel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			//log.Println(tmp["_id"])
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","3")
+			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, agencytel string
+				tmpperson = tmp["agencyperson"].(string)
+				if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+					agencytel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+						agencytel = ""
+					} else {
+						agencytel = util.ObjToString(tmp["agencytel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = agencytel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["agencyperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = agencytel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskAgency() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Second * 10)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					rdb.Do("SELECT","3")
+
+					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("agency_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("agency_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range AgencyFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="agency_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp["company_name"]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp["company_address"]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "agency_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_agency"], savetmp)
+						if saveid != nil {
+							//保存redis
+							//保存redis
+							rc := RedisPool.Get()
+							rc.Do("SELECT","3")
+
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["agency_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 674 - 0
udp_winner/timedTaskBuyer.go

@@ -0,0 +1,674 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskBuyer(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
+		"topscopeclass": 1, "buyeraddr": 1,"buyerclass":1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","2")
+				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["buyer"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+					//更新buyerclass
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+						//更新buyerclass合并
+						if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+							//无值,不更新
+						}else {
+							var buyerclass_new,buyerclass_old string
+							buyerclass_new = tmp["buyerclass"].(string)
+							buyerclass_old = oldTmp["buyerclass"].(string)
+							if buyerclass_old=="" {
+								oldTmp["buyerclass"] = buyerclass_new
+							}else {
+								if buyerclass_new!=buyerclass_old {
+									if !strings.Contains(buyerclass_old, buyerclass_new) {
+										oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+									}
+								}
+							}
+						}
+
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, buyertel string
+					tmpperson = tmp["buyerperson"].(string)
+					if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+						buyertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+							buyertel = ""
+						} else {
+							buyertel = util.ObjToString(tmp["buyertel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = buyertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["buyerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = buyertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if buyerclass_old=="" {
+							oldTmp["buyerclass"] = buyerclass_new
+						}else {
+							if buyerclass_new!=buyerclass_old {
+								if !strings.Contains(buyerclass_old, buyerclass_new) {
+									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+								}
+							}
+						}
+					}
+
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			//log.Println(tmp["_id"])
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","2")
+			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+
+				//更新行业类型 buyerclass合并
+				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if buyerclass_old=="" {
+							oldTmp["buyerclass"] = buyerclass_new
+						}else {
+							if buyerclass_new!=buyerclass_old {
+								if !strings.Contains(buyerclass_old, buyerclass_new) {
+									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+								}
+							}
+						}
+					}
+
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, buyertel string
+				tmpperson = tmp["buyerperson"].(string)
+				if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+					buyertel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+						buyertel = ""
+					} else {
+						buyertel = util.ObjToString(tmp["buyertel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = buyertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["buyerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = buyertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+
+				//更新buyerclass合并
+				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+					//无值,不更新
+				}else {
+					var buyerclass_new,buyerclass_old string
+					buyerclass_new = tmp["buyerclass"].(string)
+					buyerclass_old = oldTmp["buyerclass"].(string)
+					if buyerclass_old=="" {
+						oldTmp["buyerclass"] = buyerclass_new
+					}else {
+						if buyerclass_new!=buyerclass_old {
+							if !strings.Contains(buyerclass_old, buyerclass_new) {
+								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+							}
+						}
+					}
+				}
+
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskBuyer() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				//t2.Reset(time.Second * 10) //增量
+				t2.Reset(time.Minute * 5) //存量
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					rdb.Do("SELECT","2")
+					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("buyer_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					//qyxy 企业库 两亿条
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("buyer_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range BuyerFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="buyer_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp["company_name"]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp["company_address"]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "buyer_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_buyer"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							rc.Do("SELECT","2")
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["buyer_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 311 - 170
udp_winner/timedTaskWinner.go

@@ -15,8 +15,7 @@ import (
 	"time"
 )
 
-
-
+//之前main方法,只更新
 func TaskWinner(mapinfo *map[string]interface{}) {
 	defer util.Catch()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
@@ -47,207 +46,345 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		log.Println(cursor)
 		return
 	}
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(primitive.ObjectID).Hex()
-		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
-			continue
-		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
-				log.Println("FClient.Save err",tmpid)
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
 			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
-			continue
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
 		} else {
-			//log.Println("redis get :", reply)
-			//redis存在
-			//log.Println(reply)
-			//reply = "5e0316b998a9abaf6535df3d"
-			//id, err := primitive.ObjectIDFromHex(reply)
-			//if err != nil {
-			//	log.Println("get redis id  Hex err:", err, tmp)
-			//	if err := rdb.Close(); err != nil {
-			//		log.Println(err)
-			//	}
-			//	continue
-			//}
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
-			if oldTmp == nil{
-				log.Println("redis id 不存在")
-				continue
-			}
-			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
-			//if err != nil {
-			//	log.Println("qyk id err:", err, id)
-			//	continue
-			//}
-			//比较合并
-			//行业类型
-			tmpTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
+				if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["winner"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
 
-			if oldTmp["industry"] == nil {
-				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					if oldTmp["industry"] == nil {
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
+						}
+					} else {
+						if v, ok := oldTmp["industry"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok {
+									tmpTopscopeclassMap[vvv] = true
+								}
+							}
+						}
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					sort.Strings(tmpTopscopeclass)
+					oldTmp["industry"] = tmpTopscopeclass
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
 					}
-				}
-			} else {
-				if v, ok := oldTmp["industry"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
+					//联系方式合并
+					var tmpperson, winnertel string
+					tmpperson = tmp["winnerperson"].(string)
+					if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+						winnertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+							winnertel = ""
+						} else {
+							winnertel = util.ObjToString(tmp["winnertel"])
 						}
 					}
-				}
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = winnertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["winnerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = winnertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
 					}
+					//最后删除redis
+					conn.Del(redisId)
 				}
 			}
-			sort.Strings(tmpTopscopeclass)
-			oldTmp["industry"] = tmpTopscopeclass
-			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				oldTmp["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName =Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-					log.Println("mongo更新err",esId)
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
+			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
 				}
-				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-				//	log.Println("mongo更新err:", err)
-				//}
-				//es更新
-				delete(oldTmp, "_id")
-				//esConn := elastic.GetEsConn()
-				//defer elastic.DestoryEsConn(esConn)
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
 				}
-				//log.Println( err2,err3)
 				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			tmpperson = tmp["winnerperson"].(string)
-			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
-				winnertel = ""
 			} else {
-				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if oldTmp["industry"] == nil {
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				} else {
+					if v, ok := oldTmp["industry"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok {
+								tmpTopscopeclassMap[vvv] = true
+							}
+						}
+					}
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+				oldTmp["industry"] = tmpTopscopeclass
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, winnertel string
+				tmpperson = tmp["winnerperson"].(string)
+				if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
 					winnertel = ""
 				} else {
-					winnertel = util.ObjToString(tmp["winnertel"])
+					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+						winnertel = ""
+					} else {
+						winnertel = util.ObjToString(tmp["winnertel"])
+					}
 				}
-			}
-			contactMaps := make([]interface{}, 0)
-			if oldTmp["contact"] == nil {
-				tmpContact := make(map[string]interface{})
-				tmpContact["contact_person"] = tmpperson
-				tmpContact["contact_type"] = "项目联系人"
-				tmpContact["phone"] = winnertel
-				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-				tmpContact["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, tmpContact)
-			} else {
-				//对比前四项,相等丢弃
-				if v, ok := oldTmp["contact"].(primitive.A); ok {
-					var isNotUpdate bool
-					for _, vv := range v {
-						if vvv, ok := vv.(map[string]interface{}); ok {
-							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-								isNotUpdate = true
-								vvv["updatetime"] = time.Now().Unix()
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = winnertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
 							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["winnerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = winnertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
 							contactMaps = append(contactMaps, vvv)
 						}
 					}
-					if !isNotUpdate {
-						vvv := make(map[string]interface{})
-						vvv["contact_person"] = tmp["winnerperson"]
-						vvv["contact_type"] = "项目联系人"
-						vvv["phone"] = winnertel
-						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, vvv)
-					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
 				}
 			}
-			oldTmp["contact"] = contactMaps
-			//mongo更新
-			oldTmp["updatatime"] = time.Now().Unix()
-			FClient.DbName=Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-				log.Println("mongo更新 err",esId,oldTmp)
-			}
-			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-			//	log.Println("mongo更新 err :", err)
-			//}
-			//es更新
-			delete(oldTmp, "_id")
-			//esConn := elastic.GetEsConn()
-			//defer elastic.DestoryEsConn(esConn)
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
-			//log.Println( err2,err3)
 		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 	}
-	//defer cursor.Close(context.Background())
-	//log.Println("合并执行完成", gtid, lteid, overid)
-	//if overid != lteid {
-	//	by, _ := json.Marshal(map[string]interface{}{
-	//		"gtid":  overid,
-	//		"lteid": lteid,
-	//		"stype": "",
-	//	})
-	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-	//		IP:   net.ParseIP("127.0.0.1"),
-	//		Port: Updport,
-	//	}); e != nil {
-	//		log.Println(e)
-	//	}
-	//	log.Println("重新发送udp:", string(by))
-	//	return
-	//}
-	log.Println("合并执行完成 ok", gtid, lteid, overid)
 
 }
 
-
-//定时任务
+//定时任务  新增
 //1.存异常表
 //2.合并原始库新增
 func TimedTaskWinner() {
@@ -283,6 +420,7 @@ func TimedTaskWinner() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
 					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
@@ -313,6 +451,7 @@ func TimedTaskWinner() {
 					}
 					//查询redis不存在新增
 					FClient.DbName = Config["mgodb_enterprise"]
+
 					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
 					if resulttmp["_id"] == nil {
 						//log.Println(r)
@@ -385,6 +524,7 @@ func TimedTaskWinner() {
 						}
 						contact["topscopeclass"] = "企业公示"         //项目类型
 						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
 						contacts = append(contacts, contact)
 						resulttmp["contact"] = contacts
 
@@ -461,6 +601,7 @@ func TimedTaskWinner() {
 						if saveid != nil {
 							//保存redis
 							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()