123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package main
- import (
- "gopkg.in/mgo.v2/bson"
- "log"
- qu "qfw/util"
- "sync"
- "time"
- )
- func winnerEsTaskOnce() {
- defer qu.Catch()
- now := time.Now()
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
- curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
- task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
- log.Println("区间id:",task_sid,task_eid)
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": qu.StringTOBsonId(task_sid),
- "$lt": qu.StringTOBsonId(task_eid),
- },
- }
- //参数
- threadnum:=qu.IntAll(winner_extract["threadnum"])
- db_c1:=qu.ObjToString(winner_extract["db_c1"])
- db_c2:=qu.ObjToString(winner_extract["db_c2"])
- es_index:=qu.ObjToString(winner_extract["es_index"])
- es_type:=qu.ObjToString(winner_extract["es_type"])
- //mongo
- sess := winnermgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- //es
- EsConn := winner_es.GetEsConn()
- defer winner_es.DestoryEsConn(EsConn)
- it_1 := sess.DB(winnermgo.DbName).C(db_c1).Find(&q).Sort("_id").Iter()
- num_1:=0
- pool := make(chan bool, threadnum)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); it_1.Next(&tmp);num_1++{
- if num_1%100 == 0 && num_1>0{
- log.Println("当前表:",db_c1,"数量:",num_1)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := make(map[string]interface{}, 0)
- tmp_id := qu.BsonIdToSId(tmp["_id"])
- savetmp["_id"] = tmp_id
- savetmp["name"] = tmp["company_name"]
- savetmp["pici"] = tmp["updatetime"]
- if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
- log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter()
- num_2:=0
- for tmp := make(map[string]interface{}); it_2.Next(&tmp);num_2++{
- if num_2%100 == 0 && num_2>0 {
- log.Println("当前表:",db_c2,"数量:",num_1)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := make(map[string]interface{}, 0)
- tmp_id := qu.BsonIdToSId(tmp["_id"])
- savetmp["_id"] = tmp_id
- savetmp["name"] = tmp["name"]
- savetmp["pici"] = tmp["updatetime"]
- if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
- log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("winnerextract 索引完毕! 总计:",num_1+num_2)
- }
|