wkyuer 1 ngày trước cách đây
mục cha
commit
8a1226d15a
1 tập tin đã thay đổi với 52 bổ sung0 xóa
  1. 52 0
      elastic/elasticSim.go

+ 52 - 0
elastic/elasticSim.go

@@ -185,6 +185,58 @@ func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
 	}
 }
 
+// BulkSaveReturnFails 批量保存 返回保存失败的doc
+func (e *Elastic) BulkSaveReturnFails(index string, obj []map[string]interface{}) []map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client == nil {
+		return obj
+	}
+
+	// 存储 ID 与原始数据的映射关系
+	idToData := make(map[string]map[string]interface{})
+	// 收集失败的文档
+	var failedDocs []map[string]interface{}
+
+	req := client.Bulk()
+	for _, v := range obj {
+		id := util.ObjToString(v["_id"])
+		idToData[id] = v // 建立映射
+
+		doc := make(map[string]interface{})
+		for k, va := range v {
+			doc[k] = va
+		}
+		delete(doc, "_id")
+		req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
+	}
+
+	res, err := req.Do(context.Background())
+	if err != nil {
+		log.Println("批量保存到ES出错", err.Error())
+		return obj
+	}
+
+	if res.Errors {
+		for _, item := range res.Items {
+			for _, result := range item {
+				if result.Error != nil {
+					// 通过 ID 找到原始数据并添加到失败列表
+					if originalData, exists := idToData[result.Id]; exists {
+						// 可以在失败数据中添加错误信息,方便排查
+						originalData["_error_reason"] = result.Error.Reason
+						failedDocs = append(failedDocs, originalData)
+					}
+				}
+			}
+		}
+		// 返回失败的文档列表,无错误(因为请求已被处理)
+		return failedDocs
+	}
+	// 全部成功,返回空列表
+	return nil
+}
+
 // 根据id删除索引对象
 func (e *Elastic) DelById(index, id string) bool {
 	client := e.GetEsConn()