attachment.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-02-26
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import io
  10. import os
  11. import uuid
  12. import requests
  13. import tqdm
  14. import urllib3
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log as logger
  17. from feapder.utils.oss import JyOssClient, OssBucketClient
  18. urllib3.disable_warnings()
  19. headers = {
  20. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  21. "Accept": "*/*"
  22. }
  23. def clear_file_type_suffix(filename: str, filetype: str):
  24. filename = filename.strip()
  25. if filetype in filename:
  26. filename = filename.replace(f".{filetype}", '')
  27. return filename
  28. class Downloader:
  29. def __init__(self):
  30. self.dir_name = "file"
  31. # self._oss = JyOssClient()
  32. self._bucket = OssBucketClient()
  33. def create_file(self, filename, filetype):
  34. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  35. file = "{filename}.{filetype}".format(
  36. filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
  37. filetype=filetype
  38. )
  39. return "{}/{}".format(self.dir_name, file)
  40. @staticmethod
  41. def clean_attachment(filepath):
  42. """
  43. 删除文件
  44. :param str filepath: 文件路径
  45. """
  46. try:
  47. os.remove(filepath)
  48. except FileNotFoundError:
  49. pass
  50. def remove(self, file):
  51. self.clean_attachment(file)
  52. @staticmethod
  53. def calculate_size(data):
  54. """
  55. 计算数据大小
  56. :param int data: 准备计算大小的内容
  57. :return: float
  58. """
  59. _kb = float(data / 1024.0)
  60. return float(_kb / 1024.0)
  61. @staticmethod
  62. def getsize(data):
  63. """
  64. 计算数据大小
  65. :param data: 待上传的内容。
  66. :type data: bytes,str或file-like object
  67. :return str
  68. """
  69. size = 0
  70. if isinstance(data, str):
  71. try:
  72. size = os.path.getsize(data)
  73. except FileNotFoundError:
  74. pass
  75. elif isinstance(data, bytes):
  76. size = len(data)
  77. else:
  78. pass
  79. _kb = float(size) / 1024
  80. result = "{:.1f} kb".format(_kb)
  81. if _kb >= 1024:
  82. _M = _kb / 1024
  83. if _M >= 1024:
  84. _G = _M / 1024
  85. result = "{:.1f} G".format(_G)
  86. else:
  87. result = "{:.1f} M".format(_M)
  88. return result
  89. @staticmethod
  90. def read_pdf_by_chunks(f, chunk_size=1024):
  91. try:
  92. with open(f, 'rb') as file:
  93. chunk = file.read(chunk_size)
  94. if "<</Names <</Dests 4 0 R>>" in str(chunk) and "SourceModified" in str(chunk):
  95. return False
  96. elif "doctypehtml" not in str(chunk):
  97. return True
  98. elif "%PDF" in str(chunk):
  99. return True
  100. else:
  101. return False
  102. except Exception as e:
  103. return False
  104. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  105. """
  106. 下载数据
  107. :param str url: 文件下载地址
  108. :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  109. :param file: 本地文件
  110. :param show_error_log: 展示错误堆栈信息日志
  111. """
  112. method = kwargs.pop("method", "get")
  113. request_kwargs = {}
  114. request_kwargs.setdefault("proxies", proxies)
  115. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  116. request_kwargs.setdefault("params", kwargs.pop("params", None))
  117. request_kwargs.setdefault("data", kwargs.pop("data", None))
  118. request_kwargs.setdefault("json", kwargs.pop("json", None))
  119. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  120. request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120)))
  121. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  122. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  123. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  124. stream = io.BytesIO()
  125. retries = 0
  126. while retries < 3:
  127. try:
  128. with requests.request(method, url, **request_kwargs) as req:
  129. req.raise_for_status()
  130. lower_headers = {k.lower(): v for k, v in req.headers.items()}
  131. content_length = lower_headers.get('content-length')
  132. if content_length is not None:
  133. content_length = self.calculate_size(int(content_length))
  134. if content_length > 50:
  135. # 丢弃超过50Mb内容长度的文件
  136. return stream.getvalue()
  137. else:
  138. content_length = None
  139. chunk_size = 1024 * 20 # 20KB chunks
  140. downloaded_size = 0
  141. with tqdm.tqdm(
  142. total=content_length,
  143. unit="B",
  144. initial=0,
  145. unit_scale=True,
  146. unit_divisor=1024, # 1M=1024Kb,单位换算
  147. ascii=True,
  148. desc=file) as bar:
  149. iter_content = req.iter_content(chunk_size=chunk_size)
  150. if file is not None:
  151. with open(file, "wb") as f:
  152. for chunk in iter_content:
  153. size = stream.write(chunk)
  154. f.write(chunk)
  155. bar.update(size)
  156. downloaded_size += size
  157. content_length = self.calculate_size(downloaded_size)
  158. if content_length > 50:
  159. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  160. stream.seek(0) # 将位置指针移回流的开始处
  161. break
  162. else:
  163. for chunk in iter_content:
  164. size = stream.write(chunk)
  165. bar.update(size)
  166. downloaded_size += size
  167. content_length = self.calculate_size(downloaded_size)
  168. if content_length > 50:
  169. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  170. stream.seek(0) # 将位置指针移回流的开始处
  171. break
  172. return stream.getvalue()
  173. except requests.RequestException as why:
  174. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  175. stream.seek(0) # 将位置指针移回流的开始处
  176. retries += 1
  177. if show_error_log:
  178. logger.exception(why)
  179. return stream.getvalue()
  180. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  181. """
  182. 推送数据流到oss
  183. :param str filename: 文件名称
  184. :param str filetype: 文件类型
  185. :param str url: 文件下载地址
  186. """
  187. gzip = kwargs.pop("gzip", False)
  188. stream = self.fetch_data(url, file=None, **kwargs)
  189. attachment = {
  190. "filename": "{}.{}".format(filename, filetype),
  191. "org_url": url
  192. }
  193. if len(stream) > 0:
  194. fid = tools.get_sha1(stream)
  195. try:
  196. attachment["ftype"] = filetype
  197. attachment["fid"] = "{}.{}".format(fid, filetype)
  198. attachment["size"] = self.getsize(stream)
  199. attachment["url"] = "oss"
  200. # self._oss.upload("file", attachment["fid"], stream, gzip=gzip)
  201. self._bucket.put_object(attachment["fid"], stream)
  202. except Exception as e:
  203. logger.error(
  204. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  205. )
  206. return attachment
  207. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  208. """
  209. 上传本地文件到oss
  210. :param str filename: 文件名称
  211. :param str filetype: 文件类型
  212. :param str url: 文件下载地址
  213. """
  214. gzip = kwargs.pop("gzip", False)
  215. file = self.create_file(filename, filetype)
  216. stream = self.fetch_data(url, file=file, **kwargs)
  217. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  218. attachment = {
  219. "filename": "{}.{}".format(filename, filetype),
  220. "org_url": url
  221. }
  222. if kwargs.get('is_check', None):
  223. if not self.read_pdf_by_chunks(file):
  224. self.remove(file)
  225. return attachment
  226. if len(stream) > 0:
  227. content_hash = tools.get_sha1(stream)
  228. try:
  229. attachment["fid"] = "{}.{}".format(content_hash, filetype)
  230. attachment["size"] = self.getsize(file)
  231. attachment["ftype"] = filetype
  232. attachment["url"] = "oss"
  233. # self._oss.upload("file", attachment["fid"], stream, gzip=gzip)
  234. self._bucket.put_object_from_file(attachment["fid"], file)
  235. except Exception as e:
  236. logger.error(
  237. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  238. )
  239. self.remove(file) # 删除本地临时文件
  240. return attachment
  241. def fetch_attachment(self, file_name, file_type, download_url, mode="local", proxies=None, gzip=False, **kwargs):
  242. """
  243. 下载附件
  244. @param str file_name: 文件名称
  245. @param str file_type: 文件类型
  246. @param str download_url: 文件下载地址
  247. @param str mode: 附件上传模式; "local" = 本地文件; "stream" = 数据流
  248. @param bool gzip: 是否压缩
  249. @param dict proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  250. @return: {"fid":"", "filename":"", "url":"oss", "size":"", "ftype":"", "org_url":""}
  251. """
  252. file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
  253. file_kwargs = dict(
  254. filename=file_name,
  255. filetype=file_type,
  256. url=download_url,
  257. proxies=proxies,
  258. gzip=gzip,
  259. **kwargs
  260. )
  261. if mode == "stream":
  262. attachment = self._push_oss_from_stream(**file_kwargs)
  263. else:
  264. attachment = self._push_oss_from_local(**file_kwargs)
  265. return attachment
  266. AttachmentDownloader = Downloader