123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-02-26
- ---------
- @summary: 附件下载模块
- ---------
- @author: Dzr
- """
- import io
- import os
- import uuid
- import requests
- import tqdm
- import urllib3
- import feapder.utils.tools as tools
- from feapder.utils.log import log as logger
- from untils.aliyun import AliYunService
- from untils.execptions import AttachmentNullError
- urllib3.disable_warnings()
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
- "Accept": "*/*"
- }
- def clear_file_type_suffix(filename: str, filetype: str):
- filename = filename.strip()
- if filetype in filename:
- filename = filename.replace(f".{filetype}", '')
- return filename
- class AttachmentDownloader:
- def __init__(self):
- self.dir_name = "file"
- def create_file(self, filename, filetype):
- os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
- file = "{filename}.{filetype}".format(
- filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
- filetype=filetype
- )
- return "{}/{}".format(self.dir_name, file)
- @staticmethod
- def clean_attachment(filepath):
- """
- 删除文件
- :param str filepath: 文件路径
- """
- try:
- os.remove(filepath)
- except FileNotFoundError:
- pass
- def remove(self, file):
- self.clean_attachment(file)
- @staticmethod
- def calculate_size(data):
- """
- 计算数据大小
- :param int data: 准备计算大小的内容
- :return: float
- """
- _kb = float(data / 1024.0)
- return float(_kb / 1024.0)
- @staticmethod
- def getsize(data):
- """
- 计算数据大小
- :param data: 待上传的内容。
- :type data: bytes,str或file-like object
- :return str
- """
- size = 0
- if isinstance(data, str):
- try:
- size = os.path.getsize(data)
- except FileNotFoundError:
- pass
- elif isinstance(data, bytes):
- size = len(data)
- else:
- pass
- _kb = float(size) / 1024
- result = "{:.1f} kb".format(_kb)
- if _kb >= 1024:
- _M = _kb / 1024
- if _M >= 1024:
- _G = _M / 1024
- result = "{:.1f} G".format(_G)
- else:
- result = "{:.1f} M".format(_M)
- return result
- def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
- """
- 下载数据
- :param str url: 文件下载地址
- :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
- :param file: 本地文件
- :param show_error_log: 展示错误堆栈信息日志
- """
- method = kwargs.pop("method", "get")
- request_kwargs = {}
- request_kwargs.setdefault("proxies", proxies)
- request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
- request_kwargs.setdefault("params", kwargs.pop("params", None))
- request_kwargs.setdefault("data", kwargs.pop("data", None))
- request_kwargs.setdefault("json", kwargs.pop("json", None))
- request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
- request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120)))
- request_kwargs.setdefault("stream", kwargs.pop("stream", True))
- request_kwargs.setdefault("verify", kwargs.pop("verify", False))
- request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
- stream = io.BytesIO()
- retries = 0
- while retries < 3:
- try:
- with requests.request(method, url, **request_kwargs) as req:
- req.raise_for_status()
- lower_headers = {k.lower(): v for k, v in req.headers.items()}
- content_length = lower_headers.get('content-length')
- if content_length is not None:
- content_length = self.calculate_size(int(content_length))
- if content_length > 50:
- # 丢弃超过50Mb内容长度的文件
- return stream.getvalue()
- else:
- content_length = None
- chunk_size = 1024 * 20 # 20KB chunks
- downloaded_size = 0
- with tqdm.tqdm(
- total=content_length,
- unit="B",
- initial=0,
- unit_scale=True,
- unit_divisor=1024, # 1M=1024Kb,单位换算
- ascii=True,
- desc=file) as bar:
- iter_content = req.iter_content(chunk_size=chunk_size)
- if file is not None:
- with open(file, "wb") as f:
- for chunk in iter_content:
- size = stream.write(chunk)
- f.write(chunk)
- bar.update(size)
- downloaded_size += size
- content_length = self.calculate_size(downloaded_size)
- if content_length > 50:
- stream.truncate(0) # 截断流,保留前0个字节,即清空流
- stream.seek(0) # 将位置指针移回流的开始处
- break
- else:
- for chunk in iter_content:
- size = stream.write(chunk)
- bar.update(size)
- downloaded_size += size
- content_length = self.calculate_size(downloaded_size)
- if content_length > 50:
- stream.truncate(0) # 截断流,保留前0个字节,即清空流
- stream.seek(0) # 将位置指针移回流的开始处
- break
- return stream.getvalue()
- except requests.RequestException as why:
- stream.truncate(0) # 截断流,保留前0个字节,即清空流
- stream.seek(0) # 将位置指针移回流的开始处
- retries += 1
- if show_error_log:
- logger.exception(why)
- return stream.getvalue()
- def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
- """
- 推送数据流到oss
- :param str filename: 文件名称
- :param str filetype: 文件类型
- :param str url: 文件下载地址
- """
- stream = self.fetch_data(url, file=None, **kwargs)
- attachment = {
- "filename": "{}.{}".format(filename, filetype),
- "org_url": url
- }
- if len(stream) > 0:
- fid = tools.get_sha1(stream)
- try:
- attachment["ftype"] = filetype
- attachment["fid"] = "{}.{}".format(fid, filetype)
- attachment["size"] = self.getsize(stream)
- attachment["url"] = "oss"
- AliYunService().push_oss_from_stream(attachment["fid"], stream)
- except Exception as e:
- logger.error(
- "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
- )
- return attachment
- def read_pdf_in_chunks(self, pdf_path, chunk_size=1024):
- try:
- with open(pdf_path, 'rb') as file:
- chunk = file.read(chunk_size)
- if "<</Names <</Dests 4 0 R>>" in str(chunk) and "SourceModified" in str(chunk):
- return False
- elif "doctypehtml" not in str(chunk):
- return True
- elif "%PDF" in str(chunk):
- return True
- else:
- return False
- except Exception as e:
- return False
- def _push_oss_from_local(self, filename, filetype, url, **kwargs):
- """
- 上传本地文件到oss
- :param str filename: 文件名称
- :param str filetype: 文件类型
- :param str url: 文件下载地址
- """
- file = self.create_file(filename, filetype)
- stream = self.fetch_data(url, file=file, **kwargs)
- '''上传/下载,无论失败成功都需要返回文件基础信息'''
- attachment = {
- "filename": "{}.{}".format(filename, filetype),
- "org_url": url
- }
- if kwargs.get('is_check', None):
- if not self.read_pdf_in_chunks(file):
- self.remove(file)
- return attachment
- if len(stream) > 0:
- content_hash = tools.get_sha1(stream)
- try:
- attachment["fid"] = "{}.{}".format(content_hash, filetype)
- attachment["size"] = self.getsize(file)
- attachment["ftype"] = filetype
- attachment["url"] = "oss"
- AliYunService().push_oss_from_local(attachment["fid"], file)
- except Exception as e:
- logger.error(
- "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
- )
- self.remove(file) # 删除本地临时文件
- return attachment
- def fetch_attachment(
- self,
- file_name: str,
- file_type: str,
- download_url: str,
- mode="local",
- proxies=None,
- **kwargs
- ):
- """
- 下载附件
- @param file_name: 文件名称
- @param file_type: 文件类型
- @param download_url: 文件下载地址
- @param mode: 附件上传模式 "local" = 本地文件 or "stream" = 数据流
- @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
- @return:
- """
- if not file_name or not file_type or not download_url:
- raise AttachmentNullError
- file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
- file_kwargs = dict(
- filename=file_name,
- filetype=file_type,
- url=download_url,
- proxies=proxies,
- **kwargs
- )
- if mode == "stream":
- attachment = self._push_oss_from_stream(**file_kwargs)
- else:
- attachment = self._push_oss_from_local(**file_kwargs)
- return attachment
|