123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-02-26
- ---------
- @summary: oss附件服务
- ---------
- """
- from functools import partial
- from io import BytesIO
- import oss2
- import requests
- JY_OSS_URL = "http://172.17.162.27:18011"
- JY_OSS_TEST_URL = "http://172.31.31.203:1111"
- # 远程bucket配置
- oss_conf = {
- "key_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
- "key_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
- "endpoint": "oss-cn-beijing-internal.aliyuncs.com",
- # "endpoint": "oss-cn-beijing.aliyuncs.com",
- "bucket_name": "jy-datafile"
- }
- class AttachmentError(Exception):
- def __init__(self, *args, **kwargs):
- if 'code' not in kwargs and 'reason' not in kwargs:
- kwargs['code'] = 0
- kwargs['reason'] = '附件错误'
- if 'reason' in kwargs and kwargs['reason'] is None:
- kwargs['reason'] = '附件错误'
- for key, val in kwargs.items():
- setattr(self, key, val)
- super(AttachmentError, self).__init__(*args, kwargs)
- class OssClient(object):
- def __init__(self, domain: str):
- # 初始化函数,用于创建类的实例
- self.domain = domain
- def upload(self, args: dict) -> dict:
- reply = {"error_code": -1}
- try:
- files = {
- 'file': (args['object_name'], BytesIO(args['stream'])),
- }
- data = {
- 'bucket_id': args['bucket_id'],
- 'object_name': args['object_name'],
- 'gzip': args.get('gzip', False),
- }
- response = requests.post(
- f"{self.domain}/ossservice/upload",
- files=files,
- data=data,
- timeout=300,
- )
- if response.status_code == 200:
- reply.update(response.json())
- else:
- reply['error_msg'] = f"HTTP error: {response.status_code}"
- except Exception as e:
- reply['error_msg'] = str(e)
- return reply
- def download(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/download"
- response = requests.post(url, data=data, timeout=300)
- response.raise_for_status()
- reply["error_code"] = 0
- reply["error_msg"] = "下载成功"
- reply["data"] = response.content
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- def delete(self, args: dict):
- reply = {}
- try:
- data = {
- "bucket_id": args["bucket_id"],
- "object_name": args["object_name"]
- }
- url = f"{self.domain}/ossservice/delete"
- response = requests.post(url, data=data, timeout=10)
- response.raise_for_status()
- reply = response.json()
- reply["error_code"] = 0
- except Exception as e:
- reply["error_code"] = -1
- reply["error_msg"] = str(e)
- return reply
- class JyOssClient:
- def __init__(self, domain=None, mode=None):
- if domain is None:
- domain = JY_OSS_URL
- if mode == "test":
- domain = JY_OSS_TEST_URL
- self._oss_client = OssClient(domain=domain)
- def upload(self, bucket_id, object_name, stream, gzip=False, retries=3, err_show=True):
- """
- 上传附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param bytes stream: 文件流
- :param bool gzip: 是否压缩
- :param int retries: 上传最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- "gzip": gzip,
- "stream": stream
- }
- ret = {"error_msg": "附件上传错误", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.upload(args)
- if ret["error_code"] == 0:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件上传错误")
- return ret
- def download(self, bucket_id, object_name, retries=3, err_show=False):
- """
- 下载附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param int retries: 下载最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- }
- ret = {"error_msg": "附件下载失败", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.download(args)
- if ret["error_code"] == 0 or ret["error_code"] == -1:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件下载失败")
- return ret
- def delete(self, bucket_id, object_name, retries=3, err_show=False):
- """
- 删除附件
- :param str bucket_id: 文件名
- :param str object_name: 对象名称
- :param int retries: 删除最大重试次数
- :param bool err_show: 是否展示错误
- """
- args = {
- "bucket_id": bucket_id,
- "object_name": object_name,
- }
- ret = {"error_msg": "附件删除失败", "error_code": -1}
- for _ in range(retries):
- ret = self._oss_client.delete(args)
- if ret["error_code"] == 0:
- return ret
- if err_show:
- raise AttachmentError(reason=ret.get("error_msg") or "附件删除失败")
- return ret
- _upload = partial(upload, "file")
- push_oss_from_local = push_oss_from_stream = _upload
- class OssBucketClient:
- def __init__(self):
- key_id = oss_conf['key_id']
- key_secret = oss_conf['key_secret']
- endpoint = oss_conf['endpoint']
- bucket_name = oss_conf['bucket_name']
- auth = oss2.Auth(key_id, key_secret)
- self._bucket = oss2.Bucket(auth, endpoint, bucket_name)
- def push_oss_from_local(self, key, filename):
- """
- 上传一个本地文件到OSS的普通文件
- :param str key: 上传到OSS的文件名
- :param str filename: 本地文件名,需要有可读权限
- """
- return self._bucket.put_object_from_file(key, filename)
- def push_oss_from_stream(self, key, data):
- """
- 流式上传oss
- :param str key: 上传到OSS的文件名
- :param data: 待上传的内容。
- :type data: bytes,str或file-like object
- """
- return self._bucket.put_object(key, data)
- AliYunService = OssBucketClient
|