|
@@ -3,6 +3,7 @@ package elastic
|
|
|
import (
|
|
|
util "app.yhyue.com/data_processing/common_utils"
|
|
|
"context"
|
|
|
+ "encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
es "github.com/olivere/elastic/v7"
|
|
@@ -82,48 +83,47 @@ func (e *Elastic) GetEsConn() *es.Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//
|
|
|
-//func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} {
|
|
|
-// client := e.GetEsConn()
|
|
|
-// defer func() {
|
|
|
-// go e.DestoryEsConn(client)
|
|
|
-// }()
|
|
|
-// var res []map[string]interface{}
|
|
|
-// if client != nil {
|
|
|
-// defer func() {
|
|
|
-// if r := recover(); r != nil {
|
|
|
-// log.Println("[E]", r)
|
|
|
-// for skip := 1; ; skip++ {
|
|
|
-// _, file, line, ok := runtime.Caller(skip)
|
|
|
-// if !ok {
|
|
|
-// break
|
|
|
-// }
|
|
|
-// go log.Printf("%v,%v\n", file, line)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }()
|
|
|
-// searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
|
|
|
-// if err != nil {
|
|
|
-// log.Println("从ES查询出错", err.Error())
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// if searchResult.Hits != nil {
|
|
|
-// resNum := len(searchResult.Hits.Hits)
|
|
|
-// if resNum < 5000 {
|
|
|
-// res = make([]map[string]interface{}, resNum)
|
|
|
-// for i, hit := range searchResult.Hits.Hits {
|
|
|
-// parseErr := json.Unmarshal(*hit.Source, &res[i])
|
|
|
-// if parseErr == nil && hit.Highlight != nil && res[i] != nil {
|
|
|
-// res[i]["highlight"] = map[string][]string(hit.Highlight)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } else {
|
|
|
-// log.Println("查询结果太多,查询到:", resNum, "条")
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return &res
|
|
|
-//}
|
|
|
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer func() {
|
|
|
+ go e.DestoryEsConn(client)
|
|
|
+ }()
|
|
|
+ var res []map[string]interface{}
|
|
|
+ if client != nil {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ log.Println("[E]", r)
|
|
|
+ for skip := 1; ; skip++ {
|
|
|
+ _, file, line, ok := runtime.Caller(skip)
|
|
|
+ if !ok {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ go log.Printf("%v,%v\n", file, line)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Println("从ES查询出错", err.Error())
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if searchResult.Hits != nil {
|
|
|
+ resNum := len(searchResult.Hits.Hits)
|
|
|
+ if resNum < 5000 {
|
|
|
+ res = make([]map[string]interface{}, resNum)
|
|
|
+ for i, hit := range searchResult.Hits.Hits {
|
|
|
+ parseErr := json.Unmarshal(hit.Source, &res[i])
|
|
|
+ if parseErr == nil && hit.Highlight != nil && res[i] != nil {
|
|
|
+ res[i]["highlight"] = map[string][]string(hit.Highlight)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.Println("查询结果太多,查询到:", resNum, "条")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return &res
|
|
|
+}
|
|
|
|
|
|
//关闭elastic
|
|
|
func (e *Elastic) Close() {
|
|
@@ -169,8 +169,7 @@ func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
|
|
|
//}
|
|
|
id := util.ObjToString(v["_id"])
|
|
|
delete(v, "_id")
|
|
|
- util.Debug(id, v)
|
|
|
- req = req.Add(es.NewBulkIndexRequest().Index(index).Type("_doc").Id(id).Doc(v))
|
|
|
+ req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
|
|
|
}
|
|
|
_, err := req.Do(context.Background())
|
|
|
if err != nil {
|