fulldata.go 6.2 KB


  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. // "qfw/util/mongodb"
  6. // "qfw/util/redis"
  7. "sync"
  8. "time"
  9. // "gopkg.in/mgo.v2/bson"
  10. )
  11. var FullCount = 0
  12. func RunFullData() {
  13. startTime, END := int64(0), int64(0)
  14. sts, bres := MQFW.Find(extractColl, `{}`, "publishtime", `{"publishtime":1}`, true, 1, 1)
  15. if bres && sts != nil && len(*sts) == 1 {
  16. startTime = util.Int64All((*sts)[0]["publishtime"])
  17. startTime -= 1
  18. sts, bres = MQFW.Find(extractColl, `{}`, "-publishtime", `{"publishtime":1}`, true, 1, 1)
  19. if bres && sts != nil && len(*sts) == 1 {
  20. END = util.Int64All((*sts)[0]["publishtime"])
  21. }
  22. log.Println(startTime, END)
  23. } else {
  24. return
  25. }
  26. defer util.Catch()
  27. var wg = sync.WaitGroup{}
  28. //2012-01-01 到 2015-01-01 1420041600
  29. findPoolSize := 4
  30. pool := make(chan *task, findPoolSize)
  31. endChan := make(chan bool, 1)
  32. _ = time.Now().Unix()
  33. // sess := MQFW.GetMgoConn()
  34. // var result []map[string]interface{}
  35. // sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").All(&result)
  36. // log.Println("查询结果:", len(result))
  37. // MQFW.DestoryMongoConn(sess)
  38. // pool <- &task{result}
  39. //endChan <- true
  40. before15year := int64(1420041600) //15年之前3天查询一次
  41. day := 0
  42. go func() {
  43. for {
  44. if startTime >= END {
  45. log.Println("任务结束")
  46. endChan <- true
  47. break
  48. }
  49. addDay := 1
  50. if startTime < before15year {
  51. addDay = 3
  52. }
  53. //endTime := int64(1561828196)
  54. endTime := startTime + int64(20*86400)
  55. day += addDay
  56. log.Println("day====", day, startTime, endTime)
  57. q := map[string]interface{}{
  58. "publishtime": map[string]interface{}{
  59. "$gt": startTime,
  60. "$lte": endTime,
  61. },
  62. }
  63. // q = bson.M{"_id": bson.M{"$in": []interface{}{
  64. // util.StringTOBsonId("5a29933f40d2d9bbe87ba510"),
  65. // util.StringTOBsonId("59cb110740d2d9bbe8a5ea89"),
  66. // util.StringTOBsonId("59dec3c640d2d9bbe8fc2067"),
  67. // }}}
  68. // q = bson.M{"_id": bson.M{"$in": []interface{}{
  69. // util.StringTOBsonId("58ea834ee1382322d055aba2"),
  70. // util.StringTOBsonId("5762767261a0721f1504317e"),
  71. // util.StringTOBsonId("5909acaee138233f2da53ebc"),
  72. // util.StringTOBsonId("57764ddaedbcdc49e6003b62"),
  73. // util.StringTOBsonId("590a9b2ee138233f2da964b7"),
  74. // util.StringTOBsonId("58dab09ae138233607531939"),
  75. // util.StringTOBsonId("57909adcedbcdc35c8005ab5"),
  76. // util.StringTOBsonId("57861257edbcdc1cea01478c"),
  77. // util.StringTOBsonId("57e0ac3861a0721f15324175"),
  78. // util.StringTOBsonId("58da95b8e138233607524f76"),
  79. // util.StringTOBsonId("590a88b3e138233f2da9111b"),
  80. // }}}
  81. // q = bson.M{"_id": bson.M{"$in": []interface{}{
  82. // util.StringTOBsonId("59cf6c7940d2d9bbe8c62d42"),
  83. // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1382"),
  84. // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f7f"),
  85. // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1386"),
  86. // util.StringTOBsonId("59dedd1b40d2d9bbe8fe138d"),
  87. // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f63"),
  88. // util.StringTOBsonId("59e9584340d2d9bbe84bfefd"),
  89. // util.StringTOBsonId("59e9584340d2d9bbe84bff01"),
  90. // util.StringTOBsonId("59e9584340d2d9bbe84bff08"),
  91. // util.StringTOBsonId("59e9584340d2d9bbe84bff0c"),
  92. // util.StringTOBsonId("59e9676340d2d9bbe84d4ba1"),
  93. // util.StringTOBsonId("59e9795540d2d9bbe84eacf2"),
  94. // util.StringTOBsonId("59e979af40d2d9bbe84eb0f0"),
  95. // util.StringTOBsonId("59e978fb40d2d9bbe84ea8b6"),
  96. // util.StringTOBsonId("59e9ad6740d2d9bbe851fee7"),
  97. // util.StringTOBsonId("59e9ad6740d2d9bbe851fef6"),
  98. // util.StringTOBsonId("59e9ae7640d2d9bbe8521746"),
  99. // util.StringTOBsonId("59e9aed040d2d9bbe8521fbb"),
  100. // util.StringTOBsonId("59e9aed040d2d9bbe8521feb"),
  101. // util.StringTOBsonId("59e9b88f40d2d9bbe852f222"),
  102. // util.StringTOBsonId("59efde8640d2d9bbe87677e0"),
  103. // util.StringTOBsonId("59efde8640d2d9bbe87677dc"),
  104. // util.StringTOBsonId("59f0107b40d2d9bbe87a8935"),
  105. // util.StringTOBsonId("5a026a1e40d2d9bbe8ffbbbc"),
  106. // util.StringTOBsonId("5a0269c440d2d9bbe8ffb586"),
  107. // util.StringTOBsonId("5a02840c40d2d9bbe8019c75"),
  108. // util.StringTOBsonId("5a02840c40d2d9bbe8019c80"),
  109. // util.StringTOBsonId("5a02be7640d2d9bbe8057516"),
  110. // }}}
  111. // startTime = 1561828197
  112. //数据正序处理
  113. sess := MQFW.GetMgoConn()
  114. if sess == nil {
  115. time.Sleep(10 * time.Second)
  116. continue
  117. }
  118. var result []map[string]interface{}
  119. sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").All(&result)
  120. startTime = endTime
  121. log.Println("查询结果:", len(result))
  122. if len(result) == 0 {
  123. continue
  124. }
  125. MQFW.DestoryMongoConn(sess)
  126. pool <- &task{result}
  127. wg.Add(1)
  128. startTime = endTime
  129. if day > 0 && day%(1*findPoolSize) == 0 {
  130. wg.Wait()
  131. // MQFW.Destory()
  132. // MQFW = mongodb.MongodbSim{
  133. // MongodbAddr: Sysconfig["mongodbServers"].(string),
  134. // Size: 2 * findPoolSize,
  135. // DbName: Sysconfig["mongodbName"].(string),
  136. // }
  137. // MQFW.InitPool()
  138. }
  139. }
  140. }()
  141. for {
  142. select {
  143. case t := <-pool:
  144. t.query()
  145. t.result = nil
  146. t = nil
  147. wg.Done()
  148. case <-endChan:
  149. return
  150. }
  151. }
  152. }
  153. type task struct {
  154. result []map[string]interface{}
  155. }
  156. func (t *task) query() {
  157. index := 0
  158. wg := &sync.WaitGroup{}
  159. for _, tmp := range t.result {
  160. if index%2000 == 0 {
  161. log.Println(index, tmp["_id"])
  162. }
  163. index++
  164. if util.IntAll(tmp["repeat"]) == 1 {
  165. continue
  166. }
  167. pt := util.Int64All(tmp["publishtime"])
  168. if pt > currentMegerTime {
  169. currentMegerTime = pt
  170. }
  171. currentMegerCount++
  172. if currentMegerCount > 600000 {
  173. log.Println("执行清理", currentMegerTime)
  174. clearPKey()
  175. currentMegerCount = 0
  176. }
  177. wg.Add(1)
  178. MultiThread <- true
  179. go func(tmp map[string]interface{}) {
  180. defer func() {
  181. <-MultiThread
  182. wg.Done()
  183. }()
  184. info := PreThisInfo(tmp)
  185. if info != nil && (info.LenPC > 3 || info.LenPN > 3 || info.LenPTC > 3) {
  186. startProjectMerge(info, tmp)
  187. //thisid := util.BsonIdToSId(tmp["_id"])
  188. //redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  189. currentMegerTime = info.Publishtime
  190. }
  191. }(tmp)
  192. //time.Sleep(10 * time.Microsecond)
  193. }
  194. wg.Wait()
  195. FullCount += index
  196. t.result = nil
  197. log.Println("currentFull", FullCount)
  198. }