attachment.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-02-26
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import io
  10. import os
  11. import uuid
  12. import requests
  13. import tqdm
  14. import urllib3
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log as logger
  17. from feapder.utils.oss import JyOssClient
  18. urllib3.disable_warnings()
  19. headers = {
  20. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  21. "Accept": "*/*"
  22. }
  23. def clear_file_type_suffix(filename: str, filetype: str):
  24. filename = filename.strip()
  25. if filetype in filename:
  26. filename = filename.replace(f".{filetype}", '')
  27. return filename
  28. class Downloader:
  29. def __init__(self):
  30. self.dir_name = "file"
  31. self._oss = JyOssClient()
  32. def create_file(self, filename, filetype):
  33. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  34. file = "{filename}.{filetype}".format(
  35. filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
  36. filetype=filetype
  37. )
  38. return "{}/{}".format(self.dir_name, file)
  39. @staticmethod
  40. def clean_attachment(filepath):
  41. """
  42. 删除文件
  43. :param str filepath: 文件路径
  44. """
  45. try:
  46. os.remove(filepath)
  47. except FileNotFoundError:
  48. pass
  49. def remove(self, file):
  50. self.clean_attachment(file)
  51. @staticmethod
  52. def calculate_size(data):
  53. """
  54. 计算数据大小
  55. :param int data: 准备计算大小的内容
  56. :return: float
  57. """
  58. _kb = float(data / 1024.0)
  59. return float(_kb / 1024.0)
  60. @staticmethod
  61. def getsize(data):
  62. """
  63. 计算数据大小
  64. :param data: 待上传的内容。
  65. :type data: bytes,str或file-like object
  66. :return str
  67. """
  68. size = 0
  69. if isinstance(data, str):
  70. try:
  71. size = os.path.getsize(data)
  72. except FileNotFoundError:
  73. pass
  74. elif isinstance(data, bytes):
  75. size = len(data)
  76. else:
  77. pass
  78. _kb = float(size) / 1024
  79. result = "{:.1f} kb".format(_kb)
  80. if _kb >= 1024:
  81. _M = _kb / 1024
  82. if _M >= 1024:
  83. _G = _M / 1024
  84. result = "{:.1f} G".format(_G)
  85. else:
  86. result = "{:.1f} M".format(_M)
  87. return result
  88. @staticmethod
  89. def read_pdf_by_chunks(f, chunk_size=1024):
  90. try:
  91. with open(f, 'rb') as file:
  92. chunk = file.read(chunk_size)
  93. if "<</Names <</Dests 4 0 R>>" in str(chunk) and "SourceModified" in str(chunk):
  94. return False
  95. elif "doctypehtml" not in str(chunk):
  96. return True
  97. elif "%PDF" in str(chunk):
  98. return True
  99. else:
  100. return False
  101. except Exception as e:
  102. return False
  103. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  104. """
  105. 下载数据
  106. :param str url: 文件下载地址
  107. :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  108. :param file: 本地文件
  109. :param show_error_log: 展示错误堆栈信息日志
  110. """
  111. method = kwargs.pop("method", "get")
  112. request_kwargs = {}
  113. request_kwargs.setdefault("proxies", proxies)
  114. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  115. request_kwargs.setdefault("params", kwargs.pop("params", None))
  116. request_kwargs.setdefault("data", kwargs.pop("data", None))
  117. request_kwargs.setdefault("json", kwargs.pop("json", None))
  118. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  119. request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120)))
  120. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  121. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  122. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  123. stream = io.BytesIO()
  124. retries = 0
  125. while retries < 3:
  126. try:
  127. with requests.request(method, url, **request_kwargs) as req:
  128. req.raise_for_status()
  129. lower_headers = {k.lower(): v for k, v in req.headers.items()}
  130. content_length = lower_headers.get('content-length')
  131. if content_length is not None:
  132. content_length = self.calculate_size(int(content_length))
  133. if content_length > 50:
  134. # 丢弃超过50Mb内容长度的文件
  135. return stream.getvalue()
  136. else:
  137. content_length = None
  138. chunk_size = 1024 * 20 # 20KB chunks
  139. downloaded_size = 0
  140. with tqdm.tqdm(
  141. total=content_length,
  142. unit="B",
  143. initial=0,
  144. unit_scale=True,
  145. unit_divisor=1024, # 1M=1024Kb,单位换算
  146. ascii=True,
  147. desc=file) as bar:
  148. iter_content = req.iter_content(chunk_size=chunk_size)
  149. if file is not None:
  150. with open(file, "wb") as f:
  151. for chunk in iter_content:
  152. size = stream.write(chunk)
  153. f.write(chunk)
  154. bar.update(size)
  155. downloaded_size += size
  156. content_length = self.calculate_size(downloaded_size)
  157. if content_length > 50:
  158. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  159. stream.seek(0) # 将位置指针移回流的开始处
  160. break
  161. else:
  162. for chunk in iter_content:
  163. size = stream.write(chunk)
  164. bar.update(size)
  165. downloaded_size += size
  166. content_length = self.calculate_size(downloaded_size)
  167. if content_length > 50:
  168. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  169. stream.seek(0) # 将位置指针移回流的开始处
  170. break
  171. return stream.getvalue()
  172. except requests.RequestException as why:
  173. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  174. stream.seek(0) # 将位置指针移回流的开始处
  175. retries += 1
  176. if show_error_log:
  177. logger.exception(why)
  178. return stream.getvalue()
  179. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  180. """
  181. 推送数据流到oss
  182. :param str filename: 文件名称
  183. :param str filetype: 文件类型
  184. :param str url: 文件下载地址
  185. """
  186. gzip = kwargs.pop("gzip", False)
  187. stream = self.fetch_data(url, file=None, **kwargs)
  188. attachment = {
  189. "filename": "{}.{}".format(filename, filetype),
  190. "org_url": url
  191. }
  192. if len(stream) > 0:
  193. fid = tools.get_sha1(stream)
  194. try:
  195. attachment["ftype"] = filetype
  196. attachment["fid"] = "{}.{}".format(fid, filetype)
  197. attachment["size"] = self.getsize(stream)
  198. attachment["url"] = "oss"
  199. self._oss.upload("file", attachment["fid"], stream, gzip=gzip)
  200. except Exception as e:
  201. logger.error(
  202. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  203. )
  204. return attachment
  205. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  206. """
  207. 上传本地文件到oss
  208. :param str filename: 文件名称
  209. :param str filetype: 文件类型
  210. :param str url: 文件下载地址
  211. """
  212. gzip = kwargs.pop("gzip", False)
  213. file = self.create_file(filename, filetype)
  214. stream = self.fetch_data(url, file=file, **kwargs)
  215. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  216. attachment = {
  217. "filename": "{}.{}".format(filename, filetype),
  218. "org_url": url
  219. }
  220. if kwargs.get('is_check', None):
  221. if not self.read_pdf_by_chunks(file):
  222. self.remove(file)
  223. return attachment
  224. if len(stream) > 0:
  225. content_hash = tools.get_sha1(stream)
  226. try:
  227. attachment["fid"] = "{}.{}".format(content_hash, filetype)
  228. attachment["size"] = self.getsize(file)
  229. attachment["ftype"] = filetype
  230. attachment["url"] = "oss"
  231. self._oss.upload("file", attachment["fid"], stream, gzip=gzip)
  232. except Exception as e:
  233. logger.error(
  234. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  235. )
  236. self.remove(file) # 删除本地临时文件
  237. return attachment
  238. def fetch_attachment(self, file_name, file_type, download_url, mode="local", proxies=None, gzip=False, **kwargs):
  239. """
  240. 下载附件
  241. @param str file_name: 文件名称
  242. @param str file_type: 文件类型
  243. @param str download_url: 文件下载地址
  244. @param str mode: 附件上传模式; "local" = 本地文件; "stream" = 数据流
  245. @param bool gzip: 是否压缩
  246. @param dict proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  247. @return: {"fid":"", "filename":"", "url":"oss", "size":"", "ftype":"", "org_url":""}
  248. """
  249. file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
  250. file_kwargs = dict(
  251. filename=file_name,
  252. filetype=file_type,
  253. url=download_url,
  254. proxies=proxies,
  255. gzip=gzip,
  256. **kwargs
  257. )
  258. if mode == "stream":
  259. attachment = self._push_oss_from_stream(**file_kwargs)
  260. else:
  261. attachment = self._push_oss_from_local(**file_kwargs)
  262. return attachment
  263. AttachmentDownloader = Downloader