1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package main
- import (
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "sync"
- "gopkg.in/mgo.v2/bson"
- )
- func winnerEnterPriseTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- log.Println("++++++++++++++++++++++")
- session := winnerentermgo.GetMgoConn(1800)
- defer winnerentermgo.DestoryMongoConn(session)
- c, _ := winnerenterprise["collect"].(string)
- db, _ := winnerenterprise["db"].(string)
- index, _ := winnerenterprise["index"].(string)
- itype, _ := winnerenterprise["type"].(string)
- log.Println("index===", index, "itype===", itype)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- UpdatesLock := sync.Mutex{}
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{"alias": 0, "tmp_id": 0}).Iter()
- tmp := []map[string]interface{}{}
- tmp = append(tmp, map[string]interface{}{
- "test": "test",
- })
- elastic.BulkSave(index, itype, &tmp, true)
- arrEs := []map[string]interface{}{}
- var n int
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- //go IS.Add("winner")
- log.Println("tmp=========", tmp)
- UpdatesLock.Lock()
- arrEs = append(arrEs, tmp)
- if len(arrEs) > savesizei {
- tmps := arrEs
- savepool <- true
- go func(tmpn []map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, &tmpn, true)
- }(tmps)
- arrEs = []map[string]interface{}{}
- }
- UpdatesLock.Unlock()
- if n%1000 == 0 {
- log.Println("current:", n, util.BsonIdToSId(tmp["_id"]))
- }
- tmp = make(map[string]interface{})
- }
- UpdatesLock.Lock()
- if len(arrEs) > 0 {
- tmpn := arrEs
- elastic.BulkSave(index, itype, &tmpn, true)
- }
- UpdatesLock.Unlock()
- log.Println(mapInfo, "create winner_enterprise index...over", n)
- }
|