es_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "log"
  10. "strings"
  11. "testing"
  12. )
  13. func TestEs(T *testing.T) {
  14. s := "aa"
  15. fmt.Println(strings.Contains(s, ","))
  16. }
  17. // syncEs 同步es 数据道信集群
  18. func TestSyncEs(T *testing.T) {
  19. //url := "http://172.17.4.184:19805"
  20. url := "http://127.0.0.1:19805"
  21. username := "es_all"
  22. password := "TopJkO2E_d1x"
  23. index := "bidding" //索引名称
  24. // 创建 Elasticsearch 客户端
  25. client, err := elastic.NewClient(
  26. elastic.SetURL(url),
  27. elastic.SetBasicAuth(username, password),
  28. elastic.SetSniff(false),
  29. )
  30. if err != nil {
  31. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  32. }
  33. url2 := "http://127.0.0.1:19905"
  34. username2 := "jybid"
  35. password2 := "Top2023_JEB01i@31"
  36. // 创建 Elasticsearch 客户端
  37. client2, err := elastic.NewClient(
  38. elastic.SetURL(url2),
  39. elastic.SetBasicAuth(username2, password2),
  40. elastic.SetSniff(false),
  41. )
  42. if err != nil {
  43. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  44. }
  45. rangeQuery := elastic.NewRangeQuery("id").Gte("65869b436977356f55a01b0b").Lt("6586a4196977356f55a02c79")
  46. query := elastic.NewBoolQuery().Must(rangeQuery)
  47. ctx := context.Background()
  48. //开始滚动搜索
  49. scrollID := ""
  50. scroll := "1m"
  51. searchSource := elastic.NewSearchSource().
  52. Query(query).
  53. Size(10000).
  54. Sort("_doc", true) //升序排序
  55. //Sort("_doc", false) //降序排序
  56. searchService := client.Scroll(index).
  57. Size(10000).
  58. Scroll(scroll).
  59. SearchSource(searchSource)
  60. res, err := searchService.Do(ctx)
  61. if err != nil {
  62. if err == io.EOF {
  63. fmt.Println("没有数据")
  64. } else {
  65. panic(err)
  66. }
  67. }
  68. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  69. fmt.Println("总数是:", res.TotalHits())
  70. total := 0
  71. for len(res.Hits.Hits) > 0 {
  72. for _, hit := range res.Hits.Hits {
  73. var doc map[string]interface{}
  74. err := json.Unmarshal(hit.Source, &doc)
  75. if err != nil {
  76. log.Printf("解析文档失败:%s", err)
  77. continue
  78. }
  79. id := util.ObjToString(doc["id"])
  80. client2.Index().Index(index).Id(id).BodyJson(doc).Do(ctx)
  81. }
  82. total = total + len(res.Hits.Hits)
  83. scrollID = res.ScrollId
  84. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  85. log.Println("current count:", total)
  86. if err != nil {
  87. if err == io.EOF {
  88. // 滚动到最后一批数据,退出循环
  89. break
  90. }
  91. log.Printf("滚动搜索失败:%s", err)
  92. break // 处理错误时退出循环
  93. }
  94. }
  95. fmt.Println("结束~~~~~~~~~~~~~~~")
  96. }