import oss2 from feapder.setting import ALI_BUCKET_CONFIG as oss_conf class AliYunService: def __init__(self): self.__acc_key_id = oss_conf['key_id'] self.__acc_key_secret = oss_conf['key_secret'] self.__endpoint = oss_conf['endpoint'] self.__bucket_name = oss_conf['bucket_name'] def push_oss_from_local(self, key, filename): """ 上传一个本地文件到OSS的普通文件 :param str key: 上传到OSS的文件名 :param str filename: 本地文件名,需要有可读权限 """ auth = oss2.Auth(self.__acc_key_id, self.__acc_key_secret) bucket = oss2.Bucket(auth, self.__endpoint, self.__bucket_name) bucket.put_object_from_file(key, filename) def push_oss_from_stream(self, key, data): """ 流式上传oss :param str key: 上传到OSS的文件名 :param data: 待上传的内容。 :type data: bytes,str或file-like object """ auth = oss2.Auth(self.__acc_key_id, self.__acc_key_secret) bucket = oss2.Bucket(auth, self.__endpoint, self.__bucket_name) bucket.put_object(key, data)