main.go 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "github.com/robfig/cron"
  6. "strings"
  7. "sync"
  8. )
  9. func init() {
  10. ReadConfig(&Config) //初始化
  11. InitMgo() //mgo
  12. InitCkh() //clickhouse
  13. InitEs() //es
  14. InitOther()
  15. }
  16. func main() {
  17. c := cron.New()
  18. //增量
  19. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  20. c.Start()
  21. //历史
  22. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  23. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  24. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  25. //临时处理(信息补充)
  26. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  27. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  28. ch := make(chan bool)
  29. <-ch
  30. }
  31. func tmp() {
  32. sess := MgoPro.GetMgoConn()
  33. defer MgoPro.DestoryMongoConn(sess)
  34. ch := make(chan bool, 20)
  35. wg := &sync.WaitGroup{}
  36. lock := &sync.Mutex{}
  37. query := map[string]interface{}{
  38. "project_bidstatus": 4,
  39. //"_id": map[string]interface{}{
  40. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  41. //},
  42. }
  43. count := MgoPro.Count("projectset_wy", query)
  44. fmt.Println("count:", count)
  45. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  46. n := 0
  47. //arr := [][]map[string]interface{}{}
  48. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  49. ch <- true
  50. wg.Add(1)
  51. go func(tmp map[string]interface{}) {
  52. defer func() {
  53. <-ch
  54. wg.Done()
  55. }()
  56. info_id := gconv.String(tmp["info_id"])
  57. data, _ := MgoB.FindById("bidding", info_id, map[string]interface{}{"s_topscopeclass": 1})
  58. s_topscopeclass := gconv.String((*data)["s_topscopeclass"])
  59. if !strings.Contains(s_topscopeclass, "建筑工程") {
  60. MgoPro.Del("projectset_wy", map[string]interface{}{"_id": tmp["_id"]})
  61. }
  62. //update := []map[string]interface{}{
  63. // {"_id": tmp["_id"]},
  64. //}
  65. //set := map[string]interface{}{}
  66. //info_id := gconv.String(tmp["info_id"])
  67. //data, _ := MgoPro.FindById("projectset", info_id, map[string]interface{}{"bidstatus": 1})
  68. //if gconv.String((*data)["bidstatus"]) == "拟建" {
  69. // set["project_bidstatus"] = 4
  70. //} else {
  71. // return
  72. //}
  73. //update = append(update, map[string]interface{}{"$set": set})
  74. //
  75. lock.Lock()
  76. //arr = append(arr, update)
  77. //if len(arr) > 500 {
  78. // MgoPro.UpdateBulk("projectset_wy_back", arr...)
  79. // arr = [][]map[string]interface{}{}
  80. //}
  81. lock.Unlock()
  82. }(tmp)
  83. if n%10000 == 0 {
  84. fmt.Println("current:", n)
  85. }
  86. tmp = map[string]interface{}{}
  87. }
  88. wg.Wait()
  89. //if len(arr) > 0 {
  90. // MgoPro.UpdateBulk("projectset_wy_back", arr...)
  91. // arr = [][]map[string]interface{}{}
  92. //}
  93. fmt.Println("迁移结束...")
  94. }