package main import ( "fmt" "github.com/gogf/gf/v2/util/gconv" "github.com/robfig/cron" "sync" ) func init() { ReadConfig(&Config) //初始化 InitMgo() //mgo InitCkh() //clickhouse InitEs() //es InitOther() } func main() { c := cron.New() //增量 c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据 c.Start() //历史 //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息 //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息 //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息 //临时处理(信息补充) //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form //IncTransactionDataMgoToCkhAndEs() //数据迁移 ch := make(chan bool) <-ch } func tmp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "project_bidstatus": 2, } it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter() n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := []map[string]interface{}{ {"_id": tmp["_id"]}, } set := map[string]interface{}{} info_id := gconv.String(tmp["info_id"]) data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1}) if gconv.String((*data)["bidstatus"]) == "拟建" { set["project_bidstatus"] = 4 } else { return } update = append(update, map[string]interface{}{"$set": set}) lock.Lock() arr = append(arr, update) if len(arr) > 500 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%10000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() if len(arr) > 0 { MgoPro.UpdateBulk("projectset_wy_back", arr...) arr = [][]map[string]interface{}{} } fmt.Println("迁移结束...") }