123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-02-26
- ---------
- @summary: oss附件服务
- ---------
- """
- from functools import partial
- from io import BytesIO
- import requests
- JY_OSS_URL = "http://172.17.162.27:18011"
- JY_OSS_TEST_URL = "http://172.31.31.203:1111"
- class AttachmentError(Exception):
- def __init__(self, *args, **kwargs):
- if 'code' not in kwargs and 'reason' not in kwargs:
- kwargs['code'] = 0
- kwargs['reason'] = '附件错误'
- if 'reason' in kwargs and kwargs['reason'] is None:
- kwargs['reason'] = '附件错误'
- for key, val in kwargs.items():
- setattr(self, key, val)
- super(AttachmentError, self).__init__(*args, kwargs)
- class OssClient(object):
- def __init__(self, domain: str):
- # 初始化函数,用于创建类的实例
- self.domain = domain
- def upload(self, args: dict) -> dict:
- reply = {"error_code": -1}
- try:
- files = {
- 'file': (args['object_name'], BytesIO(args['stream'])),
- }
- data = {
- 'bucket_id': args['bucket_id'],
- 'object_name': args['object_name'],
- 'gzip': args.get('gzip', False),
- }
- response = requests.post(
- f"{self.domain}/ossservice/upload",
- files=files,
- data=data,
- timeout=300,
- )
- if response.status_code == 200:
- reply.update(response.json())
- else:
- reply['error_msg'] = f"HTTP error: {response.status_code}"
- except Exception as e:
- reply['error_msg'] = str(e)
- return reply
- def download(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/download"
- response = requests.post(url, data=data, timeout=300)
- response.raise_for_status()
- reply["error_code"] = 0
- reply["error_msg"] = "下载成功"
- reply["data"] = response.content
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- def delete(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/delete"
- response = requests.post(url, data=data, timeout=10)
- response.raise_for_status()
- reply = response.json()
- reply["error_code"] = 0
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- class JyOssClient:
- def __init__(self, domain=None, mode="test"):
- if domain is None:
- domain = JY_OSS_URL
- if mode == "test":
- domain = JY_OSS_TEST_URL
- self._oss_client = OssClient(domain=domain)
- def upload(self, bucket_id, object_name, stream, gzip=False, retries=3, err_show=True):
- """
- 上传附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param bytes stream: 文件流
- :param bool gzip: 是否压缩
- :param int retries: 上传最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- "gzip": gzip,
- "stream": stream
- }
- ret = {"error_msg": "附件上传错误", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.upload(args)
- if ret["error_code"] == 0:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件上传错误")
- return ret
- def download(self, bucket_id, object_name, retries=3, err_show=False):
- """
- 下载附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param int retries: 下载最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- }
- ret = {"error_msg": "附件下载失败", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.download(args)
- if ret["error_code"] == 0 or ret["error_code"] == -1:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件下载失败")
- return ret
- def delete(self, bucket_id, object_name, retries=3, err_show=False):
- """
- 删除附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param int retries: 删除最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- }
- ret = {"error_msg": "附件删除失败", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.delete(args)
- if ret["error_code"] == 0:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件删除失败")
- return ret
- _upload = partial(upload, "file")
- push_oss_from_local = push_oss_from_stream = _upload
- AliYunService = JyOssClient
|