package main import ( "context" "encoding/json" "fmt" elasticsearch7 "github.com/elastic/go-elasticsearch/v7" "log" "time" ) // getDiff 获取es差异数据 func getDiff() { cfg1 := elasticsearch7.Config{ Addresses: []string{ "127.0.0.1:19805", }, Username: "es_all", Password: "TopJkO2E_d1x", } es, err := elasticsearch7.NewClient(cfg1) if err != nil { log.Fatal(err) } // 执行滚动搜索 scrollID := "" scrollSize := 1000 // 每次滚动返回的文档数量 for { res, err := es.Search( es.Search.WithContext(context.Background()), es.Search.WithIndex("buyer_v2"), es.Search.WithSize(scrollSize), es.Search.WithScroll(time.Minute), // 设置滚动过期时间 ) if err != nil { log.Fatalf("Error searching: %s", err) } defer res.Body.Close() var result map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&result); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // 处理结果,比较数据 hits, found := result["hits"].(map[string]interface{})["hits"].([]interface{}) if !found || len(hits) == 0 { break // 没有更多结果,退出循环 } for _, hit := range hits { doc := hit.(map[string]interface{})["_source"] // 在这里,您可以根据需要进行数据对比 // 例如,检查该文档是否存在于 buyer_v3 中 // 如果不存在,则输出相关信息 fmt.Println(doc) } // 获取滚动 ID if scrollID == "" { scrollID, _ = result["_scroll_id"].(string) } } // 清除滚动 if scrollID != "" { res, err := es.ClearScroll( es.ClearScroll.WithContext(context.Background()), es.ClearScroll.WithScrollID(scrollID), ) if err != nil { log.Printf("Error clearing scroll: %s", err) } else { defer res.Body.Close() var response map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&response); err != nil { log.Printf("Error parsing the response body: %s", err) } else { fmt.Println("Scroll cleared successfully") } } } }