qyxy.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/elastic/go-elasticsearch/v7"
  8. "github.com/olivere/elastic/v7"
  9. "github.com/xuri/excelize/v2"
  10. "io"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "log"
  14. "strings"
  15. "time"
  16. )
  17. // exportQyxy 导出企业数据
  18. func exportQyxy() {
  19. type Hit struct {
  20. ID string `json:"_id"`
  21. Source map[string]interface{} `json:"_source"`
  22. }
  23. type ESResponse struct {
  24. ScrollID string `json:"_scroll_id"`
  25. Hits struct {
  26. Hits []Hit `json:"hits"`
  27. } `json:"hits"`
  28. }
  29. // 1. 配置 ES 连接信息
  30. url := "http://172.17.4.184:19908"
  31. username := "jybid"
  32. password := "Top2023_JEB01i@31"
  33. cfg := elasticsearch.Config{
  34. Addresses: []string{
  35. url, // 替换为你的 ES 地址
  36. },
  37. Username: username, // 替换为你的用户名
  38. Password: password, // 替换为你的密码
  39. }
  40. es, err := elasticsearch.NewClient(cfg)
  41. if err != nil {
  42. log.Fatalf("Error creating the client: %s", err)
  43. }
  44. scrollTime := 2 * time.Minute
  45. indexName := "qyxy"
  46. batchSize := 5000
  47. maxRowsPerFile := 500000
  48. headers := []string{
  49. "_id", "cancel_date", "company_name", "history_name", "cancel_reason",
  50. "company_type", "business_scope", "legal_person", "capital", "credit_no",
  51. "tax_code", "company_code", "org_code", "establish_date", "authority",
  52. "issue_date", "company_area", "company_city", "company_district",
  53. "company_phone", "company_address", "company_email", "employee_num",
  54. }
  55. query := map[string]interface{}{
  56. "size": batchSize,
  57. "track_total_hits": true,
  58. "query": map[string]interface{}{
  59. "bool": map[string]interface{}{
  60. "must": []interface{}{
  61. map[string]interface{}{
  62. "range": map[string]interface{}{
  63. "cancel_date_unix": map[string]interface{}{
  64. "gte": 1704038400,
  65. },
  66. },
  67. },
  68. map[string]interface{}{
  69. "terms": map[string]interface{}{
  70. "company_status": []string{"注销", "吊销"},
  71. },
  72. },
  73. map[string]interface{}{
  74. "term": map[string]interface{}{
  75. "company_area": "河南",
  76. },
  77. },
  78. },
  79. },
  80. },
  81. "_source": headers,
  82. }
  83. var buf bytes.Buffer
  84. if err := json.NewEncoder(&buf).Encode(query); err != nil {
  85. log.Fatalf("query encode failed: %v", err)
  86. }
  87. res, err := es.Search(
  88. es.Search.WithContext(context.Background()),
  89. es.Search.WithIndex(indexName),
  90. es.Search.WithBody(&buf),
  91. es.Search.WithScroll(scrollTime),
  92. )
  93. if err != nil {
  94. log.Fatalf("initial scroll search error: %v", err)
  95. }
  96. defer res.Body.Close()
  97. // 状态追踪变量
  98. scrollID := ""
  99. fileIndex := 1
  100. rowNum := 2
  101. totalCount := 0
  102. f := excelize.NewFile()
  103. sheet := "Sheet1"
  104. writeHeader := func(f *excelize.File) {
  105. for i, h := range headers {
  106. cell, _ := excelize.CoordinatesToCellName(i+1, 1)
  107. f.SetCellValue(sheet, cell, h)
  108. }
  109. }
  110. writeHeader(f)
  111. for {
  112. var r ESResponse
  113. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  114. log.Fatalf("decode error: %v", err)
  115. }
  116. if len(r.Hits.Hits) == 0 {
  117. break
  118. }
  119. scrollID = r.ScrollID
  120. for _, hit := range r.Hits.Hits {
  121. for col, h := range headers {
  122. cell, _ := excelize.CoordinatesToCellName(col+1, rowNum)
  123. val := hit.Source[h]
  124. if h == "_id" {
  125. val = hit.ID
  126. }
  127. f.SetCellValue(sheet, cell, val)
  128. }
  129. rowNum++
  130. totalCount++
  131. // 满 50 万行,保存并开启新文件
  132. if rowNum > maxRowsPerFile {
  133. filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
  134. if err := f.SaveAs(filename); err != nil {
  135. log.Fatalf("failed to save file %s: %v", filename, err)
  136. }
  137. fmt.Printf("✅ 导出文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
  138. fileIndex++
  139. rowNum = 2
  140. f = excelize.NewFile()
  141. writeHeader(f)
  142. }
  143. }
  144. fmt.Printf("当前已处理 %d 条...\n", totalCount)
  145. // 拉下一页
  146. res, err = es.Scroll(
  147. es.Scroll.WithScrollID(scrollID),
  148. es.Scroll.WithScroll(scrollTime),
  149. )
  150. if err != nil {
  151. log.Fatalf("scroll next error: %v", err)
  152. }
  153. defer res.Body.Close()
  154. }
  155. // 保存最后一批文件
  156. if rowNum > 2 {
  157. filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
  158. if err := f.SaveAs(filename); err != nil {
  159. log.Fatalf("failed to save final file %s: %v", filename, err)
  160. }
  161. fmt.Printf("✅ 最后文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
  162. }
  163. // 清理 scroll
  164. if scrollID != "" {
  165. _, _ = es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
  166. }
  167. fmt.Println("🎉 全部导出完成,总数:", totalCount)
  168. }
  169. // getGD 获取广东企业数据
  170. func getGD() {
  171. url := "http://172.17.4.184:19908"
  172. //url := "http://127.0.0.1:19908"
  173. username := "jybid"
  174. password := "Top2023_JEB01i@31"
  175. index := "qyxy" //索引名称
  176. // 创建 Elasticsearch 客户端
  177. client, err := elastic.NewClient(
  178. elastic.SetURL(url),
  179. elastic.SetBasicAuth(username, password),
  180. elastic.SetSniff(false),
  181. )
  182. if err != nil {
  183. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  184. }
  185. //85 抽取库
  186. //Mgo := &mongodb.MongodbSim{
  187. // //MongodbAddr: "127.0.0.1:27080",
  188. // MongodbAddr: "172.17.4.85:27080",
  189. // DbName: "top",
  190. // Size: 10,
  191. // //Direct: true,
  192. //}
  193. //Mgo.InitPool()
  194. MgoB := &mongodb.MongodbSim{
  195. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  196. //MongodbAddr: "127.0.0.1:27083",
  197. Size: 10,
  198. DbName: "qfw",
  199. UserName: "SJZY_RWbid_ES",
  200. Password: "SJZY@B4i4D5e6S",
  201. //Direct: true,
  202. }
  203. MgoB.InitPool()
  204. //2023年01-01 2023-10-01,,1-3季度
  205. //2024-1 - 2024-4;1704038400-1711900800
  206. //2023-10-1 2024-1-1;1696089600-1704038400
  207. //城市范围
  208. //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
  209. //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
  210. //query := elastic.NewBoolQuery().
  211. // Must(areaTermsQuery).
  212. // Must(rangeQuery)
  213. //---------------------------//
  214. //query := elastic.NewBoolQuery()
  215. //query.Must(elastic.NewMatchQuery("company_area", "广东"))
  216. ////query.Must(elastic.NewTermQuery("company_type", "北京市"))
  217. // 构建查询条件
  218. query := elastic.NewBoolQuery().
  219. MustNot(elastic.NewTermQuery("company_type", "个体工商户")). // 排除 company_type 为 "个体工商户"
  220. Filter(elastic.NewTermQuery("company_area", "广东")) // 过滤 company_area 为 "广东"
  221. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  222. //query := elastic.NewBoolQuery().
  223. // //北京,天津,河北,上海,江苏,浙江,安徽
  224. // //Must(elastic.NewTermQuery("area", "北京市")).
  225. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  226. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  227. // Must(rangeQuery)
  228. ctx := context.Background()
  229. //开始滚动搜索
  230. scrollID := ""
  231. scroll := "10m"
  232. searchSource := elastic.NewSearchSource().
  233. Query(query).
  234. Size(10000).
  235. Sort("_doc", true) //升序排序
  236. //Sort("_doc", false) //降序排序
  237. searchService := client.Scroll(index).
  238. Size(10000).
  239. Scroll(scroll).
  240. SearchSource(searchSource)
  241. res, err := searchService.Do(ctx)
  242. if err != nil {
  243. if err == io.EOF {
  244. fmt.Println("没有数据")
  245. } else {
  246. panic(err)
  247. }
  248. }
  249. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  250. fmt.Println("总数是:", res.TotalHits())
  251. total := 0
  252. for len(res.Hits.Hits) > 0 {
  253. for _, hit := range res.Hits.Hits {
  254. var doc map[string]interface{}
  255. err := json.Unmarshal(hit.Source, &doc)
  256. if err != nil {
  257. log.Printf("解析文档失败:%s", err)
  258. continue
  259. }
  260. //存入新表
  261. insert := map[string]interface{}{
  262. "company_name": doc["company_name"],
  263. "id": doc["id"],
  264. "credit_no": doc["credit_no"],
  265. "company_code": doc["company_code"],
  266. }
  267. if strings.Contains(util.ObjToString(doc["company_name"]), "银行") || strings.Contains(util.ObjToString(doc["company_name"]), "保险") || strings.Contains(util.ObjToString(doc["company_name"]), "证券") {
  268. insert["wcc_type"] = 1
  269. }
  270. err = MgoB.InsertOrUpdate("qfw", "wcc_2025_guangdong_qyxy", insert)
  271. if err != nil {
  272. log.Println("error", doc["id"])
  273. }
  274. }
  275. total = total + len(res.Hits.Hits)
  276. scrollID = res.ScrollId
  277. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  278. log.Println("current count:", total)
  279. if err != nil {
  280. if err == io.EOF {
  281. // 滚动到最后一批数据,退出循环
  282. break
  283. }
  284. log.Println("滚动搜索失败:", err, res)
  285. break // 处理错误时退出循环
  286. }
  287. }
  288. // 在循环外调用 ClearScroll
  289. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  290. if err != nil {
  291. log.Printf("清理滚动搜索失败:%s", err)
  292. }
  293. fmt.Println("结束~~~~~~~~~~~~~~~")
  294. }
  295. // getNeqData 获取id _id 不相等数据
  296. func getNeqData() {
  297. // 本地
  298. //mgo := &mongodb.MongodbSim{
  299. // MongodbAddr: "127.0.0.1:27017",
  300. // DbName: "wcc",
  301. // Size: 10,
  302. //}
  303. //mgo.InitPool()
  304. mgo := &mongodb.MongodbSim{
  305. MongodbAddr: "172.17.189.140:27080",
  306. DbName: "qfw",
  307. Size: 10,
  308. UserName: "SJZY_RWbid_ES",
  309. Password: "SJZY@B4i4D5e6S",
  310. }
  311. mgo.InitPool()
  312. //url := "http://127.0.0.1:19908"
  313. url := "http://172.17.4.184:19908"
  314. username := "jybid"
  315. password := "Top2023_JEB01i@31"
  316. // 创建 Elasticsearch 客户端
  317. client, err := elastic.NewClient(
  318. elastic.SetURL(url),
  319. elastic.SetBasicAuth(username, password),
  320. elastic.SetSniff(false),
  321. )
  322. if err != nil {
  323. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  324. }
  325. //// 创建查询条件
  326. //q := elastic.NewBoolQuery().
  327. // //Must(elastic.NewMatchQuery("toptype", "预告")).
  328. // MustNot(elastic.NewTermQuery("_id", "id"))
  329. //Filter(elastic.NewRangeQuery("comeintime").Gte(1694707200).Lte(1695121200))
  330. // 构建查询
  331. query := elastic.NewBoolQuery().
  332. Must(
  333. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  334. //elastic.NewRangeQuery("comeintime").Gte(1694707200),
  335. )
  336. //query := elastic.NewBoolQuery().
  337. // Must(elastic.NewMatchQuery("title", "租公租房提取公积金")).
  338. // Must(elastic.NewTermQuery("toptype", "拟建")).Must(elastic.NewMatchPhraseQuery())
  339. count := 0
  340. // 执行Count请求来获取文档总数
  341. countResult, err := client.Count().Index("qyxy").Query(query).Do(context.Background())
  342. if err != nil {
  343. log.Fatalf("执行Count请求失败:%s", err)
  344. }
  345. // 获取符合条件的文档总数
  346. total := countResult
  347. fmt.Printf("符合条件的文档总数:%d\n", total)
  348. //
  349. //开始滚动搜索
  350. scrollService := client.Scroll("qyxy").Query(query).Size(10000).FetchSource(true)
  351. for {
  352. results, err := scrollService.Do(context.Background())
  353. if err != nil {
  354. log.Fatalf("滚动搜索失败:%s", err)
  355. }
  356. fmt.Println("current count:", count)
  357. if len(results.Hits.Hits) == 0 {
  358. // 没有更多的文档了,退出循环
  359. break
  360. }
  361. for _, hit := range results.Hits.Hits {
  362. // 处理每个文档
  363. // ...
  364. item := make(map[string]interface{})
  365. if err := json.Unmarshal(hit.Source, &item); err != nil {
  366. log.Printf("解码文档失败:%s\n", err)
  367. continue
  368. }
  369. save := map[string]interface{}{
  370. "id": item["id"],
  371. "company_name": item["company_name"],
  372. }
  373. mgo.Save("wcc_es_id_err_0428", save)
  374. }
  375. count += len(results.Hits.Hits)
  376. }
  377. fmt.Println("结束~~~~~~~~~~~~~~~")
  378. }
  379. // deleteEs 删除 es 数据
  380. func deleteEs() {
  381. //url := "http://127.0.0.1:19905"
  382. ////url := "http://172.17.4.184:19905"
  383. //username := "jybid"
  384. //password := "Top2023_JEB01i@31"
  385. url := "http://127.0.0.1:19805"
  386. //url := "http://172.17.4.184:19905"
  387. username := "es_all"
  388. password := "TopJkO2E_d1x"
  389. // 创建 Elasticsearch 客户端
  390. client, err := elastic.NewClient(
  391. elastic.SetURL(url),
  392. elastic.SetBasicAuth(username, password),
  393. elastic.SetSniff(false),
  394. )
  395. if err != nil {
  396. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  397. }
  398. // 构建查询
  399. query := elastic.NewBoolQuery().
  400. Must(
  401. elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
  402. elastic.NewRangeQuery("comeintime").Gte(1672416000),
  403. )
  404. // 执行Count请求来获取文档总数
  405. countResult, err := client.Count().Index("bidding").Query(query).Do(context.Background())
  406. if err != nil {
  407. log.Fatalf("执行Count请求失败:%s", err)
  408. }
  409. // 获取符合条件的文档总数
  410. total := countResult
  411. fmt.Printf("符合条件的文档总数:%d\n", total)
  412. // 创建删除请求
  413. deleteService := client.DeleteByQuery().Index("bidding").Query(query)
  414. // 执行删除操作
  415. response, err := deleteService.Do(context.Background())
  416. if err != nil {
  417. log.Fatalf("执行删除操作失败:%s", err)
  418. }
  419. // 检查删除操作的结果
  420. if response != nil {
  421. fmt.Printf("已删除文档数:%d\n", response.Deleted)
  422. } else {
  423. fmt.Println("删除操作没有返回结果。")
  424. }
  425. }
  426. type CreditLabel struct {
  427. ZhimaToptype string `json:"zhima_toptype"`
  428. ZhimaSubtype string `json:"zhima_subtype"`
  429. ZhimaName string `json:"zhima_name"`
  430. }
  431. // getZhiMa 芝麻标签存在
  432. func getZhiMa() {
  433. client, err := elastic.NewClient(elastic.SetURL("http://192.168.3.149:9201"))
  434. if err != nil {
  435. panic(err)
  436. }
  437. // 查询 zhima_labels 字段存在的数据
  438. query := elastic.NewExistsQuery("zhima_labels")
  439. searchResult, err := client.Search().
  440. Index("qyxy").
  441. Query(query).
  442. Do(context.Background())
  443. if err != nil {
  444. panic(err)
  445. }
  446. for _, hit := range searchResult.Hits.Hits {
  447. var label CreditLabel
  448. err := json.Unmarshal(hit.Source, &label)
  449. if err != nil {
  450. panic(err)
  451. }
  452. fmt.Println(label)
  453. }
  454. }