123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- import hashlib
- import os
- import re
- import traceback
- import uuid
- from urllib.parse import urlparse, unquote
- import requests
- import urllib3
- from feapder.utils.log import log as logger
- from untils.aliyun import AliYunService
- from untils.execptions import AttachmentNullError
- from untils.proxy_pool import ProxyPool
- urllib3.disable_warnings()
- # 文件文档类型
- DOCTYPE = {
- 'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
- 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
- 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
- 'odt', 'wps', 'csv', 'xml', 'xps'
- }
- # 压缩类型
- COMPRESSION_TYPE = {
- 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
- 'arj', 'lzh', 'ace', 'uue', 'edxz',
- }
- # 图片类型
- IMAGE_TYPE = {
- 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
- 'pdf'
- }
- # 其他类型
- OTHER_TYPE = {
- 'swf', 'nxzf', 'xezf', 'nxcf'
- }
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36',
- 'Accept': '*/*'
- }
- def sha1(val):
- _sha1 = hashlib.sha1()
- if isinstance(val, bytes):
- _sha1.update(str(val).encode("utf-8"))
- elif isinstance(val, str):
- _sha1.update(val.encode("utf-8"))
- return _sha1.hexdigest()
- def remove(file_path: str):
- try:
- os.remove(file_path)
- except FileNotFoundError:
- pass
- def getsize(file):
- try:
- return os.path.getsize(file)
- except FileNotFoundError:
- return 0
- def discern_file_format(text, allow_show_waring=False):
- """
- 识别文件格式
- @param text: 识别文本
- @param allow_show_waring: 是否打印警告信息
- @return: 文件格式
- """
- file_types = {
- *DOCTYPE,
- *COMPRESSION_TYPE,
- *IMAGE_TYPE,
- *OTHER_TYPE
- }
- for file_type in file_types:
- all_file_format = [file_type, file_type.upper()]
- for t in all_file_format:
- result = re.match(f'.*{t}$', text, re.S)
- if result is not None:
- return t
- else:
- unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
- if allow_show_waring:
- logger.warning(f'[未识别文件类型]{unknown_type}')
- return None
- def extract_file_type(text):
- if text is None:
- return None
- return discern_file_format(text)
- def extract_file_name_by_href(href: str, file_type: str):
- """从url中抽取文件名称"""
- # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
- # 中文字符:[\u4e00 -\u9fa5]
- zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
- parser = urlparse(href)
- query = (parser.query or parser.path)
- result = re.search(f'.*\\.{file_type}', query, re.S)
- if result is not None:
- encode_str = unquote(result.group())
- name = re.search(zh_char_pattern, encode_str)
- if name is not None:
- return unquote(name.group())
- return None
- def extract_file_name(text):
- file_type = discern_file_format(text)
- if file_type is not None:
- repl = '.{}'.format(file_type)
- text = text.replace(repl, '')
- return text
- def verify_file_name(name):
- if extract_file_type(name) is None:
- raise ValueError
- # 去除附件名空格、两个后缀
- def clean_file_name(file_name: str, file_type: str):
- file_name = file_name.strip()
- if file_type in file_name:
- file_name = file_name.replace(f'.{file_type}', '')
- return file_name
- # 限制附件大小:size < 5 kb 不存入数据库
- def limit_file_size(file_size: str):
- _pattern = '^[0-9]\d*\.\d*|[1-9]\d*'
- if "M" in file_size or "m" in file_size:
- file_size = float("".join(re.findall(_pattern, file_size))) * 1000
- else:
- file_size = "".join(re.findall(_pattern, file_size))
- if float(file_size) < 5:
- return False
- else:
- return True
- # 判断附件地址是否正确
- def judge_file_url(file_url: str):
- file_url = file_url.strip()
- if " " in file_url:
- file_url = file_url.split(" ")[0]
- return file_url
- class AttachmentDownloader(AliYunService):
- def __init__(self):
- super(AttachmentDownloader, self).__init__()
- self.dir_name = 'file'
- def _create_file(self, filename, filetype):
- os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
- file = "{filename}.{filetype}".format(
- filename=sha1("{}_{}".format(filename, uuid.uuid4())),
- filetype=filetype
- )
- return "{}/{}".format(self.dir_name, file)
- @staticmethod
- def _create_fid(file_stream: bytes):
- return sha1(file_stream)
- @staticmethod
- def _origin_filename(fid: str, filetype: str):
- return "{}.{}".format(fid, filetype)
- @staticmethod
- def _file_size(file: str):
- _kb = float(getsize(file)) / 1024
- if _kb >= 1024:
- _M = _kb / 1024
- if _M >= 1024:
- _G = _M / 1024
- return "{:.1f} G".format(_G)
- else:
- return "{:.1f} M".format(_M)
- else:
- return "{:.1f} kb".format(_kb)
- @staticmethod
- def _fetch_attachment(
- get_file_type:str,
- file_type_name:str,
- url: str,
- enable_proxy=False,
- proxy={},
- allow_show_exception=False,
- **kwargs
- ):
- request_params = {}
- request_params.setdefault('headers', kwargs.get('headers') or headers)
- request_params.setdefault('proxies', kwargs.get('proxies'))
- request_params.setdefault('timeout', kwargs.get('timeout') or 60)
- request_params.setdefault('stream', kwargs.get('stream') or True)
- request_params.setdefault('verify', kwargs.get('verify') or False)
- if enable_proxy:
- proxy = ProxyPool()
- else:
- proxy = proxy
- retries = 0
- while retries < 3:
- try:
- with requests.get(url, **request_params) as req:
- if req.status_code == 200:
- stream = req.content
- '''
- file_type_name 响应头中附件后缀所对应的键
- get_file_type 取附件后缀的规则
- file_type_txt 附件响应头
- '''
- if len(get_file_type) > 10:
- file_types = []
- file_type_txt = req.headers.get(file_type_name)
- exec(get_file_type)
- if file_types:
- file_type = file_types[0]
- else:
- file_type = ''
- return stream,file_type
- else:
- return stream, get_file_type
- else:
- retries += 1
- except requests.RequestException:
- if allow_show_exception:
- traceback.print_exc()
- if enable_proxy:
- request_params.update({'proxies': proxy.get()})
- retries += 1
- return b''
- def fetch_attachment(
- self,
- get_file_type:str,
- file_name: str,
- file_type_name: str,
- download_url: str,
- enable_proxy=False,
- allow_show_exception=False,
- **kwargs
- ):
- if not file_name or not download_url:
- raise AttachmentNullError
- file_stream = self._fetch_attachment(
- get_file_type,
- file_type_name,
- download_url,
- enable_proxy,
- allow_show_exception=allow_show_exception,
- **kwargs
- )
- if len(file_stream) == 2:
- file_type = file_stream[-1]
- else:
- file_type = ''
- file_name = clean_file_name(file_name,file_type)
- download_url = judge_file_url(download_url)
- local_tmp_file = self._create_file(file_name, file_type)
- with open(local_tmp_file, 'wb') as f:
- f.write(file_stream[0])
- result = {
- 'filename': '{}.{}'.format(file_name, file_type),
- 'org_url': download_url
- }
- if len(file_stream[0]) > 0:
- try:
- fid = self._create_fid(file_stream[0])
- key = self._origin_filename(fid, file_type)
- result.setdefault('fid', key)
- result.setdefault('ftype', file_type)
- result.setdefault('size', self._file_size(local_tmp_file))
- result.setdefault('url', 'oss')
- super().push_oss_from_local(key, local_tmp_file)
- except Exception as e:
- logger.warning(
- "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__)
- )
- remove(local_tmp_file)
- '''上传/下载,无论失败/成功必须返回附件信息'''
- if "size" not in result or limit_file_size(result.get('size')):
- return result
- else:
- return {}
|