|
@@ -4,10 +4,12 @@ import (
|
|
|
"log"
|
|
|
"qfw/util"
|
|
|
elastic "qfw/util/elastic"
|
|
|
+ "sync"
|
|
|
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
+/*
|
|
|
func buyerTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
defer util.Catch()
|
|
|
q, _ := mapInfo["query"].(map[string]interface{})
|
|
@@ -67,3 +69,92 @@ func buyerTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
log.Println(mapInfo, "create buyer index...over", n)
|
|
|
}
|
|
|
+*/
|
|
|
+
|
|
|
+func buyerTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+ q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
+ if q == nil {
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": bson.M{
|
|
|
+ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //mongo
|
|
|
+ sess := mgostandard.GetMgoConn()
|
|
|
+ defer mgostandard.DestoryMongoConn(sess)
|
|
|
+ c, _ := buyer["collect"].(string)
|
|
|
+ db, _ := buyer["db"].(string)
|
|
|
+ index, _ := buyer["index"].(string)
|
|
|
+ itype, _ := buyer["type"].(string)
|
|
|
+
|
|
|
+ count, _ := sess.DB(db).C(c).Find(&q).Count()
|
|
|
+ log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
|
|
|
+ it := sess.DB(db).C(c).Find(&q).Select(map[string]interface{}{
|
|
|
+ "name": 1,
|
|
|
+ "buyer_name": 1,
|
|
|
+ "institute_type": 1,
|
|
|
+ "buyerclass": 1,
|
|
|
+ "fixedphone": 1,
|
|
|
+ "mobilephone": 1,
|
|
|
+ "latestfixedphone": 1,
|
|
|
+ "latestmobilephone": 1,
|
|
|
+ "province": 1,
|
|
|
+ "city": 1,
|
|
|
+ }).Sort("_id").Iter()
|
|
|
+
|
|
|
+ arrEs := []map[string]interface{}{}
|
|
|
+ buyerEsLock := &sync.Mutex{}
|
|
|
+ pool := make(chan bool, 3)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ i := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
|
|
|
+ if i%1000 == 0 {
|
|
|
+ log.Println("current:", i)
|
|
|
+ }
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ util.Debug(tmp)
|
|
|
+ savetmp := map[string]interface{}{}
|
|
|
+ tmp_id := util.BsonIdToSId(tmp["_id"])
|
|
|
+ savetmp["_id"] = tmp_id
|
|
|
+ savetmp["name"] = tmp["name"]
|
|
|
+ savetmp["buyer_name"] = tmp["name"]
|
|
|
+ util.Debug(tmp["buyerclass"])
|
|
|
+ if tmp["buyerclass"] != nil {
|
|
|
+ savetmp["buyerclass"] = tmp["buyerclass"]
|
|
|
+ }
|
|
|
+ for _, f := range fieldArr {
|
|
|
+ if val := util.ObjToString(tmp[f]); val != "" {
|
|
|
+ savetmp[f] = val
|
|
|
+ }
|
|
|
+ }
|
|
|
+ util.Debug(savetmp)
|
|
|
+ buyerEsLock.Lock()
|
|
|
+ arrEs = append(arrEs, savetmp)
|
|
|
+ if len(arrEs) >= BulkSize {
|
|
|
+ tmps := arrEs
|
|
|
+ elastic.BulkSave(index, itype, &tmps, true)
|
|
|
+ arrEs = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ buyerEsLock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ buyerEsLock.Lock()
|
|
|
+ if len(arrEs) > 0 {
|
|
|
+ tmps := arrEs
|
|
|
+ elastic.BulkSave(index, itype, &tmps, true)
|
|
|
+ arrEs = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ buyerEsLock.Unlock()
|
|
|
+ log.Println(mapInfo, "create buyer index...over", i)
|
|
|
+}
|