package main import ( "gopkg.in/mgo.v2/bson" "log" qu "qfw/util" "sync" "time" ) func winnerEsTaskOnce() { defer qu.Catch() now := time.Now() preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime)) task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime)) log.Println("区间id:",task_sid,task_eid) //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": qu.StringTOBsonId(task_sid), "$lt": qu.StringTOBsonId(task_eid), }, } //参数 threadnum:=qu.IntAll(winner_extract["threadnum"]) db_c1:=qu.ObjToString(winner_extract["db_c1"]) db_c2:=qu.ObjToString(winner_extract["db_c2"]) es_index:=qu.ObjToString(winner_extract["es_index"]) es_type:=qu.ObjToString(winner_extract["es_type"]) //mongo sess := winnermgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) //es EsConn := winner_es.GetEsConn() defer winner_es.DestoryEsConn(EsConn) it_1 := sess.DB(winnermgo.DbName).C(db_c1).Find(&q).Sort("_id").Iter() num_1:=0 pool := make(chan bool, threadnum) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); it_1.Next(&tmp);num_1++{ if num_1%100 == 0 && num_1>0{ log.Println("当前表:",db_c1,"数量:",num_1) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := make(map[string]interface{}, 0) tmp_id := qu.BsonIdToSId(tmp["_id"]) savetmp["_id"] = tmp_id savetmp["name"] = tmp["company_name"] savetmp["pici"] = tmp["updatetime"] if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil { log.Println("save es err :", tmp["_id"], savetmp["_id"], err) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter() num_2:=0 for tmp := make(map[string]interface{}); it_2.Next(&tmp);num_2++{ if num_2%100 == 0 && num_2>0 { log.Println("当前表:",db_c2,"数量:",num_1) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := make(map[string]interface{}, 0) tmp_id := qu.BsonIdToSId(tmp["_id"]) savetmp["_id"] = tmp_id savetmp["name"] = tmp["name"] savetmp["pici"] = tmp["updatetime"] if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil { log.Println("save es err :", tmp["_id"], savetmp["_id"], err) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("winnerextract 索引完毕! 总计:",num_1+num_2) }