crawl_list_spider.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2025-05-09
  4. ---------
  5. @summary: 专项债 - 文书 - 列表
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. from datetime import datetime
  12. import requests
  13. from log import logger
  14. from parsel import Selector
  15. from pymongo import MongoClient
  16. import setting
  17. from RedisDB import RedisFilter
  18. from proxy import get_proxy
  19. rdf = RedisFilter()
  20. mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
  21. lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
  22. def _send(method, url, headers, max_retries=3, **kwargs):
  23. for i in range(max_retries):
  24. proxies = get_proxy()
  25. try:
  26. response = requests.request(method, url,
  27. headers=headers,
  28. proxies=proxies,
  29. timeout=kwargs.pop('timeout', 10),
  30. **kwargs)
  31. response.raise_for_status()
  32. return response
  33. except IOError as e:
  34. logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
  35. time.sleep(.5)
  36. def parse(page, html):
  37. total = 0
  38. count = 0
  39. results = []
  40. select = Selector(html)
  41. for node in select.xpath('//div[@id="to-print1"]/li'):
  42. total += 1
  43. title = node.xpath('./a/@title').extract_first('').strip()
  44. href = node.xpath('./a/@href').extract_first('').strip()
  45. publishdate = node.xpath('./span/text()').extract_first('').strip()
  46. if not rdf.get(href):
  47. results.append({
  48. 'title': title,
  49. 'href': href,
  50. 'publishdate': publishdate,
  51. 'page': page,
  52. 'isdownload': 0,
  53. 'isfailed': 0,
  54. 'retry': 0,
  55. })
  56. rdf.add(href)
  57. if len(results) > 0:
  58. ret = lst_coll.insert_many(results, ordered=False)
  59. count += len(ret.inserted_ids)
  60. logger.info(f'采集成功|第{page}页|共{total}条|入库{count}条')
  61. def fetch_page_html(page, release_date=None):
  62. headers = {
  63. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  64. 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
  65. 'Upgrade-Insecure-Requests': '1',
  66. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
  67. }
  68. url = f'https://www.celma.org.cn/zqsclb_{page}.jhtml?channelId=193&title=&province=&releaseDate='
  69. if release_date:
  70. url += release_date
  71. response = _send('get', url, headers=headers, timeout=10, max_retries=10)
  72. if not response:
  73. logger.error(f'采集失败|第{page}页')
  74. return page, None
  75. html = response.content.decode()
  76. return page, html
  77. def parse_max_page(html):
  78. select = Selector(html)
  79. texts = select.xpath('//span[@style="margin-right: 40px;"]/text()').extract()
  80. page = next(filter(lambda x: str(x).isdigit(), ''.join(texts).strip().split(' ')), -1)
  81. return int(page)
  82. def fetch_max_page(release_date=None, **kwargs):
  83. url = 'https://www.celma.org.cn/zqsclb_1.jhtml?channelId=193&title=&province=&releaseDate='
  84. if release_date:
  85. url += release_date
  86. headers = {
  87. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  88. 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
  89. 'Upgrade-Insecure-Requests': '1',
  90. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
  91. }
  92. response = _send('get', url, headers=headers, timeout=10, **kwargs)
  93. if not response:
  94. logger.warning('检索失败|暂无最大页')
  95. return -1
  96. html = response.content.decode()
  97. num = parse_max_page(html)
  98. logger.info(f'检索成功|最大{num}页')
  99. return num
  100. def fetch_max_page_by_release_date(date=None, **kwargs):
  101. if date is None:
  102. date = datetime.now().strftime('%Y-%m-%d')
  103. return fetch_max_page(release_date=date, **kwargs)
  104. def download(page):
  105. html = fetch_page_html(page)
  106. parse(page, html)
  107. return True
  108. def spider(threads):
  109. logger.info("任务开始")
  110. try:
  111. max_page = max(1, fetch_max_page(max_retries=10))
  112. with ThreadPoolExecutor(max_workers=threads) as executor:
  113. fs = []
  114. for page in range(1, max_page + 1):
  115. fs.append(executor.submit(fetch_page_html, page, release_date=None))
  116. for f in as_completed(fs):
  117. page, html = f.result()
  118. if html is not None:
  119. parse(page, html)
  120. finally:
  121. logger.info("任务结束")
  122. def daily_spider(threads):
  123. logger.info("任务开始")
  124. try:
  125. release_date = datetime.now().strftime('%Y-%m-%d')
  126. max_page = max(1, fetch_max_page_by_release_date(release_date, max_retries=10))
  127. with ThreadPoolExecutor(max_workers=threads, thread_name_prefix='list') as executor:
  128. fs = []
  129. for page in range(1, max_page + 1):
  130. fs.append(executor.submit(fetch_page_html, page, release_date=release_date))
  131. for f in as_completed(fs):
  132. page, html = f.result()
  133. if html is not None:
  134. parse(page, html)
  135. finally:
  136. logger.info("任务结束")
  137. if __name__ == '__main__':
  138. # spider(threads=20)
  139. daily_spider(threads=1)