diff.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
  7. "log"
  8. "time"
  9. )
  10. // getDiff 获取es差异数据
  11. func getDiff() {
  12. cfg1 := elasticsearch7.Config{
  13. Addresses: []string{
  14. "127.0.0.1:19805",
  15. },
  16. Username: "es_all",
  17. Password: "TopJkO2E_d1x",
  18. }
  19. es, err := elasticsearch7.NewClient(cfg1)
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. // 执行滚动搜索
  24. scrollID := ""
  25. scrollSize := 1000 // 每次滚动返回的文档数量
  26. for {
  27. res, err := es.Search(
  28. es.Search.WithContext(context.Background()),
  29. es.Search.WithIndex("buyer_v2"),
  30. es.Search.WithSize(scrollSize),
  31. es.Search.WithScroll(time.Minute), // 设置滚动过期时间
  32. )
  33. if err != nil {
  34. log.Fatalf("Error searching: %s", err)
  35. }
  36. defer res.Body.Close()
  37. var result map[string]interface{}
  38. if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
  39. log.Fatalf("Error parsing the response body: %s", err)
  40. }
  41. // 处理结果,比较数据
  42. hits, found := result["hits"].(map[string]interface{})["hits"].([]interface{})
  43. if !found || len(hits) == 0 {
  44. break // 没有更多结果,退出循环
  45. }
  46. for _, hit := range hits {
  47. doc := hit.(map[string]interface{})["_source"]
  48. // 在这里,您可以根据需要进行数据对比
  49. // 例如,检查该文档是否存在于 buyer_v3 中
  50. // 如果不存在,则输出相关信息
  51. fmt.Println(doc)
  52. }
  53. // 获取滚动 ID
  54. if scrollID == "" {
  55. scrollID, _ = result["_scroll_id"].(string)
  56. }
  57. }
  58. // 清除滚动
  59. if scrollID != "" {
  60. res, err := es.ClearScroll(
  61. es.ClearScroll.WithContext(context.Background()),
  62. es.ClearScroll.WithScrollID(scrollID),
  63. )
  64. if err != nil {
  65. log.Printf("Error clearing scroll: %s", err)
  66. } else {
  67. defer res.Body.Close()
  68. var response map[string]interface{}
  69. if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
  70. log.Printf("Error parsing the response body: %s", err)
  71. } else {
  72. fmt.Println("Scroll cleared successfully")
  73. }
  74. }
  75. }
  76. }