fulldata.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. "qfw/util/mongodb"
  6. "qfw/util/redis"
  7. "sync"
  8. "time"
  9. )
  10. var FullCount = 0
  11. func RunFullData() {
  12. defer util.Catch()
  13. var wg = sync.WaitGroup{}
  14. startTime := int64(1325347200) //2012-01-01
  15. ps := 3
  16. pool := make(chan *task, ps)
  17. day := 0
  18. endChan := make(chan bool, 1)
  19. go func() {
  20. now := time.Now().Unix()
  21. bComplete := false
  22. for {
  23. if startTime > now || bComplete {
  24. log.Println("任务结束")
  25. endChan <- true
  26. break
  27. }
  28. endTime := startTime + 86400
  29. q := map[string]interface{}{
  30. "publishtime": map[string]interface{}{
  31. "$gt": startTime,
  32. "$lte": endTime,
  33. },
  34. }
  35. //数据正序处理
  36. sess := MQFW.GetMgoConn()
  37. var result []map[string]interface{}
  38. sess.DB(MQFW.DbName).C(extractColl).Find(q).All(&result)
  39. MQFW.DestoryMongoConn(sess)
  40. pool <- &task{result}
  41. wg.Add(1)
  42. startTime = endTime
  43. day++
  44. log.Println("day====", day)
  45. if day > 0 && day%ps == 0 {
  46. wg.Wait()
  47. MQFW.Destory()
  48. MQFW = mongodb.MongodbSim{
  49. MongodbAddr: Sysconfig["mongodbServers"].(string),
  50. Size: 2 * ps,
  51. DbName: Sysconfig["mongodbName"].(string),
  52. }
  53. MQFW.InitPool()
  54. }
  55. }
  56. }()
  57. for {
  58. select {
  59. case t := <-pool:
  60. t.query()
  61. t.result = nil
  62. t = nil
  63. wg.Done()
  64. case <-endChan:
  65. return
  66. }
  67. }
  68. }
  69. type task struct {
  70. result []map[string]interface{}
  71. }
  72. func (t *task) query() {
  73. index := 0
  74. wg := &sync.WaitGroup{}
  75. for _, tmp := range t.result {
  76. if index%10000 == 0 {
  77. log.Println(index, tmp["_id"])
  78. }
  79. index++
  80. if util.IntAll(tmp["repeat"]) == 1 {
  81. continue
  82. }
  83. pt := util.Int64All(tmp["publishtime"])
  84. if pt > currentMegerTime {
  85. currentMegerTime = pt
  86. }
  87. currentMegerCount++
  88. if currentMegerCount > 300000 {
  89. log.Println("执行清理", currentMegerTime)
  90. clearPKey()
  91. currentMegerCount = 0
  92. }
  93. wg.Add(1)
  94. MultiThread <- true
  95. go func(tmp map[string]interface{}) {
  96. defer func() {
  97. <-MultiThread
  98. wg.Done()
  99. }()
  100. thisid := util.BsonIdToSId(tmp["_id"])
  101. info := PreThisInfo(tmp)
  102. if info != nil {
  103. lockPNCBMap(info)
  104. startProjectMerge(info, tmp)
  105. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  106. currentMegerTime = info.Publishtime
  107. unlockPNCBMap(info)
  108. }
  109. }(tmp)
  110. }
  111. wg.Wait()
  112. FullCount += index
  113. log.Println("currentFull", FullCount)
  114. }