1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package main
- import (
- "fmt"
- "github.com/gogf/gf/v2/util/gconv"
- "github.com/robfig/cron"
- "strings"
- "sync"
- )
- func init() {
- ReadConfig(&Config) //初始化
- InitMgo() //mgo
- InitCkh() //clickhouse
- InitEs() //es
- InitOther()
- }
- func main() {
- c := cron.New()
- //增量
- c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
- c.Start()
- //历史
- //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
- //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
- //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
- //临时处理(信息补充)
- //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
- //IncTransactionDataMgoToCkhAndEs() //数据迁移
- ch := make(chan bool)
- <-ch
- }
- func tmp() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 20)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "project_bidstatus": 4,
- //"_id": map[string]interface{}{
- // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
- //},
- }
- count := MgoPro.Count("projectset_wy", query)
- fmt.Println("count:", count)
- it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
- n := 0
- //arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- info_id := gconv.String(tmp["info_id"])
- data, _ := MgoB.FindById("bidding", info_id, map[string]interface{}{"s_topscopeclass": 1})
- s_topscopeclass := gconv.String((*data)["s_topscopeclass"])
- if !strings.Contains(s_topscopeclass, "建筑工程") {
- MgoPro.Del("projectset_wy", map[string]interface{}{"_id": tmp["_id"]})
- }
- //update := []map[string]interface{}{
- // {"_id": tmp["_id"]},
- //}
- //set := map[string]interface{}{}
- //info_id := gconv.String(tmp["info_id"])
- //data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1})
- //if gconv.String((*data)["bidstatus"]) == "拟建" {
- // set["project_bidstatus"] = 4
- //} else {
- // return
- //}
- //update = append(update, map[string]interface{}{"$set": set})
- //
- lock.Lock()
- //arr = append(arr, update)
- //if len(arr) > 500 {
- // MgoPro.UpdateBulk("projectset_wy_back", arr...)
- // arr = [][]map[string]interface{}{}
- //}
- lock.Unlock()
- }(tmp)
- if n%10000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- //if len(arr) > 0 {
- // MgoPro.UpdateBulk("projectset_wy_back", arr...)
- // arr = [][]map[string]interface{}{}
- //}
- fmt.Println("迁移结束...")
- }
|