mxs 1 рік тому
батько
коміт
ef7e599a0f

+ 7 - 0
data_project_wy/config.json

@@ -20,6 +20,13 @@
     "username": "jytop",
     "password": "pwdTopJy123"
   },
+  "es": {
+    "addr": "http://192.168.3.149:9201",
+    "size": 10,
+    "index": "transaction_info",
+    "username": "",
+    "password": ""
+  },
   "bidstarttime": 1713196800,
   "prostarttime": 1713196800,
   "startcron": "0 0 9 ? * *"

+ 11 - 3
data_project_wy/go.mod

@@ -7,18 +7,24 @@ require (
 	github.com/gogf/gf/v2 v2.7.0
 	github.com/robfig/cron v1.2.0
 	go.mongodb.org/mongo-driver v1.11.4
-	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240202055658-e2ef72e18b40
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240412074219-927f3f682cb3
 )
 
 require (
 	github.com/ClickHouse/ch-go v0.61.5 // indirect
+	github.com/PuerkitoBio/goquery v1.8.0 // indirect
 	github.com/andybalholm/brotli v1.1.0 // indirect
+	github.com/andybalholm/cascadia v1.3.1 // indirect
+	github.com/dchest/captcha v1.0.0 // indirect
 	github.com/go-faster/city v1.0.1 // indirect
 	github.com/go-faster/errors v0.7.1 // indirect
 	github.com/golang/snappy v0.0.1 // indirect
 	github.com/google/uuid v1.6.0 // indirect
+	github.com/josharian/intern v1.0.0 // indirect
 	github.com/klauspost/compress v1.17.7 // indirect
+	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
+	github.com/olivere/elastic/v7 v7.0.32 // indirect
 	github.com/paulmach/orb v0.11.1 // indirect
 	github.com/pierrec/lz4/v4 v4.1.21 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
@@ -30,9 +36,11 @@ require (
 	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
 	go.opentelemetry.io/otel v1.24.0 // indirect
 	go.opentelemetry.io/otel/trace v1.24.0 // indirect
-	golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
+	golang.org/x/crypto v0.21.0 // indirect
+	golang.org/x/net v0.22.0 // indirect
 	golang.org/x/sync v0.6.0 // indirect
 	golang.org/x/sys v0.18.0 // indirect
-	golang.org/x/text v0.13.0 // indirect
+	golang.org/x/text v0.14.0 // indirect
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
 )

+ 127 - 15
data_project_wy/history.go

@@ -4,10 +4,11 @@ import (
 	"fmt"
 	"github.com/gogf/gf/v2/util/gconv"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"strings"
 	"sync"
 )
 
-// HisTransactionDataFromBid 历史bidding(指定截止comeintime)
+// HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向
 func HisTransactionDataFromBid() {
 	sess := MgoB.GetMgoConn()
 	defer MgoB.DestoryMongoConn(sess)
@@ -58,7 +59,7 @@ func HisTransactionDataFromBid() {
 			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
 				return
 			}
-			result := DealTransactionForBid(tmp)
+			result := DealTransactionForBid(tmp, "采购意向", 3)
 			lock.Lock()
 			if len(result) > 0 {
 				arr = append(arr, result)
@@ -82,6 +83,98 @@ func HisTransactionDataFromBid() {
 	fmt.Println("结束")
 }
 
+// HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增项目)
+func HisTransactionDataFromBid2() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": 1713715200,
+			"$lt":  1713801600,
+		},
+		"toptype": "拟建",
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		//
+		"s_topscopeclass":       1,
+		"publishtime":           1,
+		"toptype":               1,
+		"comeintime":            1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//comeintime := gconv.Int64(tmp["comeintime"])
+			//if comeintime < 1609430400 || comeintime >= 1713715200 {
+			//	return
+			//}
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程
+				return
+			}
+			if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业
+				return
+			} else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") {
+				return
+			}
+			//if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业
+			//	return
+			//}
+			project_bidstatus := 4 //拟建
+			business_type := "新增项目"
+			result := DealTransactionForBid(tmp, business_type, project_bidstatus)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy_nj", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy_nj", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
 // HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
 func HisTransactionDataFromProject() {
 	sess := MgoPro.GetMgoConn()
@@ -162,11 +255,15 @@ func HisTransactionDataFromProject() {
 func HisTransactionDataAddInformation() {
 	sess := MgoPro.GetMgoConn()
 	defer MgoPro.DestoryMongoConn(sess)
-	ch := make(chan bool, 20)
+	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
-	query := map[string]interface{}{}
-	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
+	query := map[string]interface{}{
+		//"_id": map[string]interface{}{
+		//	"$gte": mongodb.StringTOBsonId("6627289319c5408c478125d4"),
+		//},
+	}
+	it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
 	n := 0
 	arr := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
@@ -179,27 +276,42 @@ func HisTransactionDataAddInformation() {
 			}()
 			id := mongodb.BsonIdToSId(tmp["_id"])
 			update := []map[string]interface{}{
-				{"_id": tmp["_id"]},
+				{"_id": mongodb.StringTOBsonId(id)},
 			}
 			set := map[string]interface{}{}
 			//法人信息
 			buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
 			set["buyer_id"] = buyer_id
 			set["agency_id"] = agency_id
-			set["winner_ids"] = winner_ids
+			set["winner_id"] = winner_ids
+
 			//项目信息补充业态
-			if from := gconv.String(tmp["from"]); from == "project" {
-				project_id := gconv.String(tmp["project_id"])
-				pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
-				if len(*pro) > 0 && (*pro)["property_form"] != nil {
-					set["property_form"] = (*pro)["property_form"]
-				}
+			//if from := gconv.String(tmp["from"]); from == "project" {
+			//	project_id := gconv.String(tmp["project_id"])
+			//	pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
+			//	if len(*pro) > 0 && (*pro)["property_form"] != nil {
+			//		set["property_form"] = (*pro)["property_form"]
+			//	}
+			//}
+			delete(tmp, "from") //无用字段删除
+			delete(tmp, "_id")  //无用字段删除
+			tmp["buyer_id"] = buyer_id
+			tmp["agency_id"] = agency_id
+			tmp["winner_id"] = winner_ids
+			if !SaveDataToEs(tmp) { //保存、更新es
+				fmt.Println("数据保存es失败,数据类型  项目project_id", tmp["project_id"])
+			}
+			var err error
+			err = UpdateOrSaveDataToClickHouse(tmp)
+			if err != nil {
+				fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err)
 			}
+			//更新
 			update = append(update, map[string]interface{}{"$set": set})
 			lock.Lock()
 			arr = append(arr, update)
 			if len(arr) > 100 {
-				MgoPro.UpdateBulk("projectset_wy_back", arr...)
+				MgoPro.UpdateBulk("projectset_wy", arr...)
 				arr = [][]map[string]interface{}{}
 			}
 			lock.Unlock()
@@ -211,7 +323,7 @@ func HisTransactionDataAddInformation() {
 	}
 	wg.Wait()
 	if len(arr) > 0 {
-		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		MgoPro.UpdateBulk("projectset_wy", arr...)
 		arr = [][]map[string]interface{}{}
 	}
 	fmt.Println("迁移结束...")

+ 14 - 0
data_project_wy/init.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 )
 
@@ -10,6 +11,7 @@ type conf struct {
 	MgoPro db `json:"mgopro"`
 	//MysqlDb    db  `json:"mysqldb"`
 	ClickHouse   ckh    `json:"clickhouse"`
+	Es           db     `json:"es"`
 	BidStartTime int64  `json:"bidstarttime"` //bidding增量起始id
 	ProStartTime int64  `json:"prostarttime"` //project增量起始id
 	StartCron    string `json:"startcron"`
@@ -20,6 +22,7 @@ type db struct {
 	Size     int    `json:"size"`
 	Username string `json:"username"`
 	Password string `json:"password"`
+	Index    string `json:"index"`
 }
 type ckh struct {
 	Addr     []string `json:"addr"`
@@ -34,6 +37,7 @@ var (
 	MgoB    *mongodb.MongodbSim //bidding
 	MgoPro  *mongodb.MongodbSim //project
 	CkhTool driver.Conn         //
+	Es      *elastic.Elastic
 	//MysqlTool *mysqldb.Mysql
 	BidStartTime int64
 	ProStartTime int64
@@ -85,6 +89,16 @@ func InitCkh() {
 	)
 }
 
+func InitEs() {
+	Es = &elastic.Elastic{
+		S_esurl:  Config.Es.Addr,
+		I_size:   Config.Es.Size,
+		Username: Config.Es.Username,
+		Password: Config.Es.Password,
+	}
+	Es.InitElasticSize()
+}
+
 func InitOther() {
 	BidStartTime = Config.BidStartTime
 	ProStartTime = Config.ProStartTime

+ 60 - 3
data_project_wy/main.go

@@ -1,13 +1,17 @@
 package main
 
 import (
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
 	"github.com/robfig/cron"
+	"sync"
 )
 
 func init() {
 	ReadConfig(&Config) //初始化
 	InitMgo()           //mgo
 	InitCkh()           //clickhouse
+	InitEs()            //es
 	InitOther()
 }
 
@@ -17,10 +21,63 @@ func main() {
 	c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
 	c.Start()
 	//历史
-	//HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800
-	//HisTransactionDataFromProject() //历史项目数据(projectset_20230904)
+	//HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
+	//HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
+	//HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
+	//临时处理(信息补充)
 	//HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
-	//IncTransactionDataMgoToCkh()//数据迁移
+	//IncTransactionDataMgoToCkh() //数据迁移
 	ch := make(chan bool)
 	<-ch
 }
+
+func tmp() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"zbtime": 0,
+	}
+	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			projectid := gconv.String(tmp["project_id"])
+			data, _ := MgoPro.FindById("projectset_20230904", projectid, map[string]interface{}{"firsttime": 1})
+			firsttime := gconv.Int64((*data)["firsttime"])
+			update := []map[string]interface{}{
+				{"_id": tmp["_id"]},
+			}
+			set := map[string]interface{}{
+				"zbtime": firsttime,
+			}
+			update = append(update, map[string]interface{}{"$set": set})
+			lock.Lock()
+			arr = append(arr, update)
+			if len(arr) > 100 {
+				MgoPro.UpdateBulk("projectset_wy_back", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	fmt.Println("迁移结束...")
+}

+ 121 - 36
data_project_wy/task.go

@@ -47,15 +47,15 @@ func IncTransactionDataFromBidAndPro() {
 	IncTransactionDataFromBid() //bidding
 	IncTransactionDataFromPro() //project
 	return
-	IncTransactionDataMgoToCkh() //mongodb迁移至clickhouse
+	IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse
 }
 
 // IncTransactionDataFromBid 增量bidding
 func IncTransactionDataFromBid() {
 	endTime := GetTime(-1) //前一天凌晨
-	fmt.Println("开始执行增量采购意向信息", BidStartTime, endTime)
+	fmt.Println("开始执行增量采购意向、拟建信息", BidStartTime, endTime)
 	if BidStartTime >= endTime {
-		fmt.Println("增量bidding采购意向查询异常:", BidStartTime, endTime)
+		fmt.Println("增量bidding采购意向、拟建查询异常:", BidStartTime, endTime)
 		return
 	}
 	query := map[string]interface{}{
@@ -83,6 +83,7 @@ func IncTransactionDataFromBid() {
 		"city":          1,
 		"district":      1,
 		//
+		"s_topscopeclass":       1,
 		"publishtime":           1,
 		"toptype":               1,
 		"extracttype":           1,
@@ -94,6 +95,7 @@ func IncTransactionDataFromBid() {
 	arr := []map[string]interface{}{}
 	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
 	n := 0
+	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
 		wg.Add(1)
@@ -102,19 +104,34 @@ func IncTransactionDataFromBid() {
 				<-ch
 				wg.Done()
 			}()
-			if gconv.String(tmp["toptype"]) != "采购意向" { //非采购意向数据过滤
-				return
-			}
 			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
 				return
 			}
-			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+			toptype := gconv.String(tmp["toptype"])
+			tag_topinformation := gconv.String(tmp["tag_topinformation"])
+			tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"])
+			var business_type string
+			var project_bidstatus int
+			if toptype == "采购意向" { //采购意向数据
+				if !strings.Contains(tag_topinformation, "物业") && !strings.Contains(tag_topinformation_ai, "物业") {
+					return
+				}
+				business_type = "采购意向"
+				project_bidstatus = 3
+			} else if toptype == "拟建" {
+				if strings.Contains(tag_topinformation, "物业") || strings.Contains(tag_topinformation_ai, "物业") {
+					return
+				}
+				business_type = "新增项目"
+				project_bidstatus = 4
+			} else {
 				return
 			}
-			result := DealTransactionForBid(tmp)
+			result := DealTransactionForBid(tmp, business_type, project_bidstatus)
 			lock.Lock()
 			if len(result) > 0 {
 				arr = append(arr, result)
+				count++
 			}
 			if len(arr) > 50 {
 				MgoPro.SaveBulk("projectset_wy", arr...)
@@ -132,12 +149,12 @@ func IncTransactionDataFromBid() {
 		MgoPro.SaveBulk("projectset_wy", arr...)
 		arr = []map[string]interface{}{}
 	}
-	fmt.Println("执行增量采购意向信息完毕", BidStartTime, endTime)
+	fmt.Println("执行增量采购意向、拟建信息完毕", BidStartTime, endTime, count)
 	BidStartTime = endTime //替换
 }
 
 // DealTransactionForBid bidding采购意向数据处理
-func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
+func DealTransactionForBid(tmp map[string]interface{}, business_type string, project_bidstatus int) map[string]interface{} {
 	//基本信息封装
 	id := mongodb.BsonIdToSId(tmp["_id"])
 	buyer := gconv.String(tmp["buyer"])
@@ -162,6 +179,17 @@ func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
 		subclass = gconv.Strings(tag_subinformation_ai)
 	}
 
+	//TODO 情报库信息(待补充)
+	information_id := ""
+	starttime, endtime := int64(0), int64(0)
+	//if project_bidstatus == 4 {//补充情报信息
+	//	//bidId = "65fbf3f566cf0db42a2a99d2"
+	//	info := FindInfomationData(id) //情报信息查询
+	//	information_id = info.Id
+	//	starttime = info.Starttime
+	//	endtime = info.Endtime
+	//}
+
 	//TODO 查询法人库信息(待补充)
 	winners := []string{}
 	if winner != "" {
@@ -177,10 +205,10 @@ func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
 		Project_Budget:    budget,
 		Project_Bidamount: bidamount,
 		Project_Money:     money,
-		Business_Type:     "采购意向",
-		Project_Bidstatus: 3,
+		Business_Type:     business_type,
+		Project_Bidstatus: project_bidstatus,
 		Info_Id:           id,
-		Information_Id:    "",
+		Information_Id:    information_id,
 		Buyer:             buyer,
 		Winner:            winners,
 		Agency:            agency,
@@ -195,8 +223,8 @@ func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
 		District:          gconv.String(tmp["district"]),
 		ZbTime:            gconv.Int64(tmp["publishtime"]),
 		JgTime:            int64(0),
-		StartTime:         int64(0),
-		EndTime:           int64(0),
+		StartTime:         starttime,
+		EndTime:           endtime,
 		Create_Time:       time.Now().Unix(),
 		Update_Time:       time.Now().Unix(),
 		//
@@ -256,6 +284,7 @@ func IncTransactionDataFromPro() {
 	arr := [][]map[string]interface{}{}
 	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
 	n := 0
+	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
 		wg.Add(1)
@@ -270,6 +299,7 @@ func IncTransactionDataFromPro() {
 			result := DealTransactionForPro(tmp)
 			lock.Lock()
 			if len(result) > 0 {
+				count++
 				update := []map[string]interface{}{
 					{"project_id": tmp["_id"]},
 					{"$set": result},
@@ -292,7 +322,7 @@ func IncTransactionDataFromPro() {
 		MgoPro.UpSertBulk("projectset_wy_back", arr...)
 		arr = [][]map[string]interface{}{}
 	}
-	fmt.Println("执行增量项目信息完毕", ProStartTime, endTime)
+	fmt.Println("执行增量项目信息完毕", ProStartTime, endTime, count)
 	ProStartTime = endTime //替换
 }
 
@@ -304,6 +334,9 @@ func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
 	winner := gconv.String(data["s_winner"])
 	agency := gconv.String(data["agency"])
 	zbtime := gconv.Int64(data["zbtime"])
+	if zbtime == 0 {
+		zbtime = gconv.Int64(data["firsttime"])
+	}
 	property_form := []string{}
 	if data["property_form"] != nil {
 		property_form = gconv.Strings(data["property_form"])
@@ -334,11 +367,7 @@ func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
 		project_bidstatus = 0
 	} else if bidstatus == "招标" {
 		business_type = "招标项目"
-		if zbtime == 0 {
-			zbtime = gconv.Int64(data["firsttime"])
-		}
 	}
-
 	//查询情报信息
 	//bidId = "65fbf3f566cf0db42a2a99d2"
 	ids := gconv.Strings(data["ids"])
@@ -390,8 +419,8 @@ func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
 	return result
 }
 
-// IncTransactionDataMgoToCkh 数据迁移
-func IncTransactionDataMgoToCkh() {
+// IncTransactionDataMgoToCkhAndEs 数据迁移
+func IncTransactionDataMgoToCkhAndEs() {
 	/*
 		数据根据update_time查询
 		1、采购意向数据(from=bidding)只插入
@@ -400,7 +429,7 @@ func IncTransactionDataMgoToCkh() {
 	fmt.Println("开始执行迁移...")
 	sess := MgoPro.GetMgoConn()
 	defer MgoPro.DestoryMongoConn(sess)
-	ch := make(chan bool, 10)
+	ch := make(chan bool, 2)
 	wg := &sync.WaitGroup{}
 	query := map[string]interface{}{
 		"update_time": map[string]interface{}{
@@ -417,18 +446,16 @@ func IncTransactionDataMgoToCkh() {
 				<-ch
 				wg.Done()
 			}()
-			var err error
 			from := gconv.String(tmp["from"])
-			delete(tmp, "from")    //无用字段删除
-			delete(tmp, "_id")     //无用字段删除
-			if from == "bidding" { //采购意向,插入
-				err = SaveDataToClickHouse(tmp)
-			} else { //项目信息,更新,插入
-				project_id := gconv.String(tmp["project_id"])
-				err = UpdateOrSaveDataToClickHouse(project_id, tmp)
+			delete(tmp, "from")     //无用字段删除
+			delete(tmp, "_id")      //无用字段删除
+			if !SaveDataToEs(tmp) { //保存、更新es
+				fmt.Println("数据保存es失败,项目project_id", tmp["project_id"])
 			}
-			if err != nil {
-				fmt.Println("数据迁移失败,数据类型", from, "  项目project_id", tmp["project_id"], err)
+			if from == "bidding" { //采购意向、拟建,插入
+				SaveDataToClickHouse(tmp)
+			} else { //项目信息,更新,插入
+				UpdateOrSaveDataToClickHouse(tmp)
 			}
 		}(tmp)
 		if n%100 == 0 {
@@ -446,7 +473,7 @@ type Infomation struct {
 	Endtime   int64
 }
 
-// 情报信息查询
+// FindInfomationData 情报信息查询
 func FindInfomationData(ids ...string) (info Infomation) {
 	for _, id := range ids {
 		query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
@@ -468,7 +495,7 @@ func FindInfomationData(ids ...string) (info Infomation) {
 	return
 }
 
-// 法人信息查询
+// FindEntInfoData 法人信息查询
 func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
 	winner_ids = []string{}
 	winnerMap := map[string]bool{} //记录所有中标单位
@@ -515,14 +542,21 @@ func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, age
 }
 
 // UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
-func UpdateOrSaveDataToClickHouse(project_id string, data map[string]interface{}) (err error) {
+func UpdateOrSaveDataToClickHouse(data map[string]interface{}) (err error) {
+	project_id := gconv.String(data["project_id"])
 	count := FindClickHouseByProjectId(project_id) //查询
 	if count > 0 {                                 //更新
 		delete(data, "create_time") //不更新创建时间
 		delete(data, "project_id")  //不更新项目id(主键)
 		err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
+		if err != nil {
+			fmt.Println("clickhouse更新失败", project_id, data)
+		}
 	} else { //插入
 		err = SaveDataToClickHouse(data)
+		if err != nil {
+			fmt.Println("clickhouse保存失败", project_id, data)
+		}
 	}
 	return
 }
@@ -567,6 +601,57 @@ func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
 	return CkhTool.Exec(context.Background(), query, values...)
 }
 
+// SaveDataToEs es存储
+func SaveDataToEs(data map[string]interface{}) bool {
+	tmp := map[string]interface{}{}
+	for k, v := range data {
+		if k == "project_id" {
+			k = "_id"
+		}
+		tmp[k] = v
+	}
+	err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
+	if err == nil && len(result) > 0 { //存在,更新
+		tmp["create_time"] = result["create_time"] //不更新create_time
+	}
+	return Es.Save(Config.Es.Index, tmp)
+}
+
+func FindEntInfoData2(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
+	query := fmt.Sprintf(`SELECT id FROM %s WHERE company_name = ?`, Config.ClickHouse.DataBase+".ent_info")
+	if buyer != "" {
+		buyer_id = GetClickHouseData(bid, query, buyer)
+	}
+	if agency != "" {
+		agency_id = GetClickHouseData(bid, query, agency)
+	}
+	if len(winners) > 0 {
+		for _, w := range winners {
+			winner_id := GetClickHouseData(bid, query, w)
+			if winner_id != "" {
+				winner_ids = append(winner_ids, winner_id)
+			}
+		}
+	}
+	return
+}
+
+func GetClickHouseData(bid, query, value string) string {
+	rows, err := CkhTool.Query(context.Background(), query, value)
+	if err != nil {
+		return ""
+	}
+	for rows.Next() {
+		var id string
+		if err := rows.Scan(&id); err == nil {
+			return id
+		} else {
+			fmt.Println("查询情报信息异常:", err, bid)
+		}
+	}
+	return ""
+}
+
 /*// SaveTransactionData 保存增量物业信息
 func SaveTransactionData() {
 	fmt.Println("save projectset_wy...")