package main import ( "log" "qfw/util" "qfw/util/mongodb" "qfw/util/redis" "sync" "time" ) var FullCount = 0 func RunFullData() { defer util.Catch() var wg = sync.WaitGroup{} startTime := int64(1325347200) //2012-01-01 ps := 3 pool := make(chan *task, ps) day := 0 endChan := make(chan bool, 1) go func() { now := time.Now().Unix() bComplete := false for { if startTime > now || bComplete { log.Println("任务结束") endChan <- true break } endTime := startTime + 86400 q := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gt": startTime, "$lte": endTime, }, } //数据正序处理 sess := MQFW.GetMgoConn() var result []map[string]interface{} sess.DB(MQFW.DbName).C(extractColl).Find(q).All(&result) MQFW.DestoryMongoConn(sess) pool <- &task{result} wg.Add(1) startTime = endTime day++ log.Println("day====", day) if day > 0 && day%ps == 0 { wg.Wait() MQFW.Destory() MQFW = mongodb.MongodbSim{ MongodbAddr: Sysconfig["mongodbServers"].(string), Size: 2 * ps, DbName: Sysconfig["mongodbName"].(string), } MQFW.InitPool() } } }() for { select { case t := <-pool: t.query() t.result = nil t = nil wg.Done() case <-endChan: return } } } type task struct { result []map[string]interface{} } func (t *task) query() { index := 0 wg := &sync.WaitGroup{} for _, tmp := range t.result { if index%10000 == 0 { log.Println(index, tmp["_id"]) } index++ if util.IntAll(tmp["repeat"]) == 1 { continue } pt := util.Int64All(tmp["publishtime"]) if pt > currentMegerTime { currentMegerTime = pt } currentMegerCount++ if currentMegerCount > 300000 { log.Println("执行清理", currentMegerTime) clearPKey() currentMegerCount = 0 } wg.Add(1) MultiThread <- true go func(tmp map[string]interface{}) { defer func() { <-MultiThread wg.Done() }() thisid := util.BsonIdToSId(tmp["_id"]) info := PreThisInfo(tmp) if info != nil { lockPNCBMap(info) startProjectMerge(info, tmp) redis.Put(INFOID, thisid, 1, INFOTIMEOUT) currentMegerTime = info.Publishtime unlockPNCBMap(info) } }(tmp) } wg.Wait() FullCount += index log.Println("currentFull", FullCount) }