|
@@ -1,74 +0,0 @@
|
|
|
-package main
|
|
|
-
|
|
|
-import (
|
|
|
- "log"
|
|
|
- "qfw/util"
|
|
|
- elastic "qfw/util/elastic"
|
|
|
- "sync"
|
|
|
-
|
|
|
- "gopkg.in/mgo.v2/bson"
|
|
|
-)
|
|
|
-
|
|
|
-func winnerEnterPriseTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
- defer util.Catch()
|
|
|
- q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
- if q == nil {
|
|
|
- q = map[string]interface{}{
|
|
|
- "_id": bson.M{
|
|
|
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
- }
|
|
|
- log.Println("++++++++++++++++++++++")
|
|
|
- session := winnerentermgo.GetMgoConn(1800)
|
|
|
- defer winnerentermgo.DestoryMongoConn(session)
|
|
|
- c, _ := winnerenterprise["collect"].(string)
|
|
|
- db, _ := winnerenterprise["db"].(string)
|
|
|
- index, _ := winnerenterprise["index"].(string)
|
|
|
- itype, _ := winnerenterprise["type"].(string)
|
|
|
- log.Println("index===", index, "itype===", itype)
|
|
|
- count, _ := session.DB(db).C(c).Find(&q).Count()
|
|
|
- savepool := make(chan bool, 10)
|
|
|
- UpdatesLock := sync.Mutex{}
|
|
|
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
|
|
|
- query := session.DB(db).C(c).Find(q).Select(bson.M{"alias": 0, "tmp_id": 0}).Iter()
|
|
|
-
|
|
|
- tmp := []map[string]interface{}{}
|
|
|
- tmp = append(tmp, map[string]interface{}{
|
|
|
- "test": "test",
|
|
|
- })
|
|
|
-
|
|
|
- elastic.BulkSave(index, itype, &tmp, true)
|
|
|
- arrEs := []map[string]interface{}{}
|
|
|
- var n int
|
|
|
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
|
|
|
- //go IS.Add("winner")
|
|
|
- log.Println("tmp=========", tmp)
|
|
|
- UpdatesLock.Lock()
|
|
|
- arrEs = append(arrEs, tmp)
|
|
|
- if len(arrEs) > savesizei {
|
|
|
- tmps := arrEs
|
|
|
- savepool <- true
|
|
|
- go func(tmpn []map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-savepool
|
|
|
- }()
|
|
|
- elastic.BulkSave(index, itype, &tmpn, true)
|
|
|
- }(tmps)
|
|
|
- arrEs = []map[string]interface{}{}
|
|
|
- }
|
|
|
- UpdatesLock.Unlock()
|
|
|
- if n%1000 == 0 {
|
|
|
- log.Println("current:", n, util.BsonIdToSId(tmp["_id"]))
|
|
|
- }
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
- UpdatesLock.Lock()
|
|
|
- if len(arrEs) > 0 {
|
|
|
- tmpn := arrEs
|
|
|
- elastic.BulkSave(index, itype, &tmpn, true)
|
|
|
- }
|
|
|
- UpdatesLock.Unlock()
|
|
|
- log.Println(mapInfo, "create winner_enterprise index...over", n)
|
|
|
-}
|