crawl_detail_spider.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2025-05-09
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Dzr
  8. """
  9. import hashlib
  10. import os
  11. import time
  12. from concurrent.futures import ThreadPoolExecutor, wait, as_completed
  13. import requests
  14. from log import logger
  15. from parsel import Selector
  16. from pymongo import MongoClient
  17. from pathlib import Path
  18. from proxy import get_proxy
  19. import setting
  20. _root_path = Path(setting.LOCAL_FILE_ROOT_DIR)
  21. if not _root_path.exists():
  22. _root_path.mkdir(exist_ok=True)
  23. mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
  24. lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
  25. detail_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_detail']
  26. def _send(method, url, headers, max_retries=3, **kwargs):
  27. for i in range(max_retries):
  28. # proxies = get_proxy()
  29. proxies = None
  30. try:
  31. response = requests.request(method, url,
  32. headers=headers,
  33. proxies=proxies,
  34. timeout=kwargs.pop('timeout', 10),
  35. **kwargs)
  36. response.raise_for_status()
  37. return response
  38. except IOError as e:
  39. logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
  40. time.sleep(.5)
  41. def get_tasks(query, skip=0, limit=100):
  42. documents = []
  43. with lst_coll.find(query, skip=skip, limit=limit) as cursor:
  44. for item in cursor:
  45. documents.append(item)
  46. yield from documents
  47. def fetch_page_html(url):
  48. headers = {
  49. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  50. 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
  51. 'Upgrade-Insecure-Requests': '1',
  52. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
  53. }
  54. response = _send('get', url, headers, max_retries=5)
  55. if not response:
  56. return None
  57. return response.content.decode()
  58. def touch(file: Path):
  59. """
  60. 创建文件
  61. @param file:
  62. @return: Path 实例对象
  63. """
  64. # 确保目录存在
  65. file.parent.mkdir(parents=True, exist_ok=True)
  66. # 创建文件并设置权限
  67. file.touch(exist_ok=True, mode=0o777)
  68. # 确保文件权限生效(某些系统需要额外操作)
  69. file.chmod(0o777)
  70. def download_file(filepath, link, timeout=180, chunk_size=1024 * 64) -> Path:
  71. file = _root_path.absolute() / filepath
  72. # logger.info(f"准备下载附件|{file.name}")
  73. try:
  74. touch(file)
  75. except Exception as e:
  76. logger.error(f"创建文件失败|{type(e).__name__}")
  77. return file
  78. # 下载文件
  79. headers = {
  80. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  81. 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
  82. 'Referer': 'https://www.celma.org.cn/',
  83. 'Upgrade-Insecure-Requests': '1',
  84. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
  85. }
  86. response = _send('get', link, headers, stream=True, timeout=timeout)
  87. if not response:
  88. logger.info(f'附件下载失败|{file.name}')
  89. return file
  90. # 二进制写入文件, chunk_size 单位:KB
  91. with file.open('wb') as f:
  92. for chunk in response.iter_content(chunk_size=chunk_size):
  93. f.write(chunk)
  94. logger.info(f'附件下载成功|{file.name}')
  95. return file
  96. def parse(directory, html):
  97. selector = Selector(text=html)
  98. nodes = selector.xpath('//div[@class="content-fj"]/ul/li')
  99. dedup = set()
  100. attachments = {}
  101. for node in nodes:
  102. title = node.xpath('./a/@title').extract_first('').strip()
  103. if not title or title in dedup:
  104. continue
  105. href = node.xpath('./a/@href').extract_first('').strip()
  106. date_str = node.xpath('./span/text()').extract_first('').strip()
  107. filepath = directory + os.sep + title
  108. file = download_file(filepath, href, timeout=300)
  109. if file.exists() and file.stat().st_size > 0:
  110. attachments[str(len(attachments) + 1)] = {
  111. 'filename': title,
  112. 'filehref': href,
  113. 'filetype': file.suffixes[0][1:],
  114. 'publishdate': date_str,
  115. 'filepath': str(file.absolute()),
  116. }
  117. dedup.add(title)
  118. return attachments
  119. def download(task):
  120. href = task['href']
  121. title = task['title']
  122. publish_date = task['publishdate']
  123. ms5_str = hashlib.md5(title.encode()).hexdigest()
  124. directory = ''.join((publish_date, os.sep, ms5_str))
  125. try:
  126. html = fetch_page_html(href)
  127. assert html is not None
  128. attachments = parse(directory, html)
  129. logger.info(f'采集成功|{title}|附件数量:{len(attachments)}')
  130. return task, attachments
  131. except Exception as e:
  132. logger.error(f'采集失败|{title}|{type(e).__name__}')
  133. return task, None
  134. def _spider(executor, query):
  135. while True:
  136. if lst_coll.count_documents(query) == 0:
  137. break
  138. fs = []
  139. for task in get_tasks(query, limit=1000):
  140. fs.append(executor.submit(download, task))
  141. inserts = []
  142. for f in as_completed(fs):
  143. task, files = f.result()
  144. if files is None:
  145. if not query.get('retry'):
  146. lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1}})
  147. else:
  148. task['retry'] += 1
  149. lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1, 'retry': task['retry']}})
  150. else:
  151. item = {
  152. 'title': task['title'],
  153. 'href': task['href'],
  154. 'publishdate': task['publishdate'],
  155. }
  156. if len(files) > 0:
  157. item['attachments'] = files
  158. inserts.append(item)
  159. if len(inserts) > 10:
  160. detail_coll.insert_many(inserts, ordered=True)
  161. inserts = []
  162. lst_coll.update_one({'_id': task['_id']}, {'$set': {'isdownload': 1, 'isfailed': 0}})
  163. wait(fs)
  164. if len(inserts) > 0:
  165. detail_coll.insert_many(inserts, ordered=True)
  166. def main(threads):
  167. logger.info("任务开始")
  168. executor = ThreadPoolExecutor(max_workers=threads, thread_name_prefix='detail')
  169. try:
  170. _spider(executor, query={'isdownload': 0, 'isfailed': 0})
  171. _spider(executor, query={'isdownload': 0, 'isfailed': 1})
  172. finally:
  173. executor.shutdown()
  174. logger.info("任务结束")
  175. if __name__ == '__main__':
  176. main(threads=20)