import hashlib import os import re import traceback import uuid from urllib.parse import urlparse, unquote import requests import urllib3 from feapder.setting import headers from untils.execptions import AttachmentNullError from untils.aliyun import AliYunService from untils.proxy_pool import ProxyPool urllib3.disable_warnings() def hex_sha1(val): sha1 = hashlib.sha1() if isinstance(val, bytes): sha1.update(str(val).encode("utf-8")) elif isinstance(val, str): sha1.update(val.encode("utf-8")) res = sha1.hexdigest() return res def extract_file_type(text): if text is None: return None file_types = { 'pdf', 'doc', 'docx', 'rar', 'zip', 'gzzb', 'jpg', 'png' } for file_type in file_types: tmp = [file_type, file_type.upper()] for t in tmp: result = re.match(f'.*{t}$', text, re.S) if result is not None: return t else: return None def extract_file_name(href: str, file_type: str): # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b] # 中文字符:[\u4e00 -\u9fa5] zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+' parser = urlparse(href) query = (parser.query or parser.path) result = re.search(f'.*\\.{file_type}', query, re.S) if result is not None: encode_str = unquote(result.group()) name = re.search(zh_char_pattern, encode_str) if name is not None: return unquote(name.group()) return None def verify_file_name(name): if extract_file_type(name) is None: raise ValueError class AttachmentDownloader: def __init__(self): self.dir_name = 'file' def create_dir(self): if not os.path.exists(self.dir_name): os.makedirs(self.dir_name, mode=0o777, exist_ok=True) def create_file_path(self, filename, file_type): self.create_dir() sign = hex_sha1("{}_{}".format(filename, uuid.uuid4())) tmp_name = "{}.{}".format(sign, file_type) return "{}/{}".format(self.dir_name, tmp_name) @staticmethod def create_fid(file_stream: bytes): return hex_sha1(file_stream) @staticmethod def _fetch_attachment( url: str, file_path: str, enable_proxy=False, allow_show_exception=False, **kwargs ): request_params = {} request_params.setdefault('headers', kwargs.get('headers') or headers) request_params.setdefault('proxies', kwargs.get('proxies')) request_params.setdefault('timeout', kwargs.get('timeout') or 60) request_params.setdefault('stream', kwargs.get('stream') or True) request_params.setdefault('verify', kwargs.get('verify') or False) if enable_proxy: proxy = ProxyPool() else: proxy = {} retries = 0 while retries < 3: try: with requests.get(url, **request_params) as req: if req.status_code == 200: stream = req.content with open(file_path, 'wb') as f: f.write(stream) return stream else: retries += 1 except requests.RequestException: if allow_show_exception: traceback.print_exc() if enable_proxy: request_params.update({'proxies': proxy.get()}) retries += 1 return b'' @staticmethod def clean_attachment(file_path): os.remove(file_path) @staticmethod def getsize(file_path: str): def _getsize(filename): try: return os.path.getsize(filename) except: return 0 _kb = float(_getsize(file_path)) / 1024 if _kb >= 1024: _M = _kb / 1024 if _M >= 1024: _G = _M / 1024 return "{:.1f} G".format(_G) else: return "{:.1f} M".format(_M) else: return "{:.1f} kb".format(_kb) def fetch_attachment( self, file_name: str, file_type: str, download_url: str, enable_proxy=False, allow_request_exception=False, **kwargs ): if not file_name or not file_type or not download_url: raise AttachmentNullError file_path = self.create_file_path(file_name, file_type) file_stream = self._fetch_attachment( download_url, file_path, enable_proxy, allow_request_exception, **kwargs ) if len(file_stream) > 0: fid = self.create_fid(file_stream) '''上传/下载,无论失败成功都需要给出文件基础信息''' try: result = { 'filename': file_name, 'ftype': file_type, 'fid': "{}.{}".format(fid, file_type), 'org_url': download_url, 'size': self.getsize(file_path), 'url': 'oss', } AliYunService().push_oss_from_local(result['fid'], file_path) except Exception: result = { 'filename': file_name, 'org_url': download_url, } self.clean_attachment(file_path) else: result = { 'filename': file_name, 'org_url': download_url, } return result # if __name__ == '__main__': # a = AttachmentDownloader().fetch_attachment( # file_name='成建制移民村(五标段)合同', # file_type='pdf', # download_url='http://222.75.70.90/NXGPPSP_MG/downloadFileServlet?req=F&num=8b80b23f7e729b88017e758a1b03422c' # ) # print(a)