main.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "github.com/robfig/cron"
  6. "sync"
  7. )
  8. func init() {
  9. ReadConfig(&Config) //初始化
  10. InitMgo() //mgo
  11. InitCkh() //clickhouse
  12. InitEs() //es
  13. InitOther()
  14. }
  15. func main() {
  16. c := cron.New()
  17. //增量
  18. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  19. c.Start()
  20. //历史
  21. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  22. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  23. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  24. //临时处理(信息补充)
  25. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  26. //IncTransactionDataMgoToCkh() //数据迁移
  27. ch := make(chan bool)
  28. <-ch
  29. }
  30. func tmp() {
  31. sess := MgoPro.GetMgoConn()
  32. defer MgoPro.DestoryMongoConn(sess)
  33. ch := make(chan bool, 20)
  34. wg := &sync.WaitGroup{}
  35. lock := &sync.Mutex{}
  36. query := map[string]interface{}{
  37. "zbtime": 0,
  38. }
  39. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  40. n := 0
  41. arr := [][]map[string]interface{}{}
  42. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  43. ch <- true
  44. wg.Add(1)
  45. go func(tmp map[string]interface{}) {
  46. defer func() {
  47. <-ch
  48. wg.Done()
  49. }()
  50. projectid := gconv.String(tmp["project_id"])
  51. data, _ := MgoPro.FindById("projectset_20230904", projectid, map[string]interface{}{"firsttime": 1})
  52. firsttime := gconv.Int64((*data)["firsttime"])
  53. update := []map[string]interface{}{
  54. {"_id": tmp["_id"]},
  55. }
  56. set := map[string]interface{}{
  57. "zbtime": firsttime,
  58. }
  59. update = append(update, map[string]interface{}{"$set": set})
  60. lock.Lock()
  61. arr = append(arr, update)
  62. if len(arr) > 100 {
  63. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  64. arr = [][]map[string]interface{}{}
  65. }
  66. lock.Unlock()
  67. }(tmp)
  68. if n%10000 == 0 {
  69. fmt.Println("current:", n)
  70. }
  71. tmp = map[string]interface{}{}
  72. }
  73. wg.Wait()
  74. if len(arr) > 0 {
  75. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  76. arr = [][]map[string]interface{}{}
  77. }
  78. fmt.Println("迁移结束...")
  79. }