1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package main
- import (
- "fmt"
- "github.com/gogf/gf/v2/util/gconv"
- "github.com/robfig/cron"
- "sync"
- )
- func init() {
- ReadConfig(&Config) //初始化
- InitMgo() //mgo
- InitCkh() //clickhouse
- InitEs() //es
- InitOther()
- }
- func main() {
- c := cron.New()
- //增量
- c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
- c.Start()
- //历史
- //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
- //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
- //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
- //临时处理(信息补充)
- //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
- //IncTransactionDataMgoToCkh() //数据迁移
- ch := make(chan bool)
- <-ch
- }
- func tmp() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 20)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "zbtime": 0,
- }
- it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- projectid := gconv.String(tmp["project_id"])
- data, _ := MgoPro.FindById("projectset_20230904", projectid, map[string]interface{}{"firsttime": 1})
- firsttime := gconv.Int64((*data)["firsttime"])
- update := []map[string]interface{}{
- {"_id": tmp["_id"]},
- }
- set := map[string]interface{}{
- "zbtime": firsttime,
- }
- update = append(update, map[string]interface{}{"$set": set})
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 100 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%10000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- fmt.Println("迁移结束...")
- }
|