|
@@ -1,12 +1,12 @@
|
|
|
package elastic
|
|
|
|
|
|
import (
|
|
|
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
es "github.com/olivere/elastic/v7"
|
|
|
+ util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"log"
|
|
|
"runtime"
|
|
|
"strings"
|
|
@@ -37,7 +37,7 @@ func (e *Elastic) InitElasticSize() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//关闭连接
|
|
|
+// 关闭连接
|
|
|
func (e *Elastic) DestoryEsConn(client *es.Client) {
|
|
|
select {
|
|
|
case e.Pool <- client:
|
|
@@ -127,7 +127,7 @@ func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
|
|
|
return &res
|
|
|
}
|
|
|
|
|
|
-//关闭elastic
|
|
|
+// 关闭elastic
|
|
|
func (e *Elastic) Close() {
|
|
|
for i := 0; i < e.I_size; i++ {
|
|
|
cli := <-e.Pool
|
|
@@ -180,7 +180,59 @@ func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//根据id删除索引对象
|
|
|
+// BulkSaveReturnFails 批量保存 返回保存失败的doc
|
|
|
+func (e *Elastic) BulkSaveReturnFails(index string, obj []map[string]interface{}) []map[string]interface{} {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ if client == nil {
|
|
|
+ return obj
|
|
|
+ }
|
|
|
+
|
|
|
+ // 存储 ID 与原始数据的映射关系
|
|
|
+ idToData := make(map[string]map[string]interface{})
|
|
|
+ // 收集失败的文档
|
|
|
+ var failedDocs []map[string]interface{}
|
|
|
+
|
|
|
+ req := client.Bulk()
|
|
|
+ for _, v := range obj {
|
|
|
+ id := util.ObjToString(v["_id"])
|
|
|
+ idToData[id] = v // 建立映射
|
|
|
+
|
|
|
+ doc := make(map[string]interface{})
|
|
|
+ for k, va := range v {
|
|
|
+ doc[k] = va
|
|
|
+ }
|
|
|
+ delete(doc, "_id")
|
|
|
+ req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
|
|
|
+ }
|
|
|
+
|
|
|
+ res, err := req.Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Println("批量保存到ES出错", err.Error())
|
|
|
+ return obj
|
|
|
+ }
|
|
|
+
|
|
|
+ if res.Errors {
|
|
|
+ for _, item := range res.Items {
|
|
|
+ for _, result := range item {
|
|
|
+ if result.Error != nil {
|
|
|
+ // 通过 ID 找到原始数据并添加到失败列表
|
|
|
+ if originalData, exists := idToData[result.Id]; exists {
|
|
|
+ // 可以在失败数据中添加错误信息,方便排查
|
|
|
+ originalData["_error_reason"] = result.Error.Reason
|
|
|
+ failedDocs = append(failedDocs, originalData)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 返回失败的文档列表,无错误(因为请求已被处理)
|
|
|
+ return failedDocs
|
|
|
+ }
|
|
|
+ // 全部成功,返回空列表
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// 根据id删除索引对象
|
|
|
func (e *Elastic) DelById(index, id string) bool {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|