Преглед изворни кода

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

# Conflicts:
#	udp_winner/config.json
fengweiqiang пре 5 година
родитељ
комит
eec4979e0e
4 измењених фајлова са 1275 додато и 6 уклоњено
  1. 6 2
      udp_winner/config.json
  2. 14 4
      udp_winner/main.go
  3. 596 0
      udp_winner/timedTaskAgency.go
  4. 659 0
      udp_winner/timedTaskBuyer.go

+ 6 - 2
udp_winner/config.json

@@ -1,7 +1,11 @@
 {
   "elasticsearch": "http://127.0.0.1:9800",
-  "elasticsearch_index": "localhost_mytest",
-  "elasticsearch_type": "mytest",
+  "elasticsearch_index": "localhost_winner",
+  "elasticsearch_type": "mytestwinner",
+  "elasticsearch_buyer_index": "buyer_enterprise",
+  "elasticsearch_buyer_type": "buyerent",
+  "elasticsearch_agency_index": "agency_enterprise",
+  "elasticsearch_agency_type": "agencyent",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
   "pool_size": "10",

+ 14 - 4
udp_winner/main.go

@@ -21,7 +21,7 @@ import (
 
 var (
 	Config                                = make(map[string]string)
-	Fields                                []string
+	Fields,BuyerFields,AgencyFields                                []string
 	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
 	HisRedisPool                          *hisRedis.Client
@@ -43,9 +43,19 @@ func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
 	log.Println(Config)
-	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address", "capital",
-		"establish_date", "legal_person", "company_type", "district", "city", "province", "area_code", "credit_no",
-		"company_name", "history_name", "topscopeclass", "wechat_accounts", "alias", "website", "report_websites"}
+	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
+		"capital", "establish_date", "legal_person", "company_type",
+		"district", "city", "province", "area_code", "credit_no",
+		"company_name", "history_name", "topscopeclass", "wechat_accounts",
+		"alias", "website", "report_websites"}
+
+	BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
+		"address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
+		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
+
+	AgencyFields = []string{"_id", "contact", "type", "ranks",
+		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
+		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
 	var err error
 	pool_size, _ := strconv.Atoi(Config["pool_size"])
 

+ 596 - 0
udp_winner/timedTaskAgency.go

@@ -0,0 +1,596 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskAgency(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1,
+		"topscopeclass": 1, "agencyaddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(tmp["_id"].(primitive.ObjectID).Hex(), string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).String() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["agency"])
+						continue
+					}
+
+
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+
+
+
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" ||
+						Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, agencytel string
+					tmpperson = tmp["agencyperson"].(string)
+					if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+						agencytel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+							agencytel = ""
+						} else {
+							agencytel = util.ObjToString(tmp["agencytel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = agencytel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["agencyperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = agencytel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["agency"] == nil || tmp["agency"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, agencytel string
+				tmpperson = tmp["agencyperson"].(string)
+				if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+					agencytel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
+						agencytel = ""
+					} else {
+						agencytel = util.ObjToString(tmp["agencytel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = agencytel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["agencyperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = agencytel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskAgency() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("agency_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("agency_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range AgencyFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="agency_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "agency_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["agency_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}

+ 659 - 0
udp_winner/timedTaskBuyer.go

@@ -0,0 +1,659 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+//之前main方法,只更新
+func TaskBuyer(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
+		"topscopeclass": 1, "buyeraddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(tmp["_id"].(primitive.ObjectID).Hex(), string(bytes), 0).Err(); err != nil {
+				log.Println(err)
+			}
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
+		} else {
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).String() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["buyer"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
+
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+					sort.Strings(tmpTopscopeclass)
+
+					//更新buyerclass
+
+
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+
+						//更新buyerclass合并
+						if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+							//无值,不更新
+						}else {
+							//有值
+							var buyerclass_new,buyerclass_old string
+							buyerclass_new = tmp["buyerclass"].(string)
+							buyerclass_old = oldTmp["buyerclass"].(string)
+							if strings.Contains(buyerclass_old, buyerclass_new) {
+								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+							}
+
+						}
+
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
+					}
+					//联系方式合并
+					var tmpperson, buyertel string
+					tmpperson = tmp["buyerperson"].(string)
+					if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+						buyertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+							buyertel = ""
+						} else {
+							buyertel = util.ObjToString(tmp["buyertel"])
+						}
+					}
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = buyertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["buyerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = buyertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+					}
+					oldTmp["contact"] = contactMaps
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						//有值
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if strings.Contains(buyerclass_old, buyerclass_new) {
+							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						}
+
+					}
+
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
+					}
+					//最后删除redis
+					conn.Del(redisId)
+				}
+			}
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				continue
+			} else {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+
+
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+
+
+
+
+				//更新行业类型 buyerclass合并
+				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						//有值
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = oldTmp["buyerclass"].(string)
+						if strings.Contains(buyerclass_old, buyerclass_new) {
+							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						}
+
+					}
+
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, buyertel string
+				tmpperson = tmp["buyerperson"].(string)
+				if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+					buyertel = ""
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
+						buyertel = ""
+					} else {
+						buyertel = util.ObjToString(tmp["buyertel"])
+					}
+				}
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = buyertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
+							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["buyerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = buyertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+				}
+				oldTmp["contact"] = contactMaps
+
+				//更新buyerclass合并
+				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+					//无值,不更新
+				}else {
+					//有值
+					var buyerclass_new,buyerclass_old string
+					buyerclass_new = tmp["buyerclass"].(string)
+					buyerclass_old = oldTmp["buyerclass"].(string)
+					if strings.Contains(buyerclass_old, buyerclass_new) {
+						oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+					}
+
+				}
+
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
+			}
+		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskBuyer() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("buyer_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("buyer_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					//qyxy 企业库 两亿条
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("buyer_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range BuyerFields {
+							if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}else if sk=="buyer_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "buyer_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["buyer_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}