|
@@ -7,6 +7,7 @@ import (
|
|
|
"fmt"
|
|
|
es "github.com/olivere/elastic/v7"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
+ "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"log"
|
|
|
"runtime"
|
|
|
"strings"
|
|
@@ -467,7 +468,7 @@ func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) er
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-//ExistsIndex 判断索引是否存在
|
|
|
+// ExistsIndex 判断索引是否存在
|
|
|
func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -476,7 +477,7 @@ func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-//CreateIndex 创建索引
|
|
|
+// CreateIndex 创建索引
|
|
|
func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -485,7 +486,7 @@ func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-//DeleteIndex 删除索引
|
|
|
+// DeleteIndex 删除索引
|
|
|
func (e *Elastic) DeleteIndex(index string) (err error) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -494,7 +495,7 @@ func (e *Elastic) DeleteIndex(index string) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-//RemoveAlias 移除别名
|
|
|
+// RemoveAlias 移除别名
|
|
|
func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -503,7 +504,7 @@ func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-//SetAlias 添加别名
|
|
|
+// SetAlias 添加别名
|
|
|
func (e *Elastic) SetAlias(index, aliasName string) (err error) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -512,7 +513,7 @@ func (e *Elastic) SetAlias(index, aliasName string) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-//DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
|
|
|
+// DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
|
|
|
func (e *Elastic) DeleteByID(index, id string) error {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
@@ -546,3 +547,33 @@ func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+// Save 保存对象
|
|
|
+func (e *Elastic) Save(index string, obj interface{}) bool {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ log.Println("[E]", r)
|
|
|
+ for skip := 1; ; skip++ {
|
|
|
+ _, file, line, ok := runtime.Caller(skip)
|
|
|
+ if !ok {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ go log.Printf("%v,%v\n", file, line)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ data := util.ObjToMap(obj)
|
|
|
+ _id := mongodb.BsonIdToSId((*data)["_id"])
|
|
|
+ (*data)["id"] = _id
|
|
|
+ delete((*data), "_id")
|
|
|
+ _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
|
|
|
+ if err != nil {
|
|
|
+ log.Println("保存到ES出错", err.Error(), obj)
|
|
|
+ return false
|
|
|
+ } else {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+}
|