12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
- "log"
- "time"
- )
- // getDiff 获取es差异数据
- func getDiff() {
- cfg1 := elasticsearch7.Config{
- Addresses: []string{
- "127.0.0.1:19805",
- },
- Username: "es_all",
- Password: "TopJkO2E_d1x",
- }
- es, err := elasticsearch7.NewClient(cfg1)
- if err != nil {
- log.Fatal(err)
- }
- // 执行滚动搜索
- scrollID := ""
- scrollSize := 1000 // 每次滚动返回的文档数量
- for {
- res, err := es.Search(
- es.Search.WithContext(context.Background()),
- es.Search.WithIndex("buyer_v2"),
- es.Search.WithSize(scrollSize),
- es.Search.WithScroll(time.Minute), // 设置滚动过期时间
- )
- if err != nil {
- log.Fatalf("Error searching: %s", err)
- }
- defer res.Body.Close()
- var result map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
- log.Fatalf("Error parsing the response body: %s", err)
- }
- // 处理结果,比较数据
- hits, found := result["hits"].(map[string]interface{})["hits"].([]interface{})
- if !found || len(hits) == 0 {
- break // 没有更多结果,退出循环
- }
- for _, hit := range hits {
- doc := hit.(map[string]interface{})["_source"]
- // 在这里,您可以根据需要进行数据对比
- // 例如,检查该文档是否存在于 buyer_v3 中
- // 如果不存在,则输出相关信息
- fmt.Println(doc)
- }
- // 获取滚动 ID
- if scrollID == "" {
- scrollID, _ = result["_scroll_id"].(string)
- }
- }
- // 清除滚动
- if scrollID != "" {
- res, err := es.ClearScroll(
- es.ClearScroll.WithContext(context.Background()),
- es.ClearScroll.WithScrollID(scrollID),
- )
- if err != nil {
- log.Printf("Error clearing scroll: %s", err)
- } else {
- defer res.Body.Close()
- var response map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
- log.Printf("Error parsing the response body: %s", err)
- } else {
- fmt.Println("Scroll cleared successfully")
- }
- }
- }
- }
|