#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Author : lijunliang # @Email : lijunliang@topnet.net.cn # @File : law.py # @Software: PyCharm from itertools import islice import oss2 import base64 class OssServeClient(object): def __init__(self, access_key_id='LTAI4Fzf9BwiAxVm4NKnYai2', access_key_secret='qf4Ic6OItmQXoGyUgvIlQ2BojjrpaV', endpoint='oss-cn-beijing.aliyuncs.com', bucket_name='jydocs-previewimg'): """ 抽象出来的文件存储客户端 目前使用阿里云OSS对象存储服务 如果更换SeaWeedFs,需要重写此文件 注意:文件读写,都是以object-name为索引,请保存object-name """ self.__auth = None self.__bucket = None self.__access_key_id = access_key_id self.__access_key_secret = access_key_secret self.__endpoint = endpoint self.__bucket_name = bucket_name self.do_auth() def do_auth(self): """ 身份验证, 注意:如果auth有时间限制,请在每一批次调用do_auth方法。 """ auth = oss2.Auth(self.__access_key_id, self.__access_key_secret) bucket = oss2.Bucket(auth, self.__endpoint, self.__bucket_name) self.__auth = auth self.__bucket = bucket def file_list(self): """ 列举文件,仅测试用 """ for b in islice(oss2.ObjectIterator(self.__bucket), 10): obj = b.key print(obj) def upload_text_file(self, object_name: str, file_content: str, headers: dict = None) -> (any, any): """ 文本内容上传 :param object_name: url :param file_content: 文本内容 :param headers: 自定义请求头 :return: """ result = self.__bucket.put_object(object_name, bytes(file_content, encoding='utf-8'), headers=headers) status, request_id = result.status, result.request_id return status, request_id def download_text_content(self, object_name) -> (bool, str): """ 下载文本内容 """ object_stream = self.__bucket.get_object(object_name) content = object_stream.read() if object_stream.client_crc == object_stream.server_crc: return True, str(content, encoding='utf-8') else: return False, '' def delete_object(self, object_name: str) -> (any, any): """ 删除内容 """ result = self.__bucket.delete_object(object_name) status, request_id = result.status, result.request_id return status, request_id def upload_bytes_file(self, object_name: str, file_content: bytes, headers: dict = None): """ 文件上传 :param object_name: fid :param file_content: 文件流 :param headers: 自定义请求头 :return: """ if not headers: headers = None result = self.__bucket.put_object(object_name, file_content, headers=headers) status, request_id = result.status, result.request_id return status, request_id def download_file(self, object_name, save_path): """ 文件下载到本地 :param object_name: fid :param save_path: 保存路径 :return: """ object_stream = self.__bucket.get_object_to_file(object_name, save_path) if object_stream.status == 200: return True # @staticmethod # def create_oss_meta(request_field: dict): # ''' # 生成请求头 # :param request_field: # :return: # ''' # header = {} # request_meta = {} # for key, val in request_field.items(): # if not val: # continue # print(key,"------->",val) # request_meta += "{}:{},".format(key, base64.b64encode(val.encode("utf-8"))) # # request_meta += "{}:{},".format(base64.b64encode(key.encode("utf-8")), base64.b64encode(val.encode("utf-8"))) # # request_meta[key]= "{}".format(base64.b64encode(val.encode("utf-8"))) # if request_meta: # header["x-oss-persistent-headers"] = request_meta # return header # return header @staticmethod def create_oss_meta(request_field: dict): ''' 生成请求头 :param request_field: :return: ''' header = {} request_meta = "x-oss-meta-{}" for key, val in request_field.items(): if val and key: h_k = request_meta.format(key) header[h_k] = base64.b64encode(val.encode("utf-8")) return header if __name__ == '__main__': fsc = OssServeClient() # fsc.file_list() # st = fsc.download_file("09b21fa4-87ba-11eb-aa22-00006052fe9a.docx", # "./123.docx") # st = fsc.download_file("8de27c64-87bd-11eb-a585-000060530480.pdf", # "./123.pdf") # st = fsc.download_file("cb38111c-8c43-11eb-84e3-0000605a9bb4.png", # "./456.png") # print(st) fsc.delete_object("0a163c54c1865a1e6af9f7e09daefce7.pdf") # ret = fsc.download_text_content("09b222e4-87ba-11eb-bd90-00006052fe9a") # print(ret)