# -*- coding: utf-8 -*- """ Created on 2025-05-09 --------- @summary: 专项债 - 文书 - 列表 --------- @author: Dzr """ import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import requests from log import logger from parsel import Selector from pymongo import MongoClient import setting from RedisDB import RedisFilter from proxy import get_proxy rdf = RedisFilter() mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT) lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list'] def _send(method, url, headers, max_retries=3, **kwargs): for i in range(max_retries): proxies = get_proxy() try: response = requests.request(method, url, headers=headers, proxies=proxies, timeout=kwargs.pop('timeout', 10), **kwargs) response.raise_for_status() return response except IOError as e: logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}') time.sleep(.5) def parse(page, html): total = 0 count = 0 results = [] select = Selector(html) for node in select.xpath('//div[@id="to-print1"]/li'): total += 1 title = node.xpath('./a/@title').extract_first('').strip() href = node.xpath('./a/@href').extract_first('').strip() publishdate = node.xpath('./span/text()').extract_first('').strip() if not rdf.get(href): results.append({ 'title': title, 'href': href, 'publishdate': publishdate, 'page': page, 'isdownload': 0, 'isfailed': 0, 'retry': 0, }) rdf.add(href) if len(results) > 0: ret = lst_coll.insert_many(results, ordered=False) count += len(ret.inserted_ids) logger.info(f'采集成功|第{page}页|共{total}条|入库{count}条') def fetch_page_html(page, release_date=None): headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36' } url = f'https://www.celma.org.cn/zqsclb_{page}.jhtml?channelId=193&title=&province=&releaseDate=' if release_date: url += release_date response = _send('get', url, headers=headers, timeout=10, max_retries=10) if not response: logger.error(f'采集失败|第{page}页') return page, None html = response.content.decode() return page, html def parse_max_page(html): select = Selector(html) texts = select.xpath('//span[@style="margin-right: 40px;"]/text()').extract() page = next(filter(lambda x: str(x).isdigit(), ''.join(texts).strip().split(' ')), -1) return int(page) def fetch_max_page(release_date=None, **kwargs): url = 'https://www.celma.org.cn/zqsclb_1.jhtml?channelId=193&title=&province=&releaseDate=' if release_date: url += release_date headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36' } response = _send('get', url, headers=headers, timeout=10, **kwargs) if not response: logger.warning('检索失败|暂无最大页') return -1 html = response.content.decode() num = parse_max_page(html) logger.info(f'检索成功|最大{num}页') return num def fetch_max_page_by_release_date(date=None, **kwargs): if date is None: date = datetime.now().strftime('%Y-%m-%d') return fetch_max_page(release_date=date, **kwargs) def download(page): html = fetch_page_html(page) parse(page, html) return True def spider(threads): logger.info("任务开始") try: max_page = max(1, fetch_max_page(max_retries=10)) with ThreadPoolExecutor(max_workers=threads) as executor: fs = [] for page in range(1, max_page + 1): fs.append(executor.submit(fetch_page_html, page, release_date=None)) for f in as_completed(fs): page, html = f.result() if html is not None: parse(page, html) finally: logger.info("任务结束") def daily_spider(threads): logger.info("任务开始") try: release_date = datetime.now().strftime('%Y-%m-%d') max_page = max(1, fetch_max_page_by_release_date(release_date, max_retries=10)) with ThreadPoolExecutor(max_workers=threads, thread_name_prefix='list') as executor: fs = [] for page in range(1, max_page + 1): fs.append(executor.submit(fetch_page_html, page, release_date=release_date)) for f in as_completed(fs): page, html = f.result() if html is not None: parse(page, html) finally: logger.info("任务结束") if __name__ == '__main__': # spider(threads=20) daily_spider(threads=1)