# -*- coding: utf-8 -*- """ Created on 2024-02-26 --------- @summary: 附件下载模块 --------- @author: Dzr """ import io import os import uuid import requests import tqdm import urllib3 import feapder.utils.tools as tools from feapder.utils.log import log as logger from feapder.utils.oss import JyOssClient, OssBucketClient urllib3.disable_warnings() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", "Accept": "*/*" } def clear_file_type_suffix(filename: str, filetype: str): filename = filename.strip() if filetype in filename: filename = filename.replace(f".{filetype}", '') return filename class Downloader: def __init__(self): self.dir_name = "file" # self._oss = JyOssClient() self._bucket = OssBucketClient() def create_file(self, filename, filetype): os.makedirs(self.dir_name, mode=0o777, exist_ok=True) file = "{filename}.{filetype}".format( filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())), filetype=filetype ) return "{}/{}".format(self.dir_name, file) @staticmethod def clean_attachment(filepath): """ 删除文件 :param str filepath: 文件路径 """ try: os.remove(filepath) except FileNotFoundError: pass def remove(self, file): self.clean_attachment(file) @staticmethod def calculate_size(data): """ 计算数据大小 :param int data: 准备计算大小的内容 :return: float """ _kb = float(data / 1024.0) return float(_kb / 1024.0) @staticmethod def getsize(data): """ 计算数据大小 :param data: 待上传的内容。 :type data: bytes,str或file-like object :return str """ size = 0 if isinstance(data, str): try: size = os.path.getsize(data) except FileNotFoundError: pass elif isinstance(data, bytes): size = len(data) else: pass _kb = float(size) / 1024 result = "{:.1f} kb".format(_kb) if _kb >= 1024: _M = _kb / 1024 if _M >= 1024: _G = _M / 1024 result = "{:.1f} G".format(_G) else: result = "{:.1f} M".format(_M) return result @staticmethod def read_pdf_by_chunks(f, chunk_size=1024): try: with open(f, 'rb') as file: chunk = file.read(chunk_size) if "<>" in str(chunk) and "SourceModified" in str(chunk): return False elif "doctypehtml" not in str(chunk): return True elif "%PDF" in str(chunk): return True else: return False except Exception as e: return False def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs): """ 下载数据 :param str url: 文件下载地址 :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"} :param file: 本地文件 :param show_error_log: 展示错误堆栈信息日志 """ method = kwargs.pop("method", "get") request_kwargs = {} request_kwargs.setdefault("proxies", proxies) request_kwargs.setdefault("headers", kwargs.get("headers") or headers) request_kwargs.setdefault("params", kwargs.pop("params", None)) request_kwargs.setdefault("data", kwargs.pop("data", None)) request_kwargs.setdefault("json", kwargs.pop("json", None)) request_kwargs.setdefault("cookies", kwargs.pop("cookies", None)) request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120))) request_kwargs.setdefault("stream", kwargs.pop("stream", True)) request_kwargs.setdefault("verify", kwargs.pop("verify", False)) request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True)) stream = io.BytesIO() retries = 0 while retries < 3: try: with requests.request(method, url, **request_kwargs) as req: req.raise_for_status() lower_headers = {k.lower(): v for k, v in req.headers.items()} content_length = lower_headers.get('content-length') if content_length is not None: content_length = self.calculate_size(int(content_length)) if content_length > 50: # 丢弃超过50Mb内容长度的文件 return stream.getvalue() else: content_length = None chunk_size = 1024 * 20 # 20KB chunks downloaded_size = 0 with tqdm.tqdm( total=content_length, unit="B", initial=0, unit_scale=True, unit_divisor=1024, # 1M=1024Kb,单位换算 ascii=True, desc=file) as bar: iter_content = req.iter_content(chunk_size=chunk_size) if file is not None: with open(file, "wb") as f: for chunk in iter_content: size = stream.write(chunk) f.write(chunk) bar.update(size) downloaded_size += size content_length = self.calculate_size(downloaded_size) if content_length > 50: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 break else: for chunk in iter_content: size = stream.write(chunk) bar.update(size) downloaded_size += size content_length = self.calculate_size(downloaded_size) if content_length > 50: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 break return stream.getvalue() except requests.RequestException as why: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 retries += 1 if show_error_log: logger.exception(why) return stream.getvalue() def _push_oss_from_stream(self, filename, filetype, url, **kwargs): """ 推送数据流到oss :param str filename: 文件名称 :param str filetype: 文件类型 :param str url: 文件下载地址 """ gzip = kwargs.pop("gzip", False) stream = self.fetch_data(url, file=None, **kwargs) attachment = { "filename": "{}.{}".format(filename, filetype), "org_url": url } if len(stream) > 0: fid = tools.get_sha1(stream) try: attachment["ftype"] = filetype attachment["fid"] = "{}.{}".format(fid, filetype) attachment["size"] = self.getsize(stream) attachment["url"] = "oss" # self._oss.upload("file", attachment["fid"], stream, gzip=gzip) self._bucket.put_object(attachment["fid"], stream) except Exception as e: logger.error( "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__) ) return attachment def _push_oss_from_local(self, filename, filetype, url, **kwargs): """ 上传本地文件到oss :param str filename: 文件名称 :param str filetype: 文件类型 :param str url: 文件下载地址 """ gzip = kwargs.pop("gzip", False) file = self.create_file(filename, filetype) stream = self.fetch_data(url, file=file, **kwargs) '''上传/下载,无论失败成功都需要返回文件基础信息''' attachment = { "filename": "{}.{}".format(filename, filetype), "org_url": url } if kwargs.get('is_check', None): if not self.read_pdf_by_chunks(file): self.remove(file) return attachment if len(stream) > 0: content_hash = tools.get_sha1(stream) try: attachment["fid"] = "{}.{}".format(content_hash, filetype) attachment["size"] = self.getsize(file) attachment["ftype"] = filetype attachment["url"] = "oss" # self._oss.upload("file", attachment["fid"], stream, gzip=gzip) self._bucket.put_object_from_file(attachment["fid"], file) except Exception as e: logger.error( "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__) ) self.remove(file) # 删除本地临时文件 return attachment def fetch_attachment(self, file_name, file_type, download_url, mode="local", proxies=None, gzip=False, **kwargs): """ 下载附件 @param str file_name: 文件名称 @param str file_type: 文件类型 @param str download_url: 文件下载地址 @param str mode: 附件上传模式; "local" = 本地文件; "stream" = 数据流 @param bool gzip: 是否压缩 @param dict proxies: 代理 {"http":"http://xxx", "https":"https://xxx"} @return: {"fid":"", "filename":"", "url":"oss", "size":"", "ftype":"", "org_url":""} """ file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复 file_kwargs = dict( filename=file_name, filetype=file_type, url=download_url, proxies=proxies, gzip=gzip, **kwargs ) if mode == "stream": attachment = self._push_oss_from_stream(**file_kwargs) else: attachment = self._push_oss_from_local(**file_kwargs) return attachment AttachmentDownloader = Downloader