main.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "github.com/robfig/cron"
  6. "sync"
  7. )
  8. func init() {
  9. ReadConfig(&Config) //初始化
  10. InitMgo() //mgo
  11. InitCkh() //clickhouse
  12. InitEs() //es
  13. InitOther()
  14. }
  15. func main() {
  16. c := cron.New()
  17. //增量
  18. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  19. c.Start()
  20. //历史
  21. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  22. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  23. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  24. //临时处理(信息补充)
  25. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  26. //IncTransactionDataMgoToCkh() //数据迁移
  27. ch := make(chan bool)
  28. <-ch
  29. }
  30. func tmp() {
  31. sess := MgoPro.GetMgoConn()
  32. defer MgoPro.DestoryMongoConn(sess)
  33. ch := make(chan bool, 20)
  34. wg := &sync.WaitGroup{}
  35. lock := &sync.Mutex{}
  36. query := map[string]interface{}{
  37. //"zbtime": 0,
  38. }
  39. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  40. n := 0
  41. arr := [][]map[string]interface{}{}
  42. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  43. ch <- true
  44. wg.Add(1)
  45. go func(tmp map[string]interface{}) {
  46. defer func() {
  47. <-ch
  48. wg.Done()
  49. }()
  50. info_id := gconv.String(tmp["info_id"])
  51. update := []map[string]interface{}{
  52. {"_id": tmp["_id"]},
  53. }
  54. set := map[string]interface{}{
  55. "info_ids": []string{info_id},
  56. }
  57. update = append(update, map[string]interface{}{"$set": set})
  58. lock.Lock()
  59. arr = append(arr, update)
  60. if len(arr) > 500 {
  61. MgoPro.UpdateBulk("projectset_wy", arr...)
  62. arr = [][]map[string]interface{}{}
  63. }
  64. lock.Unlock()
  65. }(tmp)
  66. if n%10000 == 0 {
  67. fmt.Println("current:", n)
  68. }
  69. tmp = map[string]interface{}{}
  70. }
  71. wg.Wait()
  72. if len(arr) > 0 {
  73. MgoPro.UpdateBulk("projectset_wy", arr...)
  74. arr = [][]map[string]interface{}{}
  75. }
  76. fmt.Println("迁移结束...")
  77. }