qyxy.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "log"
  9. )
  10. // getNeqData 获取id _id 不相等数据
  11. func getNeqData() {
  12. // 本地
  13. //mgo := &mongodb.MongodbSim{
  14. // MongodbAddr: "127.0.0.1:27017",
  15. // DbName: "wcc",
  16. // Size: 10,
  17. //}
  18. //mgo.InitPool()
  19. mgo := &mongodb.MongodbSim{
  20. MongodbAddr: "172.17.189.140:27080",
  21. DbName: "qfw",
  22. Size: 10,
  23. UserName: "SJZY_RWbid_ES",
  24. Password: "SJZY@B4i4D5e6S",
  25. }
  26. mgo.InitPool()
  27. //url := "http://127.0.0.1:19908"
  28. url := "http://172.17.4.184:19908"
  29. username := "jybid"
  30. password := "Top2023_JEB01i@31"
  31. // 创建 Elasticsearch 客户端
  32. client, err := elastic.NewClient(
  33. elastic.SetURL(url),
  34. elastic.SetBasicAuth(username, password),
  35. elastic.SetSniff(false),
  36. )
  37. if err != nil {
  38. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  39. }
  40. //// 创建查询条件
  41. //q := elastic.NewBoolQuery().
  42. // //Must(elastic.NewMatchQuery("toptype", "预告")).
  43. // MustNot(elastic.NewTermQuery("_id", "id"))
  44. //Filter(elastic.NewRangeQuery("comeintime").Gte(1694707200).Lte(1695121200))
  45. // 构建查询
  46. query := elastic.NewBoolQuery().
  47. Must(
  48. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  49. //elastic.NewRangeQuery("comeintime").Gte(1694707200),
  50. )
  51. //query := elastic.NewBoolQuery().
  52. // Must(elastic.NewMatchQuery("title", "租公租房提取公积金")).
  53. // Must(elastic.NewTermQuery("toptype", "拟建")).Must(elastic.NewMatchPhraseQuery())
  54. count := 0
  55. // 执行Count请求来获取文档总数
  56. countResult, err := client.Count().Index("qyxy").Query(query).Do(context.Background())
  57. if err != nil {
  58. log.Fatalf("执行Count请求失败:%s", err)
  59. }
  60. // 获取符合条件的文档总数
  61. total := countResult
  62. fmt.Printf("符合条件的文档总数:%d\n", total)
  63. //
  64. //开始滚动搜索
  65. scrollService := client.Scroll("qyxy").Query(query).Size(10000).FetchSource(true)
  66. for {
  67. results, err := scrollService.Do(context.Background())
  68. if err != nil {
  69. log.Fatalf("滚动搜索失败:%s", err)
  70. }
  71. fmt.Println("current count:", count)
  72. if len(results.Hits.Hits) == 0 {
  73. // 没有更多的文档了,退出循环
  74. break
  75. }
  76. for _, hit := range results.Hits.Hits {
  77. // 处理每个文档
  78. // ...
  79. item := make(map[string]interface{})
  80. if err := json.Unmarshal(hit.Source, &item); err != nil {
  81. log.Printf("解码文档失败:%s\n", err)
  82. continue
  83. }
  84. save := map[string]interface{}{
  85. "id": item["id"],
  86. "company_name": item["company_name"],
  87. }
  88. mgo.Save("wcc_es_id_err_0428", save)
  89. }
  90. count += len(results.Hits.Hits)
  91. }
  92. fmt.Println("结束~~~~~~~~~~~~~~~")
  93. }
  94. // deleteEs 删除 es 数据
  95. func deleteEs() {
  96. //url := "http://127.0.0.1:19905"
  97. ////url := "http://172.17.4.184:19905"
  98. //username := "jybid"
  99. //password := "Top2023_JEB01i@31"
  100. url := "http://127.0.0.1:19805"
  101. //url := "http://172.17.4.184:19905"
  102. username := "es_all"
  103. password := "TopJkO2E_d1x"
  104. // 创建 Elasticsearch 客户端
  105. client, err := elastic.NewClient(
  106. elastic.SetURL(url),
  107. elastic.SetBasicAuth(username, password),
  108. elastic.SetSniff(false),
  109. )
  110. if err != nil {
  111. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  112. }
  113. // 构建查询
  114. query := elastic.NewBoolQuery().
  115. Must(
  116. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  117. elastic.NewRangeQuery("comeintime").Gte(1672416000),
  118. )
  119. // 执行Count请求来获取文档总数
  120. countResult, err := client.Count().Index("bidding").Query(query).Do(context.Background())
  121. if err != nil {
  122. log.Fatalf("执行Count请求失败:%s", err)
  123. }
  124. // 获取符合条件的文档总数
  125. total := countResult
  126. fmt.Printf("符合条件的文档总数:%d\n", total)
  127. // 创建删除请求
  128. deleteService := client.DeleteByQuery().Index("bidding").Query(query)
  129. // 执行删除操作
  130. response, err := deleteService.Do(context.Background())
  131. if err != nil {
  132. log.Fatalf("执行删除操作失败:%s", err)
  133. }
  134. // 检查删除操作的结果
  135. if response != nil {
  136. fmt.Printf("已删除文档数:%d\n", response.Deleted)
  137. } else {
  138. fmt.Println("删除操作没有返回结果。")
  139. }
  140. }
  141. type CreditLabel struct {
  142. ZhimaToptype string `json:"zhima_toptype"`
  143. ZhimaSubtype string `json:"zhima_subtype"`
  144. ZhimaName string `json:"zhima_name"`
  145. }
  146. // getZhiMa 芝麻标签存在
  147. func getZhiMa() {
  148. client, err := elastic.NewClient(elastic.SetURL("http://192.168.3.149:9201"))
  149. if err != nil {
  150. panic(err)
  151. }
  152. // 查询 zhima_labels 字段存在的数据
  153. query := elastic.NewExistsQuery("zhima_labels")
  154. searchResult, err := client.Search().
  155. Index("qyxy").
  156. Query(query).
  157. Do(context.Background())
  158. if err != nil {
  159. panic(err)
  160. }
  161. for _, hit := range searchResult.Hits.Hits {
  162. var label CreditLabel
  163. err := json.Unmarshal(hit.Source, &label)
  164. if err != nil {
  165. panic(err)
  166. }
  167. fmt.Println(label)
  168. }
  169. }