main.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/elastic/go-elasticsearch/v7"
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. type EsHit struct {
  12. ID string `json:"_id"`
  13. }
  14. type EsResponse struct {
  15. ScrollID string `json:"_scroll_id"`
  16. Hits struct {
  17. Hits []EsHit `json:"hits"`
  18. } `json:"hits"`
  19. }
  20. func fetchIDs(es *elasticsearch.Client, index string, query string) (map[string]struct{}, error) {
  21. ids := make(map[string]struct{})
  22. ctx := context.Background()
  23. // 初始化 Scroll 查询
  24. res, err := es.Search(
  25. es.Search.WithContext(ctx),
  26. es.Search.WithIndex(index),
  27. es.Search.WithScroll(time.Minute),
  28. es.Search.WithSize(1000),
  29. es.Search.WithBody(strings.NewReader(query)),
  30. es.Search.WithTrackTotalHits(true),
  31. )
  32. if err != nil {
  33. return nil, err
  34. }
  35. defer res.Body.Close()
  36. if res.IsError() {
  37. return nil, fmt.Errorf("init scroll error: %s", res.String())
  38. }
  39. var r EsResponse
  40. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  41. return nil, err
  42. }
  43. scrollID := r.ScrollID
  44. for {
  45. // 收集当前批次的 ID
  46. for _, hit := range r.Hits.Hits {
  47. ids[hit.ID] = struct{}{}
  48. }
  49. // 如果这一批没有数据,结束
  50. if len(r.Hits.Hits) == 0 {
  51. break
  52. }
  53. // 继续滚动
  54. scrollRes, err := es.Scroll(
  55. es.Scroll.WithContext(ctx),
  56. es.Scroll.WithScrollID(scrollID),
  57. es.Scroll.WithScroll(time.Minute),
  58. )
  59. if err != nil {
  60. return nil, err
  61. }
  62. defer scrollRes.Body.Close()
  63. if scrollRes.IsError() {
  64. return nil, fmt.Errorf("scroll error: %s", scrollRes.String())
  65. }
  66. if err := json.NewDecoder(scrollRes.Body).Decode(&r); err != nil {
  67. return nil, err
  68. }
  69. scrollID = r.ScrollID
  70. }
  71. // 清理 Scroll
  72. es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
  73. return ids, nil
  74. }
  75. func main() {
  76. // 配置两个 ES 客户端(ES 7 也适用)
  77. es1, _ := elasticsearch.NewClient(elasticsearch.Config{
  78. Addresses: []string{"http://172.17.4.184:19908"},
  79. Username: "jybid",
  80. Password: "Top2023_JEB01i@31",
  81. })
  82. es2, _ := elasticsearch.NewClient(elasticsearch.Config{
  83. Addresses: []string{"http://172.17.4.184:19905"},
  84. Username: "jybid",
  85. Password: "Top2023_JEB01i@31",
  86. })
  87. indexName := "bidding"
  88. queryDSL := `{
  89. "query": {
  90. "range": {
  91. "id": {
  92. "gte": "689681700000000000000000",
  93. "lt": "6896aba00000000000000000"
  94. }
  95. }
  96. },
  97. "_source": false,
  98. "stored_fields": []
  99. }`
  100. fmt.Println("Fetching from cluster 1...")
  101. ids1, err := fetchIDs(es1, indexName, queryDSL)
  102. if err != nil {
  103. log.Fatalf("Cluster1 error: %v", err)
  104. }
  105. fmt.Println("Fetching from cluster 2...")
  106. ids2, err := fetchIDs(es2, indexName, queryDSL)
  107. if err != nil {
  108. log.Fatalf("Cluster2 error: %v", err)
  109. }
  110. // 差集:集群1独有
  111. fmt.Printf("Diff (cluster1 - cluster2): %d docs\n", len(ids1)-len(ids2))
  112. for id := range ids1 {
  113. if _, ok := ids2[id]; !ok {
  114. fmt.Println(id)
  115. }
  116. }
  117. // 差集:集群2独有
  118. fmt.Printf("Diff (cluster2 - cluster1): %d docs\n", len(ids2)-len(ids1))
  119. for id := range ids2 {
  120. if _, ok := ids1[id]; !ok {
  121. fmt.Println(id)
  122. }
  123. }
  124. }