import hashlib import os import re import traceback import uuid from urllib.parse import urlparse, unquote import requests import urllib3 from feapder.utils.log import log as logger from untils.aliyun import AliYunService from untils.execptions import AttachmentNullError from untils.proxy_pool import ProxyPool urllib3.disable_warnings() # 文件文档类型 DOCTYPE = { 'txt', 'rtf', 'dps', 'et', 'ett', 'xls', 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx', 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx', 'odt', 'wps', 'csv', 'xml', 'xps' } # 压缩类型 COMPRESSION_TYPE = { 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab', 'arj', 'lzh', 'ace', 'uue', 'edxz', } # 图片类型 IMAGE_TYPE = { 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp', 'pdf' } # 其他类型 OTHER_TYPE = { 'swf', 'nxzf', 'xezf', 'nxcf' } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36', 'Accept': '*/*' } def sha1(val): _sha1 = hashlib.sha1() if isinstance(val, bytes): _sha1.update(str(val).encode("utf-8")) elif isinstance(val, str): _sha1.update(val.encode("utf-8")) return _sha1.hexdigest() def remove(file_path: str): try: os.remove(file_path) except FileNotFoundError: pass def getsize(file): try: return os.path.getsize(file) except FileNotFoundError: return 0 def discern_file_format(text, allow_show_waring=False): """ 识别文件格式 @param text: 识别文本 @param allow_show_waring: 是否打印警告信息 @return: 文件格式 """ file_types = { *DOCTYPE, *COMPRESSION_TYPE, *IMAGE_TYPE, *OTHER_TYPE } for file_type in file_types: all_file_format = [file_type, file_type.upper()] for t in all_file_format: result = re.match(f'.*{t}$', text, re.S) if result is not None: return t else: unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S) if allow_show_waring: logger.warning(f'[未识别文件类型]{unknown_type}') return None def extract_file_type(text): if text is None: return None return discern_file_format(text) def extract_file_name_by_href(href: str, file_type: str): """从url中抽取文件名称""" # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b] # 中文字符:[\u4e00 -\u9fa5] zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+' parser = urlparse(href) query = (parser.query or parser.path) result = re.search(f'.*\\.{file_type}', query, re.S) if result is not None: encode_str = unquote(result.group()) name = re.search(zh_char_pattern, encode_str) if name is not None: return unquote(name.group()) return None def extract_file_name(text): file_type = discern_file_format(text) if file_type is not None: repl = '.{}'.format(file_type) text = text.replace(repl, '') return text def verify_file_name(name): if extract_file_type(name) is None: raise ValueError # 去除附件名空格、两个后缀 def clean_file_name(file_name: str, file_type: str): file_name = file_name.strip() if file_type in file_name: file_name = file_name.replace(f'.{file_type}', '') return file_name # 限制附件大小:size < 5 kb 不存入数据库 def limit_file_size(file_size: str): _pattern = '^[0-9]\d*\.\d*|[1-9]\d*' if "M" in file_size or "m" in file_size: file_size = float("".join(re.findall(_pattern, file_size))) * 1000 else: file_size = "".join(re.findall(_pattern, file_size)) if float(file_size) < 5: return False else: return True # 判断附件地址是否正确 def judge_file_url(file_url: str): file_url = file_url.strip() if " " in file_url: file_url = file_url.split(" ")[0] return file_url class AttachmentDownloader(AliYunService): def __init__(self): super(AttachmentDownloader, self).__init__() self.dir_name = 'file' def _create_file(self, filename, filetype): os.makedirs(self.dir_name, mode=0o777, exist_ok=True) file = "{filename}.{filetype}".format( filename=sha1("{}_{}".format(filename, uuid.uuid4())), filetype=filetype ) return "{}/{}".format(self.dir_name, file) @staticmethod def _create_fid(file_stream: bytes): return sha1(file_stream) @staticmethod def _origin_filename(fid: str, filetype: str): return "{}.{}".format(fid, filetype) @staticmethod def _file_size(file: str): _kb = float(getsize(file)) / 1024 if _kb >= 1024: _M = _kb / 1024 if _M >= 1024: _G = _M / 1024 return "{:.1f} G".format(_G) else: return "{:.1f} M".format(_M) else: return "{:.1f} kb".format(_kb) @staticmethod def _fetch_attachment( get_file_type:str, file_type_name:str, url: str, enable_proxy=False, proxy={}, allow_show_exception=False, **kwargs ): request_params = {} request_params.setdefault('headers', kwargs.get('headers') or headers) request_params.setdefault('proxies', kwargs.get('proxies')) request_params.setdefault('timeout', kwargs.get('timeout') or 60) request_params.setdefault('stream', kwargs.get('stream') or True) request_params.setdefault('verify', kwargs.get('verify') or False) if enable_proxy: proxy = ProxyPool() else: proxy = proxy retries = 0 while retries < 3: try: with requests.get(url, **request_params) as req: if req.status_code == 200: stream = req.content ''' file_type_name 响应头中附件后缀所对应的键 get_file_type 取附件后缀的规则 file_type_txt 附件响应头 ''' if len(get_file_type) > 10: file_types = [] file_type_txt = req.headers.get(file_type_name) exec(get_file_type) if file_types: file_type = file_types[0] else: file_type = '' return stream,file_type else: return stream, get_file_type else: retries += 1 except requests.RequestException: if allow_show_exception: traceback.print_exc() if enable_proxy: request_params.update({'proxies': proxy.get()}) retries += 1 return b'' def fetch_attachment( self, get_file_type:str, file_name: str, file_type_name: str, download_url: str, enable_proxy=False, allow_show_exception=False, **kwargs ): if not file_name or not download_url: raise AttachmentNullError file_stream = self._fetch_attachment( get_file_type, file_type_name, download_url, enable_proxy, allow_show_exception=allow_show_exception, **kwargs ) if len(file_stream) == 2: file_type = file_stream[-1] else: file_type = '' file_name = clean_file_name(file_name,file_type) download_url = judge_file_url(download_url) local_tmp_file = self._create_file(file_name, file_type) with open(local_tmp_file, 'wb') as f: f.write(file_stream[0]) result = { 'filename': '{}.{}'.format(file_name, file_type), 'org_url': download_url } if len(file_stream[0]) > 0: try: fid = self._create_fid(file_stream[0]) key = self._origin_filename(fid, file_type) result.setdefault('fid', key) result.setdefault('ftype', file_type) result.setdefault('size', self._file_size(local_tmp_file)) result.setdefault('url', 'oss') super().push_oss_from_local(key, local_tmp_file) except Exception as e: logger.warning( "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__) ) remove(local_tmp_file) '''上传/下载,无论失败/成功必须返回附件信息''' if "size" not in result or limit_file_size(result.get('size')): return result else: return {}