package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "strings" "testing" ) func TestEs(T *testing.T) { s := "aa" fmt.Println(strings.Contains(s, ",")) } // syncEs 同步es 数据道信集群 func TestSyncEs(T *testing.T) { //url := "http://172.17.4.184:19805" url := "http://127.0.0.1:19805" username := "es_all" password := "TopJkO2E_d1x" index := "bidding" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } url2 := "http://127.0.0.1:19905" username2 := "jybid" password2 := "Top2023_JEB01i@31" // 创建 Elasticsearch 客户端 client2, err := elastic.NewClient( elastic.SetURL(url2), elastic.SetBasicAuth(username2, password2), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } rangeQuery := elastic.NewRangeQuery("id").Gte("65869b436977356f55a01b0b").Lt("6586a4196977356f55a02c79") query := elastic.NewBoolQuery().Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "1m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } id := util.ObjToString(doc["id"]) client2.Index().Index(index).Id(id).BodyJson(doc).Do(ctx) } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Printf("滚动搜索失败:%s", err) break // 处理错误时退出循环 } } fmt.Println("结束~~~~~~~~~~~~~~~") }