# -*- coding: utf-8 -*- """ Created on 2024-02-26 --------- @summary: 附件下载模块 --------- @author: Dzr """ import io import os import uuid import requests import tqdm import urllib3 import feapder.utils.tools as tools from feapder.utils.log import log as logger from untils.aliyun import AliYunService from untils.execptions import AttachmentNullError urllib3.disable_warnings() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", "Accept": "*/*" } def clear_file_type_suffix(filename: str, filetype: str): filename = filename.strip() if filetype in filename: filename = filename.replace(f".{filetype}", '') return filename class AttachmentDownloader: def __init__(self): self.dir_name = "file" def create_file(self, filename, filetype): os.makedirs(self.dir_name, mode=0o777, exist_ok=True) file = "{filename}.{filetype}".format( filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())), filetype=filetype ) return "{}/{}".format(self.dir_name, file) @staticmethod def clean_attachment(filepath): """ 删除文件 :param str filepath: 文件路径 """ try: os.remove(filepath) except FileNotFoundError: pass def remove(self, file): self.clean_attachment(file) @staticmethod def calculate_size(data): """ 计算数据大小 :param int data: 准备计算大小的内容 :return: float """ _kb = float(data / 1024.0) return float(_kb / 1024.0) @staticmethod def getsize(data): """ 计算数据大小 :param data: 待上传的内容。 :type data: bytes,str或file-like object :return str """ size = 0 if isinstance(data, str): try: size = os.path.getsize(data) except FileNotFoundError: pass elif isinstance(data, bytes): size = len(data) else: pass _kb = float(size) / 1024 result = "{:.1f} kb".format(_kb) if _kb >= 1024: _M = _kb / 1024 if _M >= 1024: _G = _M / 1024 result = "{:.1f} G".format(_G) else: result = "{:.1f} M".format(_M) return result def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs): """ 下载数据 :param str url: 文件下载地址 :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"} :param file: 本地文件 :param show_error_log: 展示错误堆栈信息日志 """ method = kwargs.pop("method", "get") request_kwargs = {} request_kwargs.setdefault("proxies", proxies) request_kwargs.setdefault("headers", kwargs.get("headers") or headers) request_kwargs.setdefault("params", kwargs.pop("params", None)) request_kwargs.setdefault("data", kwargs.pop("data", None)) request_kwargs.setdefault("json", kwargs.pop("json", None)) request_kwargs.setdefault("cookies", kwargs.pop("cookies", None)) request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120))) request_kwargs.setdefault("stream", kwargs.pop("stream", True)) request_kwargs.setdefault("verify", kwargs.pop("verify", False)) request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True)) stream = io.BytesIO() retries = 0 while retries < 3: try: with requests.request(method, url, **request_kwargs) as req: req.raise_for_status() lower_headers = {k.lower(): v for k, v in req.headers.items()} content_length = lower_headers.get('content-length') if content_length is not None: content_length = self.calculate_size(int(content_length)) if content_length > 50: # 丢弃超过50Mb内容长度的文件 return stream.getvalue() else: content_length = None chunk_size = 1024 * 20 # 20KB chunks downloaded_size = 0 with tqdm.tqdm( total=content_length, unit="B", initial=0, unit_scale=True, unit_divisor=1024, # 1M=1024Kb,单位换算 ascii=True, desc=file) as bar: iter_content = req.iter_content(chunk_size=chunk_size) if file is not None: with open(file, "wb") as f: for chunk in iter_content: size = stream.write(chunk) f.write(chunk) bar.update(size) downloaded_size += size content_length = self.calculate_size(downloaded_size) if content_length > 50: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 break else: for chunk in iter_content: size = stream.write(chunk) bar.update(size) downloaded_size += size content_length = self.calculate_size(downloaded_size) if content_length > 50: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 break return stream.getvalue() except requests.RequestException as why: stream.truncate(0) # 截断流,保留前0个字节,即清空流 stream.seek(0) # 将位置指针移回流的开始处 retries += 1 if show_error_log: logger.exception(why) return stream.getvalue() def _push_oss_from_stream(self, filename, filetype, url, **kwargs): """ 推送数据流到oss :param str filename: 文件名称 :param str filetype: 文件类型 :param str url: 文件下载地址 """ stream = self.fetch_data(url, file=None, **kwargs) attachment = { "filename": "{}.{}".format(filename, filetype), "org_url": url } if len(stream) > 0: fid = tools.get_sha1(stream) try: attachment["ftype"] = filetype attachment["fid"] = "{}.{}".format(fid, filetype) attachment["size"] = self.getsize(stream) attachment["url"] = "oss" AliYunService().push_oss_from_stream(attachment["fid"], stream) except Exception as e: logger.error( "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__) ) return attachment def read_pdf_in_chunks(self, pdf_path, chunk_size=1024): try: with open(pdf_path, 'rb') as file: chunk = file.read(chunk_size) if "<>" in str(chunk) and "SourceModified" in str(chunk): return False elif "doctypehtml" not in str(chunk): return True elif "%PDF" in str(chunk): return True else: return False except Exception as e: return False def _push_oss_from_local(self, filename, filetype, url, **kwargs): """ 上传本地文件到oss :param str filename: 文件名称 :param str filetype: 文件类型 :param str url: 文件下载地址 """ file = self.create_file(filename, filetype) stream = self.fetch_data(url, file=file, **kwargs) '''上传/下载,无论失败成功都需要返回文件基础信息''' attachment = { "filename": "{}.{}".format(filename, filetype), "org_url": url } if kwargs.get('is_check', None): if not self.read_pdf_in_chunks(file): self.remove(file) return attachment if len(stream) > 0: content_hash = tools.get_sha1(stream) try: attachment["fid"] = "{}.{}".format(content_hash, filetype) attachment["size"] = self.getsize(file) attachment["ftype"] = filetype attachment["url"] = "oss" AliYunService().push_oss_from_local(attachment["fid"], file) except Exception as e: logger.error( "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__) ) self.remove(file) # 删除本地临时文件 return attachment def fetch_attachment( self, file_name: str, file_type: str, download_url: str, mode="local", proxies=None, **kwargs ): """ 下载附件 @param file_name: 文件名称 @param file_type: 文件类型 @param download_url: 文件下载地址 @param mode: 附件上传模式 "local" = 本地文件 or "stream" = 数据流 @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"} @return: """ if not file_name or not file_type or not download_url: raise AttachmentNullError file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复 file_kwargs = dict( filename=file_name, filetype=file_type, url=download_url, proxies=proxies, **kwargs ) if mode == "stream": attachment = self._push_oss_from_stream(**file_kwargs) else: attachment = self._push_oss_from_local(**file_kwargs) return attachment