|
@@ -1,7 +1,9 @@
|
|
|
package elastic
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"encoding/json"
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
|
es "gopkg.in/olivere/elastic.v2"
|
|
|
"log"
|
|
@@ -355,3 +357,60 @@ func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[str
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// UpdateBulk 批量修改文档
|
|
|
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ //bulkService := client.Bulk().Index(index).Refresh("true")
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService.Type(itype)
|
|
|
+ for _, d := range docs {
|
|
|
+ id := d[0]["_id"].(string)
|
|
|
+ doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
|
|
|
+ bulkService.Add(doc)
|
|
|
+ }
|
|
|
+ _, err := bulkService.Do()
|
|
|
+ if err != nil {
|
|
|
+ fmt.Printf("UpdateBulk all success err is %v\n", err)
|
|
|
+ }
|
|
|
+ //if len(res.Failed()) > 0 {
|
|
|
+ // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
|
|
|
+ //}
|
|
|
+}
|
|
|
+
|
|
|
+// UpsertBulk 批量修改文档(不存在则插入)
|
|
|
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService.Type("bidding")
|
|
|
+ for i := range ids {
|
|
|
+ doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
|
|
|
+ bulkService.Add(doc)
|
|
|
+ }
|
|
|
+ res, err := bulkService.Do()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if len(res.Failed()) > 0 {
|
|
|
+ return errors.New(res.Failed()[0].Error)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// 批量删除
|
|
|
+func (e *Elastic) DeleteBulk(index string, ids []string) {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService.Type("bidding")
|
|
|
+ for i := range ids {
|
|
|
+ req := es.NewBulkDeleteRequest().Id(ids[i])
|
|
|
+ bulkService.Add(req)
|
|
|
+ }
|
|
|
+ res, err := bulkService.Do()
|
|
|
+ if err != nil {
|
|
|
+ fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
|
|
|
+ }
|
|
|
+}
|