123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package main
- import (
- "fmt"
- "github.com/robfig/cron"
- "sync"
- )
- func init() {
- ReadConfig(&Config) //初始化
- InitMgo() //mgo
- InitCkh() //clickhouse
- InitEs() //es
- InitOther()
- }
- func main() {
- c := cron.New()
- //增量
- c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
- c.Start()
- //历史
- //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
- //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
- //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
- //临时处理(信息补充)
- //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
- //IncTransactionDataMgoToCkhAndEs() //数据迁移
- ch := make(chan bool)
- <-ch
- }
- func tmp() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 1)
- wg := &sync.WaitGroup{}
- //lock := &sync.Mutex{}
- query := map[string]interface{}{
- //"project_bidstatus": 4,
- //"_id": map[string]interface{}{
- // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
- //},
- //"update_time": map[string]interface{}{
- // "$lt": 1714959573,
- //},
- //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
- "repeat": true,
- }
- repeat := map[string]bool{}
- count := MgoPro.Count("projectset_wy_back", query)
- fmt.Println("count:", count)
- it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
- n := 0
- //arr := []map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- //update := []map[string]interface{}{}
- //project_id := gconv.String(tmp["project_id"])
- //lock.Lock()
- //if !repeat[project_id] {
- // Es.DelById(Config.Es.Index, project_id)
- // CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
- // repeat[project_id] = true
- //}
- //lock.Unlock()
- //err, result := Es.GetById(Config.Es.Index, project_id)
- //Es.DelById()
- //if err != nil || len(result) == 0 {
- // fmt.Println(project_id)
- // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
- //} else {
- // if gconv.Int(result["project_bidstatus"]) != 0 {
- // fmt.Println("11", project_id)
- // }
- //}
- if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
- fmt.Println("project_id")
- //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- //update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}})
- }
- //if len(update) > 0 {
- // lock.Lock()
- // arr = append(arr, update)
- // if len(arr) > 500 {
- // MgoPro.UpdateBulk("projectset_wy_back", arr...)
- // arr = [][]map[string]interface{}{}
- // }
- // lock.Unlock()
- //}
- }(tmp)
- if n%1000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- //if len(arr) > 0 {
- // MgoPro.SaveBulk("projectset_wy_tmp2", arr...)
- // arr = []map[string]interface{}{}
- //}
- fmt.Println("迁移结束...", len(repeat))
- }
|