main.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/robfig/cron"
  5. "sync"
  6. )
  7. func init() {
  8. ReadConfig(&Config) //初始化
  9. InitMgo() //mgo
  10. InitCkh() //clickhouse
  11. InitEs() //es
  12. InitOther()
  13. }
  14. func main() {
  15. c := cron.New()
  16. //增量
  17. c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
  18. c.Start()
  19. //历史
  20. //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
  21. //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
  22. //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
  23. //临时处理(信息补充)
  24. //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
  25. //IncTransactionDataMgoToCkhAndEs() //数据迁移
  26. ch := make(chan bool)
  27. <-ch
  28. }
  29. func tmp() {
  30. sess := MgoPro.GetMgoConn()
  31. defer MgoPro.DestoryMongoConn(sess)
  32. ch := make(chan bool, 1)
  33. wg := &sync.WaitGroup{}
  34. //lock := &sync.Mutex{}
  35. query := map[string]interface{}{
  36. //"project_bidstatus": 4,
  37. //"_id": map[string]interface{}{
  38. // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
  39. //},
  40. //"update_time": map[string]interface{}{
  41. // "$lt": 1714959573,
  42. //},
  43. //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
  44. "repeat": true,
  45. }
  46. repeat := map[string]bool{}
  47. count := MgoPro.Count("projectset_wy_back", query)
  48. fmt.Println("count:", count)
  49. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  50. n := 0
  51. //arr := []map[string]interface{}{}
  52. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  53. ch <- true
  54. wg.Add(1)
  55. go func(tmp map[string]interface{}) {
  56. defer func() {
  57. <-ch
  58. wg.Done()
  59. }()
  60. //update := []map[string]interface{}{}
  61. //project_id := gconv.String(tmp["project_id"])
  62. //lock.Lock()
  63. //if !repeat[project_id] {
  64. // Es.DelById(Config.Es.Index, project_id)
  65. // CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
  66. // repeat[project_id] = true
  67. //}
  68. //lock.Unlock()
  69. //err, result := Es.GetById(Config.Es.Index, project_id)
  70. //Es.DelById()
  71. //if err != nil || len(result) == 0 {
  72. // fmt.Println(project_id)
  73. // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  74. // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
  75. //} else {
  76. // if gconv.Int(result["project_bidstatus"]) != 0 {
  77. // fmt.Println("11", project_id)
  78. // }
  79. //}
  80. if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
  81. fmt.Println("project_id")
  82. //update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  83. //update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}})
  84. }
  85. //if len(update) > 0 {
  86. // lock.Lock()
  87. // arr = append(arr, update)
  88. // if len(arr) > 500 {
  89. // MgoPro.UpdateBulk("projectset_wy_back", arr...)
  90. // arr = [][]map[string]interface{}{}
  91. // }
  92. // lock.Unlock()
  93. //}
  94. }(tmp)
  95. if n%1000 == 0 {
  96. fmt.Println("current:", n)
  97. }
  98. tmp = map[string]interface{}{}
  99. }
  100. wg.Wait()
  101. //if len(arr) > 0 {
  102. // MgoPro.SaveBulk("projectset_wy_tmp2", arr...)
  103. // arr = []map[string]interface{}{}
  104. //}
  105. fmt.Println("迁移结束...", len(repeat))
  106. }