dzr пре 2 месеци
родитељ
комит
9c8305e797
10 измењених фајлова са 627 додато и 0 уклоњено
  1. 1 0
      spbf/README.md
  2. 105 0
      spbf/RedisDB.py
  3. 221 0
      spbf/crawl_detail_spider.py
  4. 178 0
      spbf/crawl_list_spider.py
  5. 19 0
      spbf/downloadFileDaemon.sh
  6. 33 0
      spbf/log.py
  7. 40 0
      spbf/proxy.py
  8. 5 0
      spbf/requirements.txt
  9. 20 0
      spbf/setting.py
  10. 5 0
      spbf/spider_list.sh

+ 1 - 0
spbf/README.md

@@ -0,0 +1 @@
+#### 专项债附件采集

+ 105 - 0
spbf/RedisDB.py

@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: redis 去重
+---------
+@author: dzr
+"""
+import hashlib
+
+import redis
+import setting
+
+REDIS_URL = setting.REDIS_URL
+REDIS_EXPIRE_TIME = setting.REDIS_EXPIRE_TIME
+
+
+class RedisFilter:
+
+    def __init__(self, url=None, expire_time=None):
+        if not url:
+            url = REDIS_URL
+
+        if not expire_time:
+            expire_time = REDIS_EXPIRE_TIME
+
+        self.redis_db = redis.StrictRedis.from_url(url)
+        self._ex = expire_time
+
+    def __repr__(self):
+        return "<RedisFilter: {}>".format(self.redis_db)
+
+    def exists(self, key):
+        """全量检索"""
+        if self.redis_db.exists(key) > 0:
+            return True
+        return False
+
+    def add(self, keys):
+        """
+        添加数据
+
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(添加失败返回False, 添加成功返回True)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_added = []
+        for key in keys:
+            pkey = "pylist_" + self.fingerprint(key)
+            if not self.exists(pkey):
+                is_added.append(self.redis_db.set(pkey, 1, ex=self._ex))
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]
+
+    def delete(self, keys):
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        for key in keys:
+            pkey = "pylist_" + self.fingerprint(key)
+            self.redis_db.delete(pkey)
+
+        return True
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_exist = []
+        for key in keys:
+            pkey = "pylist_" + self.fingerprint(key)
+            is_exist.append(self.exists(pkey))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]
+
+    def fingerprint(self, *args):
+        """
+        @summary: 获取唯一的64位值,获取唯一数据指纹
+        ---------
+        @param args: 去重数据集合
+        ---------
+        @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30
+        """
+        args = sorted(args)
+        sha256 = hashlib.sha256()
+        for arg in args:
+            sha256.update(str(arg).encode())
+        return sha256.hexdigest()

+ 221 - 0
spbf/crawl_detail_spider.py

@@ -0,0 +1,221 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-05-09 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+import hashlib
+import os
+import time
+from concurrent.futures import ThreadPoolExecutor, wait, as_completed
+
+import requests
+from log import logger
+from parsel import Selector
+from pymongo import MongoClient
+from pathlib import Path
+
+from proxy import get_proxy
+import setting
+
+_root_path = Path(setting.LOCAL_FILE_ROOT_DIR)
+if not _root_path.exists():
+    _root_path.mkdir(exist_ok=True)
+
+
+mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
+lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
+detail_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_detail']
+
+
+def _send(method, url, headers, max_retries=3, **kwargs):
+    for i in range(max_retries):
+        # proxies = get_proxy()
+        proxies = None
+        try:
+            response = requests.request(method, url,
+                                        headers=headers,
+                                        proxies=proxies,
+                                        timeout=kwargs.pop('timeout', 10),
+                                        **kwargs)
+            response.raise_for_status()
+            return response
+        except IOError as e:
+            logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
+            time.sleep(.5)
+
+
+def get_tasks(query, skip=0, limit=100):
+    documents = []
+    with lst_coll.find(query, skip=skip, limit=limit) as cursor:
+        for item in cursor:
+            documents.append(item)
+
+    yield from documents
+
+
+def fetch_page_html(url):
+    headers = {
+        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        'Upgrade-Insecure-Requests': '1',
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
+    }
+    response = _send('get', url, headers, max_retries=5)
+    if not response:
+        return None
+
+    return response.content.decode()
+
+
+def touch(file: Path):
+    """
+    创建文件
+
+    @param file:
+    @return: Path 实例对象
+    """
+    # 确保目录存在
+    file.parent.mkdir(parents=True, exist_ok=True)
+    # 创建文件并设置权限
+    file.touch(exist_ok=True, mode=0o777)
+    # 确保文件权限生效(某些系统需要额外操作)
+    file.chmod(0o777)
+
+
+def download_file(filepath, link, timeout=180, chunk_size=1024 * 64) -> Path:
+    file = _root_path.absolute() / filepath
+    # logger.info(f"准备下载附件|{file.name}")
+    try:
+        touch(file)
+    except Exception as e:
+        logger.error(f"创建文件失败|{type(e).__name__}")
+        return file
+
+    # 下载文件
+    headers = {
+        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        'Referer': 'https://www.celma.org.cn/',
+        'Upgrade-Insecure-Requests': '1',
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
+    }
+    response = _send('get', link, headers, stream=True, timeout=timeout)
+    if not response:
+        logger.info(f'附件下载失败|{file.name}')
+        return file
+
+    # 二进制写入文件, chunk_size 单位:KB
+    with file.open('wb') as f:
+        for chunk in response.iter_content(chunk_size=chunk_size):
+            f.write(chunk)
+
+    logger.info(f'附件下载成功|{file.name}')
+    return file
+
+
+def parse(directory, html):
+    selector = Selector(text=html)
+    nodes = selector.xpath('//div[@class="content-fj"]/ul/li')
+
+    dedup = set()
+    attachments = {}
+    for node in nodes:
+        title = node.xpath('./a/@title').extract_first('').strip()
+        if not title or title in dedup:
+            continue
+
+        href = node.xpath('./a/@href').extract_first('').strip()
+        date_str = node.xpath('./span/text()').extract_first('').strip()
+
+        filepath = directory + os.sep + title
+        file = download_file(filepath, href, timeout=300)
+        if file.exists() and file.stat().st_size > 0:
+            attachments[str(len(attachments) + 1)] = {
+                'filename': title,
+                'filehref': href,
+                'filetype': file.suffixes[0][1:],
+                'publishdate': date_str,
+                'filepath': str(file.absolute()),
+            }
+            dedup.add(title)
+
+    return attachments
+
+
+def download(task):
+    href = task['href']
+    title = task['title']
+    publish_date = task['publishdate']
+    ms5_str = hashlib.md5(title.encode()).hexdigest()
+    directory = ''.join((publish_date, os.sep, ms5_str))
+
+    try:
+        html = fetch_page_html(href)
+        assert html is not None
+        attachments = parse(directory, html)
+        logger.info(f'采集成功|{title}|附件数量:{len(attachments)}')
+        return task, attachments
+    except Exception as e:
+        logger.error(f'采集失败|{title}|{type(e).__name__}')
+        return task, None
+
+
+def _spider(executor, query):
+    while True:
+        if lst_coll.count_documents(query) == 0:
+            break
+
+        fs = []
+        for task in get_tasks(query, limit=1000):
+            fs.append(executor.submit(download, task))
+
+        inserts = []
+        for f in as_completed(fs):
+            task, files = f.result()
+            if files is None:
+                if not query.get('retry'):
+                    lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1}})
+                else:
+                    task['retry'] += 1
+                    lst_coll.update_one({'_id': task['_id']}, {'$set': {'isfailed': 1, 'retry': task['retry']}})
+
+            else:
+                item = {
+                    'title': task['title'],
+                    'href': task['href'],
+                    'publishdate': task['publishdate'],
+                }
+                if len(files) > 0:
+                    item['attachments'] = files
+
+                inserts.append(item)
+                if len(inserts) > 10:
+                    detail_coll.insert_many(inserts, ordered=True)
+                    inserts = []
+
+                lst_coll.update_one({'_id': task['_id']}, {'$set': {'isdownload': 1, 'isfailed': 0}})
+
+        wait(fs)
+
+        if len(inserts) > 0:
+            detail_coll.insert_many(inserts, ordered=True)
+
+
+def main(threads):
+    logger.info("任务开始")
+
+    executor = ThreadPoolExecutor(max_workers=threads, thread_name_prefix='detail')
+    try:
+        _spider(executor, query={'isdownload': 0, 'isfailed': 0})
+        _spider(executor, query={'isdownload': 0, 'isfailed': 1})
+
+    finally:
+        executor.shutdown()
+        logger.info("任务结束")
+
+
+if __name__ == '__main__':
+    main(threads=20)

+ 178 - 0
spbf/crawl_list_spider.py

@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-05-09 
+---------
+@summary:  专项债 - 文书 - 列表
+---------
+@author: Dzr
+"""
+import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from datetime import datetime
+
+import requests
+from log import logger
+from parsel import Selector
+from pymongo import MongoClient
+
+import setting
+from RedisDB import RedisFilter
+from proxy import get_proxy
+
+rdf = RedisFilter()
+mgo = MongoClient(setting.MGO_HOST, setting.MGO_PORT)
+lst_coll = mgo[setting.MGO_DB]['special_purpose_bond_files_list']
+
+
+def _send(method, url, headers, max_retries=3, **kwargs):
+    for i in range(max_retries):
+        proxies = get_proxy()
+        try:
+            response = requests.request(method, url,
+                                        headers=headers,
+                                        proxies=proxies,
+                                        timeout=kwargs.pop('timeout', 10),
+                                        **kwargs)
+            response.raise_for_status()
+            return response
+        except IOError as e:
+            logger.error(f'网络请求|{type(e).__name__}|{url}|重试..{i + 1}')
+            time.sleep(.5)
+
+
+def parse(page, html):
+    total = 0
+    count = 0
+
+    results = []
+    select = Selector(html)
+    for node in select.xpath('//div[@id="to-print1"]/li'):
+        total += 1
+        title = node.xpath('./a/@title').extract_first('').strip()
+        href = node.xpath('./a/@href').extract_first('').strip()
+        publishdate = node.xpath('./span/text()').extract_first('').strip()
+
+        if not rdf.get(href):
+            results.append({
+                'title': title,
+                'href': href,
+                'publishdate': publishdate,
+                'page': page,
+                'isdownload': 0,
+                'isfailed': 0,
+                'retry': 0,
+            })
+            rdf.add(href)
+
+    if len(results) > 0:
+        ret = lst_coll.insert_many(results, ordered=False)
+        count += len(ret.inserted_ids)
+
+    logger.info(f'采集成功|第{page}页|共{total}条|入库{count}条')
+
+
+def fetch_page_html(page, release_date=None):
+    headers = {
+        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        'Upgrade-Insecure-Requests': '1',
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
+    }
+    url = f'https://www.celma.org.cn/zqsclb_{page}.jhtml?channelId=193&title=&province=&releaseDate='
+    if release_date:
+        url += release_date
+
+    response = _send('get', url, headers=headers, timeout=10, max_retries=10)
+    if not response:
+        logger.error(f'采集失败|第{page}页')
+        return page, None
+
+    html = response.content.decode()
+    return page, html
+
+
+def parse_max_page(html):
+    select = Selector(html)
+    texts = select.xpath('//span[@style="margin-right: 40px;"]/text()').extract()
+    page = next(filter(lambda x: str(x).isdigit(), ''.join(texts).strip().split(' ')), -1)
+    return int(page)
+
+
+def fetch_max_page(release_date=None, **kwargs):
+    url = 'https://www.celma.org.cn/zqsclb_1.jhtml?channelId=193&title=&province=&releaseDate='
+    if release_date:
+        url += release_date
+
+    headers = {
+        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        'Upgrade-Insecure-Requests': '1',
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
+    }
+    response = _send('get', url, headers=headers, timeout=10, **kwargs)
+    if not response:
+        logger.warning('检索失败|暂无最大页')
+        return -1
+
+    html = response.content.decode()
+    num = parse_max_page(html)
+    logger.info(f'检索成功|最大{num}页')
+    return num
+
+
+def fetch_max_page_by_release_date(date=None, **kwargs):
+    if date is None:
+        date = datetime.now().strftime('%Y-%m-%d')
+
+    return fetch_max_page(release_date=date, **kwargs)
+
+
+def download(page):
+    html = fetch_page_html(page)
+    parse(page, html)
+    return True
+
+
+def spider(threads):
+    logger.info("任务开始")
+    try:
+        max_page = max(1, fetch_max_page(max_retries=10))
+
+        with ThreadPoolExecutor(max_workers=threads) as executor:
+            fs = []
+            for page in range(1, max_page + 1):
+                fs.append(executor.submit(fetch_page_html, page, release_date=None))
+
+            for f in as_completed(fs):
+                page, html = f.result()
+                if html is not None:
+                    parse(page, html)
+
+    finally:
+        logger.info("任务结束")
+
+
+def daily_spider(threads):
+    logger.info("任务开始")
+
+    try:
+        release_date = datetime.now().strftime('%Y-%m-%d')
+        max_page = max(1, fetch_max_page_by_release_date(release_date, max_retries=10))
+
+        with ThreadPoolExecutor(max_workers=threads, thread_name_prefix='list') as executor:
+            fs = []
+            for page in range(1, max_page + 1):
+                fs.append(executor.submit(fetch_page_html, page, release_date=release_date))
+
+            for f in as_completed(fs):
+                page, html = f.result()
+                if html is not None:
+                    parse(page, html)
+
+    finally:
+        logger.info("任务结束")
+
+
+if __name__ == '__main__':
+    # spider(threads=20)
+    daily_spider(threads=1)

+ 19 - 0
spbf/downloadFileDaemon.sh

@@ -0,0 +1,19 @@
+#!/bin/bash
+
+source /opt/py312/bin/activate
+
+# 进入脚本目录确保路径正确
+cd "$(dirname "$0")" || exit 1
+
+# 捕获终止信号
+trap 'kill $SPIDER_PID; exit' SIGINT SIGTERM
+
+while true; do
+  echo "$(date): 启动专项债附件下载器..."
+  nohup python3 /mnt/download_files/crawl_detail_spider.py > /dev/null 2>&1 &
+
+  SPIDER_PID=$!
+  wait $SPIDER_PID
+  echo "$(date): 专项债附件下载器进程已终止,1秒后重启..."
+  sleep 1
+done

+ 33 - 0
spbf/log.py

@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-08-22
+---------
+@summary:  日志模块
+---------
+@author: Dzr
+"""
+import sys
+from pathlib import Path
+
+from loguru import logger
+
+logger.remove()  # 删除默认logru配置
+
+_absolute = Path(__file__).absolute().parent
+_log_path = (_absolute / 'logs/log_{time:YYYYMMDD}.log').resolve()
+loguru_format = (
+    "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
+    "<level>{level: <4}</level> | "
+    "<cyan>{thread.name}</cyan> | "
+    "<cyan>{file.name}</cyan>:<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
+)
+logru_level = 'INFO'
+logger.add(
+    sink=_log_path,
+    format=loguru_format,
+    level=logru_level,
+    rotation='00:00',
+    retention='1 week',
+    encoding='utf-8',
+)
+logger.add(sys.stdout, format=loguru_format, colorize=True, level=logru_level)

+ 40 - 0
spbf/proxy.py

@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2024-02-27
+---------
+@summary: 代理
+---------
+@author: dzr
+"""
+import requests
+
+import setting
+from log import logger
+
+__all__ = ['get_proxy']
+
+
+def get_proxy(scheme=None, default=None, socks5h=False):
+    url = setting.JY_PROXY_HOST
+    headers = setting.JY_PROXY_AUTH
+
+    try:
+        proxy = requests.get(url, headers=headers, timeout=15).json()
+    except requests.RequestException:
+        return default
+
+    if not proxy:
+        logger.debug('暂无代理...')
+        return default
+
+    proxies = proxy.get('data')
+    if proxies:
+        if socks5h:
+            proxy_items = proxies.get('http')
+            proxy_h = {
+                'http': proxy_items.replace('socks5', 'socks5h'),
+                'https': proxy_items.replace('socks5', 'socks5h')
+            }
+            proxies = proxy_h
+
+    return proxies if not scheme else proxies.get(scheme, default)

+ 5 - 0
spbf/requirements.txt

@@ -0,0 +1,5 @@
+loguru==0.5.3
+parsel==1.9.1
+pymongo==3.12.0
+redis==5.2.1
+requests==2.32.3

+ 20 - 0
spbf/setting.py

@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2025-05-12 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+
+LOCAL_FILE_ROOT_DIR = '/nas/special_purpose_bond_files'
+
+REDIS_URL = "redis://default:k5ZJR5KV4q7DRZ92DQ@172.17.162.34:8361/15"
+REDIS_EXPIRE_TIME = 3600 * 24
+
+MGO_HOST = '172.17.4.87'
+MGO_PORT = 27080
+MGO_DB = 'py_theme'
+
+JY_PROXY_HOST = 'http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch'
+JY_PROXY_AUTH = {'Authorization': 'Basic amlhbnl1MDAxOjEyM3F3ZSFB'}

+ 5 - 0
spbf/spider_list.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+source /opt/py312/bin/activate
+ps -ef |grep 'crawl_list_spider'|grep -v grep |awk '{print$2}'|xargs kill -9 2>/dev/null
+nohup python3 /mnt/download_files/crawl_list_spider.py >/dev/null &