package main import ( "log" "strings" "qfw/util" "qfw/util/mongodb" "qfw/util/redis" "sync" "time" ) var FullCount = 0 func RunFullData() { defer util.Catch() var wg = sync.WaitGroup{} startTime := int64(1325347200) //2012-01-01 ps := 3 pool := make(chan *task, ps) day := 0 endChan := make(chan bool, 1) go func() { now := time.Now().Unix() bComplete := false for { if startTime > now || bComplete { log.Println("任务结束") endChan <- true break } endTime := startTime + 86400 q := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gt": startTime, "$lte": endTime, }, } //数据正序处理 sess := MQFW.GetMgoConn() var result []map[string]interface{} sess.DB(MQFW.DbName).C(extractColl).Find(q).All(&result) MQFW.DestoryMongoConn(sess) pool <- &task{result} wg.Add(1) startTime = endTime day++ log.Println("day====", day) if day > 0 && day%ps == 0 { wg.Wait() MQFW.Destory() MQFW = mongodb.MongodbSim{ MongodbAddr: Sysconfig["mongodbServers"].(string), Size: 2 * ps, DbName: Sysconfig["mongodbName"].(string), } MQFW.InitPool() } } }() for { select { case t := <-pool: t.query() t.result = nil t = nil wg.Done() case <-endChan: return } } } type task struct { result []map[string]interface{} } func (t *task) query() { index := 0 wg := &sync.WaitGroup{} for _, tmp := range t.result { if index%10000 == 0 { log.Println(index, tmp["_id"]) } index++ if util.IntAll(tmp["repeat"]) == 1 { continue } pt := util.Int64All(tmp["publishtime"]) if pt > currentMegerTime { currentMegerTime = pt } currentMegerCount++ if currentMegerCount > 300000 { log.Println("执行清理", currentMegerTime) clearPKey() currentMegerCount = 0 } wg.Add(1) MultiThread <- true go func(tmp map[string]interface{}) { defer func() { <-MultiThread wg.Done() }() thisid := util.BsonIdToSId(tmp["_id"]) info := PreThisInfo(tmp) if info != nil { lockPNCBMap(info) startProjectMerge(info, tmp) redis.Put(INFOID, thisid, 1, INFOTIMEOUT) currentMegerTime = info.Publishtime unlockPNCBMap(info) } }(tmp) } wg.Wait() FullCount += index log.Println("currentFull", FullCount) } //获取对比项目数组 func getComeperProjects2(p PCBV, thisinfo *Info) (res []interface{}, pncb []*CompareInfo) { newarr := []string{} repeatId := map[string]bool{} if p.PnameLen > 0 { pn := NewCompareInfo("pn", thisinfo.PNKey, PNKey) pncb = append(pncb, pn) thisinfo.AllRelatePNKeyMap = map[string]*Key{} pn.KeyMap.Lock.Lock() for k, v := range pn.KeyMap.Map { if strings.Contains(k, pn.Key) || strings.Contains(pn.Key, k) { thisinfo.AllRelatePNKeyMap[k] = v for _, id := range *v.Arr { if !repeatId[id] { newarr = append(newarr, id) repeatId[id] = true } } } } if thisinfo.AllRelatePNKeyMap[pn.Key] == nil { K := &Key{&[]string{}, &sync.Mutex{}} thisinfo.AllRelatePNKeyMap[pn.Key] = K pn.KeyMap.Map[pn.Key] = K } pn.KeyMap.Lock.Unlock() } if p.PcodeLen > 0 { pc := NewCompareInfo("pc", thisinfo.PCKey, PCKey) pncb = append(pncb, pc) thisinfo.AllRelatePCKeyMap = map[string]*Key{} pc.KeyMap.Lock.Lock() for k, v := range pc.KeyMap.Map { if strings.Contains(k, pc.Key) || strings.Contains(pc.Key, k) { thisinfo.AllRelatePCKeyMap[k] = v for _, id := range *v.Arr { if !repeatId[id] { newarr = append(newarr, id) repeatId[id] = true } } } } if thisinfo.AllRelatePCKeyMap[pc.Key] == nil { K := &Key{&[]string{}, &sync.Mutex{}} thisinfo.AllRelatePCKeyMap[pc.Key] = K pc.KeyMap.Map[pc.Key] = K } pc.KeyMap.Lock.Unlock() } if p.BuyerLen > 0 { pb := NewCompareInfo("pb", thisinfo.PBKey, PBKey) pncb = append(pncb, pb) pb.KeyMap.Lock.Lock() K := pb.KeyMap.Map[pb.Key] if K == nil { K = &Key{&[]string{}, &sync.Mutex{}} pb.KeyMap.Map[pb.Key] = K } else { for _, id := range *K.Arr { if !repeatId[id] { newarr = append(newarr, id) repeatId[id] = true } } } pb.KeyMap.Lock.Unlock() } if len(newarr) > 0 { res = redis.Mget(REDISIDS, newarr) } return }