qyxy.go 9.1 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "strings"
  12. )
  13. // getGD 获取广东企业数据
  14. func getGD() {
  15. url := "http://172.17.4.184:19908"
  16. //url := "http://127.0.0.1:19908"
  17. username := "jybid"
  18. password := "Top2023_JEB01i@31"
  19. index := "qyxy" //索引名称
  20. // 创建 Elasticsearch 客户端
  21. client, err := elastic.NewClient(
  22. elastic.SetURL(url),
  23. elastic.SetBasicAuth(username, password),
  24. elastic.SetSniff(false),
  25. )
  26. if err != nil {
  27. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  28. }
  29. //85 抽取库
  30. //Mgo := &mongodb.MongodbSim{
  31. // //MongodbAddr: "127.0.0.1:27080",
  32. // MongodbAddr: "172.17.4.85:27080",
  33. // DbName: "top",
  34. // Size: 10,
  35. // //Direct: true,
  36. //}
  37. //Mgo.InitPool()
  38. MgoB := &mongodb.MongodbSim{
  39. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  40. //MongodbAddr: "127.0.0.1:27083",
  41. Size: 10,
  42. DbName: "qfw",
  43. UserName: "SJZY_RWbid_ES",
  44. Password: "SJZY@B4i4D5e6S",
  45. //Direct: true,
  46. }
  47. MgoB.InitPool()
  48. //2023年01-01 2023-10-01,,1-3季度
  49. //2024-1 - 2024-4;1704038400-1711900800
  50. //2023-10-1 2024-1-1;1696089600-1704038400
  51. //城市范围
  52. //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
  53. //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  54. //query := elastic.NewBoolQuery().
  55. // Must(areaTermsQuery).
  56. // Must(rangeQuery)
  57. //---------------------------//
  58. //query := elastic.NewBoolQuery()
  59. //query.Must(elastic.NewMatchQuery("company_area", "广东"))
  60. ////query.Must(elastic.NewTermQuery("company_type", "北京市"))
  61. // 构建查询条件
  62. query := elastic.NewBoolQuery().
  63. MustNot(elastic.NewTermQuery("company_type", "个体工商户")). // 排除 company_type 为 "个体工商户"
  64. Filter(elastic.NewTermQuery("company_area", "广东")) // 过滤 company_area 为 "广东"
  65. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  66. //query := elastic.NewBoolQuery().
  67. // //北京,天津,河北,上海,江苏,浙江,安徽
  68. // //Must(elastic.NewTermQuery("area", "北京市")).
  69. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  70. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  71. // Must(rangeQuery)
  72. ctx := context.Background()
  73. //开始滚动搜索
  74. scrollID := ""
  75. scroll := "10m"
  76. searchSource := elastic.NewSearchSource().
  77. Query(query).
  78. Size(10000).
  79. Sort("_doc", true) //升序排序
  80. //Sort("_doc", false) //降序排序
  81. searchService := client.Scroll(index).
  82. Size(10000).
  83. Scroll(scroll).
  84. SearchSource(searchSource)
  85. res, err := searchService.Do(ctx)
  86. if err != nil {
  87. if err == io.EOF {
  88. fmt.Println("没有数据")
  89. } else {
  90. panic(err)
  91. }
  92. }
  93. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  94. fmt.Println("总数是:", res.TotalHits())
  95. total := 0
  96. for len(res.Hits.Hits) > 0 {
  97. for _, hit := range res.Hits.Hits {
  98. var doc map[string]interface{}
  99. err := json.Unmarshal(hit.Source, &doc)
  100. if err != nil {
  101. log.Printf("解析文档失败:%s", err)
  102. continue
  103. }
  104. //存入新表
  105. insert := map[string]interface{}{
  106. "company_name": doc["company_name"],
  107. "id": doc["id"],
  108. "credit_no": doc["credit_no"],
  109. "company_code": doc["company_code"],
  110. }
  111. if strings.Contains(util.ObjToString(doc["company_name"]), "银行") || strings.Contains(util.ObjToString(doc["company_name"]), "保险") || strings.Contains(util.ObjToString(doc["company_name"]), "证券") {
  112. insert["wcc_type"] = 1
  113. }
  114. err = MgoB.InsertOrUpdate("qfw", "wcc_2025_guangdong_qyxy", insert)
  115. if err != nil {
  116. log.Println("error", doc["id"])
  117. }
  118. }
  119. total = total + len(res.Hits.Hits)
  120. scrollID = res.ScrollId
  121. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  122. log.Println("current count:", total)
  123. if err != nil {
  124. if err == io.EOF {
  125. // 滚动到最后一批数据,退出循环
  126. break
  127. }
  128. log.Println("滚动搜索失败:", err, res)
  129. break // 处理错误时退出循环
  130. }
  131. }
  132. // 在循环外调用 ClearScroll
  133. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  134. if err != nil {
  135. log.Printf("清理滚动搜索失败:%s", err)
  136. }
  137. fmt.Println("结束~~~~~~~~~~~~~~~")
  138. }
  139. // getNeqData 获取id _id 不相等数据
  140. func getNeqData() {
  141. // 本地
  142. //mgo := &mongodb.MongodbSim{
  143. // MongodbAddr: "127.0.0.1:27017",
  144. // DbName: "wcc",
  145. // Size: 10,
  146. //}
  147. //mgo.InitPool()
  148. mgo := &mongodb.MongodbSim{
  149. MongodbAddr: "172.17.189.140:27080",
  150. DbName: "qfw",
  151. Size: 10,
  152. UserName: "SJZY_RWbid_ES",
  153. Password: "SJZY@B4i4D5e6S",
  154. }
  155. mgo.InitPool()
  156. //url := "http://127.0.0.1:19908"
  157. url := "http://172.17.4.184:19908"
  158. username := "jybid"
  159. password := "Top2023_JEB01i@31"
  160. // 创建 Elasticsearch 客户端
  161. client, err := elastic.NewClient(
  162. elastic.SetURL(url),
  163. elastic.SetBasicAuth(username, password),
  164. elastic.SetSniff(false),
  165. )
  166. if err != nil {
  167. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  168. }
  169. //// 创建查询条件
  170. //q := elastic.NewBoolQuery().
  171. // //Must(elastic.NewMatchQuery("toptype", "预告")).
  172. // MustNot(elastic.NewTermQuery("_id", "id"))
  173. //Filter(elastic.NewRangeQuery("comeintime").Gte(1694707200).Lte(1695121200))
  174. // 构建查询
  175. query := elastic.NewBoolQuery().
  176. Must(
  177. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  178. //elastic.NewRangeQuery("comeintime").Gte(1694707200),
  179. )
  180. //query := elastic.NewBoolQuery().
  181. // Must(elastic.NewMatchQuery("title", "租公租房提取公积金")).
  182. // Must(elastic.NewTermQuery("toptype", "拟建")).Must(elastic.NewMatchPhraseQuery())
  183. count := 0
  184. // 执行Count请求来获取文档总数
  185. countResult, err := client.Count().Index("qyxy").Query(query).Do(context.Background())
  186. if err != nil {
  187. log.Fatalf("执行Count请求失败:%s", err)
  188. }
  189. // 获取符合条件的文档总数
  190. total := countResult
  191. fmt.Printf("符合条件的文档总数:%d\n", total)
  192. //
  193. //开始滚动搜索
  194. scrollService := client.Scroll("qyxy").Query(query).Size(10000).FetchSource(true)
  195. for {
  196. results, err := scrollService.Do(context.Background())
  197. if err != nil {
  198. log.Fatalf("滚动搜索失败:%s", err)
  199. }
  200. fmt.Println("current count:", count)
  201. if len(results.Hits.Hits) == 0 {
  202. // 没有更多的文档了,退出循环
  203. break
  204. }
  205. for _, hit := range results.Hits.Hits {
  206. // 处理每个文档
  207. // ...
  208. item := make(map[string]interface{})
  209. if err := json.Unmarshal(hit.Source, &item); err != nil {
  210. log.Printf("解码文档失败:%s\n", err)
  211. continue
  212. }
  213. save := map[string]interface{}{
  214. "id": item["id"],
  215. "company_name": item["company_name"],
  216. }
  217. mgo.Save("wcc_es_id_err_0428", save)
  218. }
  219. count += len(results.Hits.Hits)
  220. }
  221. fmt.Println("结束~~~~~~~~~~~~~~~")
  222. }
  223. // deleteEs 删除 es 数据
  224. func deleteEs() {
  225. //url := "http://127.0.0.1:19905"
  226. ////url := "http://172.17.4.184:19905"
  227. //username := "jybid"
  228. //password := "Top2023_JEB01i@31"
  229. url := "http://127.0.0.1:19805"
  230. //url := "http://172.17.4.184:19905"
  231. username := "es_all"
  232. password := "TopJkO2E_d1x"
  233. // 创建 Elasticsearch 客户端
  234. client, err := elastic.NewClient(
  235. elastic.SetURL(url),
  236. elastic.SetBasicAuth(username, password),
  237. elastic.SetSniff(false),
  238. )
  239. if err != nil {
  240. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  241. }
  242. // 构建查询
  243. query := elastic.NewBoolQuery().
  244. Must(
  245. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  246. elastic.NewRangeQuery("comeintime").Gte(1672416000),
  247. )
  248. // 执行Count请求来获取文档总数
  249. countResult, err := client.Count().Index("bidding").Query(query).Do(context.Background())
  250. if err != nil {
  251. log.Fatalf("执行Count请求失败:%s", err)
  252. }
  253. // 获取符合条件的文档总数
  254. total := countResult
  255. fmt.Printf("符合条件的文档总数:%d\n", total)
  256. // 创建删除请求
  257. deleteService := client.DeleteByQuery().Index("bidding").Query(query)
  258. // 执行删除操作
  259. response, err := deleteService.Do(context.Background())
  260. if err != nil {
  261. log.Fatalf("执行删除操作失败:%s", err)
  262. }
  263. // 检查删除操作的结果
  264. if response != nil {
  265. fmt.Printf("已删除文档数:%d\n", response.Deleted)
  266. } else {
  267. fmt.Println("删除操作没有返回结果。")
  268. }
  269. }
  270. type CreditLabel struct {
  271. ZhimaToptype string `json:"zhima_toptype"`
  272. ZhimaSubtype string `json:"zhima_subtype"`
  273. ZhimaName string `json:"zhima_name"`
  274. }
  275. // getZhiMa 芝麻标签存在
  276. func getZhiMa() {
  277. client, err := elastic.NewClient(elastic.SetURL("http://192.168.3.149:9201"))
  278. if err != nil {
  279. panic(err)
  280. }
  281. // 查询 zhima_labels 字段存在的数据
  282. query := elastic.NewExistsQuery("zhima_labels")
  283. searchResult, err := client.Search().
  284. Index("qyxy").
  285. Query(query).
  286. Do(context.Background())
  287. if err != nil {
  288. panic(err)
  289. }
  290. for _, hit := range searchResult.Hits.Hits {
  291. var label CreditLabel
  292. err := json.Unmarshal(hit.Source, &label)
  293. if err != nil {
  294. panic(err)
  295. }
  296. fmt.Println(label)
  297. }
  298. }