# -*- coding: utf-8 -*- """ Created on 2025-05-09 --------- @summary: --------- @author: Dzr """ import hashlib import os import time from concurrent.futures import ThreadPoolExecutor, wait, as_completed import requests from log import logger from parsel import Selector from pymongo import MongoClient from pathlib import Path from proxy import get_proxy import setting _root_path = Path(setting.LOCAL_FILE_ROOT_DIR) if not _root_path.exists(): _root_path.mkdir(exist_ok=True) mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT) lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list'] detail_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_detail'] def _send(method, url, headers, max_retries=3, **kwargs): for i in range(max_retries): # proxies = get_proxy() proxies = None try: response = requests.request(method, url, headers=headers, proxies=proxies, timeout=kwargs.pop('timeout', 10), **kwargs) response.raise_for_status() return response except IOError as e: logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}') time.sleep(.5) def get_tasks(query, skip=0, limit=100): documents = [] with lst_coll.find(query, skip=skip, limit=limit) as cursor: for item in cursor: documents.append(item) yield from documents def fetch_page_html(url): headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36' } response = _send('get', url, headers, max_retries=5) if not response: return None return response.content.decode() def touch(file: Path): """ 创建文件 @param file: @return: Path 实例对象 """ # 确保目录存在 file.parent.mkdir(parents=True, exist_ok=True) # 创建文件并设置权限 file.touch(exist_ok=True, mode=0o777) # 确保文件权限生效(某些系统需要额外操作) file.chmod(0o777) def download_file(filepath, link, timeout=180, chunk_size=1024 * 64) -> Path: file = _root_path.absolute() / filepath # logger.info(f"准备下载附件|{file.name}") try: touch(file) except Exception as e: logger.error(f"创建文件失败|{type(e).__name__}") return file # 下载文件 headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': 'https://www.celma.org.cn/', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36' } response = _send('get', link, headers, stream=True, timeout=timeout) if not response: logger.info(f'附件下载失败|{file.name}') return file # 二进制写入文件, chunk_size 单位:KB with file.open('wb') as f: for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) logger.info(f'附件下载成功|{file.name}') return file def parse(directory, html): selector = Selector(text=html) nodes = selector.xpath('//div[@class="content-fj"]/ul/li') dedup = set() attachments = {} for node in nodes: title = node.xpath('./a/@title').extract_first('').strip() if not title or title in dedup: continue href = node.xpath('./a/@href').extract_first('').strip() date_str = node.xpath('./span/text()').extract_first('').strip() filepath = directory + os.sep + title file = download_file(filepath, href, timeout=300) if file.exists() and file.stat().st_size > 0: attachments[str(len(attachments) + 1)] = { 'filename': title, 'filehref': href, 'filetype': file.suffixes[0][1:], 'publishdate': date_str, 'filepath': str(file.absolute()), } dedup.add(title) return attachments def download(task): href = task['href'] title = task['title'] publish_date = task['publishdate'] ms5_str = hashlib.md5(title.encode()).hexdigest() directory = ''.join((publish_date, os.sep, ms5_str)) try: html = fetch_page_html(href) assert html is not None attachments = parse(directory, html) logger.info(f'采集成功|{title}|附件数量:{len(attachments)}') return task, attachments except Exception as e: logger.error(f'采集失败|{title}|{type(e).__name__}') return task, None def _spider(executor, query): while True: if lst_coll.count_documents(query) == 0: break fs = [] for task in get_tasks(query, limit=1000): fs.append(executor.submit(download, task)) inserts = [] for f in as_completed(fs): task, files = f.result() if files is None: if not query.get('retry'): lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1}}) else: task['retry'] += 1 lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1, 'retry': task['retry']}}) else: item = { 'title': task['title'], 'href': task['href'], 'publishdate': task['publishdate'], } if len(files) > 0: item['attachments'] = files inserts.append(item) if len(inserts) > 10: detail_coll.insert_many(inserts, ordered=True) inserts = [] lst_coll.update_one({'_id': task['_id']}, {'$set': {'isdownload': 1, 'isfailed': 0}}) wait(fs) if len(inserts) > 0: detail_coll.insert_many(inserts, ordered=True) def main(threads): logger.info("任务开始") executor = ThreadPoolExecutor(max_workers=threads, thread_name_prefix='detail') try: _spider(executor, query={'isdownload': 0, 'isfailed': 0}) _spider(executor, query={'isdownload': 0, 'isfailed': 1}) finally: executor.shutdown() logger.info("任务结束") if __name__ == '__main__': main(threads=20)