fulldata.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. "qfw/util/mongodb"
  6. "qfw/util/redis"
  7. "sync"
  8. "time"
  9. )
  10. var FullCount = 0
  11. func RunFullData(startTime int64) {
  12. if startTime < 1325347200 {
  13. log.Println("时间错误", startTime)
  14. }
  15. defer util.Catch()
  16. var wg = sync.WaitGroup{}
  17. //startTime := int64(1325347200) //2012-01-01
  18. ps := 3
  19. pool := make(chan *task, ps)
  20. day := 0
  21. endChan := make(chan bool, 1)
  22. go func() {
  23. now := time.Now().Unix()
  24. bComplete := false
  25. for {
  26. if startTime > now || bComplete {
  27. log.Println("任务结束", startTime)
  28. endChan <- true
  29. break
  30. }
  31. endTime := startTime + 86400
  32. q := map[string]interface{}{
  33. "publishtime": map[string]interface{}{
  34. "$gt": startTime,
  35. "$lte": endTime,
  36. },
  37. }
  38. //数据正序处理
  39. sess := MQFW.GetMgoConn()
  40. var result []map[string]interface{}
  41. sess.DB(MQFW.DbName).C(extractColl).Find(q).All(&result)
  42. MQFW.DestoryMongoConn(sess)
  43. pool <- &task{result}
  44. wg.Add(1)
  45. startTime = endTime
  46. day++
  47. log.Println("day====", day)
  48. if day > 0 && day%ps == 0 {
  49. wg.Wait()
  50. MQFW.Destory()
  51. MQFW = mongodb.MongodbSim{
  52. MongodbAddr: Sysconfig["mongodbServers"].(string),
  53. Size: 2 * ps,
  54. DbName: Sysconfig["mongodbName"].(string),
  55. }
  56. MQFW.InitPool()
  57. }
  58. }
  59. }()
  60. for {
  61. select {
  62. case t := <-pool:
  63. t.query()
  64. t.result = nil
  65. t = nil
  66. wg.Done()
  67. case <-endChan:
  68. return
  69. }
  70. }
  71. }
  72. type task struct {
  73. result []map[string]interface{}
  74. }
  75. func (t *task) query() {
  76. index := 0
  77. wg := &sync.WaitGroup{}
  78. for _, tmp := range t.result {
  79. if index%10000 == 0 {
  80. log.Println(index, tmp["_id"])
  81. }
  82. index++
  83. if util.IntAll(tmp["repeat"]) == 1 {
  84. continue
  85. }
  86. pt := util.Int64All(tmp["publishtime"])
  87. if pt > currentMegerTime {
  88. currentMegerTime = pt
  89. }
  90. currentMegerCount++
  91. if currentMegerCount > 300000 {
  92. log.Println("执行清理", currentMegerTime)
  93. clearPKey()
  94. currentMegerCount = 0
  95. }
  96. wg.Add(1)
  97. MultiThread <- true
  98. go func(tmp map[string]interface{}) {
  99. defer func() {
  100. <-MultiThread
  101. wg.Done()
  102. }()
  103. thisid := util.BsonIdToSId(tmp["_id"])
  104. info := PreThisInfo(tmp)
  105. if info != nil {
  106. lockPNCBMap(info)
  107. storeLock(info)
  108. startProjectMerge(info, tmp)
  109. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  110. currentMegerTime = info.Publishtime
  111. unlockPNCBMap(info)
  112. }
  113. }(tmp)
  114. }
  115. wg.Wait()
  116. FullCount += index
  117. log.Println("currentFull", FullCount)
  118. }