attachment.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2020-09-06
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import io
  10. import os
  11. import uuid
  12. import requests
  13. import tqdm
  14. import urllib3
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log as logger
  17. from untils.aliyun import AliYunService
  18. from untils.execptions import AttachmentNullError
  19. urllib3.disable_warnings()
  20. headers = {
  21. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  22. "Accept": "*/*"
  23. }
  24. def clear_file_type_suffix(filename: str, filetype: str):
  25. filename = filename.strip()
  26. if filetype in filename:
  27. filename = filename.replace(f".{filetype}", '')
  28. return filename
  29. class AttachmentDownloader:
  30. def __init__(self):
  31. self.dir_name = "file"
  32. def create_file(self, filename, filetype):
  33. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  34. file = "{filename}.{filetype}".format(
  35. filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
  36. filetype=filetype
  37. )
  38. return "{}/{}".format(self.dir_name, file)
  39. @staticmethod
  40. def clean_attachment(filepath):
  41. """
  42. 删除文件
  43. :param str filepath: 文件路径
  44. """
  45. try:
  46. os.remove(filepath)
  47. except FileNotFoundError:
  48. pass
  49. def remove(self, file):
  50. self.clean_attachment(file)
  51. @staticmethod
  52. def calculate_size(data):
  53. """
  54. 计算数据大小
  55. :param int data: 准备计算大小的内容
  56. :return: float
  57. """
  58. _kb = float(data / 1024.0)
  59. return float(_kb / 1024.0)
  60. @staticmethod
  61. def getsize(data):
  62. """
  63. 计算数据大小
  64. :param data: 待上传的内容。
  65. :type data: bytes,str或file-like object
  66. :return str
  67. """
  68. size = 0
  69. if isinstance(data, str):
  70. try:
  71. size = os.path.getsize(data)
  72. except FileNotFoundError:
  73. pass
  74. elif isinstance(data, bytes):
  75. size = len(data)
  76. else:
  77. pass
  78. _kb = float(size) / 1024
  79. result = "{:.1f} kb".format(_kb)
  80. if _kb >= 1024:
  81. _M = _kb / 1024
  82. if _M >= 1024:
  83. _G = _M / 1024
  84. result = "{:.1f} G".format(_G)
  85. else:
  86. result = "{:.1f} M".format(_M)
  87. return result
  88. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  89. """
  90. 下载数据
  91. :param str url: 文件下载地址
  92. :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  93. :param file: 本地文件
  94. :param show_error_log: 展示错误堆栈信息日志
  95. """
  96. method = kwargs.pop("method", "get")
  97. request_kwargs = {}
  98. request_kwargs.setdefault("proxies", proxies)
  99. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  100. request_kwargs.setdefault("params", kwargs.pop("params", None))
  101. request_kwargs.setdefault("data", kwargs.pop("data", None))
  102. request_kwargs.setdefault("json", kwargs.pop("json", None))
  103. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  104. request_kwargs.setdefault("timeout", kwargs.pop("timeout", 60))
  105. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  106. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  107. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  108. retries = 0
  109. while retries < 3:
  110. try:
  111. with requests.request(method, url, **request_kwargs) as req:
  112. stream = io.BytesIO()
  113. lh = {k.lower(): v for k, v in req.headers.items()}
  114. cl = lh.get("content-length") or len(req.content) # 内容长度
  115. icl = int(cl)
  116. content_length = self.calculate_size(icl)
  117. if content_length > 50:
  118. # 丢弃超过50Mb内容长度的文件
  119. return stream.getvalue()
  120. if req.status_code != 200:
  121. retries += 1
  122. continue
  123. iter_content = req.iter_content(chunk_size=1024 * 20)
  124. with tqdm.tqdm(
  125. total=icl,
  126. unit="B",
  127. initial=0,
  128. unit_scale=True,
  129. unit_divisor=1024, # 1M=1024Kb,单位换算
  130. ascii=True,
  131. desc=file) as bar:
  132. if file is not None:
  133. with open(file, "wb") as f:
  134. for chunk in iter_content:
  135. stream.write(chunk)
  136. size = f.write(chunk)
  137. bar.update(size)
  138. else:
  139. for chunk in iter_content:
  140. size = stream.write(chunk)
  141. bar.update(size)
  142. return stream.getvalue()
  143. except requests.RequestException as why:
  144. retries += 1
  145. if show_error_log:
  146. logger.exception(why)
  147. return b''
  148. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  149. """
  150. 推送数据流到oss
  151. :param str filename: 文件名称
  152. :param str filetype: 文件类型
  153. :param str url: 文件下载地址
  154. """
  155. stream = self.fetch_data(url, file=None, **kwargs)
  156. attachment = {
  157. "filename": "{}.{}".format(filename, filetype),
  158. "org_url": url
  159. }
  160. if len(stream) > 0:
  161. fid = tools.get_sha1(stream)
  162. try:
  163. attachment["ftype"] = filetype
  164. attachment["fid"] = "{}.{}".format(fid, filetype)
  165. attachment["size"] = self.getsize(stream)
  166. attachment["url"] = "oss"
  167. AliYunService().push_oss_from_stream(attachment["fid"], stream)
  168. except Exception as e:
  169. logger.error(
  170. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  171. )
  172. return attachment
  173. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  174. """
  175. 上传本地文件到oss
  176. :param str filename: 文件名称
  177. :param str filetype: 文件类型
  178. :param str url: 文件下载地址
  179. """
  180. file = self.create_file(filename, filetype)
  181. stream = self.fetch_data(url, file=file, **kwargs)
  182. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  183. attachment = {
  184. "filename": "{}.{}".format(filename, filetype),
  185. "org_url": url
  186. }
  187. if len(stream) > 0:
  188. content_hash = tools.get_sha1(stream)
  189. try:
  190. attachment["fid"] = "{}.{}".format(content_hash, filetype)
  191. attachment["size"] = self.getsize(file)
  192. attachment["ftype"] = filetype
  193. attachment["url"] = "oss"
  194. AliYunService().push_oss_from_local(attachment["fid"], file)
  195. except Exception as e:
  196. logger.error(
  197. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  198. )
  199. self.remove(file) # 删除本地临时文件
  200. return attachment
  201. def fetch_attachment(
  202. self,
  203. file_name: str,
  204. file_type: str,
  205. download_url: str,
  206. mode="local",
  207. proxies=None,
  208. **kwargs
  209. ):
  210. """
  211. 下载附件
  212. @param file_name: 文件名称
  213. @param file_type: 文件类型
  214. @param download_url: 文件下载地址
  215. @param mode: 附件上传模式 "local" = 本地文件 or "stream" = 数据流
  216. @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  217. @return:
  218. """
  219. if not file_name or not file_type or not download_url:
  220. raise AttachmentNullError
  221. file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
  222. file_kwargs = dict(
  223. filename=file_name,
  224. filetype=file_type,
  225. url=download_url,
  226. proxies=proxies,
  227. **kwargs
  228. )
  229. if mode == "stream":
  230. attachment = self._push_oss_from_stream(**file_kwargs)
  231. else:
  232. attachment = self._push_oss_from_local(**file_kwargs)
  233. return attachment