package main import ( "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v7" "log" "strings" "time" ) type EsHit struct { ID string `json:"_id"` } type EsResponse struct { ScrollID string `json:"_scroll_id"` Hits struct { Hits []EsHit `json:"hits"` } `json:"hits"` } func fetchIDs(es *elasticsearch.Client, index string, query string) (map[string]struct{}, error) { ids := make(map[string]struct{}) ctx := context.Background() // 初始化 Scroll 查询 res, err := es.Search( es.Search.WithContext(ctx), es.Search.WithIndex(index), es.Search.WithScroll(time.Minute), es.Search.WithSize(1000), es.Search.WithBody(strings.NewReader(query)), es.Search.WithTrackTotalHits(true), ) if err != nil { return nil, err } defer res.Body.Close() if res.IsError() { return nil, fmt.Errorf("init scroll error: %s", res.String()) } var r EsResponse if err := json.NewDecoder(res.Body).Decode(&r); err != nil { return nil, err } scrollID := r.ScrollID for { // 收集当前批次的 ID for _, hit := range r.Hits.Hits { ids[hit.ID] = struct{}{} } // 如果这一批没有数据,结束 if len(r.Hits.Hits) == 0 { break } // 继续滚动 scrollRes, err := es.Scroll( es.Scroll.WithContext(ctx), es.Scroll.WithScrollID(scrollID), es.Scroll.WithScroll(time.Minute), ) if err != nil { return nil, err } defer scrollRes.Body.Close() if scrollRes.IsError() { return nil, fmt.Errorf("scroll error: %s", scrollRes.String()) } if err := json.NewDecoder(scrollRes.Body).Decode(&r); err != nil { return nil, err } scrollID = r.ScrollID } // 清理 Scroll es.ClearScroll(es.ClearScroll.WithScrollID(scrollID)) return ids, nil } func main() { // 配置两个 ES 客户端(ES 7 也适用) es1, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{"http://172.17.4.184:19908"}, Username: "jybid", Password: "Top2023_JEB01i@31", }) es2, _ := elasticsearch.NewClient(elasticsearch.Config{ Addresses: []string{"http://172.17.4.184:19905"}, Username: "jybid", Password: "Top2023_JEB01i@31", }) indexName := "bidding" queryDSL := `{ "query": { "range": { "id": { "gte": "689681700000000000000000", "lt": "6896aba00000000000000000" } } }, "_source": false, "stored_fields": [] }` fmt.Println("Fetching from cluster 1...") ids1, err := fetchIDs(es1, indexName, queryDSL) if err != nil { log.Fatalf("Cluster1 error: %v", err) } fmt.Println("Fetching from cluster 2...") ids2, err := fetchIDs(es2, indexName, queryDSL) if err != nil { log.Fatalf("Cluster2 error: %v", err) } // 差集:集群1独有 fmt.Printf("Diff (cluster1 - cluster2): %d docs\n", len(ids1)-len(ids2)) for id := range ids1 { if _, ok := ids2[id]; !ok { fmt.Println(id) } } // 差集:集群2独有 fmt.Printf("Diff (cluster2 - cluster1): %d docs\n", len(ids2)-len(ids1)) for id := range ids2 { if _, ok := ids1[id]; !ok { fmt.Println(id) } } }