Browse Source

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 years ago
parent
commit
8f9578194f

+ 2 - 1
src/jy/admin/audit/agencyinfo.go

@@ -76,7 +76,7 @@ func init() {
 			e["contact"] = contacts
 		}
 
-
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -105,6 +105,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientAgencyDB, e)
 			if sid == ""{

+ 2 - 2
src/jy/admin/audit/buyerinfo.go

@@ -77,8 +77,7 @@ func init() {
 			}
 			e["contact"] = contacts
 		}
-
-
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -107,6 +106,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientBuyerDB, e)
 			if sid == ""{

+ 2 - 1
src/jy/admin/audit/qiyeku.go

@@ -75,7 +75,7 @@ func init() {
 			}
 			e["contact"] = contacts
 		}
-		e["comeintime"] =time.Now().Unix()
+		e["updatatime"]=time.Now().Unix()
 		var sid string
 		if bson.IsObjectIdHex(_id) {
 			//更新
@@ -104,6 +104,7 @@ func init() {
 			}
 			c.JSON(200, gin.H{"rep": 200, "updateid": sid})
 		} else {
+			e["comeintime"] =time.Now().Unix()
 			//不存在直接保存新数据
 			sid = Mgo.Save(util.ElasticClientDB, e)
 			if sid == "" {

+ 25 - 18
udp_winner/config.json

@@ -1,30 +1,37 @@
 {
-  "elasticsearch": "http://172.17.145.170:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
-  "elasticsearch_buyer_index": "buyer_enterprise",
-  "elasticsearch_buyer_type": "buyerent",
-  "elasticsearch_agency_index": "agency_enterprise",
-  "elasticsearch_agency_type": "agencyent",
   "udpport": "127.0.0.1:12678",
   "port": "12678",
   "pool_size": "10",
-  "mgoinit": "172.17.145.163:27082",
-  "mgodb_bidding": "extract_v3",
-  "mgodb_mgoinit_c": "buyer",
+  "mgoinit": "172.17.4.187:27083",
+  "mgodb_bidding": "qfw",
+  "mgodb_mgoinit_c": "result_20200116",
   "mgodb_enterprise": "enterprise",
   "mgodb_enterprise_c": "qyxy",
-  "mgourl2": "192.168.3.207:27092",
   "mgourl": "172.17.145.163:27082",
-  "mgodb_extract_kf": "extract_v3",
-  "mgo_qyk_c": "winner_enterprise",
-  "mgo_qyk_buyer": "buyer_enterprise",
-  "mgo_qyk_agency": "agency_enterprise",
+  "mgodb_extract_kf": "extract_kf",
+  "mgo_qyk_c": "winner_2",
+  "mgo_qyk_c_w_new": "winner_new_2",
+  "mgo_qyk_c_w_err": "winner_err_2",
+  "mgo_qyk_buyer": "buyer_2",
+  "mgo_qyk_c_b_new": "buyer_new_2",
+  "mgo_qyk_c_b_err": "buyer_err_2",
+  "mgo_qyk_agency": "agency_2",
+  "mgo_qyk_c_a_new": "agency_new_2",
+  "mgo_qyk_c_a_err": "agency_err_2",
   "mgo_qyk_reg": "rc_rule",
-  "redis": "172.17.148.44:1479",
+  "redis": "172.17.148.44:2579",
   "redis_winner_db": "1",
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "172.17.148.44:2579"
-}
+  "his_redis": "172.17.148.44:2679",
+  "jkmail": {
+    "to": "zhangjinkun@topnet.net.cn",
+    "api": "http://10.171.112.160:19281/_send/_mail"
+  },
+  "nextNode": {
+    "addr": "127.0.0.1",
+    "port": 1483,
+    "memo": "标准库"
+  }
+}

BIN
udp_winner/go-redis.zip


+ 101 - 43
udp_winner/main.go

@@ -3,41 +3,39 @@ package main
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/garyburd/redigo/redis"
-	hisRedis "github.com/go-redis/redis"
-	"gopkg.in/mgo.v2/bson"
-	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
 	"net"
-	"qfw/common/src/qfw/util/elastic"
 	"qfw/util"
-	"qfw/util/mongodb"
+	"qfw/common/src/qfw/util/mongodb"
 	"regexp"
 	"strconv"
 	"strings"
 	"time"
+
+	"github.com/garyburd/redigo/redis"
+	hisRedis "github.com/go-redis/redis"
+	"gopkg.in/mgo.v2/bson"
 )
 
 var (
-	Config                                = make(map[string]string)
-	Fields, BuyerFields, AgencyFields     []string
-	SourceClient, FClient                 *mongodb.MongodbSim
-	RedisPool                             redis.Pool
-	HisRedisPool                          *hisRedis.Client
-	Addrs                                 = make(map[string]interface{}, 0) //省市县
-	udpclient                             mu.UdpClient                      //udp对象
-	ElasticClientIndex, ElasticClientType string
-	Reg_xing                              = regexp.MustCompile(`\*{1,}`)
-	Reg_person                            = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
-	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
-	EsConn                                *es.Client
-	Updport                               int
-	CPool                                 chan bool
+	Config                               = make(map[string]string)
+	Sysconfig                            map[string]interface{}
+	Fields, BuyerFields, AgencyFields    []string
+	SourceClient, FClient                *mongodb.MongodbSim
+	RedisPool                            redis.Pool
+	HisRedisPool                         *hisRedis.Client
+	Addrs                                = make(map[string]interface{}, 0) //省市县
+	udpclient                            mu.UdpClient                      //udp对象
+	Reg_person                           = regexp.MustCompile("[\u4e00-\u9fa5]+")
+	Reg_xing                             = regexp.MustCompile(`\*{1,}`)
+	Reg_tel                              = regexp.MustCompile(`^[0-9\-\s]*$`)
+	Updport                              int
+	CPoolWinner, CPoolBuery, CPoolAgency chan bool
 	//his_redis db
 	redis_winner_db, redis_buyer_db, redis_agency_db int
 	//异常表正则匹配处理
-	WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuerRegOk, BuyerRegErr []regexp.Regexp
+	WinnerRegOk, WinnerRegErr, AgencyRegOk, AgencyRegErr, BuyerRegOk, BuyerRegErr []regexp.Regexp
 )
 
 /**
@@ -47,13 +45,16 @@ var (
 func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
-	log.Println(Config)
+	util.ReadConfig(&Sysconfig)
+	log.Println(Sysconfig)
 	var err error
 	cpnum, err := strconv.Atoi(Config["chan_pool_num"])
 	if err != nil {
 		log.Fatalln(err)
 	}
-	CPool = make(chan bool, cpnum)
+	CPoolWinner = make(chan bool, cpnum)
+	CPoolBuery = make(chan bool, cpnum)
+	CPoolAgency = make(chan bool, cpnum)
 	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
 		"capital", "establish_date", "legal_person", "company_type",
 		"district", "city", "province", "area_code", "credit_no",
@@ -68,27 +69,25 @@ func init() {
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
 		"history_name", "wechat_accounts", "website", "report_websites"}
 
-	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
-	//es init
-	elastic.InitElasticSize(Config["elasticsearch"], 50)
-	EsConn = elastic.GetEsConn()
-	defer elastic.DestoryEsConn(EsConn)
 	initRdis()
 	initMongo()
 	initReg()
+	Updport, err = strconv.Atoi(Config["port"])
+	if err != nil{
+		log.Fatalln(err)
+	}
 }
 
 func main() {
 	//udp
 	updport := Config["udpport"]
-	Updport, _ = strconv.Atoi(Config["port"])
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
-	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
 	go TimedTaskBuyer()  //定时任务
 	go TimedTaskAgency() //定时任务
+	go checkMapJob()
 	c := make(chan int, 1)
 	<-c
 
@@ -113,13 +112,43 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 			//data_info:save//存量   data_info:add //增量
 			//阻塞
-			CPool <- true
-			go func(mapinfo *map[string]interface{}) {
-				defer func() { <-CPool }()
-				go TaskWinner(mapinfo)
-				go TaskBuyer(mapinfo)
-				go TaskAgency(mapinfo)
-			}(tmp)
+			tmpstype,ok := (*tmp)["stype"].(string)
+			if ok&& tmpstype ==""{
+				CPoolWinner <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolWinner }()
+					TaskWinner(mapinfo)
+				}(tmp)
+				CPoolBuery <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolBuery }()
+					TaskBuyer(mapinfo)
+				}(tmp)
+				CPoolAgency <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolAgency }()
+					TaskAgency(mapinfo)
+				}(tmp)
+			}else if tmpstype =="winner" {
+				CPoolWinner <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolWinner }()
+					TaskWinner(mapinfo)
+				}(tmp)
+			}else if tmpstype=="buyer"{
+				CPoolBuery <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolBuery }()
+					TaskBuyer(mapinfo)
+				}(tmp)
+			}else if tmpstype=="agency"{
+				CPoolAgency <- true
+				go func(mapinfo *map[string]interface{}) {
+					defer func() { <-CPoolAgency }()
+					TaskAgency(mapinfo)
+				}(tmp)
+			}
+
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Println("发送成功", string(data))
@@ -170,14 +199,12 @@ func initMongo() {
 	SourceClient.MongodbAddr = Config["mgoinit"]
 	SourceClient.Size = pool_size
 	SourceClient.DbName = Config["mgodb_bidding"]
-	//mongodbSim.DbName = "qfw"
 	SourceClient.InitPool()
 
 	FClient = new(mongodb.MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
 	FClient.DbName = Config["mgodb_extract_kf"]
-	//mongodbSim.DbName = "qfw"
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
 	defer FClient.DestoryMongoConn(FClientmgoConn)
@@ -204,14 +231,22 @@ func initReg() {
 	if !b {
 		log.Fatalln("查询正则失败")
 	}
-	for _, v := range (*findReg) {
+	for _, v := range *findReg {
 		s_field, ok := v["s_field"].(string) //字段
 		s_rule, ok2 := v["s_rule"].(string)  //正则
 		s_type, ok3 := v["s_type"].(string)  //ok or err
 		if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
 			continue
 		}
-		regtmp := regexp.MustCompile(s_rule)
+		var pattern string
+		if strings.Contains(s_rule, "\\u") {
+			s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
+			s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
+			pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
+		} else {
+			pattern = s_rule
+		}
+		regtmp := regexp.MustCompile(pattern)
 		if s_field == "winner" {
 			if s_type == "ok" {
 				WinnerRegOk = append(WinnerRegOk, *regtmp)
@@ -220,7 +255,7 @@ func initReg() {
 			}
 		} else if s_field == "buyer" {
 			if s_type == "ok" {
-				BuerRegOk = append(BuerRegOk, *regtmp)
+				BuyerRegOk = append(BuyerRegOk, *regtmp)
 			} else if s_type == "err" {
 				BuyerRegErr = append(BuyerRegErr, *regtmp)
 			}
@@ -232,5 +267,28 @@ func initReg() {
 			}
 		}
 	}
-	log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
+	log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuyerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
+}
+
+//通知下个节点nextNode
+func nextNode(stype string, updatatime int64) {
+	to, _ := Sysconfig["nextNode"].(map[string]interface{})
+	log.Println(stype, to)
+	key := stype + "-" + fmt.Sprint(updatatime)
+	by, _ := json.Marshal(map[string]interface{}{
+		"query": map[string]interface{}{
+			"updatatime": map[string]interface{}{
+				"$gte": updatatime,
+			},
+		},
+		"stype": stype,
+		"key":   key,
+	})
+	addr := &net.UDPAddr{
+		IP:   net.ParseIP(to["addr"].(string)),
+		Port: util.IntAll(to["port"]),
+	}
+	node := &udpNode{by, addr, time.Now().Unix(), 0}
+	udptaskmap.Store(key, node)
+	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 }

+ 48 - 32
udp_winner/text.md

@@ -13,17 +13,11 @@
   "redis": "172.17.148.44:1479"
 }
 
-//本地
+
+本地
 {
-  "elasticsearch": "http://127.0.0.1:9800",
-  "elasticsearch_index": "localhost_winner",
-  "elasticsearch_type": "mytestwinner",
-  "elasticsearch_buyer_index": "localhost_buyer",
-  "elasticsearch_buyer_type": "mytestbuyer",
-  "elasticsearch_agency_index": "localhost_agency",
-  "elasticsearch_agency_type": "mytestagency",
-  "udpport": "127.0.0.1:12311",
-  "port": "12311",
+  "udpport": "127.0.0.1:12678",
+  "port": "12678",
   "pool_size": "10",
   "mgoinit": "127.0.0.1:27017",
   "mgodb_bidding": "qfw",
@@ -33,53 +27,75 @@
   "mgourl2": "192.168.3.207:27092",
   "mgourl": "127.0.0.1:27017",
   "mgodb_extract_kf": "extract_kf",
-  "mgo_qyk_c": "enterprise_qyxy",
-  "mgo_qyk_buyer": "buyer_qyxy",
-  "mgo_qyk_agency": "agency_qyxy",
+  "mgo_qyk_c": "winner_enterprise",
+  "mgo_qyk_c_w_new": "winner_new_2",
+  "mgo_qyk_c_w_err": "winner_err_2",
+  "mgo_qyk_buyer": "buyer_enterprise",
+  "mgo_qyk_c_b_new": "buyer_new_2",
+  "mgo_qyk_c_b_err": "buyer_err_2",
+  "mgo_qyk_agency": "agency_enterprise",
+  "mgo_qyk_c_a_new": "agency_new_2",
+  "mgo_qyk_c_a_err": "agency_err_2",
   "mgo_qyk_reg": "rc_rule",
   "redis": "127.0.0.1:6379",
   "redis_winner_db": "1",
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "127.0.0.1:6380"
+  "his_redis": "127.0.0.1:6380",
+  "jkmail": {
+        "to": "zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+  "nextNode": {
+        "addr": "127.0.0.1",
+        "port": 1483,
+        "memo": "标准库"
+    }
 }
 
 
-//线上
 {
-  "elasticsearch": "http://172.17.145.170:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
-  "elasticsearch_buyer_index": "localhost_buyer",
-  "elasticsearch_buyer_type": "mytestbuyer",
-  "elasticsearch_agency_index": "localhost_agency",
-  "elasticsearch_agency_type": "mytestagency",
-  "udpport": "127.0.0.1:1486",
-  "port": "12311",
+  "udpport": "127.0.0.1:12678",
+  "port": "12678",
   "pool_size": "10",
-  "mgoinit": "10.30.94.175:27081,10.81.232.246:27082,10.172.242.243:27080",
+  "mgoinit": "127.0.0.1:27017",
   "mgodb_bidding": "qfw",
   "mgodb_mgoinit_c": "bidding",
   "mgodb_enterprise": "enterprise",
   "mgodb_enterprise_c": "qyxy",
-  "mgourl2": "192.168.3.207:27092",
   "mgourl": "172.17.145.163:27082",
-  "mgodb_extract_kf": "extract_v3",
-  "mgo_qyk_c": "winner_enterprise",
-  "mgo_qyk_buyer": "buyer_enterprise",
-  "mgo_qyk_agency": "agency_enterprise",
+  "mgodb_extract_kf": "extract_kf",
+  "mgo_qyk_c": "winner_2",
+  "mgo_qyk_c_w_new": "winner_new_2",
+  "mgo_qyk_c_w_err": "winner_err_2",
+  "mgo_qyk_buyer": "buyer_2",
+  "mgo_qyk_c_b_new": "buyer_new_2",
+  "mgo_qyk_c_b_err": "buyer_err_2",
+  "mgo_qyk_agency": "agency_2",
+  "mgo_qyk_c_a_new": "agency_new_2",
+  "mgo_qyk_c_a_err": "agency_err_2",
   "mgo_qyk_reg": "rc_rule",
-  "redis": "172.17.148.44:1479",
+  "redis": "172.17.148.44:2579",
   "redis_winner_db": "1",
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "127.0.0.1:6380"
+  "his_redis": "172.17.148.44:2679",
+  "jkmail": {
+        "to": "zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+  "nextNode": {
+        "addr": "127.0.0.1",
+        "port": 1483,
+        "memo": "标准库"
+    }
 }
 
 
 
+
 中标单位企业库
 /file/udpwinnerent_1486
 172.17.145.163 

+ 273 - 249
udp_winner/timedTaskAgency.go

@@ -6,7 +6,7 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"gopkg.in/mgo.v2/bson"
 	"log"
-	mu "mfw/util"
+	util2 "mfw/util"
 	"net"
 	"qfw/util"
 	"sort"
@@ -31,11 +31,12 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
+	//timenow := time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// agencyaddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -58,7 +59,12 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		//遍历bidding表保存到redis
 		//key:企业名  value:json结构体{"agency": 1, "agencytel": 1, "agencyperson": 1,"topscopeclass": 1, "agencyaddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
+		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
+			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			agency, ok := tmp["agency"].(string)
 			if !ok || utf8.RuneCountInString(agency) < 4 {
 				continue
@@ -66,7 +72,6 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(agency).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
@@ -83,6 +88,24 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 		}
+		log.Println("存量 agency mongo遍历完成:",num)
+
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "agency",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
 
 		SourceClient.DestoryMongoConn(SourceClientcc)
 
@@ -102,11 +125,10 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", redis_agency_db)
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
-						if err = FClient.SaveForOld("agency_new", vmap); err != nil {
-							log.Println("存量 FClient.Save err", err, vmap)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_a_new"], vmap); !errb  {
+							log.Println("存量 FClient.Save err", errb, vmap)
 						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -188,138 +210,140 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
 					(*oldTmp)["updatatime"] = time.Now().Unix()
-					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
 					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("存量 EsConn err :", err)
-					}
 				}
 			}
 		}
 		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+		//发送udp 更新es段
 
 	} else {
-		//增量处理
 		overid := gtid
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
-			overid = tmp["_id"].(bson.ObjectId).Hex()
-			//log.Println(tmp["_id"])
-			agency, ok := tmp["agency"].(string)
-			if !ok || utf8.RuneCountInString(agency) < 4 {
-				continue
-			}
-			//redis查询是否存在
-			rdb := RedisPool.Get()
-			rdb.Do("SELECT", redis_agency_db)
-			if reply, err := redis.String(rdb.Do("GET", agency)); err != nil {
-				//redis不存在存到临时表,定时任务处理
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if err := FClient.SaveForOld("agency_new", tmp); err != nil {
-					log.Println("FClient.Save err", err, tmp)
-				}
-				//log.Println("get redis id err:定时任务处理", err, tmp)
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				continue
-			} else {
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				//拿到合并后的qyk
-				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp, b := FClient.FindById(Config["mgo_qyk_agency"], reply, bson.M{})
-				if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
-					log.Println("redis id 不存在")
-					continue
-				}
-				//比较合并
-				//行业类型
-				tmpTopscopeclass := []string{}
-				tmpConTopscopeclass := []string{}
-				tmpTopscopeclassMap := make(map[string]bool)
+			overid = AddAgency(overid, tmp)
+		}
+		SourceClient.DestoryMongoConn(SourceClientcc)
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+		//发送udp 更新es段
+		//nextNode("agencyent",timenow)
+	}
 
-				if v, ok := tmp["topscopeclass"].([]interface{}); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-							tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
-						}
-					}
-				}
-				for k := range tmpTopscopeclassMap {
-					tmpTopscopeclass = append(tmpTopscopeclass, k)
-				}
-				sort.Strings(tmpTopscopeclass)
-				esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+}
 
-				//更新行业类型
-				if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
 
-					(*oldTmp)["updatatime"] = time.Now().Unix()
-					//mongo更新
-					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
-						log.Println("mongo更新err", esId)
-					}
+//增量
+func AddAgency(overid string, tmp map[string]interface{}) string {
+	overid = tmp["_id"].(bson.ObjectId).Hex()
+	agency, ok := tmp["agency"].(string)
+	if !ok || utf8.RuneCountInString(agency) < 4 {
+		return overid
+	}
+	//redis查询是否存在
+	rdb := RedisPool.Get()
+	rdb.Do("SELECT", redis_agency_db)
+	if reply, err := redis.String(rdb.Do("GET", agency)); err != nil {
+		//redis不存在存到临时表,定时任务处理
+		if errb := FClient.SaveByOriID(Config["mgo_qyk_c_a_new"], tmp); !errb {
+			log.Println("FClient.Save err", errb, tmp)
+		}
+		//log.Println("get redis id err:定时任务处理", err, tmp)
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		return overid
+	} else {
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		//拿到合并后的qyk
+		oldTmp, b := FClient.FindById(Config["mgo_qyk_agency"], reply, bson.M{})
+		if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
+			log.Println("redis id 不存在", reply)
+			return overid
+		}
+		//比较合并 行业类型
+		tmpTopscopeclass := []string{}
+		tmpConTopscopeclass := []string{}
+		tmpTopscopeclassMap := make(map[string]bool)
 
-					//es更新
-					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("update es err:", err)
-					}
-					continue
-				}
-				//联系方式合并
-				var tmpperson, agencytel string
-				if tmppersona, ok := tmp["agencyperson"].(string); ok {
-					tmpperson = tmppersona
+		if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+			for _, vv := range v {
+				if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+					tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
 				}
-				if agencyteltmp, ok := tmp["agencytel"].(string); ok {
-					agencytel = agencyteltmp
-				}
-				if Reg_xing.MatchString(agencytel) || !Reg_tel.MatchString(agencytel) {
-					agencytel = ""
-				} else {
-					agencytel = agencytel
-				}
-				contactMaps := make([]interface{}, 0)
-				if (*oldTmp)["contact"] != nil {
-					//直接添加联系人,不再判断
-					if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
-						contactMaps = append(contactMaps, v...)
-					}
-				}
-				vvv := make(map[string]interface{})
-				vvv["infoid"] = overid
-				vvv["contact_person"] = tmpperson
-				vvv["contact_type"] = "项目联系人"
-				vvv["phone"] = agencytel
-				vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
-				vvv["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, vvv)
-				(*oldTmp)["contact"] = contactMaps
-				//mongo更新
-				(*oldTmp)["updatatime"] = time.Now().Unix()
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
-					log.Println("mongo更新 err", esId, oldTmp)
-				}
-				//es更新
-				delete((*oldTmp), "_id")
-				if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("EsConn err :", err)
+			}
+		}
+		for k := range tmpTopscopeclassMap {
+			tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		sort.Strings(tmpTopscopeclass)
+		esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+
+		//更新行业类型
+		if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+			(*oldTmp)["updatatime"] = time.Now().Unix()
+			//mongo更新
+			if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+				log.Println("mongo更新err", esId)
+			}
+
+			//es更新
+			delete((*oldTmp), "_id")
+			return overid
+		}
+		//联系方式合并
+		contactMaps := make([]map[string]interface{}, 0)
+		if (*oldTmp)["contact"] != nil {
+			//直接添加联系人,不再判断
+			if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+				for _, vv := range v {
+					contactMaps = append(contactMaps, vv.(map[string]interface{}))
 				}
 			}
 		}
-		SourceClient.DestoryMongoConn(SourceClientcc)
-		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+		var tmpperson, agencytel string
+		if tmppersona, ok := tmp["agencyperson"].(string); ok && tmppersona != "" && Reg_person.MatchString(tmppersona) && !Reg_xing.MatchString(tmppersona) {
+			tmpperson = tmppersona
+		}
+		if tmpperson != "" {
+			if agencyteltmp, ok := tmp["agencytel"].(string); ok {
+				agencytel = agencyteltmp
+			}
+			if Reg_xing.MatchString(agencytel) || !Reg_tel.MatchString(agencytel) {
+				agencytel = ""
+			} else {
+				agencytel = agencytel
+			}
+
+			vvv := make(map[string]interface{})
+			vvv["infoid"] = overid
+			vvv["contact_person"] = tmpperson
+			vvv["contact_type"] = "项目联系人"
+			vvv["phone"] = agencytel
+			vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
+			vvv["updatetime"] = time.Now().Unix()
+			contactMaps = append(contactMaps, vvv)
+		}
+		//分包处理
+		if tmp["package"] != nil {
+			PackageDealWithAgency(oldTmp, tmp, agency)
+		}
+		(*oldTmp)["contact"] = contactMaps
+		//mongo更新
+		(*oldTmp)["updatatime"] = time.Now().Unix()
+		if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
+			log.Println("mongo更新 err", esId, oldTmp)
+		}
+		//es更新
+		delete((*oldTmp), "_id")
 	}
+	return overid
 
 }
 
@@ -330,9 +354,10 @@ func TimedTaskAgency() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_a_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
@@ -340,9 +365,9 @@ func TimedTaskAgency() {
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
-				log.Println("临时表有数据:", tmpLast)
+				log.Println("临时表有数据:", tmpLast["_id"])
 				fconn := FClient.GetMgoConn(86400)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_a_new"]).Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
 					},
@@ -365,23 +390,10 @@ func TimedTaskAgency() {
 					rdb := RedisPool.Get()
 					rdb.Do("SELECT", redis_agency_db)
 					if _, err := redis.String(rdb.Do("GET", erragency)); err == nil {
-						//redis存在发送udp进行处理
-						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":      tmpId,
-							"lteid":     tmpId,
-							"stype":     "",
-							"data_type": "agency",
-							"data_info": "add",
-						})
-						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP("127.0.0.1"),
-							Port: Updport,
-						}); e != nil {
-							log.Println(e)
-						}
+						//增量合并
+						AddAgency(tmpId, tmp)
 						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
+						if DeletedCount := FClient.Del(Config["mgo_qyk_c_a_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -394,138 +406,156 @@ func TimedTaskAgency() {
 						}
 					}
 					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
-
-					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": erragency})
-					if !b || (*resulttmp)["_id"] == nil {
+					sessionfinone := FClient.GetMgoConn()
+					resulttmp := make(map[string]interface{})
+					err := sessionfinone.DB(Config["mgodb_enterprise"]).C(Config["mgodb_enterprise_c"]).Find(bson.M{"company_name": erragency}).One(&resulttmp)
+					FClient.DestoryMongoConn(sessionfinone)
+					if err != nil || resulttmp["_id"] == nil {
 						//log.Println(r)
 						//人工审核正则
 						var isok bool
+						//先遍历ok
 						for _, v := range AgencyRegOk {
 							isok = v.MatchString(erragency)
 							if isok {
-								tmp["agency_ok"] = 1
-								break
+								//匹配ok完,匹配err
+								for _, vRegErr := range AgencyRegErr {
+									isok = vRegErr.MatchString(erragency)
+									//匹配到ok 也匹配到err 按err算
+									if isok {
+										tmp["agency_err"] = 1
+										break
+									}
+								}
+								//匹配ok,没匹配err 按ok算
+								if tmp["agency_err"] == nil {
+									tmp["agency_ok"] = 1
+									break
+								}
 							}
 						}
-						if tmp["agency_ok"] == nil{
-							tmp["agency_err"] =1
+						//都没匹配
+						if tmp["agency_ok"] == nil && tmp["agency_err"] == nil {
+							tmp["agency_err"] = 1
 						}
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						//winner_err winner_ok buyer_err buyer_err agency_err agency_ok
-						if err := FClient.SaveForOld("agency_err", tmp); err != nil {
-							log.Println("存入异常表错误", err, tmp)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_a_err"], tmp); !errb {
+							log.Println("存入异常表错误", errb, tmp)
 						}
-						if deleteNum := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
+						if deleteNum := FClient.Del(Config["mgo_qyk_c_a_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
-						//匹配到原始库,新增 resulttmp
-						if (*resulttmp)["credit_no"] != nil {
-							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						//匹配到原始库,新增 resulttmp    angency
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
-											(*resulttmp)["province"] = v["province"]
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
 										}
-										(*resulttmp)["city"] = v["city"]
-										(*resulttmp)["district"] = v["district"]
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
 
 									}
 								}
 							}
 						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if (*resulttmp)["legal_person"] != nil {
-							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if (*resulttmp)["annual_reports"] != nil {
-							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
+
+						//行业类型
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]interface{}); ok {
+							for _, vv := range tclasss {
+								if vvv, ok := vv.(string); ok {
+									if len(vvv) > 1 {
+										tmpclass = append(tmpclass, vvv[:len(vvv)-1])
+									}
+								}
 							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
+						}
+						contacts := make([]map[string]interface{}, 0)
+						if legal_person, ok := resulttmp["legal_person"].(string); ok && legal_person != "" && !Reg_xing.MatchString(legal_person) && Reg_person.MatchString(legal_person) {
+							contact := make(map[string]interface{}, 0)
+							contact["contact_person"] = legal_person //联系人
+							contact["contact_type"] = "法定代表人"        //法定代表人
+							if resulttmp["annual_reports"] != nil {
+								bytes, err := json.Marshal(resulttmp["annual_reports"])
+								if err != nil {
+									log.Println("annual_reports err:", err)
+								}
+								phonetmp := make([]map[string]interface{}, 0)
+								err = json.Unmarshal(bytes, &phonetmp)
+								if err != nil {
+									log.Println("Unmarshal err:", err)
+								}
+								for _, vv := range phonetmp {
+									if vv["company_phone"] != nil {
+										if vv["company_phone"] == "" {
+											continue
+										} else {
+											contact["phone"] = vv["company_phone"] //联系电话
+											break
+										}
 									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
+										contact["phone"] = "" //联系电话
 									}
-								} else {
+
+								}
+							}
+							//log.Println(k, contact["phone"], resulttmp["_id"])
+							//time.Sleep(10 * time.Second)
+							if phone, ok := contact["phone"].(string); ok && phone != "" {
+								if Reg_xing.MatchString(phone) || !Reg_tel.MatchString(phone) {
 									contact["phone"] = "" //联系电话
 								}
-
+							} else {
+								contact["phone"] = "" //联系电话
 							}
+							contact["topscopeclass"] = "企业公示"         //项目类型
+							contact["updatetime"] = time.Now().Unix() //更新时间
+							contact["infoid"] = ""                    //招标信息id
+							contacts = append(contacts, contact)
 						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contact["infoid"] = ""                    //招标信息id
-						contacts = append(contacts, contact)
+
+
 						//添加临时表匹配到的联系人
-						vvv := make(map[string]interface{})
-						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
-						if tmp["agencyperson"] != nil {
-							vvv["contact_person"] = tmp["agencyperson"]
-						} else {
-							vvv["contact_person"] = ""
-						}
-						vvv["contact_type"] = "项目联系人"
-						//	"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1
-						if tmp["agencytel"] != nil {
-							vvv["phone"] = tmp["agencytel"]
-						} else {
-							vvv["phone"] = ""
-						}
-						tmpclass := make([]string, 0)
-						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
-							for _, vv := range tclasss {
-								if len(vv) > 1 {
-									tmpclass = append(tmpclass, vv[:len(vv)-1])
-								}
+						if agencyperson, ok := tmp["agencyperson"].(string); ok && agencyperson != "" &&
+							!Reg_xing.MatchString(agencyperson) && Reg_person.MatchString(agencyperson) {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+							vvv["contact_person"] = agencyperson
+							vvv["contact_type"] = "项目联系人"
+							if agencytel, ok := tmp["agencytel"].(string); ok && !Reg_xing.MatchString(agencytel) && Reg_tel.MatchString(agencytel) {
+								vvv["phone"] = agencytel
+							} else {
+								vvv["phone"] = ""
 							}
+
+							vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contacts = append(contacts, vvv)
 						}
-						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contacts = append(contacts, vvv)
-						(*resulttmp)["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range AgencyFields {
 							if sk == "_id" {
-								savetmp["tmp"+sk] = (*resulttmp)[sk]
+								savetmp["tmp"+sk] = resulttmp[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if (*resulttmp)["report_websites"] == nil {
+								if resulttmp["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -542,77 +572,71 @@ func TimedTaskAgency() {
 								savetmp[sk] = []interface{}{}
 								continue
 							} else if sk == "agency_name" {
-								if (*resulttmp)["company_name"] == nil {
+								if resulttmp["company_name"] == nil {
 									savetmp[sk] = ""
 								} else {
-									savetmp[sk] = (*resulttmp)["company_name"]
+									savetmp[sk] = resulttmp["company_name"]
 								}
 								continue
 							} else if sk == "address" {
-								if (*resulttmp)["company_address"] == nil {
+								if resulttmp["company_address"] == nil {
 									savetmp[sk] = ""
 								} else {
-									savetmp[sk] = (*resulttmp)["company_address"]
+									savetmp[sk] = resulttmp["company_address"]
 								}
 								continue
 							}
 
-							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
 								sk != "agency_name" && sk != "address" &&
 								sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = (*resulttmp)[sk]
+								savetmp[sk] = resulttmp[sk]
 							}
 						}
 						//tmps = append(tmps, savetmp)
+						savetmp["comeintime"] = time.Now().Unix()
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-
 						saveid := FClient.Save(Config["mgo_qyk_agency"], savetmp)
 						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
 							rc.Do("SELECT", redis_agency_db)
-							//var _id string
-							//if v, ok := saveid.(primitive.ObjectID); ok {
-							//	_id = v.Hex()
-							//}
 							if _, err := rc.Do("SET", savetmp["agency_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
 							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
-										log.Println("删除临时表失败", deleteNum)
-									}
+								//删除临时表
+								if deleteNum := FClient.Del(Config["mgo_qyk_c_a_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
+									log.Println("删除临时表失败", deleteNum)
 								}
 							}
+							if err := rc.Close(); err != nil {
+								log.Println(err)
+							}
 						} else {
 							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
 					}
 				}
 				FClient.DestoryMongoConn(fconn)
-				log.Println("buyer_new,遍历完成")
+				log.Println("agency_new,遍历完成")
 			}
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("agencyent",timenow)
 	}
 }
+
+//分包处理
+func PackageDealWithAgency(contactMap *map[string]interface{}, tmp map[string]interface{}, comName string) []interface{} {
+	util.Catch()
+	//if v, ok := tmp["package"].(map[string]interface{}); ok {
+	//for i, pv := range v {
+	//	log.Println(i, pv)
+	//}
+	//}
+	return nil
+}

+ 297 - 264
udp_winner/timedTaskBuyer.go

@@ -6,7 +6,7 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"gopkg.in/mgo.v2/bson"
 	"log"
-	mu "mfw/util"
+	util2 "mfw/util"
 	"net"
 	"qfw/util"
 	"sort"
@@ -31,11 +31,12 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
+	//timenow:=time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// buyeraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -58,7 +59,12 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		//遍历bidding表保存到redis
 		//key:企业名  value:json结构体{"buyer": 1, "buyertel": 1, "buyerperson": 1,"topscopeclass": 1, "buyeraddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
+		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
+			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			buyer, ok := tmp["buyer"].(string)
 			if !ok || utf8.RuneCountInString(buyer) < 4 {
 				continue
@@ -66,7 +72,6 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(buyer).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
@@ -83,6 +88,25 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 		}
+		log.Println("存量buyer mongo遍历完成:",num)
+
+
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "buyer",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
 
 		SourceClient.DestoryMongoConn(SourceClientcc)
 
@@ -102,11 +126,10 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", redis_buyer_db)
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
-						if err = FClient.SaveForOld("buyer_new", vmap); err != nil {
-							log.Println("存量 FClient.Save err", err, vmap)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_new"], vmap); !errb {
+							log.Println("存量 FClient.Save err", errb, vmap)
 						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -206,15 +229,11 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
 					(*oldTmp)["updatatime"] = time.Now().Unix()
-					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
 					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("存量 EsConn err :", err)
-					}
 				}
 			}
 		}
@@ -225,139 +244,143 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		overid := gtid
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
-			overid = tmp["_id"].(bson.ObjectId).Hex()
-			//log.Println(tmp["_id"])
-			buyer, ok := tmp["buyer"].(string)
-			if !ok || utf8.RuneCountInString(buyer) < 4 {
-				continue
-			}
-			//redis查询是否存在
-			rdb := RedisPool.Get()
-			rdb.Do("SELECT", redis_buyer_db)
-			if reply, err := redis.String(rdb.Do("GET", buyer)); err != nil {
-				//redis不存在存到临时表,定时任务处理
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if err := FClient.SaveForOld("buyer_new", tmp); err != nil {
-					log.Println("FClient.Save err", err, tmp)
-				}
-				//log.Println("get redis id err:定时任务处理", err, tmp)
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				continue
-			} else {
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				//拿到合并后的qyk
-				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, bson.M{})
-				if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
-					log.Println("redis id 不存在")
-					continue
-				}
-				//比较合并
-				//行业类型
-				tmpTopscopeclass := []string{}
-				tmpConTopscopeclass := []string{}
-				tmpTopscopeclassMap := make(map[string]bool)
-
-				if v, ok := tmp["topscopeclass"].([]interface{}); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-							tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
-						}
-					}
-				}
-				for k := range tmpTopscopeclassMap {
-					tmpTopscopeclass = append(tmpTopscopeclass, k)
-				}
-				sort.Strings(tmpTopscopeclass)
-				esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+			overid = AddBuyer(overid, tmp)
+		}
+		SourceClient.DestoryMongoConn(SourceClientcc)
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+		//发送udp 更新es段
+		//nextNode("buyerent",timenow)
+	}
 
-				//更新buyerclass合并
-				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
-					//无值,不更新
-				} else {
-					var buyerclass_new, buyerclass_old string
-					buyerclass_new = tmp["buyerclass"].(string)
-					buyerclass_old = (*oldTmp)["buyerclass"].(string)
-					if buyerclass_old == "" {
-						(*oldTmp)["buyerclass"] = buyerclass_new
-					} else {
-						if buyerclass_new != buyerclass_old {
-							if !strings.Contains(buyerclass_old, buyerclass_new) {
-								(*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型
-							}
-						}
-					}
-				}
+}
 
-				//更新行业类型
-				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
 
-					(*oldTmp)["updatatime"] = time.Now().Unix()
-					//mongo更新
-					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
-						log.Println("mongo更新err", esId)
-					}
+//增量
+func AddBuyer(overid string, tmp map[string]interface{}) string {
+	overid = tmp["_id"].(bson.ObjectId).Hex()
+	buyer, ok := tmp["buyer"].(string)
+	if !ok || utf8.RuneCountInString(buyer) < 4 {
+		return overid
+	}
+	//redis查询是否存在
+	rdb := RedisPool.Get()
+	rdb.Do("SELECT", redis_buyer_db)
+	if reply, err := redis.String(rdb.Do("GET", buyer)); err != nil {
+		//redis不存在存到临时表,定时任务处理
+		if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_new"], tmp); !errb {
+			log.Println("FClient.Save err", errb, tmp)
+		}
+		//log.Println("get redis id err:定时任务处理", err, tmp)
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		return overid
+	} else {
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		//拿到合并后的qyk
+		oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, bson.M{})
+		if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
+			log.Println("redis id 不存在", reply)
+			return overid
+		}
+		//比较合并 行业类型
+		tmpTopscopeclass := []string{}
+		tmpConTopscopeclass := []string{}
+		tmpTopscopeclassMap := make(map[string]bool)
 
-					//es更新
-					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("update es err:", err)
-					}
-					continue
+		if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+			for _, vv := range v {
+				if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+					tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
 				}
-				//联系方式合并
-				var tmpperson, buyertel string
-				if tmppersona, ok := tmp["buyerperson"].(string); ok {
-					tmpperson = tmppersona
-				}
-				if buyerteltmp, ok := tmp["buyertel"].(string); ok {
-					buyertel = buyerteltmp
-				}
-				if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) {
-					buyertel = ""
-				} else {
-					buyertel = buyertel
-				}
-				contactMaps := make([]interface{}, 0)
-				if (*oldTmp)["contact"] != nil {
-					//直接添加联系人,不再判断
-					if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
-						contactMaps = append(contactMaps, v...)
+			}
+		}
+		for k := range tmpTopscopeclassMap {
+			tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		sort.Strings(tmpTopscopeclass)
+		esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+
+		//更新buyerclass合并
+		if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+			//无值,不更新
+		} else {
+			var buyerclass_new, buyerclass_old string
+			buyerclass_new = tmp["buyerclass"].(string)
+			buyerclass_old = (*oldTmp)["buyerclass"].(string)
+			if buyerclass_old == "" {
+				(*oldTmp)["buyerclass"] = buyerclass_new
+			} else {
+				if buyerclass_new != buyerclass_old {
+					if !strings.Contains(buyerclass_old, buyerclass_new) {
+						(*oldTmp)["buyerclass"] = buyerclass_old + "," + buyerclass_new //采购单位类型
 					}
 				}
-				vvv := make(map[string]interface{})
-				vvv["infoid"] = overid
-				vvv["contact_person"] = tmpperson
-				vvv["contact_type"] = "项目联系人"
-				vvv["phone"] = buyertel
-				vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
-				vvv["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, vvv)
-				(*oldTmp)["contact"] = contactMaps
-				//mongo更新
-				(*oldTmp)["updatatime"] = time.Now().Unix()
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
-					log.Println("mongo更新 err", esId, oldTmp)
-				}
-				//es更新
-				delete((*oldTmp), "_id")
-				if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("EsConn err :", err)
+			}
+		}
+
+		//更新行业类型
+		if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+			(*oldTmp)["updatatime"] = time.Now().Unix()
+			//mongo更新
+			if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+				log.Println("mongo更新err", esId)
+			}
+
+			//es更新
+			delete((*oldTmp), "_id")
+			return overid
+		}
+		//联系方式合并
+		contactMaps := make([]map[string]interface{}, 0)
+		if (*oldTmp)["contact"] != nil {
+			//直接添加联系人,不再判断
+			if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+				for _, vv := range v {
+					contactMaps = append(contactMaps, vv.(map[string]interface{}))
 				}
 			}
 		}
-		SourceClient.DestoryMongoConn(SourceClientcc)
+		var tmpperson, buyertel string
+		if tmppersona, ok := tmp["buyerperson"].(string); ok && tmppersona != "" && Reg_person.MatchString(tmppersona) && !Reg_xing.MatchString(tmppersona) {
+			tmpperson = tmppersona
+		}
+		if tmpperson != "" {
+			if buyerteltmp, ok := tmp["buyertel"].(string); ok {
+				buyertel = buyerteltmp
+			}
+			if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) {
+				buyertel = ""
+			} else {
+				buyertel = buyertel
+			}
 
-		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+			vvv := make(map[string]interface{})
+			vvv["infoid"] = overid
+			vvv["contact_person"] = tmpperson
+			vvv["contact_type"] = "项目联系人"
+			vvv["phone"] = buyertel
+			vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
+			vvv["updatetime"] = time.Now().Unix()
+			contactMaps = append(contactMaps, vvv)
+		}
+		//分包处理
+		if tmp["package"] != nil {
+			PackageDealWithBuyer(oldTmp, tmp, buyer)
+		}
+		(*oldTmp)["contact"] = contactMaps
+		//mongo更新
+		(*oldTmp)["updatatime"] = time.Now().Unix()
+		if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
+			log.Println("mongo更新 err", esId, oldTmp)
+		}
+		//es更新
+		delete((*oldTmp), "_id")
 	}
-
+	return overid
 }
 
 //定时任务  新增
@@ -367,9 +390,10 @@ func TimedTaskBuyer() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow := time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
@@ -377,9 +401,9 @@ func TimedTaskBuyer() {
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
-				log.Println("临时表有数据:", tmpLast)
+				log.Println("临时表有数据:", tmpLast["_id"])
 				fconn := FClient.GetMgoConn(86400)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
 					},
@@ -402,23 +426,10 @@ func TimedTaskBuyer() {
 					rdb := RedisPool.Get()
 					rdb.Do("SELECT", redis_buyer_db)
 					if _, err := redis.String(rdb.Do("GET", errbuyer)); err == nil {
-						//redis存在发送udp进行处理
-						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":      tmpId,
-							"lteid":     tmpId,
-							"stype":     "",
-							"data_type": "buyer",
-							"data_info": "add",
-						})
-						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP("127.0.0.1"),
-							Port: Updport,
-						}); e != nil {
-							log.Println(e)
-						}
+						//增量合并
+						AddBuyer(tmpId, tmp)
 						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
+						if DeletedCount := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -430,138 +441,160 @@ func TimedTaskBuyer() {
 							log.Println(err)
 						}
 					}
-					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
 
-					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": errbuyer})
-					if !b || (*resulttmp)["_id"] == nil {
+					//查询redis不存在新增
+					sessionfinone := FClient.GetMgoConn()
+					resulttmp := make(map[string]interface{})
+					err := sessionfinone.DB(Config["mgodb_enterprise"]).C(Config["mgodb_enterprise_c"]).Find(bson.M{"company_name": errbuyer}).One(&resulttmp)
+					FClient.DestoryMongoConn(sessionfinone)
+					if err != nil || resulttmp["_id"] == nil {
 						//log.Println(r)
-						//人工审核
+						//人工审核正则
 						var isok bool
-						for _, v := range BuerRegOk {
+						//先遍历ok
+						for _, v := range BuyerRegOk {
 							isok = v.MatchString(errbuyer)
 							if isok {
-								tmp["buyer_ok"] = 1
-								break
+								//匹配ok完,匹配err
+								for _, vRegErr := range BuyerRegErr {
+									isok = vRegErr.MatchString(errbuyer)
+									//匹配到ok 也匹配到err 按err算
+									if isok {
+										tmp["buyer_err"] = 1
+										break
+									}
+								}
+								//匹配ok,没匹配err 按ok算
+								if tmp["buyer_err"] == nil {
+									tmp["buyer_ok"] = 1
+									break
+								}
 							}
 						}
-						if tmp["buyer_ok"] == nil {
+						//都没匹配
+						if tmp["buyer_ok"] == nil && tmp["bnuyer_err"] == nil {
 							tmp["buyer_err"] = 1
 						}
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if err := FClient.SaveForOld("buyer_err", tmp); err != nil {
-							log.Println("存入异常表错误", err, tmp)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_b_err"], tmp); !errb{
+							log.Println("存入异常表错误", errb, tmp)
 						}
-						if deleteNum := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
+						if deleteNum := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
 						//匹配到原始库,新增 resulttmp    buyer
-						if (*resulttmp)["credit_no"] != nil {
-							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
-											(*resulttmp)["province"] = v["province"]
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
 										}
-										(*resulttmp)["city"] = v["city"]
-										(*resulttmp)["district"] = v["district"]
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
 
 									}
 								}
 							}
 						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if (*resulttmp)["legal_person"] != nil {
-							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if (*resulttmp)["annual_reports"] != nil {
-							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
+
+						//行业类型
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]interface{}); ok {
+							for _, vv := range tclasss {
+								if vvv, ok := vv.(string); ok {
+									if len(vvv) > 1 {
+										tmpclass = append(tmpclass, vvv[:len(vvv)-1])
+									}
+								}
 							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
+						}
+						contacts := make([]map[string]interface{}, 0)
+						if legal_person, ok := resulttmp["legal_person"].(string); ok && legal_person != "" && !Reg_xing.MatchString(legal_person) && Reg_person.MatchString(legal_person) {
+							contact := make(map[string]interface{}, 0)
+							contact["contact_person"] = legal_person //联系人
+							contact["contact_type"] = "法定代表人"        //法定代表人
+							if resulttmp["annual_reports"] != nil {
+								bytes, err := json.Marshal(resulttmp["annual_reports"])
+								if err != nil {
+									log.Println("annual_reports err:", err)
+								}
+								phonetmp := make([]map[string]interface{}, 0)
+								err = json.Unmarshal(bytes, &phonetmp)
+								if err != nil {
+									log.Println("Unmarshal err:", err)
+								}
+								for _, vv := range phonetmp {
+									if vv["company_phone"] != nil {
+										if vv["company_phone"] == "" {
+											continue
+										} else {
+											contact["phone"] = vv["company_phone"] //联系电话
+											break
+										}
 									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
+										contact["phone"] = "" //联系电话
 									}
-								} else {
+
+								}
+							}
+							//log.Println(k, contact["phone"], resulttmp["_id"])
+							//time.Sleep(10 * time.Second)
+							if phone, ok := contact["phone"].(string); ok && phone != "" {
+								if Reg_xing.MatchString(phone) || !Reg_tel.MatchString(phone) {
 									contact["phone"] = "" //联系电话
 								}
-
+							} else {
+								contact["phone"] = "" //联系电话
 							}
+							contact["topscopeclass"] = "企业公示"         //项目类型
+							contact["updatetime"] = time.Now().Unix() //更新时间
+							contact["infoid"] = ""                    //招标信息id
+							contacts = append(contacts, contact)
 						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contact["infoid"] = ""                    //招标信息id
-						contacts = append(contacts, contact)
+
+
 						//添加临时表匹配到的联系人
-						vvv := make(map[string]interface{})
-						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
-						if tmp["buyerperson"] != nil {
-							vvv["contact_person"] = tmp["buyerperson"]
-						} else {
-							vvv["contact_person"] = ""
-						}
-						vvv["contact_type"] = "项目联系人"
-						//	"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1 buyerclass : 1
-						if tmp["buyertel"] != nil {
-							vvv["phone"] = tmp["buyertel"]
-						} else {
-							vvv["phone"] = ""
-						}
-						tmpclass := make([]string, 0)
-						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
-							for _, vv := range tclasss {
-								if len(vv) > 1 {
-									tmpclass = append(tmpclass, vv[:len(vv)-1])
-								}
+						if buyerperson, ok := tmp["buyerperson"].(string); ok && buyerperson != "" &&
+							!Reg_xing.MatchString(buyerperson) && Reg_person.MatchString(buyerperson) {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+							vvv["contact_person"] = buyerperson
+							vvv["contact_type"] = "项目联系人"
+							if buyertel, ok := tmp["buyertel"].(string); ok && !Reg_xing.MatchString(buyertel) && Reg_tel.MatchString(buyertel) {
+								vvv["phone"] = buyertel
+							} else {
+								vvv["phone"] = ""
 							}
+
+							vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contacts = append(contacts, vvv)
 						}
-						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contacts = append(contacts, vvv)
-						(*resulttmp)["contact"] = contacts
+
+
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range BuyerFields {
 							if sk == "_id" {
-								savetmp["tmp"+sk] = (*resulttmp)[sk]
+								savetmp["tmp"+sk] = resulttmp[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if (*resulttmp)["report_websites"] == nil {
+								if resulttmp["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -578,17 +611,17 @@ func TimedTaskBuyer() {
 								savetmp[sk] = []interface{}{}
 								continue
 							} else if sk == "buyer_name" {
-								if (*resulttmp)["company_name"] == nil {
+								if resulttmp["company_name"] == nil {
 									savetmp[sk] = ""
 								} else {
-									savetmp[sk] = (*resulttmp)["company_name"]
+									savetmp[sk] = resulttmp["company_name"]
 								}
 								continue
 							} else if sk == "address" {
-								if (*resulttmp)["company_address"] == nil {
+								if resulttmp["company_address"] == nil {
 									savetmp[sk] = ""
 								} else {
-									savetmp[sk] = (*resulttmp)["company_address"]
+									savetmp[sk] = resulttmp["company_address"]
 								}
 								continue
 							}else if sk == "buyerclass" {
@@ -601,57 +634,45 @@ func TimedTaskBuyer() {
 							}
 
 
-							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
 								sk != "buyer_name" && sk != "address" && sk != "buyerclass"&&
 								sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = (*resulttmp)[sk]
+								savetmp[sk] = resulttmp[sk]
 							}
 						}
+
+
+
+						//判断分包
+						if tmp["package"] != nil {
+							PackageDealWithBuyer(&savetmp, tmp, errbuyer)
+						}
 						//tmps = append(tmps, savetmp)
+						savetmp["comeintime"] = time.Now().Unix()
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-
 						saveid := FClient.Save(Config["mgo_qyk_buyer"], savetmp)
 						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
 							rc.Do("SELECT", redis_buyer_db)
-							//var _id string
-							//if v, ok := saveid.(primitive.ObjectID); ok {
-							//	_id = v.Hex()
-							//}
 							if _, err := rc.Do("SET", savetmp["buyer_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
 							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
-										log.Println("删除临时表失败", deleteNum)
-									}
+								//删除临时表
+								if deleteNum := FClient.Del(Config["mgo_qyk_c_b_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
+									log.Println("删除临时表失败", deleteNum)
 								}
 							}
+							if err := rc.Close(); err != nil {
+								log.Println(err)
+							}
 						} else {
 							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
 					}
-
 				}
 				FClient.DestoryMongoConn(fconn)
 				log.Println("buyer_new,遍历完成")
@@ -659,5 +680,17 @@ func TimedTaskBuyer() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("buyerent",timenow)
 	}
 }
+
+//分包处理
+func PackageDealWithBuyer(contactMap *map[string]interface{}, tmp map[string]interface{}, comName string) []interface{} {
+	util.Catch()
+	//if v, ok := tmp["package"].(map[string]interface{}); ok {
+	//for i, pv := range v {
+	//	log.Println(i, pv)
+	//}
+	//}
+	return nil
+}

+ 306 - 234
udp_winner/timedTaskWinner.go

@@ -4,9 +4,10 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
-	"gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	util2 "mfw/util"
+	"net"
 	"qfw/util"
 	"sort"
 	"strings"
@@ -30,11 +31,12 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
+	//timenow := time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// winneraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -57,7 +59,12 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		//遍历bidding表保存到redis
 		//key:企业名  value:json结构体{"winner": 1, "winnertel": 1, "winnerperson": 1,"topscopeclass": 1, "winneraddr": 1,"_id":1}
 		tmp := make(map[string]interface{})
+		var num int
+		var tmpRangeId string
 		for cursor.Next(&tmp) {
+			num++
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			tmpRangeId = mgoId
 			winner, ok := tmp["winner"].(string)
 			if !ok || utf8.RuneCountInString(winner) < 4 {
 				continue
@@ -65,11 +72,57 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			//判断redis key是否存在
 			e_num := conn.Exists(winner).Val()
 			//获取字符串_id
-			mgoId := tmp["_id"].(bson.ObjectId).Hex()
 			//替换_id
 			tmp["_id"] = mgoId
 			//创建value数组
 			tmps := make([]map[string]interface{}, 0)
+			//存量删除分包
+			if v, ok := tmp["package"].(map[string]interface{}); ok {
+				for i, vv := range v {
+					if vvv, ok2 := vv.(map[string]interface{}); ok2 {
+						if pwinner, ok := vvv["winner"].(string); ok && pwinner != "" {
+							if pwinper, ok4 := vvv["winnerperson"].(string); ok4 && pwinper != "" {
+								ptmp := make(map[string]interface{})
+								ptmp["p_id"] = mgoId + "_pkg_" + i
+								ptmp["_id"] = bson.NewObjectId().Hex()
+								ptmp["winner"] = winner
+								ptmp["winnerperson"] = pwinper
+								if pkgtel, ok6 := vvv["winnertel"].(string); ok6 {
+									ptmp["winnertel"] = pkgtel
+								}
+								if tmp["topscopeclass"] != nil {
+									ptmp["topscopeclass"] = tmp["topscopeclass"]
+								}
+								//分包里中标单位和企业名一样
+								if pwinner == winner {
+									tmps = append(tmps, ptmp)
+								} else if conn.Exists(pwinner).Val() > 0 {
+									//分包里中标单位和企业名不一样,存量里匹配上了
+									bytes, _ := conn.Get(pwinner).Bytes()
+									cltmps := make([]map[string]interface{}, 0)
+									json.Unmarshal(bytes, &cltmps)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key存在,合并  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								} else {
+									//分包和企业名不一样,而且存量没匹配上
+									cltmps := make([]map[string]interface{}, 0)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key新增,新增  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								}
+							}
+						}
+					}
+				}
+				delete(tmp, "package")
+			}
 			if e_num > 0 {
 				//存量redis的key存在,累加更新
 				bytes, _ := conn.Get(winner).Bytes()
@@ -82,6 +135,23 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 		}
+		log.Println("存量 winner mongo遍历完成:", num)
+		if tmpRangeId != lteid{
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  tmpRangeId,
+				"lteid": lteid,
+				"data_info":"save",
+				"stype": "winner",
+			})
+			if e := udpclient.WriteUdp(by, util2.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP("127.0.0.1"),
+				Port: Updport,
+			}); e != nil {
+				log.Println(e)
+			}
+			SourceClient.DestoryMongoConn(SourceClientcc)
+			return
+		}
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		//遍历redis
 		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
@@ -99,14 +169,10 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 				rdb.Do("SELECT", redis_winner_db)
 				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
-					//if tmpid := FClient.Save("winner_new", tmps); tmpid == nil {
-					//	log.Println("存量 FClient.Save err", tmpid)
-					//}
 					for _, vmap := range rValuesMaps {
 						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
-						if err = FClient.SaveForOld("winner_new", vmap); err != nil {
-							log.Println("存量 FClient.Save err", err, vmap)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_w_new"], vmap); !errb {
+							log.Println("存量 FClient.Save err", errb, vmap)
 						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -120,7 +186,6 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 						log.Println(err)
 					}
 					//拿到合并后的qyk
-					FClient.DbName = Config["mgodb_extract_kf"]
 					oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, nil)
 					if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
 						log.Println(redisCName, "存量 redis id 不存在", reply)
@@ -197,116 +262,118 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
 					(*oldTmp)["updatatime"] = time.Now().Unix()
-					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
 					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("存量 EsConn err :", err)
-					}
 				}
 			}
 		}
 		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+		//发送udp 更新es段
 
 	} else {
-		//增量
-		overid := addfunc(gtid, cursor)
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = Add(overid, tmp)
+		}
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+		//发送udp 更新es段
+		//nextNode("winnerent", timenow)
 	}
 
 }
 
 //增量
-func addfunc(gtid string, cursor *mgo.Iter) string {
-	//增量处理
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(bson.ObjectId).Hex()
-		//log.Println(tmp["_id"])
-		winner, ok := tmp["winner"].(string)
-		if !ok || utf8.RuneCountInString(winner) < 4 {
-			continue
+func Add(overid string, tmp map[string]interface{}) string {
+	overid = tmp["_id"].(bson.ObjectId).Hex()
+	winner, ok := tmp["winner"].(string)
+	if !ok || utf8.RuneCountInString(winner) < 4 {
+		return overid
+	}
+	//redis查询是否存在
+	rdb := RedisPool.Get()
+	rdb.Do("SELECT", redis_winner_db)
+	if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
+		//redis不存在存到临时表,定时任务处理
+		if errb := FClient.SaveByOriID(Config["mgo_qyk_c_w_new"], tmp); !errb {
+			log.Println("FClient.Save err", errb, tmp)
 		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		rdb.Do("SELECT", redis_winner_db)
-		if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if err := FClient.SaveForOld("winner_new", tmp); err != nil {
-				log.Println("FClient.Save err", err, tmp)
-			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			continue
-		} else {
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
-			if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
-				log.Println("redis id 不存在")
-				continue
-			}
-			//比较合并 行业类型
-			tmpTopscopeclass := []string{}
-			tmpConTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+		//log.Println("get redis id err:定时任务处理", err, tmp)
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		return overid
+	} else {
+		if err := rdb.Close(); err != nil {
+			log.Println(err)
+		}
+		//拿到合并后的qyk
+		oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
+		if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
+			log.Println("redis id 不存在", reply)
+			return overid
+		}
+		//比较合并 行业类型
+		tmpTopscopeclass := []string{}
+		tmpConTopscopeclass := []string{}
+		tmpTopscopeclassMap := make(map[string]bool)
 
-			if (*oldTmp)["industry"] != nil {
-				if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
-						}
-					}
-				}
-			}
-			if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+		if (*oldTmp)["industry"] != nil {
+			if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
 				for _, vv := range v {
-					if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-						tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
+					if vvv, ok := vv.(string); ok {
+						tmpTopscopeclassMap[vvv] = true
 					}
 				}
 			}
-			for k := range tmpTopscopeclassMap {
-				tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+			for _, vv := range v {
+				if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+					tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
+				}
 			}
-			sort.Strings(tmpTopscopeclass)
-			(*oldTmp)["industry"] = tmpTopscopeclass
+		}
+		for k := range tmpTopscopeclassMap {
+			tmpTopscopeclass = append(tmpTopscopeclass, k)
+		}
+		sort.Strings(tmpTopscopeclass)
+		(*oldTmp)["industry"] = tmpTopscopeclass
 
-			esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				(*oldTmp)["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-					log.Println("mongo更新err", esId)
-				}
+		esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+		//更新行业类型
+		if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+			(*oldTmp)["updatatime"] = time.Now().Unix()
+			//mongo更新
+			if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+				log.Println("mongo更新err", esId)
+			}
 
-				//es更新
-				delete((*oldTmp), "_id")
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
+			//es更新
+			delete((*oldTmp), "_id")
+			return overid
+		}
+		//联系方式合并
+		contactMaps := make([]map[string]interface{}, 0)
+		if (*oldTmp)["contact"] != nil {
+			//直接添加联系人,不再判断
+			if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+				for _, vv := range v {
+					contactMaps = append(contactMaps, vv.(map[string]interface{}))
 				}
-				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			if tmppersona, ok := tmp["winnerperson"].(string); ok {
-				tmpperson = tmppersona
 			}
+		}
+		var tmpperson, winnertel string
+		if tmppersona, ok := tmp["winnerperson"].(string); ok && tmppersona != "" && Reg_person.MatchString(tmppersona) && !Reg_xing.MatchString(tmppersona) {
+			tmpperson = tmppersona
+		}
+		if tmpperson != "" {
 			if winnerteltmp, ok := tmp["winnertel"].(string); ok {
 				winnertel = winnerteltmp
 			}
@@ -315,13 +382,7 @@ func addfunc(gtid string, cursor *mgo.Iter) string {
 			} else {
 				winnertel = winnertel
 			}
-			contactMaps := make([]interface{}, 0)
-			if (*oldTmp)["contact"] != nil {
-				//直接添加联系人,不再判断
-				if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
-					contactMaps = append(contactMaps, v...)
-				}
-			}
+
 			vvv := make(map[string]interface{})
 			vvv["infoid"] = overid
 			vvv["contact_person"] = tmpperson
@@ -330,23 +391,22 @@ func addfunc(gtid string, cursor *mgo.Iter) string {
 			vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
 			vvv["updatetime"] = time.Now().Unix()
 			contactMaps = append(contactMaps, vvv)
-			//分包处理
-			PackageDealWith(&contactMaps, tmp)
-			(*oldTmp)["contact"] = contactMaps
-			//mongo更新
-			(*oldTmp)["updatatime"] = time.Now().Unix()
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-				log.Println("mongo更新 err", esId, oldTmp)
-			}
-			//es更新
-			delete((*oldTmp), "_id")
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
 		}
+		//分包处理
+		if tmp["package"] != nil {
+			PackageDealWith(tmp, tmpConTopscopeclass, overid)
+		}
+		(*oldTmp)["contact"] = contactMaps
+		//mongo更新
+		(*oldTmp)["updatatime"] = time.Now().Unix()
+		if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+			log.Println("mongo更新 err", esId, oldTmp)
+		}
+		//es更新
+		delete((*oldTmp), "_id")
 	}
 	return overid
+
 }
 
 //定时任务  新增
@@ -356,9 +416,10 @@ func TimedTaskWinner() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_w_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
@@ -366,9 +427,9 @@ func TimedTaskWinner() {
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
-				log.Println("临时表有数据:", tmpLast)
+				log.Println("临时表有数据:", tmpLast["_id"])
 				fconn := FClient.GetMgoConn(86400)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_w_new"]).Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
 					},
@@ -391,14 +452,10 @@ func TimedTaskWinner() {
 					rdb := RedisPool.Get()
 					rdb.Do("SELECT", redis_winner_db)
 					if _, err := redis.String(rdb.Do("GET", errwinner)); err == nil {
-						//redis存在发送udp进行处理
-						TaskWinner(&map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-						})
+						//增量合并
+						Add(tmpId, tmp)
 						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
+						if DeletedCount := FClient.Del(Config["mgo_qyk_c_w_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -411,10 +468,11 @@ func TimedTaskWinner() {
 						}
 					}
 					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
-
-					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": errwinner})
-					if !b || (*resulttmp)["_id"] == nil {
+					sessionfinone := FClient.GetMgoConn()
+					resulttmp := make(map[string]interface{})
+					err := sessionfinone.DB(Config["mgodb_enterprise"]).C(Config["mgodb_enterprise_c"]).Find(bson.M{"company_name": errwinner}).One(&resulttmp)
+					FClient.DestoryMongoConn(sessionfinone)
+					if err != nil || resulttmp["_id"] == nil {
 						//log.Println(r)
 						//人工审核正则
 						var isok bool
@@ -439,122 +497,126 @@ func TimedTaskWinner() {
 							}
 						}
 						//都没匹配
-						if tmp["winner_ok"] == nil  && tmp["winner_err"] == nil{
+						if tmp["winner_ok"] == nil && tmp["winner_err"] == nil {
 							tmp["winner_err"] = 1
 						}
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if err := FClient.SaveForOld("winner_err", tmp); err != nil {
-							log.Println("存入异常表错误", err, tmp)
+						if errb := FClient.SaveByOriID(Config["mgo_qyk_c_w_err"], tmp); !errb {
+							log.Println("存入异常表错误", errb, tmp)
 						}
-						if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !b {
+						if deleteNum := FClient.Del(Config["mgo_qyk_c_w_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
 						//匹配到原始库,新增 resulttmp
-						if (*resulttmp)["credit_no"] != nil {
-							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
-											(*resulttmp)["province"] = v["province"]
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
 										}
-										(*resulttmp)["city"] = v["city"]
-										(*resulttmp)["district"] = v["district"]
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
 
 									}
 								}
 							}
 						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if (*resulttmp)["legal_person"] != nil {
-							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if (*resulttmp)["annual_reports"] != nil {
-							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
+						//行业类型
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]interface{}); ok {
+							for _, vv := range tclasss {
+								if vvv, ok := vv.(string); ok {
+									if len(vvv) > 1 {
+										tmpclass = append(tmpclass, vvv[:len(vvv)-1])
+									}
+								}
 							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
+						}
+						contacts := make([]map[string]interface{}, 0)
+						if legal_person, ok := resulttmp["legal_person"].(string); ok && legal_person != "" && !Reg_xing.MatchString(legal_person) && Reg_person.MatchString(legal_person) {
+							contact := make(map[string]interface{}, 0)
+							contact["contact_person"] = legal_person //联系人
+							contact["contact_type"] = "法定代表人"        //法定代表人
+							if resulttmp["annual_reports"] != nil {
+								bytes, err := json.Marshal(resulttmp["annual_reports"])
+								if err != nil {
+									log.Println("annual_reports err:", err)
+								}
+								phonetmp := make([]map[string]interface{}, 0)
+								err = json.Unmarshal(bytes, &phonetmp)
+								if err != nil {
+									log.Println("Unmarshal err:", err)
+								}
+								for _, vv := range phonetmp {
+									if vv["company_phone"] != nil {
+										if vv["company_phone"] == "" {
+											continue
+										} else {
+											contact["phone"] = vv["company_phone"] //联系电话
+											break
+										}
 									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
+										contact["phone"] = "" //联系电话
 									}
-								} else {
+
+								}
+							}
+							//log.Println(k, contact["phone"], resulttmp["_id"])
+							//time.Sleep(10 * time.Second)
+							if phone, ok := contact["phone"].(string); ok && phone != "" {
+								if Reg_xing.MatchString(phone) || !Reg_tel.MatchString(phone) {
 									contact["phone"] = "" //联系电话
 								}
-
+							} else {
+								contact["phone"] = "" //联系电话
 							}
+							contact["topscopeclass"] = "企业公示"         //项目类型
+							contact["updatetime"] = time.Now().Unix() //更新时间
+							contact["infoid"] = ""                    //招标信息id
+							contacts = append(contacts, contact)
 						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contact["infoid"] = ""                    //招标信息id
-						contacts = append(contacts, contact)
 						//添加临时表匹配到的联系人
-						vvv := make(map[string]interface{})
-						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
-						if tmp["winnerperson"] != nil {
-							vvv["contact_person"] = tmp["winnerperson"]
-						} else {
-							vvv["contact_person"] = ""
-						}
-						vvv["contact_type"] = "项目联系人"
-						//	"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1
-						if tmp["winnertel"] != nil {
-							vvv["phone"] = tmp["winnertel"]
-						} else {
-							vvv["phone"] = ""
-						}
-						tmpclass := make([]string, 0)
-						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
-							for _, vv := range tclasss {
-								if len(vv) > 1 {
-									tmpclass = append(tmpclass, vv[:len(vv)-1])
-								}
+						if winnerperson, ok := tmp["winnerperson"].(string); ok && winnerperson != "" &&
+							!Reg_xing.MatchString(winnerperson) && Reg_person.MatchString(winnerperson) {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+							vvv["contact_person"] = winnerperson
+							vvv["contact_type"] = "项目联系人"
+							//	"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1
+							if winnertel, ok := tmp["winnertel"].(string); ok && !Reg_xing.MatchString(winnertel) && Reg_tel.MatchString(winnertel) {
+								vvv["phone"] = winnertel
+							} else {
+								vvv["phone"] = ""
 							}
+
+							vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contacts = append(contacts, vvv)
 						}
-						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contacts = append(contacts, vvv)
-						(*resulttmp)["contact"] = contacts
+
+						resulttmp["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range Fields {
 							if sk == "establish_date" {
-								if (*resulttmp)[sk] != nil {
-									savetmp[sk] = (*resulttmp)[sk].(time.Time).UTC().Unix()
+								if resulttmp[sk] != nil {
+									savetmp[sk] = resulttmp[sk].(time.Time).UTC().Unix()
 									continue
 								}
 							} else if sk == "capital" {
 								//log.Println(sk, resulttmp[sk])
-								savetmp[sk] = ObjToMoney([]interface{}{(*resulttmp)[sk], ""})[0]
+								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
 								continue
 							} else if sk == "partners" {
 								//log.Println(sk, resulttmp[sk], )
-								if (*resulttmp)[sk] != nil {
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+								if resulttmp[sk] != nil {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for i, _ := range ppms {
 											if ppms[i].(map[string]interface{})["stock_type"] != nil {
 												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
@@ -569,19 +631,19 @@ func TimedTaskWinner() {
 								}
 								continue
 							} else if sk == "_id" {
-								savetmp["tmp"+sk] = (*resulttmp)[sk]
+								savetmp["tmp"+sk] = resulttmp[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if (*resulttmp)["report_websites"] == nil {
+								if resulttmp["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
+									if ppms, ok := resulttmp[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -610,50 +672,36 @@ func TimedTaskWinner() {
 								savetmp[sk] = tmpTopscopeclass
 								continue
 							}
-							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = (*resulttmp)[sk]
+								savetmp[sk] = resulttmp[sk]
 							}
 						}
+						//判断分包
+						if tmp["package"] != nil {
+							PackageDealWith(tmp,tmpclass, tmpId)
+						}
 						//tmps = append(tmps, savetmp)
+						savetmp["comeintime"] = time.Now().Unix()
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-
 						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
 						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
 							rc.Do("SELECT", redis_winner_db)
-							//var _id string
-							//if v, ok := saveid.(primitive.ObjectID); ok {
-							//	_id = v.Hex()
-							//}
 							if _, err := rc.Do("SET", savetmp["company_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
 							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
-										log.Println("删除临时表失败", deleteNum)
-									}
+								//删除临时表
+								if deleteNum := FClient.Del(Config["mgo_qyk_c_w_new"], bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
+									log.Println("删除临时表失败", deleteNum)
 								}
 							}
+							if err := rc.Close(); err != nil {
+								log.Println(err)
+							}
 						} else {
 							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
@@ -665,10 +713,34 @@ func TimedTaskWinner() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("winnerent", timenow)
 	}
 }
 
 //分包处理
-func PackageDealWith(contactMaps *[]interface{}, tmp map[string]interface{}) {
-
+func PackageDealWith(tmp map[string]interface{},  tmpclass []string,pkgid string) {
+	util.Catch()
+	if v, ok := tmp["package"].(map[string]interface{}); ok {
+		for i, pv := range v {
+			if ppv, ok2 := pv.(map[string]interface{}); ok2 {
+				pkgwinner, ok3 := ppv["winner"].(string)
+				if !ok3 || utf8.RuneCountInString(pkgwinner) < 4 {
+					continue
+				}
+				//创建中标单位放入临时表
+				vvv := make(map[string]interface{})
+				vvv["winner"] = pkgwinner
+				vvv["topscopeclass"] = tmpclass
+				vvv["pkg_id"] =pkgid+"_"+i
+				if winnerperson, ok := tmp["winnerperson"].(string); ok && winnerperson != "" &&
+					!Reg_xing.MatchString(winnerperson) && Reg_person.MatchString(winnerperson) {
+					vvv["winnerperson"] = winnerperson
+					if winnertel, ok := tmp["winnertel"].(string); ok && !Reg_xing.MatchString(winnertel) && Reg_tel.MatchString(winnertel) {
+						vvv["winnertel"] = winnertel
+					}
+				}
+				FClient.Save(Config["mgo_qyk_c_w_new"],vvv)
+			}
+		}
+	}
 }

+ 59 - 0
udp_winner/udptaskmap.go

@@ -0,0 +1,59 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				node.retry++
+				if node.retry > 5 {
+					log.Println("udp重试失败", k)
+					udptaskmap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+				}
+			} else if now-node.timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}

+ 20 - 0
udpcreateindex/src/config.json

@@ -47,6 +47,26 @@
         "index": "projectset_v1",
         "type": "projectset"
     },
+    "standard": {
+ 		"addr": "172.17.145.163:27082",
+        "size": 10,
+        "db": "qfw",
+    	"winnerent":{
+			"collect": "winner_enterprise",
+        	"index": "winnerent_v1",
+        	"type": "winnerent"
+		},
+        "buyerent":{
+			"collect": "buyer_enterprise",
+        	"index": "buyerent_v1",
+        	"type": "buyerent"
+		},
+ 		"agencyent":{
+			"collect": "agency_enterprise",
+       	 	"index": "agencyent_v1",
+       		"type": "agencyent"
+		}
+    },
     "mongodb": {
         "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
         "pool": 10,

+ 45 - 12
udpcreateindex/src/main.go

@@ -13,17 +13,19 @@ import (
 )
 
 var (
-	Sysconfig                                                      map[string]interface{} //配置文件
-	mgo                                                            *mongodb.MongodbSim    //mongodb操作对象
-	extractmgo                                                     *mongodb.MongodbSim    //mongodb操作对象
-	udpclient                                                      mu.UdpClient           //udp对象
-	updport                                                        string
-	winner, winnerenterprise, bidding, biddingback, project, buyer map[string]interface{}
-	savesizei                                                      = 500
-	biddingIndexFields                                             = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
-	projectinfoFields                                              []string
-	multiIndex                                                     []string
-	BulkSize                                                       = 400
+	Sysconfig          map[string]interface{} //配置文件
+	mgo                *mongodb.MongodbSim    //mongodb操作对象
+	extractmgo         *mongodb.MongodbSim    //mongodb操作对象
+	mgostandard        *mongodb.MongodbSim    //mongodb操作对象
+	udpclient          mu.UdpClient           //udp对象
+	updport            string
+	savesizei          = 500
+	biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
+	projectinfoFields  []string
+	multiIndex         []string
+	BulkSize           = 400
+
+	winner, bidding, biddingback, project, buyer, standard map[string]interface{}
 )
 
 func init() {
@@ -32,7 +34,7 @@ func init() {
 	go checkMapJob()
 	updport, _ = Sysconfig["updport"].(string)
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
-	winnerenterprise, _ = Sysconfig["winnerenterprise"].(map[string]interface{})
+	standard, _ = Sysconfig["standard"].(map[string]interface{})
 	buyer, _ = Sysconfig["buyer"].(map[string]interface{})
 	bidding, _ = Sysconfig["bidding"].(map[string]interface{})
 	biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
@@ -60,6 +62,13 @@ func init() {
 		}
 		extractmgo.InitPool()
 	}
+	mgostandard = &mongodb.MongodbSim{
+		MongodbAddr: standard["addr"].(string),
+		Size:        util.IntAllDef(standard["pool"], 5),
+		DbName:      standard["db"].(string),
+	}
+	mgostandard.InitPool()
+	log.Println(standard["addr"].(string))
 
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
@@ -173,6 +182,30 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					buyerTask(data, mapInfo)
 				}()
+			case "winnerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("winnerent", mapInfo)
+				}()
+			case "buyerent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("buyerent", mapInfo)
+				}()
+			case "agencyent": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					standardTask("agencyent", mapInfo)
+				}()
 			default:
 				pool <- true
 				go func() {

+ 178 - 0
udpcreateindex/src/standardata.go

@@ -0,0 +1,178 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+func standardTask(stype string, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  util.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	switch stype {
+	case "winnerent":
+		winnerEnt(q)
+	case "buyerent":
+		buyerEnt(q)
+	case "agencyent":
+		agencyEnt(q)
+	}
+}
+
+//winnerent
+func winnerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	winnerent, _ := standard["winnerent"].(map[string]interface{})
+	c, _ := winnerent["collect"].(string)
+	index, _ := winnerent["index"].(string)
+	itype, _ := winnerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["company"] = tmp["company_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create winnerent index...over", n)
+}
+
+//buyerent
+func buyerEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	buyerent, _ := standard["buyerent"].(map[string]interface{})
+	c, _ := buyerent["collect"].(string)
+	index, _ := buyerent["index"].(string)
+	itype, _ := buyerent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+		tmp["buyer"] = tmp["buyer_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create buyerent index...over", n)
+}
+
+//agencyent
+func agencyEnt(q map[string]interface{}) {
+	session := mgostandard.GetMgoConn(3600)
+	defer mgostandard.DestoryMongoConn(session)
+	db, _ := standard["db"].(string)
+	agencyent, _ := standard["agencyent"].(map[string]interface{})
+	c, _ := agencyent["collect"].(string)
+	index, _ := agencyent["index"].(string)
+	itype, _ := agencyent["type"].(string)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	savepool := make(chan bool, 10)
+
+	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Iter()
+	arr := make([]map[string]interface{}, savesizei)
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		//不生索引字段
+		delete(tmp, "partners")
+		delete(tmp, "wechat_accounts")
+		delete(tmp, "tmp_id")
+
+		tmp["agency"] = tmp["agency_name"]
+		arr[i] = tmp
+		n++
+		if i == savesizei-1 {
+			savepool <- true
+			tmps := arr
+			go func(tmpn *[]map[string]interface{}) {
+				defer func() {
+					<-savepool
+				}()
+				elastic.BulkSave(index, itype, tmpn, true)
+			}(&tmps)
+			i = 0
+			arr = make([]map[string]interface{}, savesizei)
+		}
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	if i > 0 {
+		elastic.BulkSave(index, itype, &arr, true)
+	}
+	log.Println("create agencyent index...over", n)
+}

+ 1 - 1
udpfilterdup/src/config.json

@@ -2,7 +2,7 @@
     "udpport": ":1485",
     "dupdays": 5,
     "mongodb": {
-        "addr": "127.0.0.1:27080",
+        "addr": "127.0.0.1:27092",
         "pool": 10,
         "db": "qfw",
         "extract": "extract_v20190111",

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint64(m.Size))
+	opts.SetMaxPoolSize(uint16(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)

+ 61 - 55
udps/main.go

@@ -6,72 +6,78 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	qu "qfw/util"
+	"os"
+	qutil "qfw/util"
+	"qfw/util/mongodb"
 	"time"
 
 	"gopkg.in/mgo.v2/bson"
 )
 
-var udpclient mu.UdpClient //udp对象
-var nextNodes []map[string]interface{}
-
-var startDate, endDate, ip, port, stype, sid, eid string
+var startDate, endDate string
 
 func main() {
-	//2015-11-03,2017-04-01
-	//2017-04-01,2017-06-01
-	//2017-06-01,2018-06-01
-	//2018-06-01,2019-02-20
-	/*
-ObjectId("5da3f31aa5cb26b9b798d3aa")
-ObjectId("5da418c4a5cb26b9b7e3e9a6")
-*/
-
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
+	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param := "", 0, 0, "", "", "", "", "", "", ""
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
-	flag.StringVar(&endDate, "end", "2019-11-10", "结束日期2006-01-02")
+	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
-	flag.StringVar(&port, "port", "1488", "dup端口")
-	flag.StringVar(&stype, "stype", "", "stype")
+	flag.IntVar(&p, "p", 0, "端口")
+	flag.IntVar(&tmptime, "tmptime", 0, "时间查询")
+	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
+	flag.StringVar(&id1, "gtid", "", "gtid")
+	flag.StringVar(&id2, "lteid", "", "lteid")
+	flag.StringVar(&stype, "stype", "", "stype,传递类型")
+	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
+	flag.StringVar(&q, "q", "", "q查询语句\"{'':''}\",有q就不要gtid,lteid")
+	flag.StringVar(&param, "param", "", "param,生信息发布或其他索引时用双引号套单引号\"{'mgoaddr':'','d':'','c':'','index':'','type':''}\"")
 	flag.Parse()
-	var startid, endid bson.ObjectId
-	if sid != "" && eid != "" {
-		startid = qu.StringTOBsonId(sid)
-		endid = qu.StringTOBsonId(eid)
-	} else {
-		start, _ := time.ParseInLocation(qu.Date_Short_Layout, startDate, time.Local)
-		end, _ := time.ParseInLocation(qu.Date_Short_Layout, endDate, time.Local)
-		startid = bson.NewObjectIdWithTime(start)
-		endid = bson.NewObjectIdWithTime(end)
+	if startDate != "" || endDate != "" {
+		start, _ := time.ParseInLocation(qutil.Date_Short_Layout, startDate, time.Local)
+		end, _ := time.ParseInLocation(qutil.Date_Short_Layout, endDate, time.Local)
+		id1 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(start))
+		id2 = qutil.BsonIdToSId(bson.NewObjectIdWithTime(end))
+		log.Println(id1, id2)
 	}
-	log.Println(startid, endid, ip, port, stype)
-	udpclient = mu.UdpClient{Local: ":1470", BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	by, _ := json.Marshal(map[string]interface{}{
-		"gtid":  startid,
-		"lteid": endid,
-		"stype": stype,
-	})
-	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-		IP:   net.ParseIP(ip),
-		Port: qu.IntAll(port),
-	})
-	b := make(chan bool, 1)
-	<-b
-}
-
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	switch act {
-	case mu.OP_TYPE_DATA:
-		var mapInfo map[string]interface{}
-		err := json.Unmarshal(data, &mapInfo)
-		if err != nil {
-			log.Println(err)
-		} else {
-			log.Println(mapInfo)
+	if ip != "" && p > 0 && ((id1 != "" && id2 != "") || (q != "" || tmptime > 0)) {
+		toadd := &net.UDPAddr{
+			IP:   net.ParseIP(ip),
+			Port: p,
+		}
+		udp := mu.UdpClient{Local: ":50010", BufSize: 1024}
+		udp.Listen(func(b byte, data []byte, add *net.UDPAddr) {
+			switch b {
+			case mu.OP_NOOP:
+				log.Println(string(data))
+				os.Exit(0)
+			}
+		})
+		m1 := map[string]interface{}{
+			"gtid":  id1,
+			"lteid": id2,
+			"stype": stype,
 		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
+		if bkey != "" {
+			m1["bkey"] = bkey
+		}
+		if q != "" {
+			m1["query"] = mongodb.ObjToMQ(q, true) //qutil.ObjToMap(q)
+		}
+		if tmptime > 0 && tmpkey != "" {
+			m1["query"] = map[string]interface{}{tmpkey: map[string]interface{}{"$gte": tmptime}}
+		}
+		if param != "" {
+			pm := qutil.ObjToMap(param)
+			for k, v := range *pm {
+				m1[k] = v
+			}
+		}
+
+		by, _ := json.Marshal(m1)
+		log.Println(string(by))
+		udp.WriteUdp(by, mu.OP_TYPE_DATA, toadd)
+		time.Sleep(30 * time.Second)
+	} else {
+		flag.PrintDefaults()
+		log.Println("参数错误.")
 	}
 }