|
@@ -419,3 +419,46 @@ func (e *Elastic) DeleteBulk(index string, ids []string) {
|
|
|
fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+//InsertOrUpdate 插入或更新
|
|
|
+func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+
|
|
|
+ for _, item := range docs {
|
|
|
+ // 获取唯一标识符
|
|
|
+ id := item["id"].(string)
|
|
|
+ if id == "" {
|
|
|
+ id = item["_id"].(string)
|
|
|
+ }
|
|
|
+ // 根据id判断记录是否存在
|
|
|
+ exists, err := client.Exists().
|
|
|
+ Index(index).
|
|
|
+ Id(id).
|
|
|
+ Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ // 存在则更新,不存在则插入
|
|
|
+ if exists {
|
|
|
+ _, err := client.Update().
|
|
|
+ Index(index).
|
|
|
+ Id(id).
|
|
|
+ Doc(item).
|
|
|
+ Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ _, err := client.Index().
|
|
|
+ Index(index).
|
|
|
+ Id(id).
|
|
|
+ BodyJson(item).
|
|
|
+ Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|