123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- """
- Created on 2022-03-06
- ---------
- @summary: 附件下载模块res
- ---------
- @author: Lzz
- """
- import os
- import re
- import uuid
- from urllib.parse import urlparse, unquote
- import requests
- import urllib3
- import feapder.utils.tools as tools
- from feapder.utils.log import log as logger
- from feapder.utils.oss import JyOssClient, OssBucketClient
- urllib3.disable_warnings()
- # 文件文档类型
- DOCTYPE = {
- "txt", "rtf", "dps", "et", "ett", "xls",
- "xlsx", "xlsb", "xlsm", "xlt", "ods", "pmd", "pmdx",
- "doc", "docm", "docx", "dot", "dotm", "dotx",
- "odt", "wps", "csv", "xml", "xps"
- }
- # 压缩类型
- COMPRESSION_TYPE = {
- "rar", "zip", "gzzb", "7z", "tar", "gz", "bz2", "jar", "iso", "cab",
- "arj", "lzh", "ace", "uue", "edxz",
- }
- # 图片类型
- IMAGE_TYPE = {
- "jpg", "png", "jpeg", "tiff", "gif", "psd", "raw", "eps", "svg", "bmp",
- "pdf"
- }
- # 其他类型
- OTHER_TYPE = {
- "swf", "nxzf", "xezf", "nxcf"
- }
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
- "Accept": "*/*"
- }
- def remove(file_path: str):
- try:
- os.remove(file_path)
- except FileNotFoundError:
- pass
- def getsize(file):
- try:
- return os.path.getsize(file)
- except FileNotFoundError:
- return 0
- def discern_file_format(text, show_warn_log=False):
- """
- 识别文件格式
- @param text: 识别文本
- @param show_warn_log: 是否打印警告信息
- @return: 文件格式
- """
- file_types = {
- *DOCTYPE,
- *COMPRESSION_TYPE,
- *IMAGE_TYPE,
- *OTHER_TYPE
- }
- for file_type in file_types:
- all_file_format = [file_type, file_type.upper()]
- for t in all_file_format:
- result = re.match(f".*{t}$", text, re.S)
- if result is not None:
- return t
- else:
- unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
- if show_warn_log:
- logger.warning(f"[未识别文件类型]{unknown_type}")
- return None
- def extract_file_type(text):
- if text is None:
- return None
- return discern_file_format(text)
- def extract_file_name_by_href(href: str, file_type: str):
- """从url中抽取文件名称"""
- # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
- # 中文字符:[\u4e00 -\u9fa5]
- zh_char_pattern = "[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+"
- parser = urlparse(href)
- query = (parser.query or parser.path)
- result = re.search(f".*\\.{file_type}", query, re.S)
- if result is not None:
- encode_str = unquote(result.group())
- name = re.search(zh_char_pattern, encode_str)
- if name is not None:
- return unquote(name.group())
- return None
- def extract_file_name(text):
- file_type = discern_file_format(text)
- if file_type is not None:
- repl = ".{}".format(file_type)
- text = text.replace(repl, '')
- return text
- def verify_file_name(name):
- if extract_file_type(name) is None:
- raise ValueError
- # 去除附件名空格、两个后缀
- def clear_file_type_suffix(file_name: str, file_type: str):
- file_name = file_name.strip()
- if file_type in file_name:
- file_name = file_name.replace(f".{file_type}", '')
- return file_name
- # 限制附件大小:size < 5 kb 不存入数据库
- def limit_file_size(file_size: str):
- _pattern = "^[0-9]\d*\.\d*|[1-9]\d*"
- if "M" in file_size or "m" in file_size:
- file_size = float("".join(re.findall(_pattern, file_size))) * 1000
- else:
- file_size = "".join(re.findall(_pattern, file_size))
- if float(file_size) < 5:
- return False
- else:
- return True
- # 判断附件地址是否正确
- def judge_file_url(file_url: str):
- file_url = file_url.strip()
- if " " in file_url:
- file_url = file_url.split(" ")[0]
- return file_url
- class Downloader:
- def __init__(self):
- self.dir_name = "file"
- # self._oss = JyOssClient()
- self._bucket = OssBucketClient()
- def _create_file(self, filename, filetype):
- os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
- file = "{filename}.{filetype}".format(
- filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
- filetype=filetype
- )
- return "{}/{}".format(self.dir_name, file)
- @staticmethod
- def _file_size(file: str):
- _kb = float(getsize(file)) / 1024
- if _kb >= 1024:
- _M = _kb / 1024
- if _M >= 1024:
- _G = _M / 1024
- return "{:.1f} G".format(_G)
- else:
- return "{:.1f} M".format(_M)
- else:
- return "{:.1f} kb".format(_kb)
- @staticmethod
- def fetch_data(
- url: str,
- callback=None,
- proxies=None,
- show_error_log=False,
- **kwargs
- ):
- """
- 下载数据
- @param url: 文件下载地址
- @param callback: 回调函数 可以是函数 也可是函数名
- @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
- @param show_error_log: 展示错误堆栈信息日志
- """
- method = kwargs.pop("method", "get")
- request_params = {}
- request_params.setdefault("proxies", proxies)
- request_params.setdefault("headers", kwargs.get("headers") or headers)
- request_params.setdefault("timeout", kwargs.pop("timeout", 60))
- request_params.setdefault("params", kwargs.pop("params", None))
- request_params.setdefault("data", kwargs.pop("data", None))
- request_params.setdefault("json", kwargs.pop("json", None))
- request_params.setdefault("cookies", kwargs.pop("cookies", None))
- request_params.setdefault("verify", kwargs.pop("verify", False))
- request_params.setdefault("stream", kwargs.pop("stream", True))
- retries = 0
- while retries < 3:
- try:
- with requests.request(method, url, **request_params) as response:
- if response.status_code == 200:
- stream = response.content
- filetype_lst = [] # 文件类型列表
- if callable(callback):
- # 通过自定义的回调函数 获取 response.headers 文件类型添加到 filetype_lst
- callback(response, filetype_lst)
- filetype = filetype_lst[0] if filetype_lst else ""
- return stream, filetype
- else:
- retries += 1
- except requests.RequestException as why:
- retries += 1
- if show_error_log:
- logger.exception(why)
- return b"", ""
- def fetch_attachment(
- self,
- file_name: str,
- download_url: str,
- file_type=None,
- callback=None,
- **kwargs
- ):
- file_kwargs = dict(callback=callback, url=download_url, **kwargs)
- filestream, filetype = self.fetch_data(**file_kwargs)
- filetype = file_type or filetype
- filename = clear_file_type_suffix(file_name, filetype)
- download_url = judge_file_url(download_url)
- # 保存本地临时文件
- local_temp_file = self._create_file(filename, filetype)
- with open(local_temp_file, "wb") as f:
- f.write(filestream)
- '''上传/下载,无论失败/成功必须返回附件信息'''
- attachment = {
- "filename": "{}.{}".format(filename, filetype),
- "org_url": download_url
- }
- if len(filestream) > 0:
- content_hash = tools.get_sha1(filestream)
- try:
- attachment["fid"] = "{}.{}".format(content_hash, filetype)
- attachment["size"] = self._file_size(local_temp_file)
- attachment["ftype"] = filetype
- attachment["url"] = "oss"
- # self._oss.upload("file", attachment["fid"], filestream)
- self._bucket.put_object_from_file(attachment["fid"], local_temp_file)
- except Exception as e:
- logger.error(
- "[{}]上传失败,原因:{}".format(file_name, e.__class__.__name__)
- )
- remove(local_temp_file) # 删除本地临时文件
- if "size" not in attachment or limit_file_size(attachment.get("size")):
- return attachment
- else:
- return {}
- AttachmentDownloader = Downloader
|