buyer.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. "log"
  9. )
  10. func getBuyer() {
  11. //url := "http://172.17.4.184:19908"
  12. url := "http://127.0.0.1:19908"
  13. username := "jybid"
  14. password := "Top2023_JEB01i@31"
  15. index := "bidding" //索引名
  16. // 创建 Elasticsearch 客户端
  17. client, err := elastic.NewClient(
  18. elastic.SetURL(url),
  19. elastic.SetBasicAuth(username, password),
  20. elastic.SetSniff(false),
  21. )
  22. if err != nil {
  23. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  24. }
  25. query := elastic.NewBoolQuery()
  26. query.Must(elastic.NewTermQuery("buyer", "海南警察学院(筹)"))
  27. //query.Must(elastic.NewTermQuery("company_type", "北京市"))
  28. ctx := context.Background()
  29. //开始滚动搜索
  30. scrollID := ""
  31. scroll := "10m"
  32. searchSource := elastic.NewSearchSource().
  33. Query(query).
  34. Size(10000).
  35. Sort("_doc", true) //升序排序
  36. //Sort("_doc", false) //降序排序
  37. searchService := client.Scroll(index).
  38. Size(10000).
  39. Scroll(scroll).
  40. SearchSource(searchSource)
  41. res, err := searchService.Do(ctx)
  42. if err != nil {
  43. if err == io.EOF {
  44. fmt.Println("没有数据")
  45. } else {
  46. panic(err)
  47. }
  48. }
  49. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  50. fmt.Println("总数是:", res.TotalHits())
  51. total := 0
  52. for len(res.Hits.Hits) > 0 {
  53. for _, hit := range res.Hits.Hits {
  54. var doc map[string]interface{}
  55. err := json.Unmarshal(hit.Source, &doc)
  56. if err != nil {
  57. log.Printf("解析文档失败:%s", err)
  58. continue
  59. }
  60. //存入新表
  61. insert := map[string]interface{}{
  62. "buyer": "海南警察学院",
  63. "id": doc["id"],
  64. }
  65. err = MgoB.InsertOrUpdate("qfw", "wcc_buyer_test", insert)
  66. if err != nil {
  67. log.Println("error", doc["id"])
  68. }
  69. }
  70. total = total + len(res.Hits.Hits)
  71. scrollID = res.ScrollId
  72. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  73. log.Println("current count:", total)
  74. if err != nil {
  75. if err == io.EOF {
  76. // 滚动到最后一批数据,退出循环
  77. break
  78. }
  79. log.Println("滚动搜索失败:", err, res)
  80. break // 处理错误时退出循环
  81. }
  82. }
  83. // 在循环外调用 ClearScroll
  84. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  85. if err != nil {
  86. log.Printf("清理滚动搜索失败:%s", err)
  87. }
  88. fmt.Println("结束~~~~~~~~~~~~~~~")
  89. }