package main import ( "log" "qfw/util" // "qfw/util/mongodb" // "qfw/util/redis" "sync" "time" // "gopkg.in/mgo.v2/bson" ) var FullCount = 0 func RunFullData() { startTime, END := int64(0), int64(0) sts, bres := MQFW.Find(extractColl, `{}`, "publishtime", `{"publishtime":1}`, true, 1, 1) if bres && sts != nil && len(*sts) == 1 { startTime = util.Int64All((*sts)[0]["publishtime"]) startTime -= 1 sts, bres = MQFW.Find(extractColl, `{}`, "-publishtime", `{"publishtime":1}`, true, 1, 1) if bres && sts != nil && len(*sts) == 1 { END = util.Int64All((*sts)[0]["publishtime"]) } log.Println(startTime, END) } else { return } defer util.Catch() var wg = sync.WaitGroup{} //2012-01-01 到 2015-01-01 1420041600 findPoolSize := 4 pool := make(chan *task, findPoolSize) endChan := make(chan bool, 1) _ = time.Now().Unix() // sess := MQFW.GetMgoConn() // var result []map[string]interface{} // sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").All(&result) // log.Println("查询结果:", len(result)) // MQFW.DestoryMongoConn(sess) // pool <- &task{result} //endChan <- true before15year := int64(1420041600) //15年之前3天查询一次 day := 0 go func() { for { if startTime >= END { log.Println("任务结束") endChan <- true break } addDay := 1 if startTime < before15year { addDay = 3 } //endTime := int64(1561828196) endTime := startTime + int64(20*86400) day += addDay log.Println("day====", day, startTime, endTime) q := map[string]interface{}{ "publishtime": map[string]interface{}{ "$gt": startTime, "$lte": endTime, }, } // q = bson.M{"_id": bson.M{"$in": []interface{}{ // util.StringTOBsonId("5a29933f40d2d9bbe87ba510"), // util.StringTOBsonId("59cb110740d2d9bbe8a5ea89"), // util.StringTOBsonId("59dec3c640d2d9bbe8fc2067"), // }}} // q = bson.M{"_id": bson.M{"$in": []interface{}{ // util.StringTOBsonId("58ea834ee1382322d055aba2"), // util.StringTOBsonId("5762767261a0721f1504317e"), // util.StringTOBsonId("5909acaee138233f2da53ebc"), // util.StringTOBsonId("57764ddaedbcdc49e6003b62"), // util.StringTOBsonId("590a9b2ee138233f2da964b7"), // util.StringTOBsonId("58dab09ae138233607531939"), // util.StringTOBsonId("57909adcedbcdc35c8005ab5"), // util.StringTOBsonId("57861257edbcdc1cea01478c"), // util.StringTOBsonId("57e0ac3861a0721f15324175"), // util.StringTOBsonId("58da95b8e138233607524f76"), // util.StringTOBsonId("590a88b3e138233f2da9111b"), // }}} // q = bson.M{"_id": bson.M{"$in": []interface{}{ // util.StringTOBsonId("59cf6c7940d2d9bbe8c62d42"), // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1382"), // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f7f"), // util.StringTOBsonId("59dedd1b40d2d9bbe8fe1386"), // util.StringTOBsonId("59dedd1b40d2d9bbe8fe138d"), // util.StringTOBsonId("59dedcc140d2d9bbe8fe0f63"), // util.StringTOBsonId("59e9584340d2d9bbe84bfefd"), // util.StringTOBsonId("59e9584340d2d9bbe84bff01"), // util.StringTOBsonId("59e9584340d2d9bbe84bff08"), // util.StringTOBsonId("59e9584340d2d9bbe84bff0c"), // util.StringTOBsonId("59e9676340d2d9bbe84d4ba1"), // util.StringTOBsonId("59e9795540d2d9bbe84eacf2"), // util.StringTOBsonId("59e979af40d2d9bbe84eb0f0"), // util.StringTOBsonId("59e978fb40d2d9bbe84ea8b6"), // util.StringTOBsonId("59e9ad6740d2d9bbe851fee7"), // util.StringTOBsonId("59e9ad6740d2d9bbe851fef6"), // util.StringTOBsonId("59e9ae7640d2d9bbe8521746"), // util.StringTOBsonId("59e9aed040d2d9bbe8521fbb"), // util.StringTOBsonId("59e9aed040d2d9bbe8521feb"), // util.StringTOBsonId("59e9b88f40d2d9bbe852f222"), // util.StringTOBsonId("59efde8640d2d9bbe87677e0"), // util.StringTOBsonId("59efde8640d2d9bbe87677dc"), // util.StringTOBsonId("59f0107b40d2d9bbe87a8935"), // util.StringTOBsonId("5a026a1e40d2d9bbe8ffbbbc"), // util.StringTOBsonId("5a0269c440d2d9bbe8ffb586"), // util.StringTOBsonId("5a02840c40d2d9bbe8019c75"), // util.StringTOBsonId("5a02840c40d2d9bbe8019c80"), // util.StringTOBsonId("5a02be7640d2d9bbe8057516"), // }}} // startTime = 1561828197 //数据正序处理 sess := MQFW.GetMgoConn() if sess == nil { time.Sleep(10 * time.Second) continue } var result []map[string]interface{} sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").All(&result) startTime = endTime log.Println("查询结果:", len(result)) if len(result) == 0 { continue } MQFW.DestoryMongoConn(sess) pool <- &task{result} wg.Add(1) startTime = endTime if day > 0 && day%(1*findPoolSize) == 0 { wg.Wait() // MQFW.Destory() // MQFW = mongodb.MongodbSim{ // MongodbAddr: Sysconfig["mongodbServers"].(string), // Size: 2 * findPoolSize, // DbName: Sysconfig["mongodbName"].(string), // } // MQFW.InitPool() } } }() for { select { case t := <-pool: t.query() t.result = nil t = nil wg.Done() case <-endChan: return } } } type task struct { result []map[string]interface{} } func (t *task) query() { index := 0 wg := &sync.WaitGroup{} for _, tmp := range t.result { if index%2000 == 0 { log.Println(index, tmp["_id"]) } index++ if util.IntAll(tmp["repeat"]) == 1 { continue } pt := util.Int64All(tmp["publishtime"]) if pt > currentMegerTime { currentMegerTime = pt } currentMegerCount++ if currentMegerCount > 600000 { log.Println("执行清理", currentMegerTime) clearPKey() currentMegerCount = 0 } wg.Add(1) MultiThread <- true go func(tmp map[string]interface{}) { defer func() { <-MultiThread wg.Done() }() info := PreThisInfo(tmp) if info != nil && (info.LenPC > 3 || info.LenPN > 3 || info.LenPTC > 3) { startProjectMerge(info, tmp) //thisid := util.BsonIdToSId(tmp["_id"]) //redis.Put(INFOID, thisid, 1, INFOTIMEOUT) currentMegerTime = info.Publishtime } }(tmp) //time.Sleep(10 * time.Microsecond) } wg.Wait() FullCount += index t.result = nil log.Println("currentFull", FullCount) }