123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- # -*- coding: utf-8 -*-
- """
- Created on 2025-05-09
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import hashlib
- import os
- import time
- from concurrent.futures import ThreadPoolExecutor, wait, as_completed
- import requests
- from log import logger
- from parsel import Selector
- from pymongo import MongoClient
- from pathlib import Path
- from proxy import get_proxy
- import setting
- _root_path = Path(setting.LOCAL_FILE_ROOT_DIR)
- if not _root_path.exists():
- _root_path.mkdir(exist_ok=True)
- mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
- lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
- detail_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_detail']
- def _send(method, url, headers, max_retries=3, **kwargs):
- for i in range(max_retries):
- # proxies = get_proxy()
- proxies = None
- try:
- response = requests.request(method, url,
- headers=headers,
- proxies=proxies,
- timeout=kwargs.pop('timeout', 10),
- **kwargs)
- response.raise_for_status()
- return response
- except IOError as e:
- logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
- time.sleep(.5)
- def get_tasks(query, skip=0, limit=100):
- documents = []
- with lst_coll.find(query, skip=skip, limit=limit) as cursor:
- for item in cursor:
- documents.append(item)
- yield from documents
- def fetch_page_html(url):
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
- }
- response = _send('get', url, headers, max_retries=5)
- if not response:
- return None
- return response.content.decode()
- def touch(file: Path):
- """
- 创建文件
- @param file:
- @return: Path 实例对象
- """
- # 确保目录存在
- file.parent.mkdir(parents=True, exist_ok=True)
- # 创建文件并设置权限
- file.touch(exist_ok=True, mode=0o777)
- # 确保文件权限生效(某些系统需要额外操作)
- file.chmod(0o777)
- def download_file(filepath, link, timeout=180, chunk_size=1024 * 64) -> Path:
- file = _root_path.absolute() / filepath
- # logger.info(f"准备下载附件|{file.name}")
- try:
- touch(file)
- except Exception as e:
- logger.error(f"创建文件失败|{type(e).__name__}")
- return file
- # 下载文件
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Referer': 'https://www.celma.org.cn/',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
- }
- response = _send('get', link, headers, stream=True, timeout=timeout)
- if not response:
- logger.info(f'附件下载失败|{file.name}')
- return file
- # 二进制写入文件, chunk_size 单位:KB
- with file.open('wb') as f:
- for chunk in response.iter_content(chunk_size=chunk_size):
- f.write(chunk)
- logger.info(f'附件下载成功|{file.name}')
- return file
- def parse(directory, html):
- selector = Selector(text=html)
- nodes = selector.xpath('//div[@class="content-fj"]/ul/li')
- dedup = set()
- attachments = {}
- for node in nodes:
- title = node.xpath('./a/@title').extract_first('').strip()
- if not title or title in dedup:
- continue
- href = node.xpath('./a/@href').extract_first('').strip()
- date_str = node.xpath('./span/text()').extract_first('').strip()
- filepath = directory + os.sep + title
- file = download_file(filepath, href, timeout=300)
- if file.exists() and file.stat().st_size > 0:
- attachments[str(len(attachments) + 1)] = {
- 'filename': title,
- 'filehref': href,
- 'filetype': file.suffixes[0][1:],
- 'publishdate': date_str,
- 'filepath': str(file.absolute()),
- }
- dedup.add(title)
- return attachments
- def download(task):
- href = task['href']
- title = task['title']
- publish_date = task['publishdate']
- ms5_str = hashlib.md5(title.encode()).hexdigest()
- directory = ''.join((publish_date, os.sep, ms5_str))
- try:
- html = fetch_page_html(href)
- assert html is not None
- attachments = parse(directory, html)
- logger.info(f'采集成功|{title}|附件数量:{len(attachments)}')
- return task, attachments
- except Exception as e:
- logger.error(f'采集失败|{title}|{type(e).__name__}')
- return task, None
- def _spider(executor, query):
- while True:
- if lst_coll.count_documents(query) == 0:
- break
- fs = []
- for task in get_tasks(query, limit=1000):
- fs.append(executor.submit(download, task))
- inserts = []
- for f in as_completed(fs):
- task, files = f.result()
- if files is None:
- if not query.get('retry'):
- lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1}})
- else:
- task['retry'] += 1
- lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1, 'retry': task['retry']}})
- else:
- item = {
- 'title': task['title'],
- 'href': task['href'],
- 'publishdate': task['publishdate'],
- }
- if len(files) > 0:
- item['attachments'] = files
- inserts.append(item)
- if len(inserts) > 10:
- detail_coll.insert_many(inserts, ordered=True)
- inserts = []
- lst_coll.update_one({'_id': task['_id']}, {'$set': {'isdownload': 1, 'isfailed': 0}})
- wait(fs)
- if len(inserts) > 0:
- detail_coll.insert_many(inserts, ordered=True)
- def main(threads):
- logger.info("任务开始")
- executor = ThreadPoolExecutor(max_workers=threads, thread_name_prefix='detail')
- try:
- _spider(executor, query={'isdownload': 0, 'isfailed': 0})
- _spider(executor, query={'isdownload': 0, 'isfailed': 1})
- finally:
- executor.shutdown()
- logger.info("任务结束")
- if __name__ == '__main__':
- main(threads=20)
|