package main import ( "fmt" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" "sync" ) // HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向) func HisTransactionDataFromBid() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "toptype": "采购意向", } fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, // "publishtime": 1, "comeintime": 1, "extracttype": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } arr := []map[string]interface{}{} it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800 return } if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤 return } if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤 return } result := DealTransactionForBid(tmp, "采购意向", 3) lock.Lock() if len(result) > 0 { arr = append(arr, result) } if len(arr) > 50 { MgoPro.SaveBulk("projectset_wy", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.SaveBulk("projectset_wy", arr...) arr = []map[string]interface{}{} } fmt.Println("结束") } // HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增物业项目) func HisTransactionDataFromBid2() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1713715200, "$lt": 1713801600, }, "toptype": "拟建", } fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, // "owner": 1, "s_topscopeclass": 1, "publishtime": 1, "toptype": 1, "comeintime": 1, "extracttype": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } arr := []map[string]interface{}{} it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //comeintime := gconv.Int64(tmp["comeintime"]) //if comeintime < 1609430400 || comeintime >= 1713715200 { // return //} if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤 return } if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程 return } if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业 return } else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") { return } //if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业 // return //} project_bidstatus := 4 //拟建 business_type := "新增物业项目" result := DealTransactionForBid(tmp, business_type, project_bidstatus) lock.Lock() if len(result) > 0 { arr = append(arr, result) } if len(arr) > 50 { MgoPro.SaveBulk("projectset_wy_nj", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.SaveBulk("projectset_wy_nj", arr...) arr = []map[string]interface{}{} } fmt.Println("结束") } // HisTransactionDataFromProject 历史project(指定截止pici:1713196800) func HisTransactionDataFromProject() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "pici": map[string]interface{}{ "$lt": 1713196800, //"$gt": 1711900800, }, } fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, "zbtime": 1, "jgtime": 1, "bidstatus": 1, // "firsttime": 1, "ids": 1, "pici": 1, "sourceinfoid": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } arr := []map[string]interface{}{} it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤 return } result := DealTransactionForPro(tmp) lock.Lock() if len(result) > 0 { arr = append(arr, result) } if len(arr) > 50 { MgoPro.SaveBulk("projectset_wy_newback", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.SaveBulk("projectset_wy_newback", arr...) arr = []map[string]interface{}{} } fmt.Println("结束") } // HisTransactionDataAddInformation 补充字段信息 func HisTransactionDataAddInformation() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ //"_id": mongodb.StringTOBsonId("662f01d8397fa006e2e75e6c"), //项目 //"_id": map[string]interface{}{ // "$gte": mongodb.StringTOBsonId("66308fa06f6c86a3960ae83f"), // "$lte": mongodb.StringTOBsonId("66308feb6f6c86a3960b0f4e"), //}, //拟建 //"project_bidstatus": 4, //"_id": map[string]interface{}{ // "$lte": mongodb.StringTOBsonId("6627227819c5408c474c3802"), //}, //采购意向 //"project_bidstatus": 3, //"_id": map[string]interface{}{ // "$lte": mongodb.StringTOBsonId("661f798b5a4e6cc01349dad0"), //}, //历史projectset_wy //"project_id": map[string]interface{}{ // //"$gt": "662143800000000000000000", // "$gt": "667c3b5166cf0db42ae965e6", //}, "project_id": "6637ae0866cf0db42aeeb5d4", //历史projectset_wy_back //"update_time": map[string]interface{}{ // "$gte": 1714959573, // "$lte": 1719795791, //}, } count := MgoPro.Count("projectset_wy_back", query) fmt.Println("count:", count) it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter() n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) update := []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, } set := map[string]interface{}{} //法人信息 buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"])) //更新 set["buyer_id"] = buyer_id set["agency_id"] = agency_id set["winner_id"] = winner_ids //保存 tmp["buyer_id"] = buyer_id tmp["agency_id"] = agency_id tmp["winner_id"] = winner_ids if from := gconv.String(tmp["from"]); from == "project" { //项目信息补充业态 //project_id := gconv.String(tmp["project_id"]) //pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1}) //if len(*pro) > 0 && (*pro)["property_form"] != nil { // //更新 // set["property_form"] = (*pro)["property_form"] // //保存 // tmp["property_form"] = (*pro)["property_form"] //} //查询情报信息 ids := gconv.Strings(tmp["info_ids"]) info := FindInfomationData(ids...) //情报信息查询 //更新 set["information_id"] = info.Id set["starttime"] = info.Starttime set["endtime"] = info.Endtime //保存 tmp["information_id"] = info.Id tmp["starttime"] = info.Starttime tmp["endtime"] = info.Endtime } else { if project_bidstatus := gconv.Int(tmp["project_bidstatus"]); project_bidstatus == 4 { //拟建新增物业项目,补充情报信息 //查询情报信息 id := gconv.String(tmp["info_id"]) info := FindInfomationData(id) //情报信息查询 //更新 set["information_id"] = info.Id set["starttime"] = info.Starttime set["endtime"] = info.Endtime //保存 tmp["information_id"] = info.Id tmp["starttime"] = info.Starttime tmp["endtime"] = info.Endtime } } delete(tmp, "from") //无用字段删除 delete(tmp, "_id") //无用字段删除 if !SaveDataToEs(tmp) { //保存、更新es fmt.Println("数据保存es失败,数据类型 项目project_id", tmp["project_id"]) } var err error err = UpdateOrSaveDataToClickHouse(tmp) //保存、更新clickhouse if err != nil { fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err) } //更新 update = append(update, map[string]interface{}{"$set": set}) lock.Lock() arr = append(arr, update) if len(arr) > 100 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%100 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } fmt.Println("迁移结束...") }