import io import os import traceback import uuid from io import BytesIO import oss2 import requests import urllib3 from config.load import headers, ali_oss from utils.clean_file import ( clean_file_name, judge_file_url, modify_file_url_list, remove, limit_file_size ) from utils.clean_file import sha1, getsize from utils.log import logger from utils.socks5 import Proxy urllib3.disable_warnings() class OssClient(object): def __init__(self, domain: str): # 初始化函数,用于创建类的实例 self.domain = domain def upload(self, args: dict, retries=3, err_show=False) -> dict: reply = {"error_code": -1} raise_err = None for _ in range(retries): try: files = { 'file': (args['object_name'], BytesIO(args['stream'])), } data = { 'bucket_id': args['bucket_id'], 'object_name': args['object_name'], 'gzip': args.get('gzip', False), } response = requests.post( f"{self.domain}/ossservice/upload", files=files, data=data, timeout=300, ) if response.status_code == 200: reply.update(response.json()) else: reply['error_msg'] = f"HTTP error: {response.status_code}" raise_err = None break except Exception as e: reply['error_msg'] = str(e) raise_err = e if err_show and raise_err is not None: raise raise_err return reply def download(self, args: dict): reply = {} try: data = { "bucket_id": args["bucket_id"], "object_name": args["object_name"] } url = f"{self.domain}/ossservice/download" response = requests.post(url, data=data, timeout=300) response.raise_for_status() reply["error_code"] = 0 reply["error_msg"] = "下载成功" reply["data"] = response.content except Exception as e: reply["error_code"] = -1 reply["error_msg"] = str(e) return reply def delete(self, args: dict): reply = {} try: data = { "bucket_id": args["bucket_id"], "object_name": args["object_name"] } url = f"{self.domain}/ossservice/delete" response = requests.post(url, data=data, timeout=10) response.raise_for_status() reply = response.json() reply["error_code"] = 0 except Exception as e: reply["error_code"] = -1 reply["error_msg"] = str(e) return reply class OssBucketClient: def __init__(self): key_id = ali_oss['key_id'] key_secret = ali_oss['key_secret'] endpoint = ali_oss['endpoint'] bucket_name = ali_oss['bucket_name'] auth = oss2.Auth(key_id, key_secret) self._bucket = oss2.Bucket(auth, endpoint, bucket_name) def push_oss_from_local(self, key, filename): """ 上传一个本地文件到OSS的普通文件 :param str key: 上传到OSS的文件名 :param str filename: 本地文件名,需要有可读权限 """ return self._bucket.put_object_from_file(key, filename) def push_oss_from_stream(self, key, data): """ 流式上传oss :param str key: 上传到OSS的文件名 :param data: 待上传的内容。 :type data: bytes,str或file-like object """ return self._bucket.put_object(key, data) class AttachmentDownloader: def __init__(self, address=None, mode=None): self.dir_name = 'file' if address is None: address = ali_oss['jy']['address'] if mode == 'test': address = ali_oss['jy']['address_test'] # self._oss = OssClient(domain=address) self._bucket = OssBucketClient() def _create_file(self, filename, filetype): os.makedirs(self.dir_name, mode=0o777, exist_ok=True) file = "{filename}.{filetype}".format( filename=sha1("{}_{}".format(filename, uuid.uuid4())), filetype=filetype ) return "{}/{}".format(self.dir_name, file) @staticmethod def _create_fid(file_stream: bytes): return sha1(file_stream) @staticmethod def _origin_filename(fid: str, filetype: str): return "{}.{}".format(fid, filetype) @staticmethod def _calc_size(size: float): if size >= 1024: _M = size / 1024 if _M >= 1024: _G = _M / 1024 return "{:.1f} G".format(_G) else: return "{:.1f} M".format(_M) else: return "{:.1f} kb".format(size) def _file_size(self, file): _kb = float(getsize(file)) / 1024 return self._calc_size(_kb) def stream_download(self, file_path, response): max_size = 1024 * 1024 # 1M stream = io.BytesIO() h = {k.lower(): v for k, v in response.headers.items()} content_length = h.get('content-length') if content_length is not None: content_size = int(content_length) content_length = float(content_size) / max_size # 内容体总大小,单位:M if content_length > 50: # 丢弃超过50Mb内容长度的文件 return stream.getvalue() else: content_size = None chunk_size = 1024 # 单次请求数据块最大值 data_chunk_size = 0 # 下载数据块 with open(file_path, 'wb') as f: for data in response.iter_content(chunk_size=chunk_size): size = stream.write(data) f.write(data) data_chunk_size += size if data_chunk_size / max_size > 50: stream.truncate(0) # 清空流 stream.seek(0) # 将位置指针移回流的开始处 logger.warning("超过50M的附件|丢弃下载") break # 如果content_size未设置,进行估算 if content_size is None: if size == chunk_size: # 接收到完整块,假设总大小是当前的2倍 content_size = data_chunk_size * 2 else: # 接收到最后一个块(通常小于chunk_size) content_size = data_chunk_size # 计算并显示进度 percent = min(100, int((data_chunk_size / content_size) * 100)) if content_size else 0 print("\r文件下载进度:%d%%(%d/%d) - %s" % (percent, data_chunk_size, content_size, file_path), end=" ") return stream.getvalue() def _fetch_file( self, method: str, url: str, file: str, enable_proxy: bool, allow_show_exception: bool, **kwargs ): request_params = {} request_params.setdefault('headers', kwargs.get('headers') or headers) request_params.setdefault('proxies', kwargs.get('proxies')) request_params.setdefault('timeout', kwargs.get('timeout') or 60) request_params.setdefault('stream', kwargs.get('stream') or True) request_params.setdefault('verify', kwargs.get('verify') or False) proxy = Proxy(enable_proxy) retries = 0 while retries < 3: try: with requests.request(method, url, **request_params) as req: req.raise_for_status() return self.stream_download(file, req) except requests.RequestException: if allow_show_exception: traceback.print_exc() if enable_proxy: proxy.switch() request_params.update({'proxies': proxy.proxies}) retries += 1 return b'' def download( self, file_name: str, file_type: str, download_url: str, enable_proxy=False, allow_request_exception=False, **kwargs ): request_method = kwargs.pop('method', 'get') file_type = file_type.strip() file_name = clean_file_name(file_name, file_type) download_url = judge_file_url(download_url) for app_param in modify_file_url_list: download_url = app_param(download_url) local_tmp_file = self._create_file(file_name, file_type) file_stream = self._fetch_file( request_method, download_url, local_tmp_file, enable_proxy, allow_request_exception, **kwargs ) result = { 'filename': '{}.{}'.format(file_name, file_type), 'org_url': download_url } if len(file_stream) > 0: try: fid = self._create_fid(file_stream) key = self._origin_filename(fid, file_type) result.setdefault('fid', key) result.setdefault('ftype', file_type) result.setdefault('size', self._file_size(local_tmp_file)) result.setdefault('url', 'oss') self._bucket.push_oss_from_local(key, local_tmp_file) # args = { # "bucket_id": "file", # "object_name": key, # "gzip": False, # "stream": file_stream # } # self._oss.upload(args, err_show=True) except Exception as e: logger.warning( "[{}]下载异常,原因:{}".format(file_name, type(e).__name__) ) remove(local_tmp_file) '''上传/下载,无论失败/成功必须返回附件信息''' if "size" not in result: return result elif limit_file_size(result.get('size')): return result else: return {}