|
@@ -37,7 +37,7 @@ func buyerEsTaskOnce() {
|
|
|
//参数
|
|
|
buyerent, _ := standard["buyerent"].(map[string]interface{})
|
|
|
buyer_ent := qu.ObjToString(buyerent["collect1"])
|
|
|
- buyer_enterr := qu.ObjToString(buyerent["collect2"])
|
|
|
+ //buyer_enterr := qu.ObjToString(buyerent["collect2"])
|
|
|
index, _ := buyerent["index"].(string)
|
|
|
itype, _ := buyerent["type"].(string)
|
|
|
//mongo
|
|
@@ -101,44 +101,44 @@ func buyerEsTaskOnce() {
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
|
|
|
- log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
|
|
|
- it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
|
|
|
- num_2 := 0
|
|
|
- for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
|
|
|
- if num_2%100 == 0 && num_2 > 0 {
|
|
|
- log.Println("当前表:", buyer_enterr, "数量:", num_2)
|
|
|
- }
|
|
|
- pool <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-pool
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- savetmp := map[string]interface{}{}
|
|
|
- tmp_id := qu.BsonIdToSId(tmp["_id"])
|
|
|
- savetmp["_id"] = tmp_id
|
|
|
- savetmp["name"] = tmp["name"]
|
|
|
- savetmp["buyer_name"] = tmp["name"]
|
|
|
- if tmp["buyerclass"] != nil {
|
|
|
- savetmp["buyerclass"] = tmp["buyerclass"]
|
|
|
- }
|
|
|
- for _, f := range fieldArr {
|
|
|
- if val := qu.ObjToString(tmp[f]); val != "" {
|
|
|
- savetmp[f] = val
|
|
|
- }
|
|
|
- }
|
|
|
- buyerEsLock.Lock()
|
|
|
- arrEs = append(arrEs, savetmp)
|
|
|
- if len(arrEs) >= BulkSize {
|
|
|
- tmps := arrEs
|
|
|
- elastic.BulkSave(index, itype, &tmps, true)
|
|
|
- arrEs = []map[string]interface{}{}
|
|
|
- }
|
|
|
- buyerEsLock.Unlock()
|
|
|
- }(tmp)
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
+ // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
|
|
|
+ // it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
|
|
|
+ // num_2 := 0
|
|
|
+ // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
|
|
|
+ // if num_2%100 == 0 && num_2 > 0 {
|
|
|
+ // log.Println("当前表:", buyer_enterr, "数量:", num_2)
|
|
|
+ // }
|
|
|
+ // pool <- true
|
|
|
+ // wg.Add(1)
|
|
|
+ // go func(tmp map[string]interface{}) {
|
|
|
+ // defer func() {
|
|
|
+ // <-pool
|
|
|
+ // wg.Done()
|
|
|
+ // }()
|
|
|
+ // savetmp := map[string]interface{}{}
|
|
|
+ // tmp_id := qu.BsonIdToSId(tmp["_id"])
|
|
|
+ // savetmp["_id"] = tmp_id
|
|
|
+ // savetmp["name"] = tmp["name"]
|
|
|
+ // savetmp["buyer_name"] = tmp["name"]
|
|
|
+ // if tmp["buyerclass"] != nil {
|
|
|
+ // savetmp["buyerclass"] = tmp["buyerclass"]
|
|
|
+ // }
|
|
|
+ // for _, f := range fieldArr {
|
|
|
+ // if val := qu.ObjToString(tmp[f]); val != "" {
|
|
|
+ // savetmp[f] = val
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // buyerEsLock.Lock()
|
|
|
+ // arrEs = append(arrEs, savetmp)
|
|
|
+ // if len(arrEs) >= BulkSize {
|
|
|
+ // tmps := arrEs
|
|
|
+ // elastic.BulkSave(index, itype, &tmps, true)
|
|
|
+ // arrEs = []map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ // buyerEsLock.Unlock()
|
|
|
+ // }(tmp)
|
|
|
+ // tmp = make(map[string]interface{})
|
|
|
+ // }
|
|
|
|
|
|
wg.Wait()
|
|
|
buyerEsLock.Lock()
|
|
@@ -148,5 +148,5 @@ func buyerEsTaskOnce() {
|
|
|
arrEs = []map[string]interface{}{}
|
|
|
}
|
|
|
buyerEsLock.Unlock()
|
|
|
- log.Println("buyeres 索引完毕! 总计:", num_1+num_2)
|
|
|
+ log.Println("buyeres 索引完毕! 总计:", num_1)
|
|
|
}
|