package main import ( "fmt" "github.com/robfig/cron" "sync" ) func init() { ReadConfig(&Config) //初始化 InitMgo() //mgo InitCkh() //clickhouse InitEs() //es InitOther() } func main() { c := cron.New() //增量 c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据 c.Start() //历史 //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息 //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息 //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息 //临时处理(信息补充) //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form //IncTransactionDataMgoToCkhAndEs() //数据迁移 ch := make(chan bool) <-ch } func tmp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} //lock := &sync.Mutex{} query := map[string]interface{}{ //"project_bidstatus": 4, //"_id": map[string]interface{}{ // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"), //}, //"update_time": map[string]interface{}{ // "$lt": 1714959573, //}, //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"), "repeat": true, } repeat := map[string]bool{} count := MgoPro.Count("projectset_wy_back", query) fmt.Println("count:", count) it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter() n := 0 //arr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //update := []map[string]interface{}{} //project_id := gconv.String(tmp["project_id"]) //lock.Lock() //if !repeat[project_id] { // Es.DelById(Config.Es.Index, project_id) // CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id) // repeat[project_id] = true //} //lock.Unlock() //err, result := Es.GetById(Config.Es.Index, project_id) //Es.DelById() //if err != nil || len(result) == 0 { // fmt.Println(project_id) // update = append(update, map[string]interface{}{"_id": tmp["_id"]}) // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}}) //} else { // if gconv.Int(result["project_bidstatus"]) != 0 { // fmt.Println("11", project_id) // } //} if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 { fmt.Println("project_id") //update = append(update, map[string]interface{}{"_id": tmp["_id"]}) //update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}}) } //if len(update) > 0 { // lock.Lock() // arr = append(arr, update) // if len(arr) > 500 { // MgoPro.UpdateBulk("projectset_wy_back", arr...) // arr = [][]map[string]interface{}{} // } // lock.Unlock() //} }(tmp) if n%1000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() //if len(arr) > 0 { // MgoPro.SaveBulk("projectset_wy_tmp2", arr...) // arr = []map[string]interface{}{} //} fmt.Println("迁移结束...", len(repeat)) }