fs_client.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # -*- coding: utf-8 -*-
  2. '''
  3. 文件存储服务客户端
  4. author:honbbo
  5. date:2020-2-29
  6. '''
  7. from itertools import islice
  8. import oss2
  9. class FileServeClient(object):
  10. def __init__(self):
  11. '''
  12. 抽象出来的文件存储客户端
  13. 目前使用阿里云OSS对象存储服务
  14. 如果更换SeaWeedFs,需要重写此文件
  15. 注意:文件读写,都是以object-name为索引,请保存object-name
  16. '''
  17. self.auth = None
  18. self.bucket = None
  19. self.do_auth()
  20. def do_auth(self):
  21. '''
  22. 身份验证,
  23. 注意:如果auth有时间限制,请在每一批次调用do_auth方法。
  24. 为了提高效率,不必要再每次读写文件时调用。
  25. '''
  26. auth = oss2.Auth('LTAI4G5x9aoZx8dDamQ7vfZi', 'Bk98FsbPYXcJe72n1bG3Ssf73acuNh')
  27. bucket = oss2.Bucket(auth, 'http://oss-cn-beijing.aliyuncs.com', 'topjy')
  28. self.auth = auth
  29. self.bucket = bucket
  30. def list(self):
  31. '''
  32. 列举文件,仅测试用
  33. '''
  34. for b in islice(oss2.ObjectIterator(self.bucket), 10):
  35. print(b.key)
  36. def upload_text_file(self, object_name: str, file_content: str) -> (any, any):
  37. '''
  38. 文件上传
  39. '''
  40. result = self.bucket.put_object(object_name, bytes(file_content, encoding='utf-8'))
  41. status, request_id = result.status, result.request_id
  42. return status, request_id
  43. def download_text_content(self, object_name) -> (bool, str):
  44. '''
  45. 下载文本内容
  46. '''
  47. object_stream = self.bucket.get_object(object_name)
  48. content = object_stream.read()
  49. if object_stream.client_crc == object_stream.server_crc:
  50. return True, str(content, encoding='utf-8')
  51. else:
  52. return False, ''
  53. def delete_object(self, object_name: str) -> (any, any):
  54. '''
  55. 删除内容
  56. '''
  57. result = self.bucket.delete_object(object_name)
  58. status, request_id = result.status, result.request_id
  59. return status, request_id
  60. def upload_bytes_file(self, object_name: str, file_content: bytes):
  61. result = self.bucket.put_object(object_name, file_content)
  62. status, request_id = result.status, result.request_id
  63. return status, request_id
  64. def download_file(self, object_name, save_path):
  65. object_stream = self.bucket.get_object_to_file(object_name, save_path)
  66. if object_stream.status == 200:
  67. return True, save_path
  68. if __name__ == '__main__':
  69. fsc = FileServeClient()
  70. #fsc.list()
  71. #st, id = fsc.upload_text_file('my-object-2', '这是测试的文件内容')
  72. #print(st, id)
  73. st, content = fsc.download_text_content('af4477ae-705e-11ed-9389-0242ac120002')
  74. print(content)
  75. # with open("text01.txt","w") as f:
  76. # f.write(content)
  77. #st, id = fsc.delete_object('my-object-2')
  78. #print(st, id)