123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "strings"
- "testing"
- )
- func TestEs(T *testing.T) {
- s := "aa"
- fmt.Println(strings.Contains(s, ","))
- }
- // syncEs 同步es 数据道信集群
- func TestSyncEs(T *testing.T) {
- //url := "http://172.17.4.184:19805"
- url := "http://127.0.0.1:19805"
- username := "es_all"
- password := "TopJkO2E_d1x"
- index := "bidding" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- url2 := "http://127.0.0.1:19905"
- username2 := "jybid"
- password2 := "Top2023_JEB01i@31"
- // 创建 Elasticsearch 客户端
- client2, err := elastic.NewClient(
- elastic.SetURL(url2),
- elastic.SetBasicAuth(username2, password2),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- rangeQuery := elastic.NewRangeQuery("id").Gte("65869b436977356f55a01b0b").Lt("6586a4196977356f55a02c79")
- query := elastic.NewBoolQuery().Must(rangeQuery)
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "1m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- id := util.ObjToString(doc["id"])
- client2.Index().Index(index).Id(id).BodyJson(doc).Do(ctx)
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Printf("滚动搜索失败:%s", err)
- break // 处理错误时退出循环
- }
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
|