123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/elastic/go-elasticsearch/v7"
- "log"
- "strings"
- "time"
- )
- type EsHit struct {
- ID string `json:"_id"`
- }
- type EsResponse struct {
- ScrollID string `json:"_scroll_id"`
- Hits struct {
- Hits []EsHit `json:"hits"`
- } `json:"hits"`
- }
- func fetchIDs(es *elasticsearch.Client, index string, query string) (map[string]struct{}, error) {
- ids := make(map[string]struct{})
- ctx := context.Background()
- // 初始化 Scroll 查询
- res, err := es.Search(
- es.Search.WithContext(ctx),
- es.Search.WithIndex(index),
- es.Search.WithScroll(time.Minute),
- es.Search.WithSize(1000),
- es.Search.WithBody(strings.NewReader(query)),
- es.Search.WithTrackTotalHits(true),
- )
- if err != nil {
- return nil, err
- }
- defer res.Body.Close()
- if res.IsError() {
- return nil, fmt.Errorf("init scroll error: %s", res.String())
- }
- var r EsResponse
- if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
- return nil, err
- }
- scrollID := r.ScrollID
- for {
- // 收集当前批次的 ID
- for _, hit := range r.Hits.Hits {
- ids[hit.ID] = struct{}{}
- }
- // 如果这一批没有数据,结束
- if len(r.Hits.Hits) == 0 {
- break
- }
- // 继续滚动
- scrollRes, err := es.Scroll(
- es.Scroll.WithContext(ctx),
- es.Scroll.WithScrollID(scrollID),
- es.Scroll.WithScroll(time.Minute),
- )
- if err != nil {
- return nil, err
- }
- defer scrollRes.Body.Close()
- if scrollRes.IsError() {
- return nil, fmt.Errorf("scroll error: %s", scrollRes.String())
- }
- if err := json.NewDecoder(scrollRes.Body).Decode(&r); err != nil {
- return nil, err
- }
- scrollID = r.ScrollID
- }
- // 清理 Scroll
- es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
- return ids, nil
- }
- func main() {
- // 配置两个 ES 客户端(ES 7 也适用)
- es1, _ := elasticsearch.NewClient(elasticsearch.Config{
- Addresses: []string{"http://172.17.4.184:19908"},
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- })
- es2, _ := elasticsearch.NewClient(elasticsearch.Config{
- Addresses: []string{"http://172.17.4.184:19905"},
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- })
- indexName := "bidding"
- queryDSL := `{
- "query": {
- "range": {
- "id": {
- "gte": "689681700000000000000000",
- "lt": "6896aba00000000000000000"
- }
- }
- },
- "_source": false,
- "stored_fields": []
- }`
- fmt.Println("Fetching from cluster 1...")
- ids1, err := fetchIDs(es1, indexName, queryDSL)
- if err != nil {
- log.Fatalf("Cluster1 error: %v", err)
- }
- fmt.Println("Fetching from cluster 2...")
- ids2, err := fetchIDs(es2, indexName, queryDSL)
- if err != nil {
- log.Fatalf("Cluster2 error: %v", err)
- }
- // 差集:集群1独有
- fmt.Printf("Diff (cluster1 - cluster2): %d docs\n", len(ids1)-len(ids2))
- for id := range ids1 {
- if _, ok := ids2[id]; !ok {
- fmt.Println(id)
- }
- }
- // 差集:集群2独有
- fmt.Printf("Diff (cluster2 - cluster1): %d docs\n", len(ids2)-len(ids1))
- for id := range ids2 {
- if _, ok := ids1[id]; !ok {
- fmt.Println(id)
- }
- }
- }
|