package main import ( "fmt" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" ) // HisTransactionDataFromBid 历史bidding(指定截止comeintime) func HisTransactionDataFromBid() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) ch := make(chan bool, 10) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "toptype": "采购意向", } fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, // "publishtime": 1, "comeintime": 1, "extracttype": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } arr := []map[string]interface{}{} it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800 return } if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤 return } if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤 return } result := DealTransactionForBid(tmp) lock.Lock() if len(result) > 0 { arr = append(arr, result) } if len(arr) > 50 { MgoPro.SaveBulk("projectset_wy", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.SaveBulk("projectset_wy", arr...) arr = []map[string]interface{}{} } fmt.Println("结束") } // HisTransactionDataFromProject 历史project(指定截止pici:1713196800) func HisTransactionDataFromProject() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "pici": map[string]interface{}{ "$lt": 1713196800, //"$gt": 1711900800, }, } fields := map[string]interface{}{ "projectname": 1, "budget": 1, "bidamount": 1, "buyer": 1, "s_winner": 1, "agency": 1, "property_form": 1, "multipackage": 1, "area": 1, "city": 1, "district": 1, "zbtime": 1, "jgtime": 1, "bidstatus": 1, // "firsttime": 1, "ids": 1, "pici": 1, "sourceinfoid": 1, "tag_subinformation": 1, "tag_subinformation_ai": 1, "tag_topinformation": 1, "tag_topinformation_ai": 1, } arr := []map[string]interface{}{} it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤 return } result := DealTransactionForPro(tmp) lock.Lock() if len(result) > 0 { arr = append(arr, result) } if len(arr) > 50 { MgoPro.SaveBulk("projectset_wy_back", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.SaveBulk("projectset_wy_back", arr...) arr = []map[string]interface{}{} } fmt.Println("结束") } // HisTransactionDataAddInformation 补充字段信息 func HisTransactionDataAddInformation() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{} it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter() n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) update := []map[string]interface{}{ {"_id": tmp["_id"]}, } set := map[string]interface{}{} //法人信息 buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"])) set["buyer_id"] = buyer_id set["agency_id"] = agency_id set["winner_ids"] = winner_ids //项目信息补充业态 if from := gconv.String(tmp["from"]); from == "project" { project_id := gconv.String(tmp["project_id"]) pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1}) if len(*pro) > 0 && (*pro)["property_form"] != nil { set["property_form"] = (*pro)["property_form"] } } update = append(update, map[string]interface{}{"$set": set}) lock.Lock() arr = append(arr, update) if len(arr) > 100 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%100 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } fmt.Println("迁移结束...") }