|
@@ -0,0 +1,738 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/gogf/gf/v2/util/gconv"
|
|
|
+ "go.mongodb.org/mongo-driver/bson"
|
|
|
+ "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
+)
|
|
|
+
|
|
|
+type Transaction struct {
|
|
|
+ Project_Id string `bson:"project_id"`
|
|
|
+ Project_Name string `bson:"project_name"`
|
|
|
+ Project_Budget float64 `bson:"project_budget"`
|
|
|
+ Project_Bidamount float64 `bson:"project_bidamount"`
|
|
|
+ Project_Money float64 `bson:"project_money"`
|
|
|
+ Business_Type string `bson:"business_type"`
|
|
|
+ Project_Bidstatus int `bson:"project_bidstatus"`
|
|
|
+ Info_Id string `bson:"info_id"`
|
|
|
+ Info_Ids []string `bson:"info_ids"`
|
|
|
+ Information_Id string `bson:"information_id"`
|
|
|
+ BuyerClass string `bson:"buyerclass"`
|
|
|
+ Buyer string `bson:"buyer"`
|
|
|
+ Buyer_Id string `bson:"buyer_id"`
|
|
|
+ Winner []string `bson:"winner"`
|
|
|
+ Winner_Id []string `bson:"winner_id"`
|
|
|
+ Agency string `bson:"agency"`
|
|
|
+ Agency_Id string `bson:"agency_id"`
|
|
|
+ Property_Form []string `bson:"property_form"`
|
|
|
+ SubClass []string `bson:"subclass"`
|
|
|
+ MultiPackage int `bson:"multipackage"`
|
|
|
+ Topscopeclass []string `bson:"topscopeclass"`
|
|
|
+ Area string `bson:"area"`
|
|
|
+ City string `bson:"city"`
|
|
|
+ District string `bson:"district"`
|
|
|
+ ZbTime int64 `bson:"zbtime"`
|
|
|
+ JgTime int64 `bson:"jgtime"`
|
|
|
+ StartTime int64 `bson:"starttime"`
|
|
|
+ EndTime int64 `bson:"endtime"`
|
|
|
+ Create_Time int64 `bson:"create_time"`
|
|
|
+ Update_Time int64 `bson:"update_time"`
|
|
|
+ //
|
|
|
+ // From string `bson:"from"`
|
|
|
+}
|
|
|
+
|
|
|
+func IncTransactionDataFromBidAndPro() {
|
|
|
+ // IncTransactionDataFromBid() //bidding
|
|
|
+ IncTransactionDataFromPro() //project
|
|
|
+ // IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse
|
|
|
+}
|
|
|
+
|
|
|
+// IncTransactionDataFromBid 增量bidding
|
|
|
+func IncTransactionDataFromBid() {
|
|
|
+ // endTime := GetTime(-1) //前一天凌晨
|
|
|
+ // fmt.Println("开始执行增量采购意向、拟建信息", BidStartTime, endTime)
|
|
|
+ // if BidStartTime >= endTime {
|
|
|
+ // fmt.Println("增量bidding采购意向、拟建查询异常:", BidStartTime, endTime)
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "pici": map[string]interface{}{
|
|
|
+ // "$gte": BidStartTime,
|
|
|
+ "$lt": 1729440000,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fmt.Println("增量bidding采购意向query:", query)
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ // lock := &sync.Mutex{}
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "projectname": 1,
|
|
|
+ "budget": 1,
|
|
|
+ "bidamount": 1,
|
|
|
+ "buyer": 1,
|
|
|
+ "s_winner": 1,
|
|
|
+ "agency": 1,
|
|
|
+ "property_form": 1,
|
|
|
+ "multipackage": 1,
|
|
|
+ "area": 1,
|
|
|
+ "city": 1,
|
|
|
+ "district": 1,
|
|
|
+ "buyerclass": 1,
|
|
|
+ //
|
|
|
+ "owner": 1,
|
|
|
+ "s_topscopeclass": 1,
|
|
|
+ "publishtime": 1,
|
|
|
+ "toptype": 1,
|
|
|
+ "extracttype": 1,
|
|
|
+ "tag_subinformation": 1,
|
|
|
+ "tag_subinformation_ai": 1,
|
|
|
+ "tag_topinformation": 1,
|
|
|
+ "tag_topinformation_ai": 1,
|
|
|
+ }
|
|
|
+ arr := []map[string]interface{}{}
|
|
|
+ it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Sort("-_id").Iter()
|
|
|
+ n := 0
|
|
|
+ // count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
|
|
|
+ return
|
|
|
+ }
|
|
|
+ toptype := gconv.String(tmp["toptype"])
|
|
|
+ // tag_topinformation := gconv.String(tmp["tag_topinformation"])
|
|
|
+ // tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"])
|
|
|
+ var business_type string
|
|
|
+ var project_bidstatus int
|
|
|
+ if toptype == "采购意向" { //采购意向数据
|
|
|
+ // if !strings.Contains(tag_topinformation, "物业") && !strings.Contains(tag_topinformation_ai, "物业") {
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ business_type = "采购意向"
|
|
|
+ project_bidstatus = 3
|
|
|
+ } else if toptype == "拟建" {
|
|
|
+ s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
|
|
|
+ // if !strings.Contains(s_topscopeclass, "建筑工程") || strings.Contains(tag_topinformation, "物业") || strings.Contains(tag_topinformation_ai, "物业") {
|
|
|
+ if !strings.Contains(s_topscopeclass, "建筑工程") {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ business_type = "新增物业项目"
|
|
|
+ project_bidstatus = 4
|
|
|
+ } else {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ result := DealTransactionForBid(tmp, business_type, project_bidstatus)
|
|
|
+ if !SaveDataToEs(result) { //保存、更新es
|
|
|
+ fmt.Println("数据保存es失败,项目project_id", result["project_id"])
|
|
|
+ }
|
|
|
+ SaveDataToClickHouse(result)
|
|
|
+ // lock.Lock()
|
|
|
+ // if len(result) > 0 {
|
|
|
+ // arr = append(arr, result)
|
|
|
+ // count++
|
|
|
+ // }
|
|
|
+ // if len(arr) > 50 {
|
|
|
+ // MgoPro.SaveBulk("projectset_wy", arr...)
|
|
|
+ // arr = []map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ fmt.Println("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ MgoPro.SaveBulk("projectset_wy", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ // fmt.Println("执行增量采购意向、拟建信息完毕", BidStartTime, endTime, count)
|
|
|
+ // BidStartTime = endTime //替换
|
|
|
+}
|
|
|
+
|
|
|
+// DealTransactionForBid bidding采购意向、拟建数据处理
|
|
|
+func DealTransactionForBid(tmp map[string]interface{}, business_type string, project_bidstatus int) map[string]interface{} {
|
|
|
+ //基本信息封装
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ buyerclass := gconv.String(tmp["buyerclass"])
|
|
|
+ buyer := gconv.String(tmp["buyer"])
|
|
|
+ if buyer == "" {
|
|
|
+ buyer = gconv.String(tmp["owner"])
|
|
|
+ }
|
|
|
+ winner := gconv.String(tmp["s_winner"])
|
|
|
+ agency := gconv.String(tmp["agency"])
|
|
|
+ property_form := []string{}
|
|
|
+ if tmp["property_form"] != nil {
|
|
|
+ property_form = gconv.Strings(tmp["property_form"])
|
|
|
+ }
|
|
|
+ bidamount := gconv.Float64(tmp["bidamount"])
|
|
|
+ budget := gconv.Float64(tmp["budget"])
|
|
|
+ money := bidamount
|
|
|
+ if money <= 0 {
|
|
|
+ money = budget
|
|
|
+ }
|
|
|
+
|
|
|
+ //物业分类
|
|
|
+ subclass := []string{}
|
|
|
+ if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil {
|
|
|
+ subclass = gconv.Strings(tag_subinformation)
|
|
|
+ } else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil {
|
|
|
+ subclass = gconv.Strings(tag_subinformation_ai)
|
|
|
+ }
|
|
|
+
|
|
|
+ //情报信息查询
|
|
|
+ // info := FindInfomationData(id)
|
|
|
+ topscopeclass := []string{}
|
|
|
+ s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
|
|
|
+ if s_topscopeclass != "" {
|
|
|
+ topscopeclass = strings.Split(s_topscopeclass, ",")
|
|
|
+ }
|
|
|
+ //法人信息
|
|
|
+ winners := []string{}
|
|
|
+ if winner != "" {
|
|
|
+ winners = strings.Split(winner, ",")
|
|
|
+ }
|
|
|
+ buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
|
|
|
+ //物业信息
|
|
|
+ t := &Transaction{
|
|
|
+ Project_Id: id,
|
|
|
+ Project_Name: gconv.String(tmp["projectname"]),
|
|
|
+ Project_Budget: budget,
|
|
|
+ Project_Bidamount: bidamount,
|
|
|
+ Project_Money: money,
|
|
|
+ Business_Type: business_type,
|
|
|
+ Project_Bidstatus: project_bidstatus,
|
|
|
+ Info_Id: id,
|
|
|
+ Info_Ids: []string{id},
|
|
|
+ // Information_Id: info.Id,
|
|
|
+ BuyerClass: buyerclass,
|
|
|
+ Buyer: buyer,
|
|
|
+ Topscopeclass: topscopeclass,
|
|
|
+ Winner: winners,
|
|
|
+ Agency: agency,
|
|
|
+ Buyer_Id: buyer_id,
|
|
|
+ Winner_Id: winner_ids,
|
|
|
+ Agency_Id: agency_id,
|
|
|
+ Property_Form: property_form,
|
|
|
+ SubClass: subclass,
|
|
|
+ MultiPackage: gconv.Int(tmp["multipackage"]),
|
|
|
+ Area: gconv.String(tmp["area"]),
|
|
|
+ City: gconv.String(tmp["city"]),
|
|
|
+ District: gconv.String(tmp["district"]),
|
|
|
+ ZbTime: gconv.Int64(tmp["publishtime"]),
|
|
|
+ JgTime: int64(0),
|
|
|
+ // StartTime: info.Starttime,
|
|
|
+ // EndTime: info.Endtime,
|
|
|
+ Create_Time: time.Now().Unix(),
|
|
|
+ Update_Time: time.Now().Unix(),
|
|
|
+ //
|
|
|
+ // From: "bidding",
|
|
|
+ }
|
|
|
+ result := map[string]interface{}{}
|
|
|
+ infomation, _ := bson.Marshal(t)
|
|
|
+ bson.Unmarshal(infomation, &result)
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+// IncTransactionDataFromProject 增量project
|
|
|
+func IncTransactionDataFromPro() {
|
|
|
+ // endTime := GetTime(-1) //前一天凌晨
|
|
|
+ // fmt.Println("开始执行增量项目信息", ProStartTime, endTime)
|
|
|
+ // if ProStartTime >= endTime {
|
|
|
+ // fmt.Println("增量项目信息查询异常:", ProStartTime, endTime)
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "pici": map[string]interface{}{
|
|
|
+ // "$gte": ProStartTime,
|
|
|
+ "$lt": 1729440000,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fmt.Println("增量项目查询query:", query)
|
|
|
+ sess := MgoPro.GetMgoConn()
|
|
|
+ defer MgoPro.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ // lock := &sync.Mutex{}
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "projectname": 1,
|
|
|
+ "budget": 1,
|
|
|
+ "bidamount": 1,
|
|
|
+ "buyer": 1,
|
|
|
+ "s_winner": 1,
|
|
|
+ "agency": 1,
|
|
|
+ "property_form": 1,
|
|
|
+ "multipackage": 1,
|
|
|
+ "area": 1,
|
|
|
+ "city": 1,
|
|
|
+ "district": 1,
|
|
|
+ "zbtime": 1,
|
|
|
+ "jgtime": 1,
|
|
|
+ "bidstatus": 1,
|
|
|
+ "buyerclass": 1,
|
|
|
+ "s_topscopeclass": 1,
|
|
|
+ //
|
|
|
+ "firsttime": 1,
|
|
|
+ "pici": 1,
|
|
|
+ "ids": 1,
|
|
|
+ "sourceinfoid": 1,
|
|
|
+ "tag_subinformation": 1,
|
|
|
+ "tag_subinformation_ai": 1,
|
|
|
+ "tag_topinformation": 1,
|
|
|
+ "tag_topinformation_ai": 1,
|
|
|
+ }
|
|
|
+ // arr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Sort("-_id").Iter()
|
|
|
+ n := 0
|
|
|
+ // count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ // if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ bidstatus := gconv.String(tmp["bidstatus"])
|
|
|
+ if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "招标" {
|
|
|
+ result := DealTransactionForPro(tmp)
|
|
|
+ project_id := gconv.String(result["project_id"])
|
|
|
+ if !SaveDataToEs(result) { //保存、更新es
|
|
|
+ fmt.Println("数据保存es失败,项目project_id", result["project_id"])
|
|
|
+ }
|
|
|
+ err := SaveDataToClickHouse(result)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("clickhouse保存失败", project_id, result)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // count := FindClickHouseByProjectId(project_id) //查询
|
|
|
+ // if count > 0 { //更新
|
|
|
+ // delete(result, "create_time") //不更新创建时间
|
|
|
+ // delete(result, "project_id") //不更新项目id(主键)
|
|
|
+ // err = UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id})
|
|
|
+ // if err != nil {
|
|
|
+ // fmt.Println("clickhouse更新失败", project_id, data)
|
|
|
+ // }
|
|
|
+ // } else { //插入
|
|
|
+
|
|
|
+ // }
|
|
|
+ // lock.Lock()
|
|
|
+ // if len(result) > 0 {
|
|
|
+ // count++
|
|
|
+ // update := []map[string]interface{}{
|
|
|
+ // {"project_id": mongodb.BsonIdToSId(tmp["_id"])},
|
|
|
+ // {"$set": result},
|
|
|
+ // }
|
|
|
+ // arr = append(arr, update)
|
|
|
+ // }
|
|
|
+ // if len(arr) > 50 {
|
|
|
+ // MgoPro.UpSertBulk("projectset_wy_back", arr...)
|
|
|
+ // arr = [][]map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ fmt.Println("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ // if len(arr) > 0 {
|
|
|
+ // MgoPro.UpSertBulk("projectset_wy_back", arr...)
|
|
|
+ // arr = [][]map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // fmt.Println("执行增量项目信息完毕", ProStartTime, endTime, count)
|
|
|
+ // ProStartTime = endTime //替换
|
|
|
+}
|
|
|
+
|
|
|
+// DealTransactionForPro project数据处理
|
|
|
+func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
|
|
|
+ //基本信息封装
|
|
|
+ id := mongodb.BsonIdToSId(data["_id"])
|
|
|
+ buyerclass := gconv.String(data["buyerclass"])
|
|
|
+ buyer := gconv.String(data["buyer"])
|
|
|
+ winner := gconv.String(data["s_winner"])
|
|
|
+ agency := gconv.String(data["agency"])
|
|
|
+ zbtime := gconv.Int64(data["zbtime"])
|
|
|
+ if zbtime == 0 {
|
|
|
+ zbtime = gconv.Int64(data["firsttime"])
|
|
|
+ }
|
|
|
+ property_form := []string{}
|
|
|
+ if data["property_form"] != nil {
|
|
|
+ property_form = gconv.Strings(data["property_form"])
|
|
|
+ }
|
|
|
+ bidamount := gconv.Float64(data["bidamount"])
|
|
|
+ budget := gconv.Float64(data["budget"])
|
|
|
+ money := bidamount
|
|
|
+ if money <= 0 {
|
|
|
+ money = budget
|
|
|
+ }
|
|
|
+
|
|
|
+ //物业分类
|
|
|
+ subclass := []string{}
|
|
|
+ if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil {
|
|
|
+ subclass = gconv.Strings(tag_subinformation)
|
|
|
+ } else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil {
|
|
|
+ subclass = gconv.Strings(tag_subinformation_ai)
|
|
|
+ }
|
|
|
+
|
|
|
+ //项目状态、商机类型
|
|
|
+ business_type := ""
|
|
|
+ project_bidstatus := 2
|
|
|
+ bidstatus := gconv.String(data["bidstatus"])
|
|
|
+ if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" {
|
|
|
+ project_bidstatus = 1
|
|
|
+ business_type = "合约到期项目"
|
|
|
+ } else if bidstatus == "废标" || bidstatus == "流标" {
|
|
|
+ project_bidstatus = 0
|
|
|
+ } else if bidstatus == "拟建" {
|
|
|
+ project_bidstatus = 4
|
|
|
+ } else if bidstatus == "招标" {
|
|
|
+ business_type = "招标项目"
|
|
|
+ }
|
|
|
+ //查询情报信息
|
|
|
+ ids := gconv.Strings(data["ids"])
|
|
|
+ // info := FindInfomationData(ids...) //情报信息查询
|
|
|
+ topscopeclass := []string{}
|
|
|
+ s_topscopeclass := gconv.String(data["s_topscopeclass"])
|
|
|
+ if s_topscopeclass != "" {
|
|
|
+ topscopeclass = strings.Split(s_topscopeclass, ",")
|
|
|
+ }
|
|
|
+ //查询法人信息
|
|
|
+ winners := []string{}
|
|
|
+ if winner != "" {
|
|
|
+ winners = strings.Split(winner, ",")
|
|
|
+ }
|
|
|
+ buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
|
|
|
+ //物业信息
|
|
|
+ t := &Transaction{
|
|
|
+ Project_Id: id,
|
|
|
+ Project_Name: gconv.String(data["projectname"]),
|
|
|
+ Project_Budget: budget,
|
|
|
+ Project_Bidamount: bidamount,
|
|
|
+ Project_Money: money,
|
|
|
+ Business_Type: business_type,
|
|
|
+ Project_Bidstatus: project_bidstatus,
|
|
|
+ Info_Id: gconv.String(data["sourceinfoid"]),
|
|
|
+ Info_Ids: ids,
|
|
|
+ // Information_Id: info.Id,
|
|
|
+ BuyerClass: buyerclass,
|
|
|
+ Buyer: buyer,
|
|
|
+ Topscopeclass: topscopeclass,
|
|
|
+ Winner: winners,
|
|
|
+ Agency: agency,
|
|
|
+ Buyer_Id: buyer_id,
|
|
|
+ Winner_Id: winner_ids,
|
|
|
+ Agency_Id: agency_id,
|
|
|
+ Property_Form: property_form,
|
|
|
+ SubClass: subclass,
|
|
|
+ MultiPackage: gconv.Int(data["multipackage"]),
|
|
|
+ Area: gconv.String(data["area"]),
|
|
|
+ City: gconv.String(data["city"]),
|
|
|
+ District: gconv.String(data["district"]),
|
|
|
+ ZbTime: zbtime,
|
|
|
+ JgTime: gconv.Int64(data["jgtime"]),
|
|
|
+ // StartTime: info.Starttime,
|
|
|
+ // EndTime: info.Endtime,
|
|
|
+ Create_Time: time.Now().Unix(),
|
|
|
+ Update_Time: time.Now().Unix(),
|
|
|
+ //
|
|
|
+ // From: "project",
|
|
|
+ }
|
|
|
+ result := map[string]interface{}{}
|
|
|
+ infomation, _ := bson.Marshal(t)
|
|
|
+ bson.Unmarshal(infomation, &result)
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+// IncTransactionDataMgoToCkhAndEs 数据迁移
|
|
|
+func IncTransactionDataMgoToCkhAndEs() {
|
|
|
+ /*
|
|
|
+ 数据根据update_time查询
|
|
|
+ 1、采购意向数据(from=bidding)只插入
|
|
|
+ 2、项目信息先查,有则更新,无则插入
|
|
|
+ */
|
|
|
+ fmt.Println("开始执行迁移...")
|
|
|
+ sess := MgoPro.GetMgoConn()
|
|
|
+ defer MgoPro.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 1)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "update_time": map[string]interface{}{
|
|
|
+ "$gte": GetTime(0),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ from := gconv.String(tmp["from"])
|
|
|
+ delete(tmp, "from") //无用字段删除
|
|
|
+ delete(tmp, "_id") //无用字段删除
|
|
|
+ if !SaveDataToEs(tmp) { //保存、更新es
|
|
|
+ fmt.Println("数据保存es失败,项目project_id", tmp["project_id"])
|
|
|
+ }
|
|
|
+ if from == "bidding" { //采购意向、拟建,插入
|
|
|
+ SaveDataToClickHouse(tmp)
|
|
|
+ } else { //项目信息,更新,插入
|
|
|
+ UpdateOrSaveDataToClickHouse(tmp)
|
|
|
+ }
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ fmt.Println("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ fmt.Println("迁移结束...")
|
|
|
+}
|
|
|
+
|
|
|
+type Infomation struct {
|
|
|
+ Id string
|
|
|
+ Starttime int64
|
|
|
+ Endtime int64
|
|
|
+}
|
|
|
+
|
|
|
+// FindInfomationData 情报信息查询
|
|
|
+func FindInfomationData(ids ...string) (info Infomation) {
|
|
|
+ for _, id := range ids {
|
|
|
+ query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
|
|
|
+ rows, err := CkhTool.Query(context.Background(), query, id)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for rows.Next() {
|
|
|
+ info = Infomation{}
|
|
|
+ if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil {
|
|
|
+ fmt.Println("查询情报信息异常:", id, err)
|
|
|
+ }
|
|
|
+ if info.Id != "" {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //break //目前只有一条结果
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// FindEntInfoData 法人信息查询
|
|
|
+func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
|
|
|
+ winner_ids = []string{}
|
|
|
+ winnerMap := map[string]bool{} //记录所有中标单位
|
|
|
+ values := []interface{}{}
|
|
|
+ placeholders := []string{}
|
|
|
+ if buyer != "" {
|
|
|
+ placeholders = append(placeholders, "?")
|
|
|
+ values = append(values, buyer)
|
|
|
+ }
|
|
|
+ if len(winners) > 0 {
|
|
|
+ for _, w := range winners {
|
|
|
+ winnerMap[w] = true
|
|
|
+ placeholders = append(placeholders, "?")
|
|
|
+ values = append(values, w)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if agency != "" {
|
|
|
+ placeholders = append(placeholders, "?")
|
|
|
+ values = append(values, agency)
|
|
|
+ }
|
|
|
+ if len(values) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ","))
|
|
|
+ rows, err := CkhTool.Query(context.Background(), query, values...)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for rows.Next() {
|
|
|
+ var id, company_name string
|
|
|
+ if err := rows.Scan(&id, &company_name); err == nil {
|
|
|
+ if company_name == buyer {
|
|
|
+ buyer_id = id
|
|
|
+ } else if company_name == agency {
|
|
|
+ agency_id = id
|
|
|
+ } else if winnerMap[company_name] {
|
|
|
+ winner_ids = append(winner_ids, id)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fmt.Println("查询法人信息异常:", err, bid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
|
|
|
+func UpdateOrSaveDataToClickHouse(data map[string]interface{}) (err error) {
|
|
|
+ project_id := gconv.String(data["project_id"])
|
|
|
+ count := FindClickHouseByProjectId(project_id) //查询
|
|
|
+ if count > 0 { //更新
|
|
|
+ delete(data, "create_time") //不更新创建时间
|
|
|
+ delete(data, "project_id") //不更新项目id(主键)
|
|
|
+ err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("clickhouse更新失败", project_id, data)
|
|
|
+ }
|
|
|
+ } else { //插入
|
|
|
+ err = SaveDataToClickHouse(data)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("clickhouse保存失败", project_id, data)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SaveDataToClickHouse 数据保存clickhouse
|
|
|
+func SaveDataToClickHouse(data map[string]interface{}) error {
|
|
|
+ fields, placeholders := []string{}, []string{}
|
|
|
+ values := []interface{}{}
|
|
|
+ for k, v := range data {
|
|
|
+ fields = append(fields, k)
|
|
|
+ values = append(values, v)
|
|
|
+ placeholders = append(placeholders, "?")
|
|
|
+ }
|
|
|
+ query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(fields, ","), strings.Join(placeholders, ","))
|
|
|
+ return CkhTool.Exec(context.Background(), query, values...)
|
|
|
+}
|
|
|
+
|
|
|
+// FindClickHouseByProjectId 根据条件count clickhouse
|
|
|
+func FindClickHouseByProjectId(project_id string) int {
|
|
|
+ query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info_all")
|
|
|
+ row := CkhTool.QueryRow(context.Background(), query, project_id)
|
|
|
+ var count uint64
|
|
|
+ row.Scan(&count)
|
|
|
+ return gconv.Int(count)
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateDataToClickHouse 数据更新clickhouse
|
|
|
+func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
|
|
|
+ sets := []string{}
|
|
|
+ values := []interface{}{}
|
|
|
+ for k, v := range data {
|
|
|
+ sets = append(sets, fmt.Sprintf("%s=?", k))
|
|
|
+ values = append(values, v)
|
|
|
+ }
|
|
|
+ qs := []string{}
|
|
|
+ for k, v := range querys {
|
|
|
+ qs = append(qs, fmt.Sprintf("%s=?", k))
|
|
|
+ values = append(values, v)
|
|
|
+ }
|
|
|
+ query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info", strings.Join(sets, ","), strings.Join(qs, ","))
|
|
|
+ //query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
|
|
|
+ return CkhTool.Exec(context.Background(), query, values...)
|
|
|
+}
|
|
|
+
|
|
|
+// SaveDataToEs es存储
|
|
|
+func SaveDataToEs(data map[string]interface{}) bool {
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ for k, v := range data {
|
|
|
+ if k == "project_id" {
|
|
|
+ k = "_id"
|
|
|
+ } else if k == "winner" || k == "winner_id" { //winner和winner_id无值不进es
|
|
|
+ if len(gconv.Strings(v)) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tmp[k] = v
|
|
|
+ }
|
|
|
+ err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
|
|
|
+ if err == nil && len(result) > 0 { //存在,更新
|
|
|
+ tmp["create_time"] = result["create_time"] //不更新create_time
|
|
|
+ }
|
|
|
+ return Es.Save(Config.Es.Index, tmp)
|
|
|
+}
|
|
|
+
|
|
|
+func FindEntInfoData2(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
|
|
|
+ query := fmt.Sprintf(`SELECT id FROM %s WHERE company_name = ?`, Config.ClickHouse.DataBase+".ent_info")
|
|
|
+ if buyer != "" {
|
|
|
+ buyer_id = GetClickHouseData(bid, query, buyer)
|
|
|
+ }
|
|
|
+ if agency != "" {
|
|
|
+ agency_id = GetClickHouseData(bid, query, agency)
|
|
|
+ }
|
|
|
+ if len(winners) > 0 {
|
|
|
+ for _, w := range winners {
|
|
|
+ winner_id := GetClickHouseData(bid, query, w)
|
|
|
+ if winner_id != "" {
|
|
|
+ winner_ids = append(winner_ids, winner_id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func GetClickHouseData(bid, query, value string) string {
|
|
|
+ rows, err := CkhTool.Query(context.Background(), query, value)
|
|
|
+ if err != nil {
|
|
|
+ return ""
|
|
|
+ }
|
|
|
+ for rows.Next() {
|
|
|
+ var id string
|
|
|
+ if err := rows.Scan(&id); err == nil {
|
|
|
+ return id
|
|
|
+ } else {
|
|
|
+ fmt.Println("查询情报信息异常:", err, bid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ""
|
|
|
+}
|
|
|
+
|
|
|
+/*// SaveTransactionData 保存增量物业信息
|
|
|
+func SaveTransactionData() {
|
|
|
+ fmt.Println("save projectset_wy...")
|
|
|
+ savearr := make([]map[string]interface{}, 100)
|
|
|
+ indexdb := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-TransactionSaveCache:
|
|
|
+ savearr[indexdb] = v
|
|
|
+ indexdb++
|
|
|
+ if indexdb == 100 {
|
|
|
+ Transaction_Ch <- true
|
|
|
+ go func(tmp []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-Transaction_Ch
|
|
|
+ }()
|
|
|
+ MgoPro.SaveBulk("projectset_wy", tmp...)
|
|
|
+ }(savearr)
|
|
|
+ savearr = make([]map[string]interface{}, 100)
|
|
|
+ indexdb = 0
|
|
|
+ }
|
|
|
+ case <-time.After(30 * time.Second):
|
|
|
+ if indexdb > 0 {
|
|
|
+ Transaction_Ch <- true
|
|
|
+ go func(tmp []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-Transaction_Ch
|
|
|
+ }()
|
|
|
+ MgoPro.SaveBulk("projectset_wy", tmp...)
|
|
|
+ }(savearr[:indexdb])
|
|
|
+ savearr = make([]map[string]interface{}, 100)
|
|
|
+ indexdb = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}*/
|