|
@@ -2,10 +2,9 @@ package elastic
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
- "encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
- es "gopkg.in/olivere/elastic.v2"
|
|
|
+ es "github.com/olivere/elastic/v7"
|
|
|
"log"
|
|
|
"runtime"
|
|
|
"strings"
|
|
@@ -82,47 +81,48 @@ func (e *Elastic) GetEsConn() *es.Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} {
|
|
|
- client := e.GetEsConn()
|
|
|
- defer func() {
|
|
|
- go e.DestoryEsConn(client)
|
|
|
- }()
|
|
|
- var res []map[string]interface{}
|
|
|
- if client != nil {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- log.Println("[E]", r)
|
|
|
- for skip := 1; ; skip++ {
|
|
|
- _, file, line, ok := runtime.Caller(skip)
|
|
|
- if !ok {
|
|
|
- break
|
|
|
- }
|
|
|
- go log.Printf("%v,%v\n", file, line)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("从ES查询出错", err.Error())
|
|
|
- return nil
|
|
|
- }
|
|
|
- if searchResult.Hits != nil {
|
|
|
- resNum := len(searchResult.Hits.Hits)
|
|
|
- if resNum < 5000 {
|
|
|
- res = make([]map[string]interface{}, resNum)
|
|
|
- for i, hit := range searchResult.Hits.Hits {
|
|
|
- parseErr := json.Unmarshal(*hit.Source, &res[i])
|
|
|
- if parseErr == nil && hit.Highlight != nil && res[i] != nil {
|
|
|
- res[i]["highlight"] = map[string][]string(hit.Highlight)
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- log.Println("查询结果太多,查询到:", resNum, "条")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return &res
|
|
|
-}
|
|
|
+//
|
|
|
+//func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} {
|
|
|
+// client := e.GetEsConn()
|
|
|
+// defer func() {
|
|
|
+// go e.DestoryEsConn(client)
|
|
|
+// }()
|
|
|
+// var res []map[string]interface{}
|
|
|
+// if client != nil {
|
|
|
+// defer func() {
|
|
|
+// if r := recover(); r != nil {
|
|
|
+// log.Println("[E]", r)
|
|
|
+// for skip := 1; ; skip++ {
|
|
|
+// _, file, line, ok := runtime.Caller(skip)
|
|
|
+// if !ok {
|
|
|
+// break
|
|
|
+// }
|
|
|
+// go log.Printf("%v,%v\n", file, line)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }()
|
|
|
+// searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// log.Println("从ES查询出错", err.Error())
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+// if searchResult.Hits != nil {
|
|
|
+// resNum := len(searchResult.Hits.Hits)
|
|
|
+// if resNum < 5000 {
|
|
|
+// res = make([]map[string]interface{}, resNum)
|
|
|
+// for i, hit := range searchResult.Hits.Hits {
|
|
|
+// parseErr := json.Unmarshal(*hit.Source, &res[i])
|
|
|
+// if parseErr == nil && hit.Highlight != nil && res[i] != nil {
|
|
|
+// res[i]["highlight"] = map[string][]string(hit.Highlight)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// log.Println("查询结果太多,查询到:", resNum, "条")
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return &res
|
|
|
+//}
|
|
|
|
|
|
//关闭elastic
|
|
|
func (e *Elastic) Close() {
|
|
@@ -168,7 +168,7 @@ func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, i
|
|
|
}
|
|
|
req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
|
|
|
}
|
|
|
- _, err := req.Do()
|
|
|
+ _, err := req.Do(context.Background())
|
|
|
if err != nil {
|
|
|
log.Println("批量保存到ES出错", err.Error())
|
|
|
}
|
|
@@ -182,7 +182,7 @@ func (e *Elastic) DelById(index, itype, id string) bool {
|
|
|
b := false
|
|
|
if client != nil {
|
|
|
var err error
|
|
|
- _, err = client.Delete().Index(index).Type(itype).Id(id).Do()
|
|
|
+ _, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
|
|
|
if err != nil {
|
|
|
log.Println("更新检索出错:", err.Error())
|
|
|
} else {
|
|
@@ -191,80 +191,82 @@ func (e *Elastic) DelById(index, itype, id string) bool {
|
|
|
}
|
|
|
return b
|
|
|
}
|
|
|
-func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} {
|
|
|
- //log.Println("query -- ", query)
|
|
|
- client := e.GetEsConn()
|
|
|
- defer e.DestoryEsConn(client)
|
|
|
- var res []map[string]interface{}
|
|
|
- if client != nil {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- log.Println("[E]", r)
|
|
|
- for skip := 1; ; skip++ {
|
|
|
- _, file, line, ok := runtime.Caller(skip)
|
|
|
- if !ok {
|
|
|
- break
|
|
|
- }
|
|
|
- go log.Printf("%v,%v\n", file, line)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("从ES查询出错", err.Error())
|
|
|
- return nil
|
|
|
- }
|
|
|
|
|
|
- if searchResult.Hits != nil {
|
|
|
- resNum := len(searchResult.Hits.Hits)
|
|
|
- res = make([]map[string]interface{}, resNum)
|
|
|
- for i, hit := range searchResult.Hits.Hits {
|
|
|
- json.Unmarshal(*hit.Source, &res[i])
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return &res
|
|
|
-}
|
|
|
-func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
|
|
|
- client := e.GetEsConn()
|
|
|
- defer e.DestoryEsConn(client)
|
|
|
- if client != nil {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- log.Println("[E]", r)
|
|
|
- for skip := 1; ; skip++ {
|
|
|
- _, file, line, ok := runtime.Caller(skip)
|
|
|
- if !ok {
|
|
|
- break
|
|
|
- }
|
|
|
- go log.Printf("%v,%v\n", file, line)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- query := `{"query":{"term":{"_id":"` + id + `"}}`
|
|
|
- if len(fields) > 0 {
|
|
|
- query = query + `,"_source":[` + fields + `]`
|
|
|
- }
|
|
|
- query = query + "}"
|
|
|
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("从ES查询出错", err.Error())
|
|
|
- return nil
|
|
|
- }
|
|
|
- var res map[string]interface{}
|
|
|
- if searchResult.Hits != nil {
|
|
|
- resNum := len(searchResult.Hits.Hits)
|
|
|
- if resNum == 1 {
|
|
|
- res = make(map[string]interface{})
|
|
|
- for _, hit := range searchResult.Hits.Hits {
|
|
|
- json.Unmarshal(*hit.Source, &res)
|
|
|
- }
|
|
|
- return &res
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
+//func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} {
|
|
|
+// //log.Println("query -- ", query)
|
|
|
+// client := e.GetEsConn()
|
|
|
+// defer e.DestoryEsConn(client)
|
|
|
+// var res []map[string]interface{}
|
|
|
+// if client != nil {
|
|
|
+// defer func() {
|
|
|
+// if r := recover(); r != nil {
|
|
|
+// log.Println("[E]", r)
|
|
|
+// for skip := 1; ; skip++ {
|
|
|
+// _, file, line, ok := runtime.Caller(skip)
|
|
|
+// if !ok {
|
|
|
+// break
|
|
|
+// }
|
|
|
+// go log.Printf("%v,%v\n", file, line)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }()
|
|
|
+// searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// log.Println("从ES查询出错", err.Error())
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+//
|
|
|
+// if searchResult.Hits != nil {
|
|
|
+// resNum := len(searchResult.Hits.Hits)
|
|
|
+// res = make([]map[string]interface{}, resNum)
|
|
|
+// for i, hit := range searchResult.Hits.Hits {
|
|
|
+// json.Unmarshal(*hit.Source, &res[i])
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return &res
|
|
|
+//}
|
|
|
+
|
|
|
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
|
|
|
+// client := e.GetEsConn()
|
|
|
+// defer e.DestoryEsConn(client)
|
|
|
+// if client != nil {
|
|
|
+// defer func() {
|
|
|
+// if r := recover(); r != nil {
|
|
|
+// log.Println("[E]", r)
|
|
|
+// for skip := 1; ; skip++ {
|
|
|
+// _, file, line, ok := runtime.Caller(skip)
|
|
|
+// if !ok {
|
|
|
+// break
|
|
|
+// }
|
|
|
+// go log.Printf("%v,%v\n", file, line)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }()
|
|
|
+// query := `{"query":{"term":{"_id":"` + id + `"}}`
|
|
|
+// if len(fields) > 0 {
|
|
|
+// query = query + `,"_source":[` + fields + `]`
|
|
|
+// }
|
|
|
+// query = query + "}"
|
|
|
+// searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
|
|
|
+// if err != nil {
|
|
|
+// log.Println("从ES查询出错", err.Error())
|
|
|
+// return nil
|
|
|
+// }
|
|
|
+// var res map[string]interface{}
|
|
|
+// if searchResult.Hits != nil {
|
|
|
+// resNum := len(searchResult.Hits.Hits)
|
|
|
+// if resNum == 1 {
|
|
|
+// res = make(map[string]interface{})
|
|
|
+// for _, hit := range searchResult.Hits.Hits {
|
|
|
+// json.Unmarshal(*hit.Source., &res)
|
|
|
+// }
|
|
|
+// return &res
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return nil
|
|
|
+//}
|
|
|
|
|
|
func (e *Elastic) Count(index, itype string, query interface{}) int64 {
|
|
|
client := e.GetEsConn()
|
|
@@ -286,7 +288,7 @@ func (e *Elastic) Count(index, itype string, query interface{}) int64 {
|
|
|
if qi, ok2 := query.(es.Query); ok2 {
|
|
|
qq = qi
|
|
|
}
|
|
|
- n, err := client.Count(index).Type(itype).Query(qq).Do()
|
|
|
+ n, err := client.Count(index).Query(qq).Do(context.Background())
|
|
|
if err != nil {
|
|
|
log.Println("统计出错", err.Error())
|
|
|
}
|
|
@@ -297,80 +299,79 @@ func (e *Elastic) Count(index, itype string, query interface{}) int64 {
|
|
|
}
|
|
|
|
|
|
//更新一个字段
|
|
|
-func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
|
|
|
- client := e.GetEsConn()
|
|
|
- defer e.DestoryEsConn(client)
|
|
|
- if client != nil {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- log.Println("[E]", r)
|
|
|
- for skip := 1; ; skip++ {
|
|
|
- _, file, line, ok := runtime.Caller(skip)
|
|
|
- if !ok {
|
|
|
- break
|
|
|
- }
|
|
|
- go log.Printf("%v,%v\n", file, line)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- for _, data := range update {
|
|
|
- id := data["id"]
|
|
|
- updateStr := data["updateStr"]
|
|
|
- if id != "" && updateStr != "" {
|
|
|
- _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("更新检索出错:", err.Error())
|
|
|
- }
|
|
|
- } else {
|
|
|
- log.Println("数据错误")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
|
|
|
+// client := e.GetEsConn()
|
|
|
+// defer e.DestoryEsConn(client)
|
|
|
+// if client != nil {
|
|
|
+// defer func() {
|
|
|
+// if r := recover(); r != nil {
|
|
|
+// log.Println("[E]", r)
|
|
|
+// for skip := 1; ; skip++ {
|
|
|
+// _, file, line, ok := runtime.Caller(skip)
|
|
|
+// if !ok {
|
|
|
+// break
|
|
|
+// }
|
|
|
+// go log.Printf("%v,%v\n", file, line)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }()
|
|
|
+// for _, data := range update {
|
|
|
+// id := data["id"]
|
|
|
+// updateStr := data["updateStr"]
|
|
|
+// if id != "" && updateStr != "" {
|
|
|
+// _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
|
|
|
+// if err != nil {
|
|
|
+// log.Println("更新检索出错:", err.Error())
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// log.Println("数据错误")
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|
|
|
|
|
|
//更新多个字段
|
|
|
-func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
|
|
|
- client := e.GetEsConn()
|
|
|
- defer e.DestoryEsConn(client)
|
|
|
- if client != nil {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- log.Println("[E]", r)
|
|
|
- for skip := 1; ; skip++ {
|
|
|
- _, file, line, ok := runtime.Caller(skip)
|
|
|
- if !ok {
|
|
|
- break
|
|
|
- }
|
|
|
- go log.Printf("%v,%v\n", file, line)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- for _, arr := range arrs {
|
|
|
- id := arr[0]["id"].(string)
|
|
|
- update := arr[1]["update"].([]string)
|
|
|
- for _, str := range update {
|
|
|
- _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("更新检索出错:", err.Error())
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
|
|
|
+// client := e.GetEsConn()
|
|
|
+// defer e.DestoryEsConn(client)
|
|
|
+// if client != nil {
|
|
|
+// defer func() {
|
|
|
+// if r := recover(); r != nil {
|
|
|
+// log.Println("[E]", r)
|
|
|
+// for skip := 1; ; skip++ {
|
|
|
+// _, file, line, ok := runtime.Caller(skip)
|
|
|
+// if !ok {
|
|
|
+// break
|
|
|
+// }
|
|
|
+// go log.Printf("%v,%v\n", file, line)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }()
|
|
|
+// for _, arr := range arrs {
|
|
|
+// id := arr[0]["id"].(string)
|
|
|
+// update := arr[1]["update"].([]string)
|
|
|
+// for _, str := range update {
|
|
|
+// _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
|
|
|
+// if err != nil {
|
|
|
+// log.Println("更新检索出错:", err.Error())
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|
|
|
|
|
|
// UpdateBulk 批量修改文档
|
|
|
func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
|
- //bulkService := client.Bulk().Index(index).Refresh("true")
|
|
|
- bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh("true")
|
|
|
bulkService.Type(itype)
|
|
|
for _, d := range docs {
|
|
|
id := d[0]["_id"].(string)
|
|
|
doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
|
|
|
bulkService.Add(doc)
|
|
|
}
|
|
|
- _, err := bulkService.Do()
|
|
|
+ _, err := bulkService.Do(context.Background())
|
|
|
if err != nil {
|
|
|
fmt.Printf("UpdateBulk all success err is %v\n", err)
|
|
|
}
|
|
@@ -383,18 +384,18 @@ func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface
|
|
|
func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
|
- bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh("true")
|
|
|
bulkService.Type("bidding")
|
|
|
for i := range ids {
|
|
|
doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
|
|
|
bulkService.Add(doc)
|
|
|
}
|
|
|
- res, err := bulkService.Do()
|
|
|
+ res, err := bulkService.Do(context.Background())
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
if len(res.Failed()) > 0 {
|
|
|
- return errors.New(res.Failed()[0].Error)
|
|
|
+ return errors.New(res.Failed()[0].Error.Reason)
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
@@ -403,13 +404,13 @@ func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, do
|
|
|
func (e *Elastic) DeleteBulk(index string, ids []string) {
|
|
|
client := e.GetEsConn()
|
|
|
defer e.DestoryEsConn(client)
|
|
|
- bulkService := client.Bulk().Index(index).Refresh(true)
|
|
|
+ bulkService := client.Bulk().Index(index).Refresh("true")
|
|
|
bulkService.Type("bidding")
|
|
|
for i := range ids {
|
|
|
req := es.NewBulkDeleteRequest().Id(ids[i])
|
|
|
bulkService.Add(req)
|
|
|
}
|
|
|
- res, err := bulkService.Do()
|
|
|
+ res, err := bulkService.Do(context.Background())
|
|
|
if err != nil {
|
|
|
fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
|
|
|
}
|