Răsfoiți Sursa

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

apple 5 ani în urmă
părinte
comite
61c7b8c424

+ 2 - 2
fullproject/src_v1/load_data.go

@@ -85,7 +85,7 @@ func (p *ProjectTask) loadData(starttime int64) {
 				bys, _ := json.Marshal(result)
 				var tmp *ProjectInfo
 				_ = json.Unmarshal(bys, &tmp)
-				saveData(p, result, tmp)
+				saveFiled(p, result, tmp)
 				pool <- tmp
 			}(result)
 		} else {
@@ -142,7 +142,7 @@ func (p *ProjectTask) loadSite() {
 
 }
 
-func saveData(p *ProjectTask, res map[string]interface{}, tmp *ProjectInfo) {
+func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectInfo) {
 	if jsonData, ok := res["jsondata"].(map[string]interface{}); ok {
 		proHref := util.ObjToString(jsonData["projecthref"])
 		if jsonData != nil && proHref != "" {

+ 5 - 6
fullproject/src_v1/main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"flag"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -79,11 +78,11 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-	//sid = "5649a0fcaf5374672e005704"
-	//eid = "5e169e5250b5ea296ec896f0"
-	flag.StringVar(&sid, "sid", "", "开始id")
-	flag.StringVar(&eid, "eid", "", "结束id")
-	flag.Parse()
+	sid = "5d18eca4a5cb26b9b7c7f587"
+	eid = "5e381b7650b5ea296ed16e51"
+	//flag.StringVar(&sid, "sid", "", "开始id")
+	//flag.StringVar(&eid, "eid", "", "结束id")
+	//flag.Parse()
 
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {

+ 29 - 23
fullproject/src_v1/project.go

@@ -420,7 +420,6 @@ var FIELDS = []string{
 	"buyertel",
 	"winner",
 	"agency",
-	"projectscope",
 	"topscopeclass",
 	"subscopeclass",
 	"winnerorder",
@@ -479,25 +478,31 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	bt := qu.ObjToString(tmp["toptype"])
 	bs := qu.ObjToString(tmp["subtype"])
 	p.mapBidLock.Lock()
-	set["bidtype"] = bidtype[bs]
-	if bt == "招标" {
-		set["projectscope"] = qu.ObjToString(tmp["projectscope"])
-		set["bidstatus"] = bt
-	} else {
-		if bidstatus[bs] != "" {
-			set["bidstatus"] = thisinfo.SubType
-		} else if tmp["infoformat"] == 2 {
-			set["bidstatus"] = "拟建"
-		} else if bs == "" {
-			set["bidstatus"] = ""
+	if thisinfo.Infoformat == 2 {
+		set["bidstatus"] = "拟建"
+		bt = "拟建"
+	}else {
+		set["bidtype"] = bidtype[bs]
+		if bt == "招标" {
+			set["projectscope"] = qu.ObjToString(tmp["projectscope"])
+			set["bidstatus"] = bt
 		} else {
-			set["bidstatus"] = "其它"
+			if bidstatus[bs] != "" {
+				set["bidstatus"] = thisinfo.SubType
+				bt = thisinfo.SubType
+			} else if bs == "" {
+				set["bidstatus"] = ""
+				bt = ""
+			} else {
+				set["bidstatus"] = "其它"
+				bt = "其它"
+			}
 		}
 	}
 	p.mapBidLock.Unlock()
 
 	pkg := PackageFormat(thisinfo, nil)
-	p1 := p.NewCachePinfo(pId, thisinfo, bt, pkg)
+	p1 := p.NewCachePinfo(pId, thisinfo, bs, bt, pkg)
 
 	now := time.Now().Unix()
 	set["createtime"] = now
@@ -512,8 +517,10 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 			set["zbtime"] = tmp["publishtime"]
 		}
 	} else if thisinfo.TopType == "结果" || thisinfo.SubType == "合同" {
-		set["jgtime"] = tmp["publishtime"]
-		p1.Jgtime = thisinfo.Publishtime
+		if thisinfo.Infoformat != 2 {
+			set["jgtime"] = tmp["publishtime"]
+			p1.Jgtime = thisinfo.Publishtime
+		}
 	}
 
 	if len(thisinfo.Subscopeclass) > 0 {
@@ -630,7 +637,7 @@ func (p *ProjectTask) PushListInfo(tmp map[string]interface{}, infoid string) bs
 }
 
 //生成存放在内存中的对象
-func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidtype string, pkg map[string]interface{}) ProjectInfo {
+func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidtype, bidstatus string, pkg map[string]interface{}) ProjectInfo {
 	p1 := ProjectInfo{
 		Id:            id,
 		Ids:           []string{thisinfo.Id},
@@ -654,7 +661,7 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 		Budget:        thisinfo.Budget,
 		Package:       pkg,
 		Bidamount:     thisinfo.Bidamount,
-		Bidstatus:     thisinfo.SubType,
+		Bidstatus:     bidstatus,
 		Bidtype:       bidtype,
 		Winners:       thisinfo.Winners,
 	}
@@ -683,7 +690,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	pInfo.LastTime = thisinfo.Publishtime
 	set["lasttime"] = thisinfo.Publishtime
 	if thisinfo.TopType == "招标" {
-		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" {
+		if thisinfo.SubType != "变更" && thisinfo.SubType != "其它" && tmp["zbtime"] == nil {
 			set["zbtime"] = tmp["publishtime"]
 		}
 		if pInfo.Jgtime > 0 {
@@ -722,7 +729,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		if bidstatus[bs] != "" {
 			set["bidstatus"] = thisinfo.SubType
 			pInfo.Bidstatus = thisinfo.SubType
-		} else if tmp["infoformat"] == 2 {
+		} else if thisinfo.Infoformat == 2 {
 			set["bidstatus"] = "拟建"
 			pInfo.Bidstatus = "拟建"
 		} else if bs == "" {
@@ -908,7 +915,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 /**
  *	更新项目时,项目状态的处理
  *	返回是否新增项目,异常标记
- *	1、项目时,新项目时,招标信息的状态(toptype)不是招标、拟建、预告	异常:1
+ *	1、新项目时,招标信息的状态(toptype)不是招标、拟建、预告	异常:1
  *	   异常1是在项目新建的时候才会产生
  *	3、项目合并时,项目状态是”流标“/”废标“,招标信息状态不是”招标“		异常:2
  *	4、项目合并时,项目状态是”合同“/”其它“,招标信息类型是”结果“		异常:3
@@ -927,7 +934,7 @@ func (p *ProjectTask) CompareStatus(project *ProjectInfo, info *Info) (bool, int
 			return false, 0
 		} else if project.Bidstatus == info.SubType {
 			//状态一样,根据发布时间判断是否合并
-			if (info.Publishtime - project.FirstTime) > p.statusTime {
+			if (info.Publishtime - project.LastTime) > p.statusTime {
 				return true, 0
 			} else {
 				return false, 0
@@ -1060,7 +1067,6 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 				} else {
 					project.Budgettag = 1
 				}
-
 			}
 		}
 		if budget > 0 {

+ 12 - 0
udp_winner/main.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
+	hisRedis "github.com/go-redis/redis"
 	"go.mongodb.org/mongo-driver/bson"
 	es "gopkg.in/olivere/elastic.v1"
 	"log"
@@ -23,6 +24,7 @@ var (
 	Fields,BuyerFields,AgencyFields                                []string
 	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
+	HisRedisPool                          *hisRedis.Client
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
 	ElasticClientIndex, ElasticClientType string
@@ -112,6 +114,16 @@ func init() {
 		log.Fatalln("redis err:", err)
 	}
 	c.Close()
+	HisRedisPool = hisRedis.NewClient(&hisRedis.Options{
+		Addr:         "127.0.0.1:6379",
+		DB:           1,
+		DialTimeout:  10 * time.Second,
+		ReadTimeout:  30 * time.Second,
+		WriteTimeout: 30 * time.Second,
+		PoolSize:     30,
+		MinIdleConns: 20,
+		PoolTimeout:  30 * time.Second,
+	})
 }
 
 func main() {

+ 303 - 170
udp_winner/timedTaskWinner.go

@@ -15,8 +15,7 @@ import (
 	"time"
 )
 
-
-
+//之前main方法,只更新
 func TaskWinner(mapinfo *map[string]interface{}) {
 	defer util.Catch()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
@@ -47,207 +46,340 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		log.Println(cursor)
 		return
 	}
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(primitive.ObjectID).Hex()
-		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
-			continue
-		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
-				log.Println("FClient.Save err",tmpid)
+	//判断是否是存量,是存量走Redis遍历
+	if v, ok := (*mapinfo)["data_info"].(string); ok && v == "save" {
+		//存量处理
+		tmp := map[string]interface{}{}
+		conn := HisRedisPool.Conn()
+		defer conn.Close()
+		//选择redis db
+		conn.Select(1)
+		//遍历bidding表保存到redis
+		// key:_id  value:json结构体
+		for cursor.Next(&tmp) {
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
 			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
+			bytes, _ := json.Marshal(tmp)
+			if err := conn.Set(tmp["_id"].(primitive.ObjectID).Hex(), string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
-			continue
+		}
+		//遍历redis
+		if scan := conn.Scan(0, "", 100); scan.Err() != nil {
+			log.Println(scan.Err())
+			return
 		} else {
-			//log.Println("redis get :", reply)
-			//redis存在
-			//log.Println(reply)
-			//reply = "5e0316b998a9abaf6535df3d"
-			//id, err := primitive.ObjectIDFromHex(reply)
-			//if err != nil {
-			//	log.Println("get redis id  Hex err:", err, tmp)
-			//	if err := rdb.Close(); err != nil {
-			//		log.Println(err)
-			//	}
-			//	continue
-			//}
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
-			if oldTmp == nil{
-				log.Println("redis id 不存在")
-				continue
-			}
-			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
-			//if err != nil {
-			//	log.Println("qyk id err:", err, id)
-			//	continue
-			//}
-			//比较合并
-			//行业类型
-			tmpTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+			iterator := scan.Iterator()
+			for iterator.Next() {
+				redisId := iterator.Val()                       //redis key
+				redisvalue := conn.Get(iterator.Val()).String() //redis val
+				tmp := make(map[string]interface{})
+				json.Unmarshal([]byte(redisvalue),&tmp)
+				//重复增量操作
+				//redis查询是否存在
+				rdb := RedisPool.Get()
+				if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+					//redis不存在,存到临时表,定时任务处理
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+						log.Println("存量 FClient.Save err", tmpid)
+					}
+					//log.Println("get redis id err:定时任务处理", err, tmp)
+					if err := rdb.Close(); err != nil {
+						log.Println("存量",err)
+					}
+					//删除存量redis
+					conn.Del(redisId)
+					continue
+				} else {
+					if err := rdb.Close(); err != nil {
+						log.Println(err)
+					}
+					//拿到合并后的qyk
+					FClient.DbName = Config["mgodb_extract_kf"]
+					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					if oldTmp == nil {
+						log.Println("存量 redis id 不存在",reply,tmp["winner"])
+						continue
+					}
+					tmpTopscopeclass := []string{}
+					tmpTopscopeclassMap := make(map[string]bool)
 
-			if oldTmp["industry"] == nil {
-				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					if oldTmp["industry"] == nil {
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
+						}
+					} else {
+						if v, ok := oldTmp["industry"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok {
+									tmpTopscopeclassMap[vvv] = true
+								}
+							}
+						}
+						if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+							for _, vv := range v {
+								if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+									tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+								}
+							}
+							for k := range tmpTopscopeclassMap {
+								tmpTopscopeclass = append(tmpTopscopeclass, k)
+							}
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					sort.Strings(tmpTopscopeclass)
+					oldTmp["industry"] = tmpTopscopeclass
+					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+					//更新行业类型
+					if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+						oldTmp["updatatime"] = time.Now().Unix()
+						//mongo更新
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+							log.Println("mongo更新err", esId)
+						}
+
+						//es更新
+						delete(oldTmp, "_id")
+						if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+							log.Println("update es err:", err)
+						}
+						//删除存量redis
+						conn.Del(redisId)
+						continue
 					}
-				}
-			} else {
-				if v, ok := oldTmp["industry"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
+					//联系方式合并
+					var tmpperson, winnertel string
+					tmpperson = tmp["winnerperson"].(string)
+					if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+						winnertel = ""
+					} else {
+						if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+							winnertel = ""
+						} else {
+							winnertel = util.ObjToString(tmp["winnertel"])
 						}
 					}
-				}
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+					contactMaps := make([]interface{}, 0)
+					if oldTmp["contact"] == nil {
+						tmpContact := make(map[string]interface{})
+						tmpContact["infoid"] = redisId
+						tmpContact["contact_person"] = tmpperson
+						tmpContact["contact_type"] = "项目联系人"
+						tmpContact["phone"] = winnertel
+						tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						tmpContact["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, tmpContact)
+					} else {
+						//对比前四项,相等丢弃
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							var isNotUpdate bool
+							for _, vv := range v {
+								if vvv, ok := vv.(map[string]interface{}); ok {
+									if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+										vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+										isNotUpdate = true
+										vvv["updatetime"] = time.Now().Unix()
+									}
+									contactMaps = append(contactMaps, vvv)
+								}
+							}
+							if !isNotUpdate {
+								vvv := make(map[string]interface{})
+								vvv["infoid"] = redisId
+								vvv["contact_person"] = tmp["winnerperson"]
+								vvv["contact_type"] = "项目联系人"
+								vvv["phone"] = winnertel
+								vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+								vvv["updatetime"] = time.Now().Unix()
+								contactMaps = append(contactMaps, vvv)
+							}
 						}
 					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					oldTmp["contact"] = contactMaps
+					//mongo更新
+					oldTmp["updatatime"] = time.Now().Unix()
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("存量  mongo更新 err", esId, oldTmp)
+					}
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("存量 EsConn err :", err)
 					}
+					//最后删除redis
+					conn.Del(redisId)
 				}
 			}
-			sort.Strings(tmpTopscopeclass)
-			oldTmp["industry"] = tmpTopscopeclass
-			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				oldTmp["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName =Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-					log.Println("mongo更新err",esId)
+		}
+		log.Println("存量历史合并执行完成 ok", gtid, lteid)
+
+	} else {
+		//增量处理
+		overid := gtid
+		tmp := map[string]interface{}{}
+		for cursor.Next(&tmp) {
+			overid = tmp["_id"].(primitive.ObjectID).Hex()
+			log.Println(tmp["_id"])
+			if tmp["winner"] == nil || tmp["winner"] == "" {
+				continue
+			}
+			//redis查询是否存在
+			rdb := RedisPool.Get()
+			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+				//redis不存在存到临时表,定时任务处理
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if tmpid := FClient.Save("winner_new", tmp); tmpid == nil {
+					log.Println("FClient.Save err", tmpid)
 				}
-				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-				//	log.Println("mongo更新err:", err)
-				//}
-				//es更新
-				delete(oldTmp, "_id")
-				//esConn := elastic.GetEsConn()
-				//defer elastic.DestoryEsConn(esConn)
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
+				//log.Println("get redis id err:定时任务处理", err, tmp)
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
 				}
-				//log.Println( err2,err3)
 				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			tmpperson = tmp["winnerperson"].(string)
-			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
-				winnertel = ""
 			} else {
-				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+				if err := rdb.Close(); err != nil {
+					log.Println(err)
+				}
+				//拿到合并后的qyk
+				FClient.DbName = Config["mgodb_extract_kf"]
+				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				if oldTmp == nil {
+					log.Println("redis id 不存在")
+					continue
+				}
+				//比较合并
+				//行业类型
+				tmpTopscopeclass := []string{}
+				tmpTopscopeclassMap := make(map[string]bool)
+
+				if oldTmp["industry"] == nil {
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				} else {
+					if v, ok := oldTmp["industry"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok {
+								tmpTopscopeclassMap[vvv] = true
+							}
+						}
+					}
+					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+						for _, vv := range v {
+							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+							}
+						}
+						for k := range tmpTopscopeclassMap {
+							tmpTopscopeclass = append(tmpTopscopeclass, k)
+						}
+					}
+				}
+				sort.Strings(tmpTopscopeclass)
+				oldTmp["industry"] = tmpTopscopeclass
+				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+				//更新行业类型
+				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+					oldTmp["updatatime"] = time.Now().Unix()
+					//mongo更新
+					FClient.DbName = Config["mgodb_extract_kf"]
+					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						log.Println("mongo更新err", esId)
+					}
+
+					//es更新
+					delete(oldTmp, "_id")
+					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+						log.Println("update es err:", err)
+					}
+					continue
+				}
+				//联系方式合并
+				var tmpperson, winnertel string
+				tmpperson = tmp["winnerperson"].(string)
+				if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
 					winnertel = ""
 				} else {
-					winnertel = util.ObjToString(tmp["winnertel"])
+					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+						winnertel = ""
+					} else {
+						winnertel = util.ObjToString(tmp["winnertel"])
+					}
 				}
-			}
-			contactMaps := make([]interface{}, 0)
-			if oldTmp["contact"] == nil {
-				tmpContact := make(map[string]interface{})
-				tmpContact["contact_person"] = tmpperson
-				tmpContact["contact_type"] = "项目联系人"
-				tmpContact["phone"] = winnertel
-				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-				tmpContact["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, tmpContact)
-			} else {
-				//对比前四项,相等丢弃
-				if v, ok := oldTmp["contact"].(primitive.A); ok {
-					var isNotUpdate bool
-					for _, vv := range v {
-						if vvv, ok := vv.(map[string]interface{}); ok {
-							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-								isNotUpdate = true
-								vvv["updatetime"] = time.Now().Unix()
+				contactMaps := make([]interface{}, 0)
+				if oldTmp["contact"] == nil {
+					tmpContact := make(map[string]interface{})
+					tmpContact["infoid"] = overid
+					tmpContact["contact_person"] = tmpperson
+					tmpContact["contact_type"] = "项目联系人"
+					tmpContact["phone"] = winnertel
+					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					tmpContact["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, tmpContact)
+				} else {
+					//对比前四项,相等丢弃
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						var isNotUpdate bool
+						for _, vv := range v {
+							if vvv, ok := vv.(map[string]interface{}); ok {
+								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+									vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+									isNotUpdate = true
+									vvv["updatetime"] = time.Now().Unix()
+								}
+								contactMaps = append(contactMaps, vvv)
 							}
+						}
+						if !isNotUpdate {
+							vvv := make(map[string]interface{})
+							vvv["infoid"] = overid
+							vvv["contact_person"] = tmp["winnerperson"]
+							vvv["contact_type"] = "项目联系人"
+							vvv["phone"] = winnertel
+							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+							vvv["updatetime"] = time.Now().Unix()
 							contactMaps = append(contactMaps, vvv)
 						}
 					}
-					if !isNotUpdate {
-						vvv := make(map[string]interface{})
-						vvv["contact_person"] = tmp["winnerperson"]
-						vvv["contact_type"] = "项目联系人"
-						vvv["phone"] = winnertel
-						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, vvv)
-					}
+				}
+				oldTmp["contact"] = contactMaps
+				//mongo更新
+				oldTmp["updatatime"] = time.Now().Unix()
+				FClient.DbName = Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					log.Println("mongo更新 err", esId, oldTmp)
+				}
+				//es更新
+				delete(oldTmp, "_id")
+				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("EsConn err :", err)
 				}
 			}
-			oldTmp["contact"] = contactMaps
-			//mongo更新
-			oldTmp["updatatime"] = time.Now().Unix()
-			FClient.DbName=Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-				log.Println("mongo更新 err",esId,oldTmp)
-			}
-			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-			//	log.Println("mongo更新 err :", err)
-			//}
-			//es更新
-			delete(oldTmp, "_id")
-			//esConn := elastic.GetEsConn()
-			//defer elastic.DestoryEsConn(esConn)
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
-			//log.Println( err2,err3)
 		}
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 	}
-	//defer cursor.Close(context.Background())
-	//log.Println("合并执行完成", gtid, lteid, overid)
-	//if overid != lteid {
-	//	by, _ := json.Marshal(map[string]interface{}{
-	//		"gtid":  overid,
-	//		"lteid": lteid,
-	//		"stype": "",
-	//	})
-	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-	//		IP:   net.ParseIP("127.0.0.1"),
-	//		Port: Updport,
-	//	}); e != nil {
-	//		log.Println(e)
-	//	}
-	//	log.Println("重新发送udp:", string(by))
-	//	return
-	//}
-	log.Println("合并执行完成 ok", gtid, lteid, overid)
 
 }
 
-
-//定时任务
+//定时任务  新增
 //1.存异常表
 //2.合并原始库新增
 func TimedTaskWinner() {
@@ -385,6 +517,7 @@ func TimedTaskWinner() {
 						}
 						contact["topscopeclass"] = "企业公示"         //项目类型
 						contact["updatetime"] = time.Now().Unix() //更新时间
+						contact["infoid"] = ""                    //招标信息id
 						contacts = append(contacts, contact)
 						resulttmp["contact"] = contacts