main.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "github.com/robfig/cron"
  6. "sync"
  7. )
  8. func init() {
  9. ReadConfig(&Config) //初始化
  10. InitMgo() //mgo
  11. InitCkh() //clickhouse
  12. InitEs() //es
  13. InitOther()
  14. }
  15. func main() {
  16. c := cron.New()
  17. //增量
  18. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  19. c.Start()
  20. //历史
  21. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  22. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  23. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  24. //临时处理(信息补充)
  25. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  26. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  27. ch := make(chan bool)
  28. <-ch
  29. }
  30. func tmp() {
  31. sess := MgoPro.GetMgoConn()
  32. defer MgoPro.DestoryMongoConn(sess)
  33. ch := make(chan bool, 20)
  34. wg := &sync.WaitGroup{}
  35. lock := &sync.Mutex{}
  36. query := map[string]interface{}{
  37. //"project_bidstatus": 4,
  38. //"_id": map[string]interface{}{
  39. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  40. //},
  41. }
  42. count := MgoPro.Count("projectset_wy", query)
  43. fmt.Println("count:", count)
  44. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  45. n := 0
  46. arr := [][]map[string]interface{}{}
  47. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  48. ch <- true
  49. wg.Add(1)
  50. go func(tmp map[string]interface{}) {
  51. defer func() {
  52. <-ch
  53. wg.Done()
  54. }()
  55. project_id := gconv.String(tmp["project_id"])
  56. data, _ := MgoB.FindById("bidding", project_id, map[string]interface{}{"buyerclass": 1})
  57. buyerclass := gconv.String((*data)["buyerclass"])
  58. update := []map[string]interface{}{
  59. {"_id": tmp["_id"]},
  60. }
  61. set := map[string]interface{}{
  62. "buyerclass": buyerclass,
  63. }
  64. //info_id := gconv.String(tmp["info_id"])
  65. //data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1})
  66. //if gconv.String((*data)["bidstatus"]) == "拟建" {
  67. // set["project_bidstatus"] = 4
  68. //} else {
  69. // return
  70. //}
  71. update = append(update, map[string]interface{}{"$set": set})
  72. //
  73. lock.Lock()
  74. arr = append(arr, update)
  75. if len(arr) > 500 {
  76. MgoPro.UpdateBulk("projectset_wy", arr...)
  77. arr = [][]map[string]interface{}{}
  78. }
  79. lock.Unlock()
  80. }(tmp)
  81. if n%10000 == 0 {
  82. fmt.Println("current:", n)
  83. }
  84. tmp = map[string]interface{}{}
  85. }
  86. wg.Wait()
  87. if len(arr) > 0 {
  88. MgoPro.UpdateBulk("projectset_wy", arr...)
  89. arr = [][]map[string]interface{}{}
  90. }
  91. fmt.Println("迁移结束...")
  92. }