3
0

attachment.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2020-09-06
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import io
  10. import os
  11. import uuid
  12. import requests
  13. import tqdm
  14. import urllib3
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log as logger
  17. from untils.aliyun import AliYunService
  18. from untils.execptions import AttachmentNullError
  19. urllib3.disable_warnings()
  20. headers = {
  21. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  22. "Accept": "*/*"
  23. }
  24. class AttachmentDownloader:
  25. def __init__(self):
  26. self.dir_name = "file"
  27. def create_dir(self):
  28. if not os.path.exists(self.dir_name):
  29. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  30. def create_file(self, filename, filetype):
  31. self.create_dir()
  32. sign = tools.get_sha1("{}_{}".format(filename, uuid.uuid4()))
  33. file_name = "{}.{}".format(sign, filetype)
  34. return "{}/{}".format(self.dir_name, file_name)
  35. @staticmethod
  36. def clean_attachment(file_path):
  37. """
  38. 删除文件
  39. :param str file_path: 文件路径
  40. """
  41. try:
  42. os.remove(file_path)
  43. except FileNotFoundError:
  44. pass
  45. def remove(self, file):
  46. self.clean_attachment(file)
  47. @staticmethod
  48. def get_mb(data):
  49. """
  50. 获取数据的Mb
  51. :param int data: 准备计算大小的内容
  52. :return: float
  53. """
  54. _kb = float(data / 1024.0)
  55. return float(_kb / 1024.0)
  56. @staticmethod
  57. def getsize(data):
  58. """
  59. 计算数据大小
  60. :param data: 待上传的内容。
  61. :type data: bytes,str或file-like object
  62. :return str
  63. """
  64. size = 0
  65. if isinstance(data, str):
  66. try:
  67. size = os.path.getsize(data)
  68. except FileNotFoundError:
  69. pass
  70. elif isinstance(data, bytes):
  71. size = len(data)
  72. else:
  73. pass
  74. _kb = float(size) / 1024
  75. result = "{:.1f} kb".format(_kb)
  76. if _kb >= 1024:
  77. _M = _kb / 1024
  78. if _M >= 1024:
  79. _G = _M / 1024
  80. result = "{:.1f} G".format(_G)
  81. else:
  82. result = "{:.1f} M".format(_M)
  83. return result
  84. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  85. """
  86. 数据下载
  87. :param str url: 下载地址
  88. :param file: 本地文件
  89. :param dict kwargs: requests请求参数
  90. :param dict proxies: 代理ip
  91. :param show_error_log: 展示错误堆栈信息日志
  92. """
  93. method = kwargs.pop("method", "get")
  94. request_kwargs = {}
  95. request_kwargs.setdefault("proxies", proxies)
  96. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  97. request_kwargs.setdefault("data", kwargs.pop("data", None))
  98. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  99. request_kwargs.setdefault("timeout", kwargs.pop("timeout", 60))
  100. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  101. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  102. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  103. retries = 0
  104. while retries < 3:
  105. try:
  106. with requests.request(method, url, **request_kwargs) as req:
  107. stream = io.BytesIO()
  108. lh = {k.lower(): v for k, v in req.headers.items()}
  109. '''内容长度'''
  110. cl = lh.get("content-length") or len(req.content)
  111. icl = int(cl)
  112. content_length = self.get_mb(icl)
  113. if content_length > 50:
  114. '''丢弃超过50Mb内容长度的文件'''
  115. return stream.getvalue()
  116. if req.status_code != 200:
  117. retries += 1
  118. continue
  119. iter_content = req.iter_content(chunk_size=1024 * 20)
  120. with tqdm.tqdm(
  121. total=icl,
  122. unit="B",
  123. initial=0,
  124. unit_scale=True,
  125. unit_divisor=1024, # 1M=1024Kb,单位换算
  126. ascii=True,
  127. desc=file) as bar:
  128. if file is not None:
  129. with open(file, "wb") as f:
  130. for chunk in iter_content:
  131. stream.write(chunk)
  132. size = f.write(chunk)
  133. bar.update(size)
  134. else:
  135. for chunk in iter_content:
  136. size = stream.write(chunk)
  137. bar.update(size)
  138. return stream.getvalue()
  139. except requests.RequestException as why:
  140. retries += 1
  141. if show_error_log:
  142. logger.exception(why)
  143. return b''
  144. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  145. """
  146. 将数据流推送oss
  147. :param str filename: 文件名称
  148. :param str filetype: 文件类型
  149. :param str url: 下载地址
  150. """
  151. stream = self.fetch_data(url, file=None, **kwargs)
  152. attachment = {
  153. "filename": "{}.{}".format(filename, filetype),
  154. "org_url": url
  155. }
  156. if len(stream) > 0:
  157. fid = tools.get_sha1(stream)
  158. try:
  159. attachment["ftype"] = filetype
  160. attachment["fid"] = "{}.{}".format(fid, filetype)
  161. attachment["size"] = self.getsize(stream)
  162. attachment["url"] = "oss"
  163. AliYunService().push_oss_from_stream(attachment["fid"], stream)
  164. except Exception:
  165. pass
  166. return attachment
  167. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  168. """
  169. 将本地文件推送oss
  170. :param str filename: 文件名称
  171. :param str filetype: 文件类型
  172. :param str url: 下载地址
  173. """
  174. file = self.create_file(filename, filetype)
  175. stream = self.fetch_data(url, file=file, **kwargs)
  176. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  177. attachment = {
  178. "filename": "{}.{}".format(filename, filetype),
  179. "org_url": url
  180. }
  181. if len(stream) > 0:
  182. content_hash = tools.get_sha1(stream)
  183. try:
  184. attachment["fid"] = "{}.{}".format(content_hash, filetype)
  185. attachment["size"] = self.getsize(file)
  186. attachment["ftype"] = filetype
  187. attachment["url"] = "oss"
  188. AliYunService().push_oss_from_local(attachment["fid"], file)
  189. except Exception as e:
  190. logger.error(
  191. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  192. )
  193. self.remove(file) # 删除本地临时文件
  194. return attachment
  195. def _fetch_attachment(self, filename, filetype, download_url, mode, **kwargs):
  196. """
  197. 下载附件
  198. :param str filename: 文件名称
  199. :param str filetype: 文件类型
  200. :param str download_url: 下载地址
  201. :param str mode: 附件上传模式 "local" or "stream"
  202. """
  203. file_kwargs = dict(
  204. filename=filename,
  205. filetype=filetype,
  206. url=download_url,
  207. **kwargs
  208. )
  209. if mode == "stream":
  210. res = self._push_oss_from_stream(**file_kwargs)
  211. else:
  212. res = self._push_oss_from_local(**file_kwargs)
  213. return res
  214. def fetch_attachment(
  215. self,
  216. file_name: str,
  217. file_type: str,
  218. download_url: str,
  219. proxies=None,
  220. mode="local",
  221. **kwargs
  222. ):
  223. if not file_name or not file_type or not download_url:
  224. raise AttachmentNullError
  225. file_kwargs = dict(proxies=proxies, **kwargs)
  226. return self._fetch_attachment(file_name, file_type, download_url, mode, **file_kwargs)