|
@@ -1,6 +1,7 @@
|
|
package elastic
|
|
package elastic
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
+ util "app.yhyue.com/data_processing/common_utils"
|
|
"context"
|
|
"context"
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
@@ -157,16 +158,18 @@ func (e *Elastic) Close() {
|
|
// }
|
|
// }
|
|
//}
|
|
//}
|
|
|
|
|
|
-func (e *Elastic) BulkSave(index string, obj map[string]interface{}) {
|
|
|
|
|
|
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
|
|
client := e.GetEsConn()
|
|
client := e.GetEsConn()
|
|
defer e.DestoryEsConn(client)
|
|
defer e.DestoryEsConn(client)
|
|
if client != nil {
|
|
if client != nil {
|
|
req := client.Bulk()
|
|
req := client.Bulk()
|
|
- for k, v := range obj {
|
|
|
|
|
|
+ for _, v := range obj {
|
|
//if isDelBefore {
|
|
//if isDelBefore {
|
|
// req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
|
|
// req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
|
|
//}
|
|
//}
|
|
- req = req.Add(es.NewBulkIndexRequest().Index(index).Id(k).Doc(v))
|
|
|
|
|
|
+ id := util.ObjToString(v["_id"])
|
|
|
|
+ delete(v, "_id")
|
|
|
|
+ req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
|
|
}
|
|
}
|
|
_, err := req.Do(context.Background())
|
|
_, err := req.Do(context.Background())
|
|
if err != nil {
|
|
if err != nil {
|