package main import ( "fmt" "github.com/gogf/gf/v2/util/gconv" "github.com/robfig/cron" "strings" "sync" ) func init() { ReadConfig(&Config) //初始化 InitMgo() //mgo InitCkh() //clickhouse InitEs() //es InitOther() } func main() { c := cron.New() //增量 c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据 c.Start() //历史 //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息 //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息 //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息 //临时处理(信息补充) //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form //IncTransactionDataMgoToCkhAndEs() //数据迁移 ch := make(chan bool) <-ch } func tmp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "project_bidstatus": 4, //"_id": map[string]interface{}{ // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"), //}, } count := MgoPro.Count("projectset_wy", query) fmt.Println("count:", count) it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter() n := 0 //arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() info_id := gconv.String(tmp["info_id"]) data, _ := MgoB.FindById("bidding", info_id, map[string]interface{}{"s_topscopeclass": 1}) s_topscopeclass := gconv.String((*data)["s_topscopeclass"]) if !strings.Contains(s_topscopeclass, "建筑工程") { MgoPro.Del("projectset_wy", map[string]interface{}{"_id": tmp["_id"]}) } //update := []map[string]interface{}{ // {"_id": tmp["_id"]}, //} //set := map[string]interface{}{} //info_id := gconv.String(tmp["info_id"]) //data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1}) //if gconv.String((*data)["bidstatus"]) == "拟建" { // set["project_bidstatus"] = 4 //} else { // return //} //update = append(update, map[string]interface{}{"$set": set}) // lock.Lock() //arr = append(arr, update) //if len(arr) > 500 { // MgoPro.UpdateBulk("projectset_wy_back", arr...) // arr = [][]map[string]interface{}{} //} lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() //if len(arr) > 0 { // MgoPro.UpdateBulk("projectset_wy_back", arr...) // arr = [][]map[string]interface{}{} //} fmt.Println("迁移结束...") }