123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package main
- import (
- "log"
- "qfw/util"
- "qfw/util/mongodb"
- "qfw/util/redis"
- "sync"
- "time"
- )
- var FullCount = 0
- func RunFullData() {
- defer util.Catch()
- var wg = sync.WaitGroup{}
- startTime := int64(1325347200) //2012-01-01
- ps := 3
- pool := make(chan *task, ps)
- day := 0
- endChan := make(chan bool, 1)
- go func() {
- now := time.Now().Unix()
- bComplete := false
- for {
- if startTime > now || bComplete {
- log.Println("任务结束")
- endChan <- true
- break
- }
- endTime := startTime + 86400
- q := map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gt": startTime,
- "$lte": endTime,
- },
- }
- //数据正序处理
- sess := MQFW.GetMgoConn()
- var result []map[string]interface{}
- sess.DB(MQFW.DbName).C(extractColl).Find(q).All(&result)
- MQFW.DestoryMongoConn(sess)
- pool <- &task{result}
- wg.Add(1)
- startTime = endTime
- day++
- log.Println("day====", day)
- if day > 0 && day%ps == 0 {
- wg.Wait()
- MQFW.Destory()
- MQFW = mongodb.MongodbSim{
- MongodbAddr: Sysconfig["mongodbServers"].(string),
- Size: 2 * ps,
- DbName: Sysconfig["mongodbName"].(string),
- }
- MQFW.InitPool()
- }
- }
- }()
- for {
- select {
- case t := <-pool:
- t.query()
- t.result = nil
- t = nil
- wg.Done()
- case <-endChan:
- return
- }
- }
- }
- type task struct {
- result []map[string]interface{}
- }
- func (t *task) query() {
- index := 0
- wg := &sync.WaitGroup{}
- for _, tmp := range t.result {
- if index%10000 == 0 {
- log.Println(index, tmp["_id"])
- }
- index++
- if util.IntAll(tmp["repeat"]) == 1 {
- continue
- }
- pt := util.Int64All(tmp["publishtime"])
- if pt > currentMegerTime {
- currentMegerTime = pt
- }
- currentMegerCount++
- if currentMegerCount > 300000 {
- log.Println("执行清理", currentMegerTime)
- clearPKey()
- currentMegerCount = 0
- }
- wg.Add(1)
- MultiThread <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-MultiThread
- wg.Done()
- }()
- thisid := util.BsonIdToSId(tmp["_id"])
- info := PreThisInfo(tmp)
- if info != nil {
- lockPNCBMap(info)
- startProjectMerge(info, tmp)
- redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
- currentMegerTime = info.Publishtime
- unlockPNCBMap(info)
- }
- }(tmp)
- }
- wg.Wait()
- FullCount += index
- log.Println("currentFull", FullCount)
- }
|