123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import hashlib
- import io
- import os
- import traceback
- import uuid
- import requests
- import tqdm
- import urllib3
- from untils.aliyun import AliYunService
- from untils.execptions import AttachmentNullError
- from untils.proxy_pool import ProxyPool
- urllib3.disable_warnings()
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36',
- 'Accept': '*/*'
- }
- class AttachmentDownloader:
- """附件下载模块"""
- def __init__(self):
- self.dir_name = 'file'
- def create_dir(self):
- if not os.path.exists(self.dir_name):
- os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
- def create_file(self, filename, file_type):
- self.create_dir()
- sign = self._hash("{}_{}".format(filename, uuid.uuid4()))
- local_file_name = "{}.{}".format(sign, file_type)
- return "{}/{}".format(self.dir_name, local_file_name)
- def create_fid(self, data: bytes):
- return self._hash(data)
- @staticmethod
- def _hash(val):
- _sha1 = hashlib.sha1()
- if isinstance(val, bytes):
- _sha1.update(str(val).encode("utf-8"))
- elif isinstance(val, str):
- _sha1.update(val.encode("utf-8"))
- return _sha1.hexdigest()
- @staticmethod
- def clean_attachment(file_path):
- """
- 删除文件
- :param str file_path: 文件路径
- """
- try:
- os.remove(file_path)
- except FileNotFoundError:
- pass
- def remove(self, file):
- self.clean_attachment(file)
- @staticmethod
- def get_mb(data):
- """
- 获取数据的Mb
- :param int data: 准备计算大小的内容
- :return: float
- """
- _kb = float(data / 1024.0)
- return float(_kb / 1024.0)
- @staticmethod
- def getsize(data):
- """
- 计算数据大小
- :param data: 待上传的内容。
- :type data: bytes,str或file-like object
- :return str
- """
- size = 0
- if isinstance(data, str):
- try:
- size = os.path.getsize(data)
- except FileNotFoundError:
- pass
- elif isinstance(data, bytes):
- size = len(data)
- else:
- pass
- _kb = float(size) / 1024
- result = "{:.1f} kb".format(_kb)
- if _kb >= 1024:
- _M = _kb / 1024
- if _M >= 1024:
- _G = _M / 1024
- result = "{:.1f} G".format(_G)
- else:
- result = "{:.1f} M".format(_M)
- return result
- def fetch_data(self, url, file=None, **kwargs):
- """
- 数据下载
- :param str url: 下载地址
- :param file: 本地文件
- :param dict kwargs: requests请求参数
- :return:
- """
- enable_proxy = kwargs.pop('enable_proxy', False)
- allow_show_exception = kwargs.pop('allow_show_exception', False)
- method = kwargs.pop('method', 'get')
- request_params = {}
- request_params.setdefault('data', kwargs.pop('data', None))
- request_params.setdefault('cookies', kwargs.pop('cookies', None))
- request_params.setdefault('headers', kwargs.get('headers') or headers)
- request_params.setdefault('proxies', kwargs.get('proxies'))
- request_params.setdefault('timeout', kwargs.pop('timeout', 60))
- request_params.setdefault('stream', kwargs.pop('stream', True))
- request_params.setdefault('verify', kwargs.pop('verify', False))
- request_params.setdefault('allow_redirects', kwargs.pop('allow_redirects', True))
- retries = 0
- while retries < 3:
- try:
- with requests.request(method, url, **request_params) as req:
- stream = io.BytesIO()
- lh = {k.lower(): v for k, v in req.headers.items()}
- '''内容长度'''
- cl = lh.get('content-length') or len(req.content)
- icl = int(cl)
- content_length = self.get_mb(icl)
- if content_length > 50:
- '''丢弃超过50Mb内容长度的文件'''
- return stream.getvalue()
- if req.status_code != 200:
- retries += 1
- continue
- iter_content = req.iter_content(chunk_size=1024 * 20)
- with tqdm.tqdm(
- total=icl,
- unit='B',
- initial=0,
- unit_scale=True,
- unit_divisor=1024, # 1M=1024Kb,单位换算
- ascii=True,
- desc=file) as bar:
- if file is not None:
- with open(file, 'wb') as f:
- for chunk in iter_content:
- stream.write(chunk)
- size = f.write(chunk)
- bar.update(size)
- else:
- for chunk in iter_content:
- size = stream.write(chunk)
- bar.update(size)
- return stream.getvalue()
- except requests.RequestException:
- if allow_show_exception:
- traceback.print_exc()
- if enable_proxy:
- request_params.update({'proxies': ProxyPool().get()})
- retries += 1
- return b''
- def _push_oss_from_stream(self, file_name, file_type, url, **kw):
- """
- 将数据流推送oss
- :param str file_name: 文件名称
- :param str file_type: 文件类型
- :param str url: 下载地址
- :param dict kw: 额外下载信息
- :return: dict: 附件信息
- """
- stream = self.fetch_data(url, None, **kw)
- if len(stream) > 0:
- fid = self.create_fid(stream)
- try:
- result = {
- 'filename': file_name,
- 'ftype': file_type,
- 'fid': "{}.{}".format(fid, file_type),
- 'org_url': url,
- 'size': self.getsize(stream),
- 'url': 'oss',
- }
- AliYunService().push_oss_from_stream(result['fid'], stream)
- except Exception:
- result = {
- 'filename': file_name,
- 'org_url': url,
- }
- else:
- result = {
- 'filename': file_name,
- 'org_url': url,
- }
- return result
- def _push_oss_from_file(self, file_name, file_type, url, **kw):
- """
- 将本地文件推送oss
- :param str file_name: 文件名称
- :param str file_type: 文件类型
- :param str url: 下载地址
- :param dict kw: 额外下载信息
- :return: dict: 附件信息
- """
- file = self.create_file(file_name, file_type)
- stream = self.fetch_data(url, file, **kw)
- '''上传/下载,无论失败成功都需要返回文件基础信息'''
- if len(stream) > 0:
- fid = self.create_fid(stream)
- try:
- result = {
- 'filename': file_name,
- 'ftype': file_type,
- 'fid': "{}.{}".format(fid, file_type),
- 'org_url': url,
- 'size': self.getsize(file),
- 'url': 'oss',
- }
- AliYunService().push_oss_from_local(result['fid'], file)
- except Exception:
- result = {
- 'filename': file_name,
- 'org_url': url,
- }
- else:
- result = {
- 'filename': file_name,
- 'org_url': url,
- }
- '''删除本地临时文件'''
- self.remove(file)
- return result
- def _fetch_attachment(self, file_name, file_type, download_url, **kwargs):
- """
- 下载附件
- :param str file_name: 文件名称
- :param str file_type: 文件类型
- :param str download_url: 下载地址
- :param dict kwargs: 额外的附件下载配置
- :return: dict: 附件
- """
- mode = kwargs.pop('mode', 'local')
- if mode == "stream":
- res = self._push_oss_from_stream(
- file_name,
- file_type,
- download_url,
- **kwargs
- )
- else:
- res = self._push_oss_from_file(
- file_name,
- file_type,
- download_url,
- **kwargs
- )
- return res
- def fetch_attachment(
- self,
- file_name: str,
- file_type: str,
- download_url: str,
- **kw
- ):
- if not file_name or not file_type or not download_url:
- raise AttachmentNullError
- return self._fetch_attachment(file_name, file_type, download_url, **kw)
|