package main import ( "log" "qfw/util" elastic "qfw/util/elastic" "sync" "gopkg.in/mgo.v2/bson" ) func winnerEnterPriseTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } } log.Println("++++++++++++++++++++++") session := winnerentermgo.GetMgoConn(1800) defer winnerentermgo.DestoryMongoConn(session) c, _ := winnerenterprise["collect"].(string) db, _ := winnerenterprise["db"].(string) index, _ := winnerenterprise["index"].(string) itype, _ := winnerenterprise["type"].(string) log.Println("index===", index, "itype===", itype) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Select(bson.M{"alias": 0, "tmp_id": 0}).Iter() tmp := []map[string]interface{}{} tmp = append(tmp, map[string]interface{}{ "test": "test", }) elastic.BulkSave(index, itype, &tmp, true) arrEs := []map[string]interface{}{} var n int for tmp := make(map[string]interface{}); query.Next(tmp); n++ { //go IS.Add("winner") log.Println("tmp=========", tmp) UpdatesLock.Lock() arrEs = append(arrEs, tmp) if len(arrEs) > savesizei { tmps := arrEs savepool <- true go func(tmpn []map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, &tmpn, true) }(tmps) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%1000 == 0 { log.Println("current:", n, util.BsonIdToSId(tmp["_id"])) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arrEs) > 0 { tmpn := arrEs elastic.BulkSave(index, itype, &tmpn, true) } UpdatesLock.Unlock() log.Println(mapInfo, "create winner_enterprise index...over", n) }