浏览代码

更改增存量buyer agency

apple 5 年之前
父节点
当前提交
0efae98941
共有 3 个文件被更改,包括 471 次插入503 次删除
  1. 5 4
      udp_winner/main.go
  2. 216 217
      udp_winner/timedTaskAgency.go
  3. 250 282
      udp_winner/timedTaskBuyer.go

+ 5 - 4
udp_winner/main.go

@@ -72,6 +72,7 @@ func init() {
 	SourceClient.DbName = Config["mgodb_bidding"]
 	//mongodbSim.DbName = "qfw"
 	SourceClient.InitPool()
+	
 	FClient = new(mongodb.MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
@@ -140,8 +141,8 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
-	//go TimedTaskBuyer()  //定时任务
-	//go TimedTaskAgency() //定时任务
+	go TimedTaskBuyer()  //定时任务
+	go TimedTaskAgency() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -172,9 +173,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				if key == "winner" {
 					go TaskWinner(tmp)
 				} else if key == "buyer" {
-					//go TaskBuyer(tmp)
+					go TaskBuyer(tmp)
 				} else if key == "agency" {
-					//go TaskAgency(tmp)
+					go TaskAgency(tmp)
 				}
 			}
 		}

+ 216 - 217
udp_winner/timedTaskAgency.go

@@ -4,13 +4,13 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
 	"qfw/util"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -18,21 +18,25 @@ import (
 //之前main方法,只更新
 func TaskAgency(mapinfo *map[string]interface{}) {
 	defer util.Catch()
+	//释放
+	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")
 		return
 	}
-	GId, err := primitive.ObjectIDFromHex(gtid)
-	LtId, err2 := primitive.ObjectIDFromHex(lteid)
-	if err != nil || err2 != nil {
-		log.Println(gtid, lteid, "转换_id错误")
+	var GId, LtId bson.ObjectId
+	if bson.IsObjectIdHex(gtid) && bson.IsObjectIdHex(lteid) {
+		GId = bson.ObjectIdHex(gtid)
+		LtId = bson.ObjectIdHex(lteid)
+	} else {
+		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
-	// winneraddr-company_address企业地址
+	// agencyaddr-company_address企业地址
 	SourceClientcc := SourceClient.GetMgoConn()
 	defer SourceClient.DestoryMongoConn(SourceClientcc)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
@@ -40,30 +44,45 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1,
-		"topscopeclass": 1, "agencyaddr": 1}).Iter()
-	if cursor == nil {
-		log.Println(cursor)
+	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
+		"agencyaddr": 1}).Iter()
+	if cursor.Err() != nil {
+		log.Println(cursor.Err())
 		return
 	}
 	//判断是否是存量,是存量走Redis遍历
 	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
 		//存量处理
-		tmp := map[string]interface{}{}
 		conn := HisRedisPool.Conn()
 		defer conn.Close()
 		//选择redis db
-		conn.Select(1)
+		redis_agency_db, _ := strconv.Atoi(Config["redis_agency_db"])
+		conn.Select(redis_agency_db)
 		//遍历bidding表保存到redis
-		// key:_id  value:json结构体
+		//key:企业名  value:json结构体{"agency": 1, "agencytel": 1, "agencyperson": 1,"topscopeclass": 1, "agencyaddr": 1,"_id":1}
+		tmp := make(map[string]interface{})
 		for cursor.Next(&tmp) {
-			if tmp["agency"] == nil || tmp["agency"] == "" {
+			agency, ok := tmp["agency"].(string)
+			if !ok || agency == "" {
 				continue
 			}
-			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
-			delete(tmp,"_id")
-			bytes, _ := json.Marshal(tmp)
-			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+			//判断redis key是否存在
+			e_num := conn.Exists(agency).Val()
+			//获取字符串_id
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			//替换_id
+			tmp["_id"] = mgoId
+			//创建value数组
+			tmps := make([]map[string]interface{}, 0)
+			if e_num > 0 {
+				//存量redis的key存在,累加更新
+				bytes, _ := conn.Get(agency).Bytes()
+				json.Unmarshal(bytes, &tmps)
+			}
+			tmps = append(tmps, tmp)
+			bytes, _ := json.Marshal(tmps)
+			//存量redis的key不存在,新增  key :企业名 val :[]map
+			if err := conn.Set(agency, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
 		}
@@ -74,140 +93,114 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		} else {
 			iterator := scan.Iterator()
 			for iterator.Next() {
-				redisId := iterator.Val()                       //redis key
-				redisvalue := conn.Get(iterator.Val()).Val() //redis val
-				tmp := make(map[string]interface{})
-				json.Unmarshal([]byte(redisvalue),&tmp)
-				//重复增量操作
+				redisCName := iterator.Val()                       //redis key 企业名
+				redisvalueBytes, _ := conn.Get(redisCName).Bytes() //redis val []数组
+				rValuesMaps := make([]map[string]interface{}, 0)
+				json.Unmarshal(redisvalueBytes, &rValuesMaps)
 				//redis查询是否存在
 				rdb := RedisPool.Get()
-				rdb.Do("SELECT","3")
-				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
+				rdb.Do("SELECT", Config["redis_agency_db"])
+				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
-					if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
-						log.Println("存量 FClient.Save err", tmpid)
+					//FClient.DbName = Config["mgodb_extract_kf"]
+					//if tmpid := FClient.Save("agency_new", tmps); tmpid == nil {
+					//	log.Println("存量 FClient.Save err", tmpid)
+					//}
+					fsavec := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"]).C("agency_new")
+					for _, vmap := range rValuesMaps {
+						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
+						if err = fsavec.Insert(vmap); err != nil{
+							log.Println("存量 FClient.Save err", err)
+						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
 					if err := rdb.Close(); err != nil {
-						log.Println("存量",err)
+						log.Println("存量", err)
 					}
-					//删除存量redis
-					conn.Del(redisId)
 					continue
 				} else {
+					//redis存在更新合并
 					if err := rdb.Close(); err != nil {
 						log.Println(err)
 					}
 					//拿到合并后的qyk
 					FClient.DbName = Config["mgodb_extract_kf"]
-					oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
-					if oldTmp == nil {
-						log.Println("存量 redis id 不存在",reply,tmp["agency"])
+					oldTmp, b := FClient.FindById(Config["mgo_qyk_agency"], reply, nil)
+					if !b || oldTmp == nil {
+						log.Println(redisCName, "存量 redis id 不存在", reply)
 						continue
 					}
-
-
 					tmpTopscopeclass := []string{}
 					tmpTopscopeclassMap := make(map[string]bool)
 
-					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					for _, rvaluemaps := range rValuesMaps {
+						if tclasss, ok := rvaluemaps["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpTopscopeclassMap[vv[:len(vv)-1]] = true
+								}
 							}
 						}
-						for k := range tmpTopscopeclassMap {
-							tmpTopscopeclass = append(tmpTopscopeclass, k)
-						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
 					}
 					sort.Strings(tmpTopscopeclass)
+					esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
 
-
-
-
-					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-					//更新行业类型
-					if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" ||
-						Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
-						oldTmp["updatatime"] = time.Now().Unix()
-						//mongo更新
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
-							log.Println("mongo更新err", esId)
-						}
-
-						//es更新
-						delete(oldTmp, "_id")
-						if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-							log.Println("update es err:", err)
+					//联系方式合并
+					contactMaps := make([]interface{}, 0)
+					if (*oldTmp)["contact"] != nil {
+						//直接添加联系人,不再判断
+						if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+							contactMaps = append(contactMaps, v...)
 						}
-						//删除存量redis
-						conn.Del(redisId)
-						continue
 					}
-					//联系方式合并
-					var tmpperson, agencytel string
-					tmpperson = tmp["agencyperson"].(string)
-					if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
-						agencytel = ""
-					} else {
-						if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
-							agencytel = ""
+					//遍历redis value联系人
+					for _, rvmap := range rValuesMaps {
+						var tmpperson, agencytel string
+						if rvmapperson, ok := rvmap["agencyperson"].(string); ok && rvmapperson != "" {
+							tmpperson = rvmapperson
 						} else {
-							agencytel = util.ObjToString(tmp["agencytel"])
+							continue
+						}
+						if rvmapwintel, ok := rvmap["agencytel"].(string); ok {
+							agencytel = rvmapwintel
+						} else {
+							agencytel = ""
+						}
+						if Reg_xing.MatchString(agencytel) || !Reg_tel.MatchString(agencytel) {
+							agencytel = ""
 						}
-					}
-					contactMaps := make([]interface{}, 0)
-					if oldTmp["contact"] == nil {
 						tmpContact := make(map[string]interface{})
-						tmpContact["infoid"] = redisId
+						tmpContact["infoid"] = rvmap["_id"]
 						tmpContact["contact_person"] = tmpperson
 						tmpContact["contact_type"] = "项目联系人"
 						tmpContact["phone"] = agencytel
-						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						tmpContact["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, tmpContact)
-					} else {
-						//对比前四项,相等丢弃
-						if v, ok := oldTmp["contact"].(primitive.A); ok {
-							var isNotUpdate bool
-							for _, vv := range v {
-								if vvv, ok := vv.(map[string]interface{}); ok {
-									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-										vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-										isNotUpdate = true
-										vvv["updatetime"] = time.Now().Unix()
-									}
-									contactMaps = append(contactMaps, vvv)
+						tmpclass := make([]string, 0)
+						if tclasss, ok := rvmap["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpclass = append(tmpclass, vv[:len(vv)-1])
 								}
 							}
-							if !isNotUpdate {
-								vvv := make(map[string]interface{})
-								vvv["infoid"] = redisId
-								vvv["contact_person"] = tmp["agencyperson"]
-								vvv["contact_type"] = "项目联系人"
-								vvv["phone"] = agencytel
-								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-								vvv["updatetime"] = time.Now().Unix()
-								contactMaps = append(contactMaps, vvv)
-							}
 						}
+						tmpContact["topscopeclass"] = strings.Join(tmpclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
 					}
-					oldTmp["contact"] = contactMaps
+					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
-					oldTmp["updatatime"] = time.Now().Unix()
+					(*oldTmp)["updatatime"] = time.Now().Unix()
 					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
-					delete(oldTmp, "_id")
+					delete((*oldTmp), "_id")
 					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 						log.Println("存量 EsConn err :", err)
 					}
-					//最后删除redis
-					conn.Del(redisId)
 				}
 			}
 		}
@@ -218,18 +211,18 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		overid := gtid
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
-			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			overid = tmp["_id"].(bson.ObjectId).Hex()
 			//log.Println(tmp["_id"])
 			if tmp["agency"] == nil || tmp["agency"] == "" {
 				continue
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			rdb.Do("SELECT","3")
+			rdb.Do("SELECT", Config["redis_agency_db"])
 			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if tmpid := FClient.Save("agency_new", tmp); tmpid == nil {
+				if tmpid := FClient.Save("agency_new", tmp); tmpid == "" {
 					log.Println("FClient.Save err", tmpid)
 				}
 				//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -243,34 +236,35 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				}
 				//拿到合并后的qyk
 				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
-				if oldTmp == nil {
+				oldTmp, b := FClient.FindById(Config["mgo_qyk_agency"], reply, bson.M{})
+				if !b || oldTmp == nil {
 					log.Println("redis id 不存在")
 					continue
 				}
 				//比较合并
 				//行业类型
 				tmpTopscopeclass := []string{}
+				tmpConTopscopeclass := []string{}
 				tmpTopscopeclassMap := make(map[string]bool)
 
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+				if v, ok := tmp["topscopeclass"].([]interface{}); ok {
 					for _, vv := range v {
 						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
 							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
+				}
+				for k := range tmpTopscopeclassMap {
+					tmpTopscopeclass = append(tmpTopscopeclass, k)
 				}
 				sort.Strings(tmpTopscopeclass)
+				esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
 
-
-
-				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
 				//更新行业类型
 				if tmp["agencyperson"] == nil || tmp["agencyperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["agencyperson"])) {
-					oldTmp["updatatime"] = time.Now().Unix()
+
+					(*oldTmp)["updatatime"] = time.Now().Unix()
 					//mongo更新
 					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
@@ -278,7 +272,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 					}
 
 					//es更新
-					delete(oldTmp, "_id")
+					delete((*oldTmp), "_id")
 					if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 						log.Println("update es err:", err)
 					}
@@ -286,61 +280,41 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				}
 				//联系方式合并
 				var tmpperson, agencytel string
-				tmpperson = tmp["agencyperson"].(string)
-				if tmp["agencytel"] == nil || tmp["agencytel"] == "" {
+				if tmppersona, ok := tmp["agencyperson"].(string); ok {
+					tmpperson = tmppersona
+				}
+				if agencyteltmp, ok := tmp["agencytel"].(string); ok {
+					agencytel = agencyteltmp
+				}
+				if Reg_xing.MatchString(agencytel) || !Reg_tel.MatchString(agencytel) {
 					agencytel = ""
 				} else {
-					if Reg_xing.MatchString(util.ObjToString(tmp["agencytel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["agencytel"])) {
-						agencytel = ""
-					} else {
-						agencytel = util.ObjToString(tmp["agencytel"])
-					}
+					agencytel = agencytel
 				}
 				contactMaps := make([]interface{}, 0)
-				if oldTmp["contact"] == nil {
-					tmpContact := make(map[string]interface{})
-					tmpContact["infoid"] = overid
-					tmpContact["contact_person"] = tmpperson
-					tmpContact["contact_type"] = "项目联系人"
-					tmpContact["phone"] = agencytel
-					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-					tmpContact["updatetime"] = time.Now().Unix()
-					contactMaps = append(contactMaps, tmpContact)
-				} else {
-					//对比前四项,相等丢弃
-					if v, ok := oldTmp["contact"].(primitive.A); ok {
-						var isNotUpdate bool
-						for _, vv := range v {
-							if vvv, ok := vv.(map[string]interface{}); ok {
-								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-									vvv["phone"] == agencytel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-									isNotUpdate = true
-									vvv["updatetime"] = time.Now().Unix()
-								}
-								contactMaps = append(contactMaps, vvv)
-							}
-						}
-						if !isNotUpdate {
-							vvv := make(map[string]interface{})
-							vvv["infoid"] = overid
-							vvv["contact_person"] = tmp["agencyperson"]
-							vvv["contact_type"] = "项目联系人"
-							vvv["phone"] = agencytel
-							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-							vvv["updatetime"] = time.Now().Unix()
-							contactMaps = append(contactMaps, vvv)
-						}
+				if (*oldTmp)["contact"] != nil {
+					//直接添加联系人,不再判断
+					if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+						contactMaps = append(contactMaps, v...)
 					}
 				}
-				oldTmp["contact"] = contactMaps
+				vvv := make(map[string]interface{})
+				vvv["infoid"] = overid
+				vvv["contact_person"] = tmpperson
+				vvv["contact_type"] = "项目联系人"
+				vvv["phone"] = agencytel
+				vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
+				vvv["updatetime"] = time.Now().Unix()
+				contactMaps = append(contactMaps, vvv)
+				(*oldTmp)["contact"] = contactMaps
 				//mongo更新
-				oldTmp["updatatime"] = time.Now().Unix()
+				(*oldTmp)["updatatime"] = time.Now().Unix()
 				FClient.DbName = Config["mgodb_extract_kf"]
 				if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 					log.Println("mongo更新 err", esId, oldTmp)
 				}
 				//es更新
-				delete(oldTmp, "_id")
+				delete((*oldTmp), "_id")
 				if _, err := EsConn.Update().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 					log.Println("EsConn err :", err)
 				}
@@ -351,9 +325,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 
 }
 
-
-
-//定时任务
+//定时任务  新增
 //1.存异常表
 //2.合并原始库新增
 func TimedTaskAgency() {
@@ -367,7 +339,7 @@ func TimedTaskAgency() {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Second * 10)
+				t2.Reset(time.Second * 15)
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
@@ -386,18 +358,18 @@ func TimedTaskAgency() {
 				//遍历临时表数据,匹配不到原始库存入异常表
 				tmp := make(map[string]interface{})
 				for cursor.Next(&tmp) {
-					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					tmpId := tmp["_id"].(bson.ObjectId).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					rdb.Do("SELECT","3")
-
+					rdb.Do("SELECT", Config["redis_agency_db"])
 					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
-						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
 						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-							"stype": "",
+							"gtid":      tmpId,
+							"lteid":     tmpId,
+							"stype":     "",
+							"data_type": "agency",
+							"data_info": "add",
 						})
 						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
 							IP:   net.ParseIP("127.0.0.1"),
@@ -407,7 +379,7 @@ func TimedTaskAgency() {
 						}
 						//存在的话删除tmp mongo表
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.DeleteById("agency_new", tmpId); DeletedCount == 0 {
+						if DeletedCount := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -421,33 +393,33 @@ func TimedTaskAgency() {
 					}
 					//查询redis不存在新增
 					FClient.DbName = Config["mgodb_enterprise"]
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
-					if resulttmp["_id"] == nil {
+
+					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
+					if !b || (*resulttmp)["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if saveid := FClient.Save("agency_err", tmp); saveid == nil {
-							log.Println("存入异常表错误", tmp)
+						fdmongo := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"])
+						if err := fdmongo.C("agency_err").Insert( tmp); err != nil {
+							log.Println("存入异常表错误", err,tmp)
 						}
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+						if deleteNum := fdmongo.C("agency_new").RemoveId( bson.ObjectIdHex(tmpId)); !b {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
 						//匹配到原始库,新增 resulttmp
-						if resulttmp["credit_no"] != nil {
-							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						if (*resulttmp)["credit_no"] != nil {
+							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if resulttmp["province"] == nil || resulttmp["province"] == "" {
-											resulttmp["province"] = v["province"]
+										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
+											(*resulttmp)["province"] = v["province"]
 										}
-										resulttmp["city"] = v["city"]
-										resulttmp["district"] = v["district"]
+										(*resulttmp)["city"] = v["city"]
+										(*resulttmp)["district"] = v["district"]
 
 									}
 								}
@@ -455,15 +427,15 @@ func TimedTaskAgency() {
 						}
 						contacts := make([]map[string]interface{}, 0)
 						contact := make(map[string]interface{}, 0)
-						if resulttmp["legal_person"] != nil {
-							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						if (*resulttmp)["legal_person"] != nil {
+							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
 						} else {
 							contact["contact_person"] = "" //联系人
 						}
 						contact["contact_type"] = "法定代表人" //法定代表人
 						//log.Println(1)
-						if resulttmp["annual_reports"] != nil {
-							bytes, err := json.Marshal(resulttmp["annual_reports"])
+						if (*resulttmp)["annual_reports"] != nil {
+							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
 							if err != nil {
 								log.Println("annual_reports err:", err)
 							}
@@ -495,24 +467,50 @@ func TimedTaskAgency() {
 						contact["updatetime"] = time.Now().Unix() //更新时间
 						contact["infoid"] = ""                    //招标信息id
 						contacts = append(contacts, contact)
-						resulttmp["contact"] = contacts
+						//添加临时表匹配到的联系人
+						vvv := make(map[string]interface{})
+						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+						if tmp["agencyperson"] != nil{
+							vvv["contact_person"] = tmp["agencyperson"]
+						}else {
+							vvv["contact_person"] = ""
+						}
+						vvv["contact_type"] = "项目联系人"
+						//	"agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1
+						if tmp["agencytel"] != nil{
+							vvv["phone"] = tmp["agencytel"]
+						}else {
+							vvv["phone"] = ""
+						}
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpclass = append(tmpclass, vv[:len(vv)-1])
+								}
+							}
+						}
+						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+						vvv["updatetime"] = time.Now().Unix()
+						contacts = append(contacts, vvv)
+						(*resulttmp)["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range AgencyFields {
 							if sk == "_id" {
-								savetmp["tmp"+sk] = resulttmp[sk]
+								savetmp["tmp"+sk] = (*resulttmp)[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if resulttmp["report_websites"] == nil {
+								if (*resulttmp)["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -529,47 +527,47 @@ func TimedTaskAgency() {
 								savetmp[sk] = []interface{}{}
 								continue
 							}else if sk=="agency_name" {
-								if resulttmp["company_name"] == nil {
+								if (*resulttmp)["company_name"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp["company_name"]
+									savetmp[sk] = (*resulttmp)["company_name"]
 								}
 								continue
 							}else if sk=="address"{
-								if resulttmp["company_address"] == nil {
+								if (*resulttmp)["company_address"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp["company_address"]
+									savetmp[sk] = (*resulttmp)["company_address"]
 								}
 								continue
 							}
 
 
-
-							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
 								sk != "agency_name" && sk != "address" &&
 								sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = resulttmp[sk]
+								savetmp[sk] = (*resulttmp)[sk]
 							}
 						}
 						//tmps = append(tmps, savetmp)
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
 						FClient.DbName = Config["mgodb_extract_kf"]
+
+
+
 						saveid := FClient.Save(Config["mgo_qyk_agency"], savetmp)
-						if saveid != nil {
-							//保存redis
+						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
-							rc.Do("SELECT","3")
-
-							var _id string
-							if v, ok := saveid.(primitive.ObjectID); ok {
-								_id = v.Hex()
-							}
-							if _, err := rc.Do("SET", savetmp["agency_name"], _id); err != nil {
+							rc.Do("SELECT", Config["redis_agency_db"])
+							//var _id string
+							//if v, ok := saveid.(primitive.ObjectID); ok {
+							//	_id = v.Hex()
+							//}
+							if _, err := rc.Do("SET", savetmp["agency_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
 								if err := rc.Close(); err != nil {
 									log.Println(err)
@@ -583,12 +581,12 @@ func TimedTaskAgency() {
 
 								//esConn := elastic.GetEsConn()
 								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表
 									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
+									if deleteNum := FClient.Del("agency_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 										log.Println("删除临时表失败", deleteNum)
 									}
 								}
@@ -602,4 +600,5 @@ func TimedTaskAgency() {
 		}
 		t2.Reset(time.Minute)
 	}
-}
+}
+

+ 250 - 282
udp_winner/timedTaskBuyer.go

@@ -4,13 +4,14 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
 	"qfw/util"
+	"reflect"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -18,21 +19,25 @@ import (
 //之前main方法,只更新
 func TaskBuyer(mapinfo *map[string]interface{}) {
 	defer util.Catch()
+	//释放
+	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")
 		return
 	}
-	GId, err := primitive.ObjectIDFromHex(gtid)
-	LtId, err2 := primitive.ObjectIDFromHex(lteid)
-	if err != nil || err2 != nil {
-		log.Println(gtid, lteid, "转换_id错误")
+	var GId, LtId bson.ObjectId
+	if bson.IsObjectIdHex(gtid) && bson.IsObjectIdHex(lteid) {
+		GId = bson.ObjectIdHex(gtid)
+		LtId = bson.ObjectIdHex(lteid)
+	} else {
+		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
-	// winneraddr-company_address企业地址
+	// buyeraddr-company_address企业地址
 	SourceClientcc := SourceClient.GetMgoConn()
 	defer SourceClient.DestoryMongoConn(SourceClientcc)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
@@ -40,30 +45,45 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
-		"topscopeclass": 1, "buyeraddr": 1,"buyerclass":1}).Iter()
-	if cursor == nil {
-		log.Println(cursor)
+	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1,
+		"buyeraddr": 1,"buyerclass":1}).Iter()
+	if cursor.Err() != nil {
+		log.Println(cursor.Err())
 		return
 	}
 	//判断是否是存量,是存量走Redis遍历
 	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
 		//存量处理
-		tmp := map[string]interface{}{}
 		conn := HisRedisPool.Conn()
 		defer conn.Close()
 		//选择redis db
-		conn.Select(1)
+		redis_buyer_db, _ := strconv.Atoi(Config["redis_buyer_db"])
+		conn.Select(redis_buyer_db)
 		//遍历bidding表保存到redis
-		// key:_id  value:json结构体
+		//key:企业名  value:json结构体{"buyer": 1, "buyertel": 1, "buyerperson": 1,"topscopeclass": 1, "buyeraddr": 1,"_id":1}
+		tmp := make(map[string]interface{})
 		for cursor.Next(&tmp) {
-			if tmp["buyer"] == nil || tmp["buyer"] == "" {
+			buyer, ok := tmp["buyer"].(string)
+			if !ok || buyer == "" {
 				continue
 			}
-			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
-			delete(tmp,"_id")
-			bytes, _ := json.Marshal(tmp)
-			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
+			//判断redis key是否存在
+			e_num := conn.Exists(buyer).Val()
+			//获取字符串_id
+			mgoId := tmp["_id"].(bson.ObjectId).Hex()
+			//替换_id
+			tmp["_id"] = mgoId
+			//创建value数组
+			tmps := make([]map[string]interface{}, 0)
+			if e_num > 0 {
+				//存量redis的key存在,累加更新
+				bytes, _ := conn.Get(buyer).Bytes()
+				json.Unmarshal(bytes, &tmps)
+			}
+			tmps = append(tmps, tmp)
+			bytes, _ := json.Marshal(tmps)
+			//存量redis的key不存在,新增  key :企业名 val :[]map
+			if err := conn.Set(buyer, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
 		}
@@ -74,172 +94,133 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		} else {
 			iterator := scan.Iterator()
 			for iterator.Next() {
-				redisId := iterator.Val()                       //redis key
-				redisvalue := conn.Get(iterator.Val()).Val() //redis val
-				tmp := make(map[string]interface{})
-				json.Unmarshal([]byte(redisvalue),&tmp)
-				//重复增量操作
+				redisCName := iterator.Val()                       //redis key 企业名
+				redisvalueBytes, _ := conn.Get(redisCName).Bytes() //redis val []数组
+				rValuesMaps := make([]map[string]interface{}, 0)
+				json.Unmarshal(redisvalueBytes, &rValuesMaps)
 				//redis查询是否存在
 				rdb := RedisPool.Get()
-				rdb.Do("SELECT","2")
-				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
+				rdb.Do("SELECT", Config["redis_buyer_db"])
+				if reply, err := redis.String(rdb.Do("GET", redisCName)); err != nil {
 					//redis不存在,存到临时表,定时任务处理
-					FClient.DbName = Config["mgodb_extract_kf"]
-					if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
-						log.Println("存量 FClient.Save err", tmpid)
+					//FClient.DbName = Config["mgodb_extract_kf"]
+					//if tmpid := FClient.Save("buyer_new", tmps); tmpid == nil {
+					//	log.Println("存量 FClient.Save err", tmpid)
+					//}
+					fsavec := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"]).C("buyer_new")
+					for _, vmap := range rValuesMaps {
+						vmap["_id"] = bson.ObjectIdHex(vmap["_id"].(string))
+						if err = fsavec.Insert(vmap); err != nil{
+							log.Println("存量 FClient.Save err", err)
+						}
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
 					if err := rdb.Close(); err != nil {
-						log.Println("存量",err)
+						log.Println("存量", err)
 					}
-					//删除存量redis
-					conn.Del(redisId)
 					continue
 				} else {
+					//redis存在更新合并
 					if err := rdb.Close(); err != nil {
 						log.Println(err)
 					}
 					//拿到合并后的qyk
 					FClient.DbName = Config["mgodb_extract_kf"]
-					oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
-					if oldTmp == nil {
-						log.Println("存量 redis id 不存在",reply,tmp["buyer"])
+					oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, nil)
+					if !b || oldTmp == nil {
+						log.Println(redisCName, "存量 redis id 不存在", reply)
 						continue
 					}
 					tmpTopscopeclass := []string{}
 					tmpTopscopeclassMap := make(map[string]bool)
 
-					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					for _, rvaluemaps := range rValuesMaps {
+						if tclasss, ok := rvaluemaps["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpTopscopeclassMap[vv[:len(vv)-1]] = true
+								}
 							}
 						}
-						for k := range tmpTopscopeclassMap {
-							tmpTopscopeclass = append(tmpTopscopeclass, k)
-						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
 					}
 					sort.Strings(tmpTopscopeclass)
+					esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
 
-					//更新buyerclass
-					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-					//更新行业类型
-					if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
-						//更新buyerclass合并
-						if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
-							//无值,不更新
+
+					//更新buyerclass合并
+					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+						//无值,不更新
+					}else {
+						var buyerclass_new,buyerclass_old string
+						buyerclass_new = tmp["buyerclass"].(string)
+						buyerclass_old = (*oldTmp)["buyerclass"].(string)
+						if buyerclass_old=="" {
+							(*oldTmp)["buyerclass"] = buyerclass_new
 						}else {
-							var buyerclass_new,buyerclass_old string
-							buyerclass_new = tmp["buyerclass"].(string)
-							buyerclass_old = oldTmp["buyerclass"].(string)
-							if buyerclass_old=="" {
-								oldTmp["buyerclass"] = buyerclass_new
-							}else {
-								if buyerclass_new!=buyerclass_old {
-									if !strings.Contains(buyerclass_old, buyerclass_new) {
-										oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
-									}
+							if buyerclass_new!=buyerclass_old {
+								if !strings.Contains(buyerclass_old, buyerclass_new) {
+									(*oldTmp)["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
 								}
 							}
 						}
+					}
 
-						oldTmp["updatatime"] = time.Now().Unix()
-						//mongo更新
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
-							log.Println("mongo更新err", esId)
-						}
-
-						//es更新
-						delete(oldTmp, "_id")
-						if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-							log.Println("update es err:", err)
+					//联系方式合并
+					contactMaps := make([]interface{}, 0)
+					if (*oldTmp)["contact"] != nil {
+						//直接添加联系人,不再判断
+						if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+							contactMaps = append(contactMaps, v...)
 						}
-						//删除存量redis
-						conn.Del(redisId)
-						continue
 					}
-					//联系方式合并
-					var tmpperson, buyertel string
-					tmpperson = tmp["buyerperson"].(string)
-					if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
-						buyertel = ""
-					} else {
-						if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
-							buyertel = ""
+					//遍历redis value联系人
+					for _, rvmap := range rValuesMaps {
+						var tmpperson, buyertel string
+						if rvmapperson, ok := rvmap["buyerperson"].(string); ok && rvmapperson != "" {
+							tmpperson = rvmapperson
 						} else {
-							buyertel = util.ObjToString(tmp["buyertel"])
+							continue
+						}
+						if rvmapwintel, ok := rvmap["buyertel"].(string); ok {
+							buyertel = rvmapwintel
+						} else {
+							buyertel = ""
+						}
+						if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) {
+							buyertel = ""
 						}
-					}
-					contactMaps := make([]interface{}, 0)
-					if oldTmp["contact"] == nil {
 						tmpContact := make(map[string]interface{})
-						tmpContact["infoid"] = redisId
+						tmpContact["infoid"] = rvmap["_id"]
 						tmpContact["contact_person"] = tmpperson
 						tmpContact["contact_type"] = "项目联系人"
 						tmpContact["phone"] = buyertel
-						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						tmpContact["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, tmpContact)
-					} else {
-						//对比前四项,相等丢弃
-						if v, ok := oldTmp["contact"].(primitive.A); ok {
-							var isNotUpdate bool
-							for _, vv := range v {
-								if vvv, ok := vv.(map[string]interface{}); ok {
-									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-										vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-										isNotUpdate = true
-										vvv["updatetime"] = time.Now().Unix()
-									}
-									contactMaps = append(contactMaps, vvv)
-								}
-							}
-							if !isNotUpdate {
-								vvv := make(map[string]interface{})
-								vvv["infoid"] = redisId
-								vvv["contact_person"] = tmp["buyerperson"]
-								vvv["contact_type"] = "项目联系人"
-								vvv["phone"] = buyertel
-								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-								vvv["updatetime"] = time.Now().Unix()
-								contactMaps = append(contactMaps, vvv)
-							}
-						}
-					}
-					oldTmp["contact"] = contactMaps
-
-					//更新buyerclass合并
-					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
-						//无值,不更新
-					}else {
-						var buyerclass_new,buyerclass_old string
-						buyerclass_new = tmp["buyerclass"].(string)
-						buyerclass_old = oldTmp["buyerclass"].(string)
-						if buyerclass_old=="" {
-							oldTmp["buyerclass"] = buyerclass_new
-						}else {
-							if buyerclass_new!=buyerclass_old {
-								if !strings.Contains(buyerclass_old, buyerclass_new) {
-									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						tmpclass := make([]string, 0)
+						if tclasss, ok := rvmap["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpclass = append(tmpclass, vv[:len(vv)-1])
 								}
 							}
 						}
+						tmpContact["topscopeclass"] = strings.Join(tmpclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
 					}
-
+					(*oldTmp)["contact"] = contactMaps
 					//mongo更新
-					oldTmp["updatatime"] = time.Now().Unix()
+					(*oldTmp)["updatatime"] = time.Now().Unix()
 					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
-					delete(oldTmp, "_id")
+					delete((*oldTmp), "_id")
 					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 						log.Println("存量 EsConn err :", err)
 					}
-					//最后删除redis
-					conn.Del(redisId)
 				}
 			}
 		}
@@ -250,18 +231,18 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		overid := gtid
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
-			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			overid = tmp["_id"].(bson.ObjectId).Hex()
 			//log.Println(tmp["_id"])
 			if tmp["buyer"] == nil || tmp["buyer"] == "" {
 				continue
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			rdb.Do("SELECT","2")
+			rdb.Do("SELECT", Config["redis_buyer_db"])
 			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
+				if tmpid := FClient.Save("buyer_new", tmp); tmpid == "" {
 					log.Println("FClient.Save err", tmpid)
 				}
 				//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -275,53 +256,54 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				}
 				//拿到合并后的qyk
 				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
-				if oldTmp == nil {
+				oldTmp, b := FClient.FindById(Config["mgo_qyk_buyer"], reply, bson.M{})
+				if !b || oldTmp == nil {
 					log.Println("redis id 不存在")
 					continue
 				}
 				//比较合并
 				//行业类型
 				tmpTopscopeclass := []string{}
+				tmpConTopscopeclass := []string{}
 				tmpTopscopeclassMap := make(map[string]bool)
 
-
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+				if v, ok := tmp["topscopeclass"].([]interface{}); ok {
 					for _, vv := range v {
 						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
 							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
+				}
+				for k := range tmpTopscopeclassMap {
+					tmpTopscopeclass = append(tmpTopscopeclass, k)
 				}
 				sort.Strings(tmpTopscopeclass)
+				esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
 
-				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-
-				//更新行业类型 buyerclass合并
-				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
 
-					//更新buyerclass合并
-					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
-						//无值,不更新
+				//更新buyerclass合并
+				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
+					//无值,不更新
+				}else {
+					var buyerclass_new,buyerclass_old string
+					buyerclass_new = tmp["buyerclass"].(string)
+					buyerclass_old = (*oldTmp)["buyerclass"].(string)
+					if buyerclass_old=="" {
+						(*oldTmp)["buyerclass"] = buyerclass_new
 					}else {
-						var buyerclass_new,buyerclass_old string
-						buyerclass_new = tmp["buyerclass"].(string)
-						buyerclass_old = oldTmp["buyerclass"].(string)
-						if buyerclass_old=="" {
-							oldTmp["buyerclass"] = buyerclass_new
-						}else {
-							if buyerclass_new!=buyerclass_old {
-								if !strings.Contains(buyerclass_old, buyerclass_new) {
-									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
-								}
+						if buyerclass_new!=buyerclass_old {
+							if !strings.Contains(buyerclass_old, buyerclass_new) {
+								(*oldTmp)["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
 							}
 						}
 					}
+				}
 
-					oldTmp["updatatime"] = time.Now().Unix()
+				//更新行业类型
+				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
+
+					(*oldTmp)["updatatime"] = time.Now().Unix()
 					//mongo更新
 					FClient.DbName = Config["mgodb_extract_kf"]
 					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
@@ -329,7 +311,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					}
 
 					//es更新
-					delete(oldTmp, "_id")
+					delete((*oldTmp), "_id")
 					if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 						log.Println("update es err:", err)
 					}
@@ -337,80 +319,41 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				}
 				//联系方式合并
 				var tmpperson, buyertel string
-				tmpperson = tmp["buyerperson"].(string)
-				if tmp["buyertel"] == nil || tmp["buyertel"] == "" {
+				if tmppersona, ok := tmp["buyerperson"].(string); ok {
+					tmpperson = tmppersona
+				}
+				if buyerteltmp, ok := tmp["buyertel"].(string); ok {
+					buyertel = buyerteltmp
+				}
+				if Reg_xing.MatchString(buyertel) || !Reg_tel.MatchString(buyertel) {
 					buyertel = ""
 				} else {
-					if Reg_xing.MatchString(util.ObjToString(tmp["buyertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["buyertel"])) {
-						buyertel = ""
-					} else {
-						buyertel = util.ObjToString(tmp["buyertel"])
-					}
+					buyertel = buyertel
 				}
 				contactMaps := make([]interface{}, 0)
-				if oldTmp["contact"] == nil {
-					tmpContact := make(map[string]interface{})
-					tmpContact["infoid"] = overid
-					tmpContact["contact_person"] = tmpperson
-					tmpContact["contact_type"] = "项目联系人"
-					tmpContact["phone"] = buyertel
-					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-					tmpContact["updatetime"] = time.Now().Unix()
-					contactMaps = append(contactMaps, tmpContact)
-				} else {
-					//对比前四项,相等丢弃
-					if v, ok := oldTmp["contact"].(primitive.A); ok {
-						var isNotUpdate bool
-						for _, vv := range v {
-							if vvv, ok := vv.(map[string]interface{}); ok {
-								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-									vvv["phone"] == buyertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-									isNotUpdate = true
-									vvv["updatetime"] = time.Now().Unix()
-								}
-								contactMaps = append(contactMaps, vvv)
-							}
-						}
-						if !isNotUpdate {
-							vvv := make(map[string]interface{})
-							vvv["infoid"] = overid
-							vvv["contact_person"] = tmp["buyerperson"]
-							vvv["contact_type"] = "项目联系人"
-							vvv["phone"] = buyertel
-							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-							vvv["updatetime"] = time.Now().Unix()
-							contactMaps = append(contactMaps, vvv)
-						}
-					}
-				}
-				oldTmp["contact"] = contactMaps
-
-				//更新buyerclass合并
-				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
-					//无值,不更新
-				}else {
-					var buyerclass_new,buyerclass_old string
-					buyerclass_new = tmp["buyerclass"].(string)
-					buyerclass_old = oldTmp["buyerclass"].(string)
-					if buyerclass_old=="" {
-						oldTmp["buyerclass"] = buyerclass_new
-					}else {
-						if buyerclass_new!=buyerclass_old {
-							if !strings.Contains(buyerclass_old, buyerclass_new) {
-								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
-							}
-						}
+				if (*oldTmp)["contact"] != nil {
+					//直接添加联系人,不再判断
+					if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+						contactMaps = append(contactMaps, v...)
 					}
 				}
-
+				vvv := make(map[string]interface{})
+				vvv["infoid"] = overid
+				vvv["contact_person"] = tmpperson
+				vvv["contact_type"] = "项目联系人"
+				vvv["phone"] = buyertel
+				vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
+				vvv["updatetime"] = time.Now().Unix()
+				contactMaps = append(contactMaps, vvv)
+				(*oldTmp)["contact"] = contactMaps
 				//mongo更新
-				oldTmp["updatatime"] = time.Now().Unix()
+				(*oldTmp)["updatatime"] = time.Now().Unix()
 				FClient.DbName = Config["mgodb_extract_kf"]
 				if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 					log.Println("mongo更新 err", esId, oldTmp)
 				}
 				//es更新
-				delete(oldTmp, "_id")
+				delete((*oldTmp), "_id")
 				if _, err := EsConn.Update().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
 					log.Println("EsConn err :", err)
 				}
@@ -421,9 +364,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 
 }
 
-
-
-//定时任务
+//定时任务  新增
 //1.存异常表
 //2.合并原始库新增
 func TimedTaskBuyer() {
@@ -437,8 +378,7 @@ func TimedTaskBuyer() {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				//t2.Reset(time.Second * 10) //增量
-				t2.Reset(time.Minute * 5) //存量
+				t2.Reset(time.Second * 15)
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
@@ -457,17 +397,18 @@ func TimedTaskBuyer() {
 				//遍历临时表数据,匹配不到原始库存入异常表
 				tmp := make(map[string]interface{})
 				for cursor.Next(&tmp) {
-					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					tmpId := tmp["_id"].(bson.ObjectId).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					rdb.Do("SELECT","2")
+					rdb.Do("SELECT", Config["redis_buyer_db"])
 					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
-						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
 						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-							"stype": "",
+							"gtid":      tmpId,
+							"lteid":     tmpId,
+							"stype":     "",
+							"data_type": "buyer",
+							"data_info": "add",
 						})
 						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
 							IP:   net.ParseIP("127.0.0.1"),
@@ -477,7 +418,7 @@ func TimedTaskBuyer() {
 						}
 						//存在的话删除tmp mongo表
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.DeleteById("buyer_new", tmpId); DeletedCount == 0 {
+						if DeletedCount := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -491,34 +432,34 @@ func TimedTaskBuyer() {
 					}
 					//查询redis不存在新增
 					FClient.DbName = Config["mgodb_enterprise"]
-					//qyxy 企业库 两亿条
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
-					if resulttmp["_id"] == nil {
+
+					resulttmp, b := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
+					if !b || (*resulttmp)["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if saveid := FClient.Save("buyer_err", tmp); saveid == nil {
-							log.Println("存入异常表错误", tmp)
+						log.Println(len(tmp),reflect.TypeOf(tmp))
+						fdmongo := FClient.GetMgoConn().DB(Config["mgodb_extract_kf"])
+						if err := fdmongo.C("buyer_err").Insert(tmp); err != nil {
+							log.Println("存入异常表错误", err, tmp)
 						}
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+						if deleteNum := fdmongo.C("buyer_new").RemoveId(bson.ObjectIdHex(tmpId)); !b {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
 						//log.Println(123)
 						//匹配到原始库,新增 resulttmp
-						if resulttmp["credit_no"] != nil {
-							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+						if (*resulttmp)["credit_no"] != nil {
+							if credit_no, ok := (*resulttmp)["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
 								len(strings.TrimSpace(credit_no)) > 8 {
 								dataNo := strings.TrimSpace(credit_no)[2:8]
 								if Addrs[dataNo] != nil {
 									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if resulttmp["province"] == nil || resulttmp["province"] == "" {
-											resulttmp["province"] = v["province"]
+										if (*resulttmp)["province"] == nil || (*resulttmp)["province"] == "" {
+											(*resulttmp)["province"] = v["province"]
 										}
-										resulttmp["city"] = v["city"]
-										resulttmp["district"] = v["district"]
+										(*resulttmp)["city"] = v["city"]
+										(*resulttmp)["district"] = v["district"]
 
 									}
 								}
@@ -526,15 +467,15 @@ func TimedTaskBuyer() {
 						}
 						contacts := make([]map[string]interface{}, 0)
 						contact := make(map[string]interface{}, 0)
-						if resulttmp["legal_person"] != nil {
-							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						if (*resulttmp)["legal_person"] != nil {
+							contact["contact_person"] = (*resulttmp)["legal_person"] //联系人
 						} else {
 							contact["contact_person"] = "" //联系人
 						}
 						contact["contact_type"] = "法定代表人" //法定代表人
 						//log.Println(1)
-						if resulttmp["annual_reports"] != nil {
-							bytes, err := json.Marshal(resulttmp["annual_reports"])
+						if (*resulttmp)["annual_reports"] != nil {
+							bytes, err := json.Marshal((*resulttmp)["annual_reports"])
 							if err != nil {
 								log.Println("annual_reports err:", err)
 							}
@@ -566,24 +507,50 @@ func TimedTaskBuyer() {
 						contact["updatetime"] = time.Now().Unix() //更新时间
 						contact["infoid"] = ""                    //招标信息id
 						contacts = append(contacts, contact)
-						resulttmp["contact"] = contacts
+						//添加临时表匹配到的联系人
+						vvv := make(map[string]interface{})
+						vvv["infoid"] = tmp["_id"].(bson.ObjectId).Hex()
+						if tmp["buyerperson"] != nil {
+							vvv["contact_person"] = tmp["buyerperson"]
+						} else {
+							vvv["contact_person"] = ""
+						}
+						vvv["contact_type"] = "项目联系人"
+						//	"buyer": 1, "buyertel": 1, "buyerperson": 1, "topscopeclass": 1 buyerclass : 1
+						if tmp["buyertel"] != nil {
+							vvv["phone"] = tmp["buyertel"]
+						} else {
+							vvv["phone"] = ""
+						}
+						tmpclass := make([]string, 0)
+						if tclasss, ok := tmp["topscopeclass"].([]string); ok {
+							for _, vv := range tclasss {
+								if len(vv) > 1 {
+									tmpclass = append(tmpclass, vv[:len(vv)-1])
+								}
+							}
+						}
+						vvv["topscopeclass"] = strings.Join(tmpclass, ";")
+						vvv["updatetime"] = time.Now().Unix()
+						contacts = append(contacts, vvv)
+						(*resulttmp)["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
 						for _, sk := range BuyerFields {
 							if sk == "_id" {
-								savetmp["tmp"+sk] = resulttmp[sk]
+								savetmp["tmp"+sk] = (*resulttmp)[sk]
 								continue
 							} else if sk == "area_code" {
 								//行政区划代码
-								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								savetmp[sk] = fmt.Sprint((*resulttmp)[sk])
 								continue
 							} else if sk == "report_websites" {
 								//网址
-								if resulttmp["report_websites"] == nil {
+								if (*resulttmp)["report_websites"] == nil {
 									savetmp["website"] = ""
 								} else {
 									report_websitesArr := []string{}
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+									if ppms, ok := (*resulttmp)[sk].([]interface{}); ok {
 										for _, v := range ppms {
 											if vvv, ok := v.(map[string]interface{}); ok {
 												if rv, ok := vvv["website_url"].(string); ok {
@@ -599,46 +566,45 @@ func TimedTaskBuyer() {
 							} else if sk == "wechat_accounts" {
 								savetmp[sk] = []interface{}{}
 								continue
-							}else if sk=="buyer_name" {
-								if resulttmp["company_name"] == nil {
+							} else if sk == "buyer_name" {
+								if (*resulttmp)["company_name"] == nil {
 									savetmp[sk] = ""
-								}else {
-									savetmp[sk] = resulttmp["company_name"]
+								} else {
+									savetmp[sk] = (*resulttmp)["company_name"]
 								}
 								continue
-							}else if sk=="address"{
-								if resulttmp["company_address"] == nil {
+							} else if sk == "address" {
+								if (*resulttmp)["company_address"] == nil {
 									savetmp[sk] = ""
-								}else {
-									savetmp[sk] = resulttmp["company_address"]
+								} else {
+									savetmp[sk] = (*resulttmp)["company_address"]
 								}
 								continue
 							}
 
-
-
-							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+							if (*resulttmp)[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
 								sk != "buyer_name" && sk != "address" &&
 								sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
-								savetmp[sk] = resulttmp[sk]
+								savetmp[sk] = (*resulttmp)[sk]
 							}
 						}
 						//tmps = append(tmps, savetmp)
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
 						FClient.DbName = Config["mgodb_extract_kf"]
+
 						saveid := FClient.Save(Config["mgo_qyk_buyer"], savetmp)
-						if saveid != nil {
+						if saveid != "" {
 							//保存redis
 							rc := RedisPool.Get()
-							rc.Do("SELECT","2")
-							var _id string
-							if v, ok := saveid.(primitive.ObjectID); ok {
-								_id = v.Hex()
-							}
-							if _, err := rc.Do("SET", savetmp["buyer_name"], _id); err != nil {
+							rc.Do("SELECT", Config["redis_buyer_db"])
+							//var _id string
+							//if v, ok := saveid.(primitive.ObjectID); ok {
+							//	_id = v.Hex()
+							//}
+							if _, err := rc.Do("SET", savetmp["buyer_name"], saveid); err != nil {
 								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["buyer_name"], err)
 								if err := rc.Close(); err != nil {
 									log.Println(err)
@@ -652,12 +618,12 @@ func TimedTaskBuyer() {
 
 								//esConn := elastic.GetEsConn()
 								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(saveid).BodyJson(savetmp).Refresh(true).Do(); err != nil {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表
 									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.DeleteById("buyer_new", tmpId); deleteNum == 0 {
+									if deleteNum := FClient.Del("buyer_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !deleteNum {
 										log.Println("删除临时表失败", deleteNum)
 									}
 								}
@@ -666,9 +632,11 @@ func TimedTaskBuyer() {
 							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
 					}
+
 				}
+
 			}
 		}
 		t2.Reset(time.Minute)
 	}
-}
+}