main.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "github.com/robfig/cron"
  6. "sync"
  7. )
  8. func init() {
  9. ReadConfig(&Config) //初始化
  10. InitMgo() //mgo
  11. InitCkh() //clickhouse
  12. InitEs() //es
  13. InitOther()
  14. }
  15. func main() {
  16. c := cron.New()
  17. //增量
  18. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  19. c.Start()
  20. //历史
  21. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  22. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  23. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  24. //临时处理(信息补充)
  25. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  26. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  27. ch := make(chan bool)
  28. <-ch
  29. }
  30. func tmp() {
  31. sess := MgoPro.GetMgoConn()
  32. defer MgoPro.DestoryMongoConn(sess)
  33. ch := make(chan bool, 20)
  34. wg := &sync.WaitGroup{}
  35. lock := &sync.Mutex{}
  36. query := map[string]interface{}{
  37. "project_bidstatus": 2,
  38. }
  39. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  40. n := 0
  41. arr := [][]map[string]interface{}{}
  42. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  43. ch <- true
  44. wg.Add(1)
  45. go func(tmp map[string]interface{}) {
  46. defer func() {
  47. <-ch
  48. wg.Done()
  49. }()
  50. update := []map[string]interface{}{
  51. {"_id": tmp["_id"]},
  52. }
  53. set := map[string]interface{}{}
  54. info_id := gconv.String(tmp["info_id"])
  55. data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1})
  56. if gconv.String((*data)["bidstatus"]) == "拟建" {
  57. set["project_bidstatus"] = 4
  58. } else {
  59. return
  60. }
  61. update = append(update, map[string]interface{}{"$set": set})
  62. lock.Lock()
  63. arr = append(arr, update)
  64. if len(arr) > 500 {
  65. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  66. arr = [][]map[string]interface{}{}
  67. }
  68. lock.Unlock()
  69. }(tmp)
  70. if n%10000 == 0 {
  71. fmt.Println("current:", n)
  72. }
  73. tmp = map[string]interface{}{}
  74. }
  75. wg.Wait()
  76. if len(arr) > 0 {
  77. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  78. arr = [][]map[string]interface{}{}
  79. }
  80. fmt.Println("迁移结束...")
  81. }