|
@@ -3,7 +3,9 @@ package es
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
|
+ "io"
|
|
|
"log"
|
|
|
"net/http"
|
|
|
"runtime"
|
|
@@ -15,6 +17,19 @@ import (
|
|
|
es "github.com/olivere/elastic/v7"
|
|
|
)
|
|
|
|
|
|
+type MySource struct {
|
|
|
+ Query string
|
|
|
+}
|
|
|
+
|
|
|
+func (m *MySource) Source() (interface{}, error) {
|
|
|
+ mp := make(map[string]interface{})
|
|
|
+ err := json.Unmarshal([]byte(m.Query), &mp)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return mp, nil
|
|
|
+}
|
|
|
+
|
|
|
type EsV7 struct {
|
|
|
Address string
|
|
|
UserName string
|
|
@@ -989,3 +1004,38 @@ func (e *EsV7) analyzeResp(text, analyzer, u string) (*http.Request, error) {
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
return req, err
|
|
|
}
|
|
|
+
|
|
|
+func (e *EsV7) Scroll(indexName, scrollTime, query string, f func(fv map[string]interface{})) {
|
|
|
+ client := e.GetEsConn()
|
|
|
+ defer e.DestoryEsConn(client)
|
|
|
+ // 开始滚动查询
|
|
|
+ scrollService := client.Scroll(indexName).Scroll(scrollTime).Body(query)
|
|
|
+ // 滚动 ID 用于追踪滚动上下文
|
|
|
+ var scrollID string
|
|
|
+ for {
|
|
|
+ // 执行滚动查询
|
|
|
+ searchResult, err := scrollService.Do(context.Background())
|
|
|
+ if errors.Is(err, io.EOF) {
|
|
|
+ break
|
|
|
+ } else if err != nil {
|
|
|
+ log.Println("Error executing scroll query: ", err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // 获取滚动 ID
|
|
|
+ scrollID = searchResult.ScrollId
|
|
|
+ // 处理查询结果
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ // 这里假设文档的源数据是 JSON 字符串
|
|
|
+ var source map[string]interface{}
|
|
|
+ json.Unmarshal(hit.Source, &source)
|
|
|
+ f(source)
|
|
|
+ }
|
|
|
+ // 更新滚动服务以使用新的滚动 ID
|
|
|
+ scrollService = client.Scroll(indexName).ScrollId(scrollID).Scroll(scrollTime)
|
|
|
+ }
|
|
|
+ // 清理滚动上下文
|
|
|
+ _, err := client.ClearScroll().ScrollId(scrollID).Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ log.Println("Error clearing scroll: ", err)
|
|
|
+ }
|
|
|
+}
|