package main import ( "context" "fmt" "regexp" "strings" "sync" "time" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) type Transaction struct { Project_Id string `bson:"project_id"` Project_Name string `bson:"project_name"` Project_Budget float64 `bson:"project_budget"` Project_Bidamount float64 `bson:"project_bidamount"` Project_Money float64 `bson:"project_money"` Business_Type string `bson:"business_type"` Project_Bidstatus int `bson:"project_bidstatus"` Info_Id string `bson:"info_id"` Info_Ids []string `bson:"info_ids"` Information_Id string `bson:"information_id"` BuyerClass string `bson:"buyerclass"` Buyer string `bson:"buyer"` Buyer_Id string `bson:"buyer_id"` Winner []string `bson:"winner"` Winner_Id []string `bson:"winner_id"` Agency string `bson:"agency"` Agency_Id string `bson:"agency_id"` Property_Form []string `bson:"property_form"` SubClass []string `bson:"subclass"` MultiPackage int `bson:"multipackage"` Topscopeclass []string `bson:"topscopeclass"` Area string `bson:"area"` City string `bson:"city"` District string `bson:"district"` ZbTime int64 `bson:"zbtime"` JgTime int64 `bson:"jgtime"` StartTime int64 `bson:"starttime"` EndTime int64 `bson:"endtime"` Create_Time int64 `bson:"create_time"` Update_Time int64 `bson:"update_time"` // // From string `bson:"from"` } var regLetter = regexp.MustCompile("[a-z]*") func IncTransactionDataFromBidAndPro() { // IncTransactionDataFromBid() //bidding IncTransactionDataFromPro() //project // IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse } // IncTransactionDataFromBid 增量bidding func IncTransactionDataFromBid() { stime := time.Now().AddDate(0, 0, -1) BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix() fmt.Println("开始执行增量采购意向、拟建信息") query := map[string]interface{}{ "pici": map[string]interface{}{ "$gte": BidStartTime, "$lt": BidStartTime + 86400, }, } fmt.Println("增量bidding采购意向query:", query) sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} // lock := &sync.Mutex{} fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, "buyerclass": 1, // "owner": 1, "s_topscopeclass": 1, "publishtime": 1, "toptype": 1, "extracttype": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } // arr := []map[string]interface{}{} it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Sort("-_id").Iter() n := 0 // count := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤 return } toptype := gconv.String(tmp["toptype"]) // tag_topinformation := gconv.String(tmp["tag_topinformation"]) // tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]) var business_type string var project_bidstatus int if toptype == "采购意向" { //采购意向数据 // if !strings.Contains(tag_topinformation, "物业") && !strings.Contains(tag_topinformation_ai, "物业") { // return // } business_type = "采购意向" project_bidstatus = 3 } else if toptype == "拟建" { s_topscopeclass := gconv.String(tmp["s_topscopeclass"]) // if !strings.Contains(s_topscopeclass, "建筑工程") || strings.Contains(tag_topinformation, "物业") || strings.Contains(tag_topinformation_ai, "物业") { if !strings.Contains(s_topscopeclass, "建筑工程") { return } business_type = "新增物业项目" project_bidstatus = 4 } else { return } result := DealTransactionForBid(tmp, business_type, project_bidstatus) if !SaveDataToEs(result) { //保存、更新es fmt.Println("数据保存es失败,项目project_id", result["project_id"]) } SaveDataToClickHouse(result) }(tmp) if n%1000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() fmt.Println("增量采购意向、拟建信息结束") } // DealTransactionForBid bidding采购意向、拟建数据处理 func DealTransactionForBid(tmp map[string]interface{}, business_type string, project_bidstatus int) map[string]interface{} { //基本信息封装 id := mongodb.BsonIdToSId(tmp["_id"]) buyerclass := gconv.String(tmp["buyerclass"]) buyer := gconv.String(tmp["buyer"]) if buyer == "" { buyer = gconv.String(tmp["owner"]) } winner := gconv.String(tmp["s_winner"]) agency := gconv.String(tmp["agency"]) property_form := []string{} if tmp["property_form"] != nil { property_form = gconv.Strings(tmp["property_form"]) } bidamount := gconv.Float64(tmp["bidamount"]) budget := gconv.Float64(tmp["budget"]) money := bidamount if money <= 0 { money = budget } //物业分类 subclass := []string{} if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil { subclass = gconv.Strings(tag_subinformation) } else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil { subclass = gconv.Strings(tag_subinformation_ai) } //情报信息查询 // info := FindInfomationData(id) topscopeclass := []string{} s_topscopeclass := gconv.String(tmp["s_topscopeclass"]) if s_topscopeclass != "" { topscopeclass = strings.Split(s_topscopeclass, ",") } //法人信息 winners := []string{} if winner != "" { winners = strings.Split(winner, ",") } buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners) //物业信息 t := &Transaction{ Project_Id: id, Project_Name: gconv.String(tmp["projectname"]), Project_Budget: budget, Project_Bidamount: bidamount, Project_Money: money, Business_Type: business_type, Project_Bidstatus: project_bidstatus, Info_Id: id, Info_Ids: []string{id}, // Information_Id: info.Id, BuyerClass: buyerclass, Buyer: buyer, Topscopeclass: topscopeclass, Winner: winners, Agency: agency, Buyer_Id: buyer_id, Winner_Id: winner_ids, Agency_Id: agency_id, Property_Form: property_form, SubClass: subclass, MultiPackage: gconv.Int(tmp["multipackage"]), Area: gconv.String(tmp["area"]), City: gconv.String(tmp["city"]), District: gconv.String(tmp["district"]), ZbTime: gconv.Int64(tmp["publishtime"]), JgTime: int64(0), // StartTime: info.Starttime, // EndTime: info.Endtime, Create_Time: time.Now().Unix(), Update_Time: time.Now().Unix(), // // From: "bidding", } result := map[string]interface{}{} infomation, _ := bson.Marshal(t) bson.Unmarshal(infomation, &result) return result } // IncTransactionDataFromProject 增量project func IncTransactionDataFromPro() { stime := time.Now().AddDate(0, 0, -1) BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix() fmt.Println("开始执行增量项目信息") query := map[string]interface{}{ "pici": map[string]interface{}{ "$gte": BidStartTime, "$lt": BidStartTime + 86400, }, } fmt.Println("增量项目查询query:", query) sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} // lock := &sync.Mutex{} fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, "zbtime": 1, "jgtime": 1, "bidstatus": 1, "buyerclass": 1, "topscopeclass": 1, // "firsttime": 1, "pici": 1, "ids": 1, "sourceinfoid": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() bidstatus := gconv.String(tmp["bidstatus"]) if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "招标" { result := DealTransactionForPro(tmp) project_id := gconv.String(result["project_id"]) if !SaveDataToEs(result) { //保存、更新es fmt.Println("数据保存es失败,项目project_id", result["project_id"]) } count := FindClickHouseByProjectId(project_id) //查询 if count > 0 { //更新 delete(result, "create_time") //不更新创建时间 delete(result, "project_id") //不更新项目id(主键) err := UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id}) if err != nil { fmt.Println("clickhouse更新失败", project_id, result) } } else { //插入 err := SaveDataToClickHouse(result) if err != nil { fmt.Println("clickhouse保存失败", project_id, result) } } } }(tmp) if n%1000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() fmt.Println("增量项目信息结束") } // DealTransactionForPro project数据处理 func DealTransactionForPro(data map[string]interface{}) map[string]interface{} { //基本信息封装 id := mongodb.BsonIdToSId(data["_id"]) buyerclass := gconv.String(data["buyerclass"]) buyer := gconv.String(data["buyer"]) winner := gconv.String(data["s_winner"]) agency := gconv.String(data["agency"]) zbtime := gconv.Int64(data["zbtime"]) if zbtime == 0 { zbtime = gconv.Int64(data["firsttime"]) } property_form := []string{} if data["property_form"] != nil { property_form = gconv.Strings(data["property_form"]) } bidamount := gconv.Float64(data["bidamount"]) budget := gconv.Float64(data["budget"]) money := bidamount if money <= 0 { money = budget } //物业分类 subclass := []string{} if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil { subclass = gconv.Strings(tag_subinformation) } else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil { subclass = gconv.Strings(tag_subinformation_ai) } //项目状态、商机类型 business_type := "" project_bidstatus := 2 bidstatus := gconv.String(data["bidstatus"]) if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" { project_bidstatus = 1 business_type = "合约到期项目" } else if bidstatus == "废标" || bidstatus == "流标" { project_bidstatus = 0 } else if bidstatus == "拟建" { project_bidstatus = 4 } else if bidstatus == "招标" { business_type = "招标项目" } //查询情报信息 ids := gconv.Strings(data["ids"]) // info := FindInfomationData(ids...) //情报信息查询 topscopeclass := []string{} // s_topscopeclass := gconv.String(data["s_topscopeclass"]) // if s_topscopeclass != "" { // topscopeclass = strings.Split(s_topscopeclass, ",") // } if data["topscopeclass"] != nil { if topscopeclasss, ok := data["topscopeclass"].([]interface{}); ok { for _, v := range topscopeclasss { tclass := regLetter.ReplaceAllString(gconv.String(v), "") // 去除字母 topscopeclass = append(topscopeclass, tclass) } } } //查询法人信息 winners := []string{} if winner != "" { winners = strings.Split(winner, ",") } buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners) //物业信息 t := &Transaction{ Project_Id: id, Project_Name: gconv.String(data["projectname"]), Project_Budget: budget, Project_Bidamount: bidamount, Project_Money: money, Business_Type: business_type, Project_Bidstatus: project_bidstatus, Info_Id: gconv.String(data["sourceinfoid"]), Info_Ids: ids, // Information_Id: info.Id, BuyerClass: buyerclass, Buyer: buyer, Topscopeclass: topscopeclass, Winner: winners, Agency: agency, Buyer_Id: buyer_id, Winner_Id: winner_ids, Agency_Id: agency_id, Property_Form: property_form, SubClass: subclass, MultiPackage: gconv.Int(data["multipackage"]), Area: gconv.String(data["area"]), City: gconv.String(data["city"]), District: gconv.String(data["district"]), ZbTime: zbtime, JgTime: gconv.Int64(data["jgtime"]), // StartTime: info.Starttime, // EndTime: info.Endtime, Create_Time: time.Now().Unix(), Update_Time: time.Now().Unix(), // // From: "project", } result := map[string]interface{}{} infomation, _ := bson.Marshal(t) bson.Unmarshal(infomation, &result) return result } // IncTransactionDataMgoToCkhAndEs 数据迁移 func IncTransactionDataMgoToCkhAndEs() { /* 数据根据update_time查询 1、采购意向数据(from=bidding)只插入 2、项目信息先查,有则更新,无则插入 */ fmt.Println("开始执行迁移...") sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} query := map[string]interface{}{ "update_time": map[string]interface{}{ "$gte": GetTime(0), }, } it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() from := gconv.String(tmp["from"]) delete(tmp, "from") //无用字段删除 delete(tmp, "_id") //无用字段删除 if !SaveDataToEs(tmp) { //保存、更新es fmt.Println("数据保存es失败,项目project_id", tmp["project_id"]) } if from == "bidding" { //采购意向、拟建,插入 SaveDataToClickHouse(tmp) } else { //项目信息,更新,插入 UpdateOrSaveDataToClickHouse(tmp) } }(tmp) if n%100 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() fmt.Println("迁移结束...") } type Infomation struct { Id string Starttime int64 Endtime int64 } // FindInfomationData 情报信息查询 func FindInfomationData(ids ...string) (info Infomation) { for _, id := range ids { query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information") rows, err := CkhTool.Query(context.Background(), query, id) if err != nil { continue } for rows.Next() { info = Infomation{} if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil { fmt.Println("查询情报信息异常:", id, err) } if info.Id != "" { return } //break //目前只有一条结果 } } return } // FindEntInfoData 法人信息查询 func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) { winner_ids = []string{} winnerMap := map[string]bool{} //记录所有中标单位 values := []interface{}{} placeholders := []string{} if buyer != "" { placeholders = append(placeholders, "?") values = append(values, buyer) } if len(winners) > 0 { for _, w := range winners { winnerMap[w] = true placeholders = append(placeholders, "?") values = append(values, w) } } if agency != "" { placeholders = append(placeholders, "?") values = append(values, agency) } if len(values) == 0 { return } query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ",")) rows, err := CkhTool.Query(context.Background(), query, values...) if err != nil { return } for rows.Next() { var id, company_name string if err := rows.Scan(&id, &company_name); err == nil { if company_name == buyer { buyer_id = id } else if company_name == agency { agency_id = id } else if winnerMap[company_name] { winner_ids = append(winner_ids, id) } } else { fmt.Println("查询法人信息异常:", err, bid) } } return } // UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存 func UpdateOrSaveDataToClickHouse(data map[string]interface{}) (err error) { project_id := gconv.String(data["project_id"]) count := FindClickHouseByProjectId(project_id) //查询 if count > 0 { //更新 delete(data, "create_time") //不更新创建时间 delete(data, "project_id") //不更新项目id(主键) err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id}) if err != nil { fmt.Println("clickhouse更新失败", project_id, data) } } else { //插入 err = SaveDataToClickHouse(data) if err != nil { fmt.Println("clickhouse保存失败", project_id, data) } } return } // SaveDataToClickHouse 数据保存clickhouse func SaveDataToClickHouse(data map[string]interface{}) error { fields, placeholders := []string{}, []string{} values := []interface{}{} for k, v := range data { fields = append(fields, k) values = append(values, v) placeholders = append(placeholders, "?") } query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(fields, ","), strings.Join(placeholders, ",")) return CkhTool.Exec(context.Background(), query, values...) } // FindClickHouseByProjectId 根据条件count clickhouse func FindClickHouseByProjectId(project_id string) int { query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info_all") row := CkhTool.QueryRow(context.Background(), query, project_id) var count uint64 row.Scan(&count) return gconv.Int(count) } // UpdateDataToClickHouse 数据更新clickhouse func UpdateDataToClickHouse(data, querys map[string]interface{}) error { sets := []string{} values := []interface{}{} for k, v := range data { sets = append(sets, fmt.Sprintf("%s=?", k)) values = append(values, v) } qs := []string{} for k, v := range querys { qs = append(qs, fmt.Sprintf("%s=?", k)) values = append(values, v) } query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(sets, ","), strings.Join(qs, ",")) //query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'` return CkhTool.Exec(context.Background(), query, values...) } // SaveDataToEs es存储 func SaveDataToEs(data map[string]interface{}) bool { tmp := map[string]interface{}{} for k, v := range data { if k == "project_id" { k = "_id" } else if k == "winner" || k == "winner_id" { //winner和winner_id无值不进es if len(gconv.Strings(v)) == 0 { continue } } tmp[k] = v } err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"])) if err == nil && len(result) > 0 { //存在,更新 tmp["create_time"] = result["create_time"] //不更新create_time } return Es.Save(Config.Es.Index, tmp) } func FindEntInfoData2(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) { query := fmt.Sprintf(`SELECT id FROM %s WHERE company_name = ?`, Config.ClickHouse.DataBase+".ent_info") if buyer != "" { buyer_id = GetClickHouseData(bid, query, buyer) } if agency != "" { agency_id = GetClickHouseData(bid, query, agency) } if len(winners) > 0 { for _, w := range winners { winner_id := GetClickHouseData(bid, query, w) if winner_id != "" { winner_ids = append(winner_ids, winner_id) } } } return } func GetClickHouseData(bid, query, value string) string { rows, err := CkhTool.Query(context.Background(), query, value) if err != nil { return "" } for rows.Next() { var id string if err := rows.Scan(&id); err == nil { return id } else { fmt.Println("查询情报信息异常:", err, bid) } } return "" } /*// SaveTransactionData 保存增量物业信息 func SaveTransactionData() { fmt.Println("save projectset_wy...") savearr := make([]map[string]interface{}, 100) indexdb := 0 for { select { case v := <-TransactionSaveCache: savearr[indexdb] = v indexdb++ if indexdb == 100 { Transaction_Ch <- true go func(tmp []map[string]interface{}) { defer func() { <-Transaction_Ch }() MgoPro.SaveBulk("projectset_wy", tmp...) }(savearr) savearr = make([]map[string]interface{}, 100) indexdb = 0 } case <-time.After(30 * time.Second): if indexdb > 0 { Transaction_Ch <- true go func(tmp []map[string]interface{}) { defer func() { <-Transaction_Ch }() MgoPro.SaveBulk("projectset_wy", tmp...) }(savearr[:indexdb]) savearr = make([]map[string]interface{}, 100) indexdb = 0 } } } }*/