123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- # -*- coding: utf-8 -*-
- """
- Created on 2025-05-09
- ---------
- @summary: 专项债 - 文书 - 列表
- ---------
- @author: Dzr
- """
- import time
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from datetime import datetime
- import requests
- from log import logger
- from parsel import Selector
- from pymongo import MongoClient
- import setting
- from RedisDB import RedisFilter
- from proxy import get_proxy
- rdf = RedisFilter()
- mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
- lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
- def _send(method, url, headers, max_retries=3, **kwargs):
- for i in range(max_retries):
- proxies = get_proxy()
- try:
- response = requests.request(method, url,
- headers=headers,
- proxies=proxies,
- timeout=kwargs.pop('timeout', 10),
- **kwargs)
- response.raise_for_status()
- return response
- except IOError as e:
- logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
- time.sleep(.5)
- def parse(page, html):
- total = 0
- count = 0
- results = []
- select = Selector(html)
- for node in select.xpath('//div[@id="to-print1"]/li'):
- total += 1
- title = node.xpath('./a/@title').extract_first('').strip()
- href = node.xpath('./a/@href').extract_first('').strip()
- publishdate = node.xpath('./span/text()').extract_first('').strip()
- if not rdf.get(href):
- results.append({
- 'title': title,
- 'href': href,
- 'publishdate': publishdate,
- 'page': page,
- 'isdownload': 0,
- 'isfailed': 0,
- 'retry': 0,
- })
- rdf.add(href)
- if len(results) > 0:
- ret = lst_coll.insert_many(results, ordered=False)
- count += len(ret.inserted_ids)
- logger.info(f'采集成功|第{page}页|共{total}条|入库{count}条')
- def fetch_page_html(page, release_date=None):
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
- }
- url = f'https://www.celma.org.cn/zqsclb_{page}.jhtml?channelId=193&title=&province=&releaseDate='
- if release_date:
- url += release_date
- response = _send('get', url, headers=headers, timeout=10, max_retries=10)
- if not response:
- logger.error(f'采集失败|第{page}页')
- return page, None
- html = response.content.decode()
- return page, html
- def parse_max_page(html):
- select = Selector(html)
- texts = select.xpath('//span[@style="margin-right: 40px;"]/text()').extract()
- page = next(filter(lambda x: str(x).isdigit(), ''.join(texts).strip().split(' ')), -1)
- return int(page)
- def fetch_max_page(release_date=None, **kwargs):
- url = 'https://www.celma.org.cn/zqsclb_1.jhtml?channelId=193&title=&province=&releaseDate='
- if release_date:
- url += release_date
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
- }
- response = _send('get', url, headers=headers, timeout=10, **kwargs)
- if not response:
- logger.warning('检索失败|暂无最大页')
- return -1
- html = response.content.decode()
- num = parse_max_page(html)
- logger.info(f'检索成功|最大{num}页')
- return num
- def fetch_max_page_by_release_date(date=None, **kwargs):
- if date is None:
- date = datetime.now().strftime('%Y-%m-%d')
- return fetch_max_page(release_date=date, **kwargs)
- def download(page):
- html = fetch_page_html(page)
- parse(page, html)
- return True
- def spider(threads):
- logger.info("任务开始")
- try:
- max_page = max(1, fetch_max_page(max_retries=10))
- with ThreadPoolExecutor(max_workers=threads) as executor:
- fs = []
- for page in range(1, max_page + 1):
- fs.append(executor.submit(fetch_page_html, page, release_date=None))
- for f in as_completed(fs):
- page, html = f.result()
- if html is not None:
- parse(page, html)
- finally:
- logger.info("任务结束")
- def daily_spider(threads):
- logger.info("任务开始")
- try:
- release_date = datetime.now().strftime('%Y-%m-%d')
- max_page = max(1, fetch_max_page_by_release_date(release_date, max_retries=10))
- with ThreadPoolExecutor(max_workers=threads, thread_name_prefix='list') as executor:
- fs = []
- for page in range(1, max_page + 1):
- fs.append(executor.submit(fetch_page_html, page, release_date=release_date))
- for f in as_completed(fs):
- page, html = f.result()
- if html is not None:
- parse(page, html)
- finally:
- logger.info("任务结束")
- if __name__ == '__main__':
- # spider(threads=20)
- daily_spider(threads=1)
|