1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package main
- import (
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "gopkg.in/mgo.v2/bson"
- )
- func winnerTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- session := extractmgo.GetMgoConn(1800)
- defer extractmgo.DestoryMongoConn(session)
- c, _ := winner["collect"].(string)
- db, _ := winner["db"].(string)
- index, _ := winner["index"].(string)
- itype, _ := winner["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{"pici": 0}).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- go IS.Add("winner")
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println(mapInfo, "create winner index...over", n)
- }
|