123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- import io
- import os
- import traceback
- import uuid
- from io import BytesIO
- import oss2
- import requests
- import urllib3
- from config.load import headers, ali_oss
- from utils.clean_file import (
- clean_file_name,
- judge_file_url,
- modify_file_url_list,
- remove,
- limit_file_size
- )
- from utils.clean_file import sha1, getsize
- from utils.log import logger
- from utils.socks5 import Proxy
- urllib3.disable_warnings()
- class OssClient(object):
- def __init__(self, domain: str):
- # 初始化函数,用于创建类的实例
- self.domain = domain
- def upload(self, args: dict, retries=3, err_show=False) -> dict:
- reply = {"error_code": -1}
- raise_err = None
- for _ in range(retries):
- try:
- files = {
- 'file': (args['object_name'], BytesIO(args['stream'])),
- }
- data = {
- 'bucket_id': args['bucket_id'],
- 'object_name': args['object_name'],
- 'gzip': args.get('gzip', False),
- }
- response = requests.post(
- f"{self.domain}/ossservice/upload",
- files=files,
- data=data,
- timeout=300,
- )
- if response.status_code == 200:
- reply.update(response.json())
- else:
- reply['error_msg'] = f"HTTP error: {response.status_code}"
- raise_err = None
- break
- except Exception as e:
- reply['error_msg'] = str(e)
- raise_err = e
- if err_show and raise_err is not None:
- raise raise_err
- return reply
- def download(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/download"
- response = requests.post(url, data=data, timeout=300)
- response.raise_for_status()
- reply["error_code"] = 0
- reply["error_msg"] = "下载成功"
- reply["data"] = response.content
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- def delete(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/delete"
- response = requests.post(url, data=data, timeout=10)
- response.raise_for_status()
- reply = response.json()
- reply["error_code"] = 0
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- class OssBucketClient:
- def __init__(self):
- key_id = ali_oss['key_id']
- key_secret = ali_oss['key_secret']
- endpoint = ali_oss['endpoint']
- bucket_name = ali_oss['bucket_name']
- auth = oss2.Auth(key_id, key_secret)
- self._bucket = oss2.Bucket(auth, endpoint, bucket_name)
- def push_oss_from_local(self, key, filename):
- """
- 上传一个本地文件到OSS的普通文件
- :param str key: 上传到OSS的文件名
- :param str filename: 本地文件名,需要有可读权限
- """
- return self._bucket.put_object_from_file(key, filename)
- def push_oss_from_stream(self, key, data):
- """
- 流式上传oss
- :param str key: 上传到OSS的文件名
- :param data: 待上传的内容。
- :type data: bytes,str或file-like object
- """
- return self._bucket.put_object(key, data)
- class AttachmentDownloader:
- def __init__(self, address=None, mode=None):
- self.dir_name = 'file'
- if address is None:
- address = ali_oss['jy']['address']
- if mode == 'test':
- address = ali_oss['jy']['address_test']
- # self._oss = OssClient(domain=address)
- self._bucket = OssBucketClient()
- def _create_file(self, filename, filetype):
- os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
- file = "{filename}.{filetype}".format(
- filename=sha1("{}_{}".format(filename, uuid.uuid4())),
- filetype=filetype
- )
- return "{}/{}".format(self.dir_name, file)
- @staticmethod
- def _create_fid(file_stream: bytes):
- return sha1(file_stream)
- @staticmethod
- def _origin_filename(fid: str, filetype: str):
- return "{}.{}".format(fid, filetype)
- @staticmethod
- def _calc_size(size: float):
- if size >= 1024:
- _M = size / 1024
- if _M >= 1024:
- _G = _M / 1024
- return "{:.1f} G".format(_G)
- else:
- return "{:.1f} M".format(_M)
- else:
- return "{:.1f} kb".format(size)
- def _file_size(self, file):
- _kb = float(getsize(file)) / 1024
- return self._calc_size(_kb)
- def stream_download(self, file_path, response):
- max_size = 1024 * 1024 # 1M
- stream = io.BytesIO()
- h = {k.lower(): v for k, v in response.headers.items()}
- content_length = h.get('content-length')
- if content_length is not None:
- content_size = int(content_length)
- content_length = float(content_size) / max_size # 内容体总大小,单位:M
- if content_length > 50:
- # 丢弃超过50Mb内容长度的文件
- return stream.getvalue()
- else:
- content_size = None
- chunk_size = 1024 # 单次请求数据块最大值
- data_chunk_size = 0 # 下载数据块
- with open(file_path, 'wb') as f:
- for data in response.iter_content(chunk_size=chunk_size):
- size = stream.write(data)
- f.write(data)
- data_chunk_size += size
- if data_chunk_size / max_size > 50:
- stream.truncate(0) # 清空流
- stream.seek(0) # 将位置指针移回流的开始处
- logger.warning("超过50M的附件|丢弃下载")
- break
- # 如果content_size未设置,进行估算
- if content_size is None:
- if size == chunk_size:
- # 接收到完整块,假设总大小是当前的2倍
- content_size = data_chunk_size * 2
- else:
- # 接收到最后一个块(通常小于chunk_size)
- content_size = data_chunk_size
- # 计算并显示进度
- percent = min(100, int((data_chunk_size / content_size) * 100)) if content_size else 0
- print("\r文件下载进度:%d%%(%d/%d) - %s" % (percent, data_chunk_size, content_size, file_path), end=" ")
- return stream.getvalue()
- def _fetch_file(
- self,
- method: str,
- url: str,
- file: str,
- enable_proxy: bool,
- allow_show_exception: bool,
- **kwargs
- ):
- request_params = {}
- request_params.setdefault('headers', kwargs.get('headers') or headers)
- request_params.setdefault('proxies', kwargs.get('proxies'))
- request_params.setdefault('timeout', kwargs.get('timeout') or 60)
- request_params.setdefault('stream', kwargs.get('stream') or True)
- request_params.setdefault('verify', kwargs.get('verify') or False)
- proxy = Proxy(enable_proxy)
- retries = 0
- while retries < 3:
- try:
- with requests.request(method, url, **request_params) as req:
- req.raise_for_status()
- return self.stream_download(file, req)
- except requests.RequestException:
- if allow_show_exception:
- traceback.print_exc()
- if enable_proxy:
- proxy.switch()
- request_params.update({'proxies': proxy.proxies})
- retries += 1
- return b''
- def download(
- self,
- file_name: str,
- file_type: str,
- download_url: str,
- enable_proxy=False,
- allow_request_exception=False,
- **kwargs
- ):
- request_method = kwargs.pop('method', 'get')
- file_type = file_type.strip()
- file_name = clean_file_name(file_name, file_type)
- download_url = judge_file_url(download_url)
- for app_param in modify_file_url_list:
- download_url = app_param(download_url)
- local_tmp_file = self._create_file(file_name, file_type)
- file_stream = self._fetch_file(
- request_method,
- download_url,
- local_tmp_file,
- enable_proxy,
- allow_request_exception,
- **kwargs
- )
- result = {
- 'filename': '{}.{}'.format(file_name, file_type),
- 'org_url': download_url
- }
- if len(file_stream) > 0:
- try:
- fid = self._create_fid(file_stream)
- key = self._origin_filename(fid, file_type)
- result.setdefault('fid', key)
- result.setdefault('ftype', file_type)
- result.setdefault('size', self._file_size(local_tmp_file))
- result.setdefault('url', 'oss')
- self._bucket.push_oss_from_local(key, local_tmp_file)
- # args = {
- # "bucket_id": "file",
- # "object_name": key,
- # "gzip": False,
- # "stream": file_stream
- # }
- # self._oss.upload(args, err_show=True)
- except Exception as e:
- logger.warning(
- "[{}]下载异常,原因:{}".format(file_name, type(e).__name__)
- )
- remove(local_tmp_file)
- '''上传/下载,无论失败/成功必须返回附件信息'''
- if "size" not in result:
- return result
- elif limit_file_size(result.get('size')):
- return result
- else:
- return {}
|