listdatamove.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package timetask
  2. import (
  3. "github.com/donnie4w/go-logger/logger"
  4. qu "qfw/util"
  5. "sync"
  6. "time"
  7. "util"
  8. )
  9. //列表页历史数据迁移
  10. func MoveListData() {
  11. logger.Debug("开始迁移spider_highlistdata数据...")
  12. //1、spider_highlistdata数据迁移,只保留一个月
  13. defer qu.Catch()
  14. sess := util.MgoS.GetMgoConn()
  15. defer util.MgoS.DestoryMongoConn(sess)
  16. query := map[string]interface{}{
  17. "comeintime": map[string]interface{}{
  18. "$lt": time.Now().Unix() - 30*86400,
  19. },
  20. }
  21. lock := &sync.Mutex{}
  22. wg := &sync.WaitGroup{}
  23. ch := make(chan bool, 3)
  24. arr := []map[string]interface{}{}
  25. count, _ := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Count()
  26. logger.Debug("spider_highlistdata迁移数据量:", count)
  27. it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Iter()
  28. n := 0
  29. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  30. wg.Add(1)
  31. ch <- true
  32. go func(tmp map[string]interface{}) {
  33. defer func() {
  34. <-ch
  35. wg.Done()
  36. }()
  37. lock.Lock()
  38. arr = append(arr, tmp)
  39. if len(arr) > 500 {
  40. util.MgoS.SaveBulk("spider_highlistdata_back", arr...)
  41. arr = []map[string]interface{}{}
  42. }
  43. lock.Unlock()
  44. }(tmp)
  45. if n%1000 == 0 {
  46. logger.Debug(n)
  47. }
  48. tmp = map[string]interface{}{}
  49. }
  50. wg.Wait()
  51. lock.Lock()
  52. if len(arr) > 0 {
  53. util.MgoS.SaveBulk("spider_highlistdata_back", arr...)
  54. arr = []map[string]interface{}{}
  55. }
  56. lock.Unlock()
  57. //删除原表中数据
  58. delNum := util.MgoS.Delete("spider_highlistdata", query)
  59. logger.Debug("spider_highlistdata删除数据量:", delNum)
  60. logger.Debug("开始迁移spider_listdata数据...")
  61. //1、spider_listdata数据迁移,只保留两个月
  62. query["comeintime"] = map[string]interface{}{
  63. "$lt": time.Now().Unix() - 60*86400,
  64. }
  65. count, _ = sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Count()
  66. logger.Debug("spider_listdata迁移数据量:", count)
  67. it1 := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Iter()
  68. n1 := 0
  69. for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ {
  70. wg.Add(1)
  71. ch <- true
  72. go func(tmp map[string]interface{}) {
  73. defer func() {
  74. <-ch
  75. wg.Done()
  76. }()
  77. lock.Lock()
  78. arr = append(arr, tmp)
  79. if len(arr) > 500 {
  80. util.MgoS.SaveBulk("spider_listdata_back", arr...)
  81. arr = []map[string]interface{}{}
  82. }
  83. lock.Unlock()
  84. }(tmp)
  85. if n1%1000 == 0 {
  86. logger.Debug(n1)
  87. }
  88. tmp = map[string]interface{}{}
  89. }
  90. wg.Wait()
  91. lock.Lock()
  92. if len(arr) > 0 {
  93. util.MgoS.SaveBulk("spider_listdata_back", arr...)
  94. arr = []map[string]interface{}{}
  95. }
  96. lock.Unlock()
  97. //删除原表中数据
  98. delNum = util.MgoS.Delete("spider_listdata", query)
  99. logger.Debug("spider_listdata删除数据量:", delNum)
  100. logger.Debug("数据迁移完成...")
  101. }