Ver Fonte

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

apple há 5 anos atrás
pai
commit
b9421f4379
3 ficheiros alterados com 152 adições e 206 exclusões
  1. 1 0
      udp_winner/config.json
  2. 90 194
      udp_winner/main.go
  3. 61 12
      udp_winner/timedTask.go

+ 1 - 0
udp_winner/config.json

@@ -3,6 +3,7 @@
   "elasticsearch_index": "winner_enterprise",
   "elasticsearch_type": "winnerent",
   "udpport": "127.0.0.1:12311",
+  "port": "12311",
   "mgoinit": "192.168.3.207:27081",
   "mgodb_bidding": "qfw",
   "mgodb_mgoinit_c": "bidding",

+ 90 - 194
udp_winner/main.go

@@ -9,12 +9,14 @@ import (
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.mongodb.org/mongo-driver/mongo"
 	"go.mongodb.org/mongo-driver/mongo/options"
+	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
 	"net"
-	elastic "qfw/common/src/qfw/util/elastic"
+	"qfw/common/src/qfw/util/elastic"
 	"qfw/util"
 	"regexp"
+	"strconv"
 
 	"sort"
 	"strings"
@@ -33,6 +35,8 @@ var
 	Reg_xing                              = regexp.MustCompile(`\*{1,}`)
 	Reg_person                            = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
 	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
+	EsConn                                *es.Client
+	Updport                               int
 )
 /**
 新增
@@ -47,35 +51,37 @@ func init() {
 		"company_name", "history_name", "topscopeclass", "wechat_accounts", "alias", "website", "report_websites"}
 	var err error
 	//mongo init
-	SourceClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgoinit"]).SetMaxPoolSize(20))
+	SourceClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgoinit"]).SetMaxPoolSize(20).SetMaxConnIdleTime(time.Hour * 24))
 	if err != nil {
 		log.Fatalln(err)
 	}
-	c1 := context.Background()
+	c1, _ := context.WithTimeout(context.Background(), 9999*time.Hour)
+
 	err = SourceClient.Connect(c1)
 	//defer SourceClient.Disconnect(c1)
 	if err != nil {
 		log.Fatalln(err)
 	}
-	FClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgourl"]).SetMaxPoolSize(20))
+	FClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgourl"]).SetMaxPoolSize(20).SetMaxConnIdleTime(time.Hour * 999999))
 	if err != nil {
 		log.Fatalln(err)
 	}
-	cc := context.Background()
-	err = FClient.Connect(cc)
+	err = FClient.Connect(c1)
 	//defer FClient.Disconnect(cc)
 	if err != nil {
 		log.Fatalln(err)
 	}
 	//加载省市县代码
-	cursor2, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("address").Find(cc, bson.M{},
+	cursor2, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("address").Find(c1, bson.M{},
 		options.Find().SetProjection(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}))
-	defer cursor2.Close(cc)
-	defer FClient.Connect(cc)
+	//defer FClient.Connect(cc)
 	if err != nil {
 		log.Fatalln(err)
 	}
-	for cursor2.Next(cc) {
+	for cursor2.Next(context.TODO()) {
+		if err := cursor2.Err(); err != nil {
+			log.Println("cursor.Err();", err)
+		}
 		tmp := make(map[string]interface{})
 		if err := cursor2.Decode(&tmp); err != nil {
 			log.Println(err)
@@ -87,27 +93,17 @@ func init() {
 			}
 		}
 	}
+	defer cursor2.Close(context.TODO())
 	log.Println(len(Addrs))
 	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
 	//es init
-	elastic.InitElasticSize(Config["elasticsearch"], 10)
-	//esConn := elastic.GetEsConn()
-	//defer elastic.DestoryEsConn(esConn)
-	//log.Println(esConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id("123").BodyJson(map[string]interface{}{"testname":"六盘水市钟山开发区亿农科贸有限公司"}).Refresh(true).Do())
-	//log.Println(esConn.Delete().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id("123").Refresh(true).Do())
-
-	//if ESclient, err = elastic.NewClient(elastic.SetURL(Config["elasticsearch"]), elastic.SetHealthcheckTimeout(time.Minute)); err != nil {
-	//	log.Println(Config["elasticsearch"])
-	//	log.Fatalln("ElasticClient err:", err)
-	//} else {
-	//	ElasticClientIndex = Config["elasticsearch_index"]
-	//	ElasticClientType = Config["elasticsearch_type"]
-	//}
-
+	elastic.InitElasticSize(Config["elasticsearch"], 50)
+	EsConn = elastic.GetEsConn()
+	defer elastic.DestoryEsConn(EsConn)
 	//redis
 	RedisPool = redis.Pool{
-		MaxIdle:     10,
-		IdleTimeout: 240 * time.Second,
+		MaxIdle:     50,
+		IdleTimeout: 10 * time.Second,
 		Dial: func() (redis.Conn, error) {
 			conn, e := redis.Dial("tcp", Config["redis"])
 			if e != nil {
@@ -121,17 +117,19 @@ func init() {
 		},}
 	c := RedisPool.Get()
 	if _, err := c.Do("PING"); err != nil {
-		log.Fatalln(err)
+		log.Fatalln("redis err:", err)
 	}
-	defer c.Close()
+	c.Close()
 }
 
 func main() {
 	//udp
 	updport := Config["udpport"]
+	Updport, _ = strconv.Atoi(Config["port"])
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
+	log.Println("发送端口port:",Updport)
 	go TimedTask() //定时任务
 	c := make(chan int, 1)
 	<-c
@@ -174,18 +172,23 @@ func task(mapinfo *map[string]interface{}) {
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// winneraddr-company_address企业地址
-	cursor, err := SourceClient.Database(Config["mgodb_bidding"]).Collection(Config["mgodb_mgoinit_c"]).Find(context.TODO(), bson.M{
+	cursor, err := SourceClient.Database(Config["mgodb_bidding"]).Collection(Config["mgodb_mgoinit_c"]).Find(context.Background(), bson.M{
 		"_id": bson.M{
 			"$gte": GId,
 			"$lte": LtId,
 		},
 	}, options.Find().SetProjection(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}))
+		"topscopeclass": 1, "winneraddr": 1}).SetSort(bson.M{"_id": 1}).SetBatchSize(60000000).SetMaxTime(time.Hour*24))
 	if err != nil {
 		log.Println(err)
 		return
 	}
-	for cursor.Next(context.TODO()) {
+	overid := ""
+	c2, _ := context.WithTimeout(context.Background(), 9999*time.Hour)
+	for cursor.Next(c2) {
+		if err := cursor.Err(); err != nil {
+			log.Println("cursor.Err();", err)
+		}
 		tmp := map[string]interface{}{}
 		if err := cursor.Decode(&tmp); err == nil {
 			if tmp["winner"] == nil || tmp["winner"] == "" {
@@ -193,21 +196,32 @@ func task(mapinfo *map[string]interface{}) {
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			defer rdb.Close()
 			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
-				FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").InsertOne(context.TODO(), tmp)
-				//log.Println(tmp, err)
+				if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").InsertOne(context.TODO(), tmp); err != nil {
+					log.Println(err, tmp)
+				}
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
 				continue
 			} else {
+				log.Println("redis get :", reply)
 				//redis存在
 				//log.Println(reply)
 				//reply = "5e0316b998a9abaf6535df3d"
 				id, err := primitive.ObjectIDFromHex(reply)
 				if err != nil {
-					log.Println("get redis id err:", err, tmp)
+					log.Println("get redis id  Hex err:", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
 					continue
 				}
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
 				//拿到合并后的qyk
 				oldTmp := make(map[string]interface{})
 				err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
@@ -221,6 +235,7 @@ func task(mapinfo *map[string]interface{}) {
 				tmpTopscopeclass := []string{}
 				tmpTopscopeclassMap := make(map[string]bool)
 				log.Println(tmp["_id"])
+				overid = tmp["_id"].(primitive.ObjectID).Hex()
 				if oldTmp["industry"] == nil {
 					//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
 					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
@@ -259,25 +274,29 @@ func task(mapinfo *map[string]interface{}) {
 				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
 					oldTmp["updatatime"] = time.Now().Unix()
 					//mongo更新
-					FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-						UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp})
+					if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+						UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+						log.Println("mongo更新err:", err)
+					}
 					//es更新
 					delete(oldTmp, "_id")
-					esConn := elastic.GetEsConn()
-					defer elastic.DestoryEsConn(esConn)
-					esConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do()
+					//esConn := elastic.GetEsConn()
+					//defer elastic.DestoryEsConn(esConn)
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
 					//log.Println( err2,err3)
 					continue
 				}
 				//联系方式合并
 				var tmpperson, winnertel string
 				tmpperson = tmp["winnerperson"].(string)
-				if tmp["winnertel"] == nil || tmp["winnertel"]==""{
+				if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
 					winnertel = ""
-				}else {
-					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"]))||!Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])){
+				} else {
+					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
 						winnertel = ""
-					}else {
+					} else {
 						winnertel = util.ObjToString(tmp["winnertel"])
 					}
 				}
@@ -318,13 +337,17 @@ func task(mapinfo *map[string]interface{}) {
 				oldTmp["contact"] = contactMaps
 				//mongo更新
 				oldTmp["updatatime"] = time.Now().Unix()
-				FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-					UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp})
+				if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+					UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+					log.Println("mongo更新 err :", err)
+				}
 				//es更新
 				delete(oldTmp, "_id")
-				esConn := elastic.GetEsConn()
-				defer elastic.DestoryEsConn(esConn)
-				esConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do()
+				//esConn := elastic.GetEsConn()
+				//defer elastic.DestoryEsConn(esConn)
+				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
+				}
 				//log.Println( err2,err3)
 			}
 		} else {
@@ -332,149 +355,22 @@ func task(mapinfo *map[string]interface{}) {
 			continue
 		}
 	}
-	defer cursor.Close(context.TODO())
-	//tmps := make([]interface{}, 0)
-	//num, snum := 0, 0
-	//for k := range keys {
-	//	//if num == 6 {
-	//	//	return
-	//	//}
-	//	tmp := make(map[string]interface{})
-	//	err := Client.Database("enterprise").Collection("qyxy").FindOne(context.TODO(), bson.M{"company_name": k}).Decode(&tmp)
-	//	if err != nil {
-	//		//log.Println(k, err)
-	//		continue
-	//	}
-	//	if tmp["credit_no"] != nil {
-	//		if credit_no, ok := tmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
-	//			len(strings.TrimSpace(credit_no)) > 8 {
-	//			dataNo := strings.TrimSpace(credit_no)[2:8]
-	//			if Addrs[dataNo] != nil {
-	//				if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-	//					if tmp["province"] == nil || tmp["province"] == "" {
-	//						tmp["province"] = v["province"]
-	//					}
-	//					tmp["city"] = v["city"]
-	//					tmp["district"] = v["district"]
-	//
-	//				}
-	//			}
-	//		}
-	//	}
-	//	contacts := make([]map[string]interface{}, 0)
-	//	contact := make(map[string]interface{}, 0)
-	//	if tmp["legal_person"] != nil {
-	//		contact["contact_person"] = tmp["legal_person"] //联系人
-	//	} else {
-	//		contact["contact_person"] = "" //联系人
-	//	}
-	//	contact["contact_type"] = "法定代表人" //法定代表人
-	//	//log.Println(1)
-	//	if tmp["annual_reports"] != nil {
-	//		bytes, err := json.Marshal(tmp["annual_reports"])
-	//		if err != nil {
-	//			log.Println("annual_reports err:", err)
-	//		}
-	//		//log.Println(2, string(bytes))
-	//		phonetmp := make([]map[string]interface{}, 0)
-	//		err = json.Unmarshal(bytes, &phonetmp)
-	//		if err != nil {
-	//			log.Println("Unmarshal err:", err)
-	//		}
-	//		//log.Println(44, err)
-	//		for _, vv := range phonetmp {
-	//			if vv["company_phone"] != nil {
-	//				if vv["company_phone"] == "" {
-	//					continue
-	//				} else {
-	//					contact["phone"] = vv["company_phone"] //联系电话
-	//					break
-	//				}
-	//			} else {
-	//				contact["phone"] = "" //联系电话
-	//			}
-	//
-	//		}
-	//	}
-	//	//log.Println(k, contact["phone"], tmp["_id"])
-	//	//time.Sleep(10 * time.Second)
-	//	if contact["phone"] == nil {
-	//		contact["phone"] = "" //联系电话
-	//	}
-	//	contact["topscopeclass"] = "企业公示"         //项目类型
-	//	contact["updatetime"] = time.Now().Unix() //更新时间
-	//	contacts = append(contacts, contact)
-	//	tmp["contact"] = contacts
-	//
-	//	savetmp := make(map[string]interface{}, 0)
-	//	//字段处理
-	//	for _, sk := range Fields {
-	//		if sk == "establish_date" { //成立日期
-	//			if tmp[sk] != nil {
-	//				savetmp[sk] = tmp[sk].(primitive.DateTime).Time().UTC().Unix()
-	//				continue
-	//			}
-	//		} else if sk == "capital" { //注册资本
-	//			//log.Println(sk, tmp[sk])
-	//			savetmp[sk] = ObjToMoney([]interface{}{tmp[sk], ""})[0]
-	//			continue
-	//		} else if sk == "partners" { //股东及出资信息
-	//			//log.Println(sk, tmp[sk], )
-	//			//fmt.Println(reflect.TypeOf(tmp[sk]))
-	//			if tmp[sk] != nil {
-	//				if ppms, ok := tmp[sk].(primitive.A); ok {
-	//					for i, _ := range ppms {
-	//						if ppms[i].(map[string]interface{})["stock_type"] != nil {
-	//							ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
-	//						}
-	//						delete(ppms[i].(map[string]interface{}), "identify_type")
-	//					}
-	//					savetmp[sk] = ppms
-	//					continue
-	//				}
-	//			}
-	//		} else if sk == "_id" { //_id备份企业库
-	//			savetmp["tmp"+sk] = tmp[sk]
-	//			continue
-	//		}
-	//		if tmp[sk] == nil && sk != "history_name" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "wechat_accounts" {
-	//			savetmp[sk] = ""
-	//		} else {
-	//			if sk == "wechat_accounts" { //微信公众号
-	//				if savetmp[sk] == nil {
-	//					//TODO 微信公众号取值未确认
-	//					savetmp[sk] = []string{}
-	//				}
-	//				continue
-	//			} else if sk == "website" { //网址
-	//				//TODO 网址取值未确认
-	//				continue
-	//			}
-	//			savetmp[sk] = tmp[sk]
-	//		}
-	//	}
-	//	savetmp["alias"] = "" //别名
-	//	tmps = append(tmps, savetmp)
-	//	num++
-	//	snum++
-	//	if snum >= 300 {
-	//		_, err := Client.Database("extract_v3").Collection("enterprise_qyxy").InsertMany(context.TODO(), tmps)
-	//		if err != nil {
-	//			log.Println("save:", err)
-	//			continue
-	//		} else {
-	//			log.Println(num)
-	//			tmps = []interface{}{}
-	//			snum = 0
-	//		}
-	//	}
-	//}
-	//if len(tmps) > 0 {
-	//	result, err := Client.Database("extract_v3").Collection("enterprise_qyxy").InsertMany(context.TODO(), tmps)
-	//	if err != nil {
-	//		log.Println("save over:", err)
-	//	} else {
-	//		log.Println("last save num:", len(result.InsertedIDs))
-	//	}
-	//}
+	defer cursor.Close(c2)
+	log.Println("合并执行完成", gtid, lteid ,overid)
+	if gtid != lteid{
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  overid,
+			"lteid": lteid,
+			"stype": "",
+		})
+		if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP("127.0.0.1"),
+			Port: Updport,
+		});e != nil{
+			log.Println(e)
+		}
+		log.Println("重新发送udp:",string(by))
+	}
+	log.Println("合并执行完成 ok", gtid, lteid,overid)
+
 }

+ 61 - 12
udp_winner/timedTask.go

@@ -4,14 +4,16 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	elastic "qfw/common/src/qfw/util/elastic"
+	"github.com/garyburd/redigo/redis"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.mongodb.org/mongo-driver/mongo/options"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	"net"
 	"sort"
 	"strings"
 	"time"
+	mu "mfw/util"
 )
 
 //定时任务
@@ -41,15 +43,54 @@ func TimedTask() {
 			}
 			//遍历临时表数据,匹配不到原始库存入异常表
 			for cursor.Next(context.TODO()) {
+				if err := cursor.Err(); err != nil {
+					log.Println("cursor.Err();", err)
+				}
 				tmp := make(map[string]interface{})
 				if err := cursor.Decode(&tmp); err == nil {
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						go func(reply string) {
+							by, _ := json.Marshal(map[string]interface{}{
+								"gtid":  reply,
+								"lteid": reply,
+								"stype": "",
+							})
+							if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+								IP:   net.ParseIP("127.0.0.1"),
+								Port: Updport,
+							});e != nil{
+								log.Println(e)
+							}
+						}(reply)
+						//存在的话删除tmp mongo表
+						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp);err != nil{
+							log.Println("删除临时表err:",err)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
 					resulttmp := make(map[string]interface{})
 					r := FClient.Database(Config["mgodb_enterprise"]).Collection(Config["mgodb_enterprise_c"]).FindOne(context.TODO(), bson.M{"company_name": tmp["winner"]}).Decode(&resulttmp)
 					if r != nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_err").InsertOne(context.TODO(), tmp)
-						FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp)
+						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_err").InsertOne(context.TODO(), tmp); err != nil {
+							log.Println(err)
+						}
+						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
+							log.Println(err)
+						}
 						continue
 					} else {
 						//log.Println(123)
@@ -138,7 +179,7 @@ func TimedTask() {
 										savetmp[sk] = ppms
 
 									}
-								}else {
+								} else {
 									savetmp[sk] = []interface{}{}
 								}
 								continue
@@ -172,37 +213,45 @@ func TimedTask() {
 								savetmp[sk] = []interface{}{}
 								continue
 							}
-							if resulttmp[sk] == nil && sk != "history_name" &&sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
 								savetmp[sk] = resulttmp[sk]
 							}
 						}
 						//tmps = append(tmps, savetmp)
-						savetmp["updatatime"] =time.Now().Unix()
+						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
 						result, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
 							InsertOne(context.TODO(), savetmp)
 						if err == nil {
 							//保存redis
 							rc := RedisPool.Get()
-							defer rc.Close()
 							var _id string
 							if v, ok := result.InsertedID.(primitive.ObjectID); ok {
 								_id = v.Hex()
 							}
-							if _, err := rc.Do("SET", savetmp["company_name"], result.InsertedID.(primitive.ObjectID).Hex()); err != nil {
+							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
 							} else {
 								//保存es
 								delete(savetmp, "_id")
-								esConn := elastic.GetEsConn()
-								defer elastic.DestoryEsConn(esConn)
-								if _, err := esConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表
-									FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp)
+									if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
+										log.Println(err)
+									}
 								}
 							}
 						} else {