123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- package main
- import (
- "log"
- "qfw/util"
- // "qfw/util/mongodb"
- // "qfw/util/redis"
- "sync"
- "time"
- // "gopkg.in/mgo.v2/bson"
- )
- var FullCount = 0
- func RunFullData() {
- startTime, END := int64(0), int64(0)
- sts, bres := MQFW.Find(extractColl, `{}`, "publishtime", `{"publishtime":1}`, true, 1, 1)
- if bres && sts != nil && len(*sts) == 1 {
- startTime = util.Int64All((*sts)[0]["publishtime"])
- startTime -= 1
- sts, bres = MQFW.Find(extractColl, `{}`, "-publishtime", `{"publishtime":1}`, true, 1, 1)
- if bres && sts != nil && len(*sts) == 1 {
- END = util.Int64All((*sts)[0]["publishtime"])
- }
- log.Println(startTime, END)
- } else {
- return
- }
- defer util.Catch()
- var wg = sync.WaitGroup{}
- //2012-01-01 到 2015-01-01 1420041600
- findPoolSize := 4
- pool := make(chan *task, findPoolSize)
- endChan := make(chan bool, 1)
- _ = time.Now().Unix()
- // sess := MQFW.GetMgoConn()
- // var result []map[string]interface{}
- // sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").All(&result)
- // log.Println("查询结果:", len(result))
- // MQFW.DestoryMongoConn(sess)
- // pool <- &task{result}
- //endChan <- true
- before15year := int64(1420041600) //15年之前3天查询一次
- day := 0
- go func() {
- for {
- if startTime >= END {
- log.Println("任务结束")
- endChan <- true
- break
- }
- addDay := 1
- if startTime < before15year {
- addDay = 3
- }
- //endTime := int64(1561828196)
- endTime := startTime + int64(20*86400)
- day += addDay
- log.Println("day====", day, startTime, endTime)
- q := map[string]interface{}{
- "publishtime": map[string]interface{}{
- "$gt": startTime,
- "$lte": endTime,
- },
- }
- // q = bson.M{"_id": bson.M{"$in": []interface{}{
- // util.StringTOBsonId("5a29933f40d2d9bbe87ba510"),
- // util.StringTOBsonId("59cb110740d2d9bbe8a5ea89"),
- // util.StringTOBsonId("59dec3c640d2d9bbe8fc2067"),
- // }}}
- // q = bson.M{"_id": bson.M{"$in": []interface{}{
- // util.StringTOBsonId("58ea834ee1382322d055aba2"),
- // util.StringTOBsonId("5762767261a0721f1504317e"),
- // util.StringTOBsonId("5909acaee138233f2da53ebc"),
- // util.StringTOBsonId("57764ddaedbcdc49e6003b62"),
- // util.StringTOBsonId("590a9b2ee138233f2da964b7"),
- // util.StringTOBsonId("58dab09ae138233607531939"),
- // util.StringTOBsonId("57909adcedbcdc35c8005ab5"),
- // util.StringTOBsonId("57861257edbcdc1cea01478c"),
- // util.StringTOBsonId("57e0ac3861a0721f15324175"),
- // util.StringTOBsonId("58da95b8e138233607524f76"),
- // util.StringTOBsonId("590a88b3e138233f2da9111b"),
- // }}}
- // q = bson.M{"_id": bson.M{"$in": []interface{}{
- // util.StringTOBsonId("59cf6c7940d2d9bbe8c62d42"),
- // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1382"),
- // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f7f"),
- // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1386"),
- // util.StringTOBsonId("59dedd1b40d2d9bbe8fe138d"),
- // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f63"),
- // util.StringTOBsonId("59e9584340d2d9bbe84bfefd"),
- // util.StringTOBsonId("59e9584340d2d9bbe84bff01"),
- // util.StringTOBsonId("59e9584340d2d9bbe84bff08"),
- // util.StringTOBsonId("59e9584340d2d9bbe84bff0c"),
- // util.StringTOBsonId("59e9676340d2d9bbe84d4ba1"),
- // util.StringTOBsonId("59e9795540d2d9bbe84eacf2"),
- // util.StringTOBsonId("59e979af40d2d9bbe84eb0f0"),
- // util.StringTOBsonId("59e978fb40d2d9bbe84ea8b6"),
- // util.StringTOBsonId("59e9ad6740d2d9bbe851fee7"),
- // util.StringTOBsonId("59e9ad6740d2d9bbe851fef6"),
- // util.StringTOBsonId("59e9ae7640d2d9bbe8521746"),
- // util.StringTOBsonId("59e9aed040d2d9bbe8521fbb"),
- // util.StringTOBsonId("59e9aed040d2d9bbe8521feb"),
- // util.StringTOBsonId("59e9b88f40d2d9bbe852f222"),
- // util.StringTOBsonId("59efde8640d2d9bbe87677e0"),
- // util.StringTOBsonId("59efde8640d2d9bbe87677dc"),
- // util.StringTOBsonId("59f0107b40d2d9bbe87a8935"),
- // util.StringTOBsonId("5a026a1e40d2d9bbe8ffbbbc"),
- // util.StringTOBsonId("5a0269c440d2d9bbe8ffb586"),
- // util.StringTOBsonId("5a02840c40d2d9bbe8019c75"),
- // util.StringTOBsonId("5a02840c40d2d9bbe8019c80"),
- // util.StringTOBsonId("5a02be7640d2d9bbe8057516"),
- // }}}
- // startTime = 1561828197
- //数据正序处理
- sess := MQFW.GetMgoConn()
- if sess == nil {
- time.Sleep(10 * time.Second)
- continue
- }
- var result []map[string]interface{}
- sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").All(&result)
- startTime = endTime
- log.Println("查询结果:", len(result))
- if len(result) == 0 {
- continue
- }
- MQFW.DestoryMongoConn(sess)
- pool <- &task{result}
- wg.Add(1)
- startTime = endTime
- if day > 0 && day%(1*findPoolSize) == 0 {
- wg.Wait()
- // MQFW.Destory()
- // MQFW = mongodb.MongodbSim{
- // MongodbAddr: Sysconfig["mongodbServers"].(string),
- // Size: 2 * findPoolSize,
- // DbName: Sysconfig["mongodbName"].(string),
- // }
- // MQFW.InitPool()
- }
- }
- }()
- for {
- select {
- case t := <-pool:
- t.query()
- t.result = nil
- t = nil
- wg.Done()
- case <-endChan:
- return
- }
- }
- }
- type task struct {
- result []map[string]interface{}
- }
- func (t *task) query() {
- index := 0
- wg := &sync.WaitGroup{}
- for _, tmp := range t.result {
- if index%2000 == 0 {
- log.Println(index, tmp["_id"])
- }
- index++
- if util.IntAll(tmp["repeat"]) == 1 {
- continue
- }
- pt := util.Int64All(tmp["publishtime"])
- if pt > currentMegerTime {
- currentMegerTime = pt
- }
- currentMegerCount++
- if currentMegerCount > 600000 {
- log.Println("执行清理", currentMegerTime)
- clearPKey()
- currentMegerCount = 0
- }
- wg.Add(1)
- MultiThread <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-MultiThread
- wg.Done()
- }()
- info := PreThisInfo(tmp)
- if info != nil && (info.LenPC > 3 || info.LenPN > 3 || info.LenPTC > 3) {
- startProjectMerge(info, tmp)
- //thisid := util.BsonIdToSId(tmp["_id"])
- //redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
- currentMegerTime = info.Publishtime
- }
- }(tmp)
- //time.Sleep(10 * time.Microsecond)
- }
- wg.Wait()
- FullCount += index
- t.result = nil
- log.Println("currentFull", FullCount)
- }
|