Browse Source

增加滚动查询

wangchuanjin 1 year ago
parent
commit
42a82b37ae
2 changed files with 11 additions and 3 deletions
  1. 2 1
      es/es_test.go
  2. 9 2
      es/esv7.go

+ 2 - 1
es/es_test.go

@@ -61,7 +61,8 @@ func TestScroll(t *testing.T) {
 	    }
 	  },"_source":["id"]
 	}`
-	VarEs.(*EsV7).Scroll("projectset", "5m", q, func(fv map[string]interface{}) {
+	VarEs.(*EsV7).Scroll("projectset", "5m", q, func(fv map[string]interface{}) bool {
 		log.Println(fv)
+		return true
 	})
 }

+ 9 - 2
es/esv7.go

@@ -1005,7 +1005,7 @@ func (e *EsV7) analyzeResp(text, analyzer, u string) (*http.Request, error) {
 	return req, err
 }
 
-func (e *EsV7) Scroll(indexName, scrollTime, query string, f func(fv map[string]interface{})) {
+func (e *EsV7) Scroll(indexName, scrollTime, query string, f func(fv map[string]interface{}) bool) {
 	client := e.GetEsConn()
 	defer e.DestoryEsConn(client)
 	// 开始滚动查询
@@ -1024,11 +1024,18 @@ func (e *EsV7) Scroll(indexName, scrollTime, query string, f func(fv map[string]
 		// 获取滚动 ID
 		scrollID = searchResult.ScrollId
 		// 处理查询结果
+		isBreak := false
 		for _, hit := range searchResult.Hits.Hits {
 			// 这里假设文档的源数据是 JSON 字符串
 			var source map[string]interface{}
 			json.Unmarshal(hit.Source, &source)
-			f(source)
+			if !f(source) {
+				isBreak = true
+				break
+			}
+		}
+		if isBreak {
+			break
 		}
 		// 更新滚动服务以使用新的滚动 ID
 		scrollService = client.Scroll(indexName).ScrollId(scrollID).Scroll(scrollTime)