# -*- coding: utf-8 -*- """ Created on 2024-02-26 --------- @summary: oss附件服务 --------- """ from functools import partial from io import BytesIO import oss2 import requests JY_OSS_URL = "http://172.17.162.27:18011" JY_OSS_TEST_URL = "http://172.31.31.203:1111" # 远程bucket配置 oss_conf = { "key_id": "LTAI4G5x9aoZx8dDamQ7vfZi", "key_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh", "endpoint": "oss-cn-beijing-internal.aliyuncs.com", # "endpoint": "oss-cn-beijing.aliyuncs.com", "bucket_name": "jy-datafile" } class AttachmentError(Exception): def __init__(self, *args, **kwargs): if 'code' not in kwargs and 'reason' not in kwargs: kwargs['code'] = 0 kwargs['reason'] = '附件错误' if 'reason' in kwargs and kwargs['reason'] is None: kwargs['reason'] = '附件错误' for key, val in kwargs.items(): setattr(self, key, val) super(AttachmentError, self).__init__(*args, kwargs) class OssClient(object): def __init__(self, domain: str): # 初始化函数,用于创建类的实例 self.domain = domain def upload(self, args: dict) -> dict: reply = {"error_code": -1} try: files = { 'file': (args['object_name'], BytesIO(args['stream'])), } data = { 'bucket_id': args['bucket_id'], 'object_name': args['object_name'], 'gzip': args.get('gzip', False), } response = requests.post( f"{self.domain}/ossservice/upload", files=files, data=data, timeout=300, ) if response.status_code == 200: reply.update(response.json()) else: reply['error_msg'] = f"HTTP error: {response.status_code}" except Exception as e: reply['error_msg'] = str(e) return reply def download(self, args: dict): reply = {} try: data = { "bucket_id": args["bucket_id"], "object_name": args["object_name"] } url = f"{self.domain}/ossservice/download" response = requests.post(url, data=data, timeout=300) response.raise_for_status() reply["error_code"] = 0 reply["error_msg"] = "下载成功" reply["data"] = response.content except Exception as e: reply["error_code"] = -1 reply["error_msg"] = str(e) return reply def delete(self, args: dict): reply = {} try: data = { "bucket_id": args["bucket_id"], "object_name": args["object_name"] } url = f"{self.domain}/ossservice/delete" response = requests.post(url, data=data, timeout=10) response.raise_for_status() reply = response.json() reply["error_code"] = 0 except Exception as e: reply["error_code"] = -1 reply["error_msg"] = str(e) return reply class JyOssClient: def __init__(self, domain=None, mode=None): if domain is None: domain = JY_OSS_URL if mode == "test": domain = JY_OSS_TEST_URL self._oss_client = OssClient(domain=domain) def upload(self, bucket_id, object_name, stream, gzip=False, retries=3, err_show=True): """ 上传附件 :param str bucket_id: 文件名 :param str object_name: 对象名称 :param bytes stream: 文件流 :param bool gzip: 是否压缩 :param int retries: 上传最大重试次数 :param bool err_show: 是否展示错误 """ args = { "bucket_id": bucket_id, "object_name": object_name, "gzip": gzip, "stream": stream } ret = {"error_msg": "附件上传错误", "error_code": -1} for _ in range(retries): ret = self._oss_client.upload(args) if ret["error_code"] == 0: return ret if err_show: raise AttachmentError(reason=ret.get("error_msg") or "附件上传错误") return ret def download(self, bucket_id, object_name, retries=3, err_show=False): """ 下载附件 :param str bucket_id: 文件名 :param str object_name: 对象名称 :param int retries: 下载最大重试次数 :param bool err_show: 是否展示错误 """ args = { "bucket_id": bucket_id, "object_name": object_name, } ret = {"error_msg": "附件下载失败", "error_code": -1} for _ in range(retries): ret = self._oss_client.download(args) if ret["error_code"] == 0 or ret["error_code"] == -1: return ret if err_show: raise AttachmentError(reason=ret.get("error_msg") or "附件下载失败") return ret def delete(self, bucket_id, object_name, retries=3, err_show=False): """ 删除附件 :param str bucket_id: 文件名 :param str object_name: 对象名称 :param int retries: 删除最大重试次数 :param bool err_show: 是否展示错误 """ args = { "bucket_id": bucket_id, "object_name": object_name, } ret = {"error_msg": "附件删除失败", "error_code": -1} for _ in range(retries): ret = self._oss_client.delete(args) if ret["error_code"] == 0: return ret if err_show: raise AttachmentError(reason=ret.get("error_msg") or "附件删除失败") return ret _upload = partial(upload, "file") push_oss_from_local = push_oss_from_stream = _upload class OssBucketClient: def __init__(self): key_id = oss_conf['key_id'] key_secret = oss_conf['key_secret'] endpoint = oss_conf['endpoint'] bucket_name = oss_conf['bucket_name'] auth = oss2.Auth(key_id, key_secret) self._bucket = oss2.Bucket(auth, endpoint, bucket_name) def push_oss_from_local(self, key, filename): """ 上传一个本地文件到OSS的普通文件 :param str key: 上传到OSS的文件名 :param str filename: 本地文件名,需要有可读权限 """ return self._bucket.put_object_from_file(key, filename) def push_oss_from_stream(self, key, data): """ 流式上传oss :param str key: 上传到OSS的文件名 :param data: 待上传的内容。 :type data: bytes,str或file-like object """ return self._bucket.put_object(key, data) AliYunService = OssBucketClient