attachment.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. import io
  2. import os
  3. import traceback
  4. import uuid
  5. from io import BytesIO
  6. import oss2
  7. import requests
  8. import urllib3
  9. from config.load import headers, ali_oss
  10. from utils.clean_file import (
  11. clean_file_name,
  12. judge_file_url,
  13. modify_file_url_list,
  14. remove,
  15. limit_file_size
  16. )
  17. from utils.clean_file import sha1, getsize
  18. from utils.log import logger
  19. from utils.socks5 import Proxy
  20. urllib3.disable_warnings()
  21. class OssClient(object):
  22. def __init__(self, domain: str):
  23. # 初始化函数,用于创建类的实例
  24. self.domain = domain
  25. def upload(self, args: dict, retries=3, err_show=False) -> dict:
  26. reply = {"error_code": -1}
  27. raise_err = None
  28. for _ in range(retries):
  29. try:
  30. files = {
  31. 'file': (args['object_name'], BytesIO(args['stream'])),
  32. }
  33. data = {
  34. 'bucket_id': args['bucket_id'],
  35. 'object_name': args['object_name'],
  36. 'gzip': args.get('gzip', False),
  37. }
  38. response = requests.post(
  39. f"{self.domain}/ossservice/upload",
  40. files=files,
  41. data=data,
  42. timeout=300,
  43. )
  44. if response.status_code == 200:
  45. reply.update(response.json())
  46. else:
  47. reply['error_msg'] = f"HTTP error: {response.status_code}"
  48. raise_err = None
  49. break
  50. except Exception as e:
  51. reply['error_msg'] = str(e)
  52. raise_err = e
  53. if err_show and raise_err is not None:
  54. raise raise_err
  55. return reply
  56. def download(self, args: dict):
  57. reply = {}
  58. try:
  59. data = {
  60. "bucket_id": args["bucket_id"],
  61. "object_name": args["object_name"]
  62. }
  63. url = f"{self.domain}/ossservice/download"
  64. response = requests.post(url, data=data, timeout=300)
  65. response.raise_for_status()
  66. reply["error_code"] = 0
  67. reply["error_msg"] = "下载成功"
  68. reply["data"] = response.content
  69. except Exception as e:
  70. reply["error_code"] = -1
  71. reply["error_msg"] = str(e)
  72. return reply
  73. def delete(self, args: dict):
  74. reply = {}
  75. try:
  76. data = {
  77. "bucket_id": args["bucket_id"],
  78. "object_name": args["object_name"]
  79. }
  80. url = f"{self.domain}/ossservice/delete"
  81. response = requests.post(url, data=data, timeout=10)
  82. response.raise_for_status()
  83. reply = response.json()
  84. reply["error_code"] = 0
  85. except Exception as e:
  86. reply["error_code"] = -1
  87. reply["error_msg"] = str(e)
  88. return reply
  89. class OssBucketClient:
  90. def __init__(self):
  91. key_id = ali_oss['key_id']
  92. key_secret = ali_oss['key_secret']
  93. endpoint = ali_oss['endpoint']
  94. bucket_name = ali_oss['bucket_name']
  95. auth = oss2.Auth(key_id, key_secret)
  96. self._bucket = oss2.Bucket(auth, endpoint, bucket_name)
  97. def push_oss_from_local(self, key, filename):
  98. """
  99. 上传一个本地文件到OSS的普通文件
  100. :param str key: 上传到OSS的文件名
  101. :param str filename: 本地文件名,需要有可读权限
  102. """
  103. return self._bucket.put_object_from_file(key, filename)
  104. def push_oss_from_stream(self, key, data):
  105. """
  106. 流式上传oss
  107. :param str key: 上传到OSS的文件名
  108. :param data: 待上传的内容。
  109. :type data: bytes,str或file-like object
  110. """
  111. return self._bucket.put_object(key, data)
  112. class AttachmentDownloader:
  113. def __init__(self, address=None, mode=None):
  114. self.dir_name = 'file'
  115. if address is None:
  116. address = ali_oss['jy']['address']
  117. if mode == 'test':
  118. address = ali_oss['jy']['address_test']
  119. # self._oss = OssClient(domain=address)
  120. self._bucket = OssBucketClient()
  121. def _create_file(self, filename, filetype):
  122. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  123. file = "{filename}.{filetype}".format(
  124. filename=sha1("{}_{}".format(filename, uuid.uuid4())),
  125. filetype=filetype
  126. )
  127. return "{}/{}".format(self.dir_name, file)
  128. @staticmethod
  129. def _create_fid(file_stream: bytes):
  130. return sha1(file_stream)
  131. @staticmethod
  132. def _origin_filename(fid: str, filetype: str):
  133. return "{}.{}".format(fid, filetype)
  134. @staticmethod
  135. def _calc_size(size: float):
  136. if size >= 1024:
  137. _M = size / 1024
  138. if _M >= 1024:
  139. _G = _M / 1024
  140. return "{:.1f} G".format(_G)
  141. else:
  142. return "{:.1f} M".format(_M)
  143. else:
  144. return "{:.1f} kb".format(size)
  145. def _file_size(self, file):
  146. _kb = float(getsize(file)) / 1024
  147. return self._calc_size(_kb)
  148. def stream_download(self, file_path, response):
  149. max_size = 1024 * 1024 # 1M
  150. stream = io.BytesIO()
  151. h = {k.lower(): v for k, v in response.headers.items()}
  152. content_length = h.get('content-length')
  153. if content_length is not None:
  154. content_size = int(content_length)
  155. content_length = float(content_size) / max_size # 内容体总大小,单位:M
  156. if content_length > 50:
  157. # 丢弃超过50Mb内容长度的文件
  158. return stream.getvalue()
  159. else:
  160. content_size = None
  161. chunk_size = 1024 # 单次请求数据块最大值
  162. data_chunk_size = 0 # 下载数据块
  163. with open(file_path, 'wb') as f:
  164. for data in response.iter_content(chunk_size=chunk_size):
  165. size = stream.write(data)
  166. f.write(data)
  167. data_chunk_size += size
  168. if data_chunk_size / max_size > 50:
  169. stream.truncate(0) # 清空流
  170. stream.seek(0) # 将位置指针移回流的开始处
  171. logger.warning("超过50M的附件|丢弃下载")
  172. break
  173. # 如果content_size未设置,进行估算
  174. if content_size is None:
  175. if size == chunk_size:
  176. # 接收到完整块,假设总大小是当前的2倍
  177. content_size = data_chunk_size * 2
  178. else:
  179. # 接收到最后一个块(通常小于chunk_size)
  180. content_size = data_chunk_size
  181. # 计算并显示进度
  182. percent = min(100, int((data_chunk_size / content_size) * 100)) if content_size else 0
  183. print("\r文件下载进度:%d%%(%d/%d) - %s" % (percent, data_chunk_size, content_size, file_path), end=" ")
  184. return stream.getvalue()
  185. def _fetch_file(
  186. self,
  187. method: str,
  188. url: str,
  189. file: str,
  190. enable_proxy: bool,
  191. allow_show_exception: bool,
  192. **kwargs
  193. ):
  194. request_params = {}
  195. request_params.setdefault('headers', kwargs.get('headers') or headers)
  196. request_params.setdefault('proxies', kwargs.get('proxies'))
  197. request_params.setdefault('timeout', kwargs.get('timeout') or 60)
  198. request_params.setdefault('stream', kwargs.get('stream') or True)
  199. request_params.setdefault('verify', kwargs.get('verify') or False)
  200. proxy = Proxy(enable_proxy)
  201. retries = 0
  202. while retries < 3:
  203. try:
  204. with requests.request(method, url, **request_params) as req:
  205. req.raise_for_status()
  206. return self.stream_download(file, req)
  207. except requests.RequestException:
  208. if allow_show_exception:
  209. traceback.print_exc()
  210. if enable_proxy:
  211. proxy.switch()
  212. request_params.update({'proxies': proxy.proxies})
  213. retries += 1
  214. return b''
  215. def download(
  216. self,
  217. file_name: str,
  218. file_type: str,
  219. download_url: str,
  220. enable_proxy=False,
  221. allow_request_exception=False,
  222. **kwargs
  223. ):
  224. request_method = kwargs.pop('method', 'get')
  225. file_type = file_type.strip()
  226. file_name = clean_file_name(file_name, file_type)
  227. download_url = judge_file_url(download_url)
  228. for app_param in modify_file_url_list:
  229. download_url = app_param(download_url)
  230. local_tmp_file = self._create_file(file_name, file_type)
  231. file_stream = self._fetch_file(
  232. request_method,
  233. download_url,
  234. local_tmp_file,
  235. enable_proxy,
  236. allow_request_exception,
  237. **kwargs
  238. )
  239. result = {
  240. 'filename': '{}.{}'.format(file_name, file_type),
  241. 'org_url': download_url
  242. }
  243. if len(file_stream) > 0:
  244. try:
  245. fid = self._create_fid(file_stream)
  246. key = self._origin_filename(fid, file_type)
  247. result.setdefault('fid', key)
  248. result.setdefault('ftype', file_type)
  249. result.setdefault('size', self._file_size(local_tmp_file))
  250. result.setdefault('url', 'oss')
  251. self._bucket.push_oss_from_local(key, local_tmp_file)
  252. # args = {
  253. # "bucket_id": "file",
  254. # "object_name": key,
  255. # "gzip": False,
  256. # "stream": file_stream
  257. # }
  258. # self._oss.upload(args, err_show=True)
  259. except Exception as e:
  260. logger.warning(
  261. "[{}]下载异常,原因:{}".format(file_name, type(e).__name__)
  262. )
  263. remove(local_tmp_file)
  264. '''上传/下载,无论失败/成功必须返回附件信息'''
  265. if "size" not in result:
  266. return result
  267. elif limit_file_size(result.get('size')):
  268. return result
  269. else:
  270. return {}