import threading import requests import urllib3 from loguru import logger from requests.adapters import HTTPAdapter from requests.models import Response, REDIRECT_STATI from requests.utils import get_encodings_from_content from urllib3.util.retry import Retry from config.load import headers try: import chardet except ImportError: import charset_normalizer as chardet urllib3.disable_warnings() '''特殊编码需要解码''' SPECIAL_ENCODINGS = [ 'Windows-1254', 'ISO-8859-1' ] '''每个Session连接池大小''' DEFAULT_POOLSIZE = 10 class Downloader: def __init__(self, max_retries=3, retry_interval=0, **kwargs): self._max_retries = max_retries # 请求错误时的最大重试次数 self._backoff_factor = retry_interval # 重试间隔补偿系数 self.disable_debug_log = kwargs.pop('disable_debug_log', False) self.session = requests.Session() # 适配器 - 重试对象 retry = Retry( total=self._max_retries, backoff_factor=self._backoff_factor ) # 适配器 adapter = HTTPAdapter( pool_connections=DEFAULT_POOLSIZE, pool_maxsize=DEFAULT_POOLSIZE, max_retries=retry ) self.session.mount('http://', adapter) self.session.mount('https://', adapter) @staticmethod def prepare_params(**kw): request_params = {} request_params.setdefault('allow_redirects', False) request_params.setdefault('timeout', (kw.pop('timeout', None) or 10)) for key, val in kw.items(): if key != 'headers' and key in request_params: request_params.update({key: val}) else: request_params.setdefault(key, val) request_headers = (kw.pop('headers', None) or headers) for key, val in request_headers.items(): if key in request_headers: request_headers.update({key: val}) else: request_headers.setdefault(key, val) request_params.setdefault('headers', request_headers) return request_params @staticmethod def apparent_encoding(response): encoding = response.encoding if encoding in SPECIAL_ENCODINGS: # 根据真正的编码格式对内容进行解码 true_encoding = get_encodings_from_content(response.text) if true_encoding: encoding = true_encoding[0] else: encoding = chardet.detect(response.content)['encoding'] return encoding def _requests_by_get(self, url, **kw): request_params = self.prepare_params(**kw) response = None # 请求响应 reason = "" # 错误原因 ssl_retries = 2 # ssl证书验证,错误重试次数 while True: try: response = self.session.get(url, **request_params) # 解决重定向的网站 if response.status_code in REDIRECT_STATI: request_params.update({'allow_redirects': True}) continue response.encoding = self.apparent_encoding(response) break except requests.exceptions.SSLError as e: reason = e.__class__.__name__ if 'verify' not in request_params: request_params.setdefault('verify', False) else: if 'verify' in request_params: del request_params['verify'] url = url.replace('https', 'http') if ssl_retries <= 0: break ssl_retries -= 1 except requests.RequestException as e: reason = e.__class__.__name__ break if response is None: response = Response() response.status_code = 10001 response.encoding = 'utf-8' # 设置默认编码 response._content = None # 设置默认响应文本流 response.reason = reason if self.disable_debug_log: t_name = threading.currentThread().getName() logger.debug(f'<{t_name}-Response> {response.status_code} - {url}') return response def get(self, url, **kw): """ 网络请求 :param url: 访问地址 :param kw: requests.GET请求参数 :return: 响应对象 """ return self._requests_by_get(url, **kw) class RenderDownloader(Downloader): def get(self, url, **kw): splash_url = 'http://splash.spdata.jianyu360.com/render.json' args = { 'url': url, 'html': 1, 'iframes': 1, 'headers': headers, 'timeout': kw.pop('timeout', 2), 'wait': kw.pop('wait', 0.5), 'viewport': kw.pop('viewport', 'full'), } resp = requests.post(splash_url, json=args, headers={'content-type': 'application/json'}) return resp