attachment.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-02-26
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import io
  10. import os
  11. import uuid
  12. import requests
  13. import tqdm
  14. import urllib3
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log as logger
  17. from untils.aliyun import AliYunService
  18. from untils.execptions import AttachmentNullError
  19. urllib3.disable_warnings()
  20. headers = {
  21. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  22. "Accept": "*/*"
  23. }
  24. def clear_file_type_suffix(filename: str, filetype: str):
  25. filename = filename.strip()
  26. if filetype in filename:
  27. filename = filename.replace(f".{filetype}", '')
  28. return filename
  29. class AttachmentDownloader:
  30. def __init__(self):
  31. self.dir_name = "file"
  32. def create_file(self, filename, filetype):
  33. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  34. file = "{filename}.{filetype}".format(
  35. filename=tools.get_sha1("{}_{}".format(filename, uuid.uuid4())),
  36. filetype=filetype
  37. )
  38. return "{}/{}".format(self.dir_name, file)
  39. @staticmethod
  40. def clean_attachment(filepath):
  41. """
  42. 删除文件
  43. :param str filepath: 文件路径
  44. """
  45. try:
  46. os.remove(filepath)
  47. except FileNotFoundError:
  48. pass
  49. def remove(self, file):
  50. self.clean_attachment(file)
  51. @staticmethod
  52. def calculate_size(data):
  53. """
  54. 计算数据大小
  55. :param int data: 准备计算大小的内容
  56. :return: float
  57. """
  58. _kb = float(data / 1024.0)
  59. return float(_kb / 1024.0)
  60. @staticmethod
  61. def getsize(data):
  62. """
  63. 计算数据大小
  64. :param data: 待上传的内容。
  65. :type data: bytes,str或file-like object
  66. :return str
  67. """
  68. size = 0
  69. if isinstance(data, str):
  70. try:
  71. size = os.path.getsize(data)
  72. except FileNotFoundError:
  73. pass
  74. elif isinstance(data, bytes):
  75. size = len(data)
  76. else:
  77. pass
  78. _kb = float(size) / 1024
  79. result = "{:.1f} kb".format(_kb)
  80. if _kb >= 1024:
  81. _M = _kb / 1024
  82. if _M >= 1024:
  83. _G = _M / 1024
  84. result = "{:.1f} G".format(_G)
  85. else:
  86. result = "{:.1f} M".format(_M)
  87. return result
  88. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  89. """
  90. 下载数据
  91. :param str url: 文件下载地址
  92. :param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  93. :param file: 本地文件
  94. :param show_error_log: 展示错误堆栈信息日志
  95. """
  96. method = kwargs.pop("method", "get")
  97. request_kwargs = {}
  98. request_kwargs.setdefault("proxies", proxies)
  99. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  100. request_kwargs.setdefault("params", kwargs.pop("params", None))
  101. request_kwargs.setdefault("data", kwargs.pop("data", None))
  102. request_kwargs.setdefault("json", kwargs.pop("json", None))
  103. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  104. request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60,120)))
  105. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  106. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  107. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  108. stream = io.BytesIO()
  109. retries = 0
  110. while retries < 3:
  111. try:
  112. with requests.request(method, url, **request_kwargs) as req:
  113. req.raise_for_status()
  114. lower_headers = {k.lower(): v for k, v in req.headers.items()}
  115. content_length = lower_headers.get('content-length')
  116. if content_length is not None:
  117. content_length = self.calculate_size(int(content_length))
  118. if content_length > 50:
  119. # 丢弃超过50Mb内容长度的文件
  120. return stream.getvalue()
  121. else:
  122. content_length = None
  123. chunk_size = 1024 * 20 # 20KB chunks
  124. downloaded_size = 0
  125. with tqdm.tqdm(
  126. total=content_length,
  127. unit="B",
  128. initial=0,
  129. unit_scale=True,
  130. unit_divisor=1024, # 1M=1024Kb,单位换算
  131. ascii=True,
  132. desc=file) as bar:
  133. iter_content = req.iter_content(chunk_size=chunk_size)
  134. if file is not None:
  135. with open(file, "wb") as f:
  136. for chunk in iter_content:
  137. size = stream.write(chunk)
  138. f.write(chunk)
  139. bar.update(size)
  140. downloaded_size += size
  141. content_length = self.calculate_size(downloaded_size)
  142. if content_length > 50:
  143. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  144. stream.seek(0) # 将位置指针移回流的开始处
  145. break
  146. else:
  147. for chunk in iter_content:
  148. size = stream.write(chunk)
  149. bar.update(size)
  150. downloaded_size += size
  151. content_length = self.calculate_size(downloaded_size)
  152. if content_length > 50:
  153. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  154. stream.seek(0) # 将位置指针移回流的开始处
  155. break
  156. return stream.getvalue()
  157. except requests.RequestException as why:
  158. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  159. stream.seek(0) # 将位置指针移回流的开始处
  160. retries += 1
  161. if show_error_log:
  162. logger.exception(why)
  163. return stream.getvalue()
  164. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  165. """
  166. 推送数据流到oss
  167. :param str filename: 文件名称
  168. :param str filetype: 文件类型
  169. :param str url: 文件下载地址
  170. """
  171. stream = self.fetch_data(url, file=None, **kwargs)
  172. attachment = {
  173. "filename": "{}.{}".format(filename, filetype),
  174. "org_url": url
  175. }
  176. if len(stream) > 0:
  177. fid = tools.get_sha1(stream)
  178. try:
  179. attachment["ftype"] = filetype
  180. attachment["fid"] = "{}.{}".format(fid, filetype)
  181. attachment["size"] = self.getsize(stream)
  182. attachment["url"] = "oss"
  183. AliYunService().push_oss_from_stream(attachment["fid"], stream)
  184. except Exception as e:
  185. logger.error(
  186. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  187. )
  188. return attachment
  189. def read_pdf_in_chunks(self, pdf_path, chunk_size=1024):
  190. try:
  191. with open(pdf_path, 'rb') as file:
  192. chunk = file.read(chunk_size)
  193. if "<</Names <</Dests 4 0 R>>" in str(chunk) and "SourceModified" in str(chunk):
  194. return False
  195. elif "doctypehtml" not in str(chunk):
  196. return True
  197. elif "%PDF" in str(chunk):
  198. return True
  199. else:
  200. return False
  201. except Exception as e:
  202. return False
  203. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  204. """
  205. 上传本地文件到oss
  206. :param str filename: 文件名称
  207. :param str filetype: 文件类型
  208. :param str url: 文件下载地址
  209. """
  210. file = self.create_file(filename, filetype)
  211. stream = self.fetch_data(url, file=file, **kwargs)
  212. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  213. attachment = {
  214. "filename": "{}.{}".format(filename, filetype),
  215. "org_url": url
  216. }
  217. if kwargs.get('is_check', None):
  218. if not self.read_pdf_in_chunks(file):
  219. self.remove(file)
  220. return attachment
  221. if len(stream) > 0:
  222. content_hash = tools.get_sha1(stream)
  223. try:
  224. attachment["fid"] = "{}.{}".format(content_hash, filetype)
  225. attachment["size"] = self.getsize(file)
  226. attachment["ftype"] = filetype
  227. attachment["url"] = "oss"
  228. AliYunService().push_oss_from_local(attachment["fid"], file)
  229. except Exception as e:
  230. logger.error(
  231. "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
  232. )
  233. self.remove(file) # 删除本地临时文件
  234. return attachment
  235. def fetch_attachment(
  236. self,
  237. file_name: str,
  238. file_type: str,
  239. download_url: str,
  240. mode="local",
  241. proxies=None,
  242. **kwargs
  243. ):
  244. """
  245. 下载附件
  246. @param file_name: 文件名称
  247. @param file_type: 文件类型
  248. @param download_url: 文件下载地址
  249. @param mode: 附件上传模式 "local" = 本地文件 or "stream" = 数据流
  250. @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  251. @return:
  252. """
  253. if not file_name or not file_type or not download_url:
  254. raise AttachmentNullError
  255. file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
  256. file_kwargs = dict(
  257. filename=file_name,
  258. filetype=file_type,
  259. url=download_url,
  260. proxies=proxies,
  261. **kwargs
  262. )
  263. if mode == "stream":
  264. attachment = self._push_oss_from_stream(**file_kwargs)
  265. else:
  266. attachment = self._push_oss_from_local(**file_kwargs)
  267. return attachment