attachment.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-02-26
  4. ---------
  5. @summary: 附件下载模块
  6. ---------
  7. @author: Dzr
  8. """
  9. import os
  10. import sys
  11. sys.path.append(os.path.dirname(os.getcwd()))
  12. import io
  13. import uuid
  14. import tqdm
  15. import urllib3
  16. from utils.tools import *
  17. from utils.aliyun import AliYunService
  18. urllib3.disable_warnings()
  19. headers = {
  20. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
  21. "Accept": "*/*"
  22. }
  23. def clear_file_type_suffix(filename: str, filetype: str):
  24. filename = filename.strip()
  25. if filetype in filename:
  26. filename = filename.replace(f".{filetype}", '')
  27. return filename
  28. class AttachmentDownloader:
  29. def __init__(self, max_retries=3):
  30. self.dir_name = "file"
  31. self._max_retries = max_retries
  32. # self._oss = JyOssClient()
  33. self._oss = AliYunService()
  34. def create_file(self, filename, filetype):
  35. os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
  36. file = "{filename}.{filetype}".format(
  37. filename=get_sha1("{}_{}".format(filename, uuid.uuid4())),
  38. filetype=filetype
  39. )
  40. return "{}/{}".format(self.dir_name, file)
  41. @staticmethod
  42. def clean_attachment(filepath):
  43. """
  44. 删除文件
  45. :param str filepath: 文件路径
  46. """
  47. try:
  48. os.remove(filepath)
  49. except FileNotFoundError:
  50. pass
  51. def remove(self, file):
  52. self.clean_attachment(file)
  53. @staticmethod
  54. def calculate_size(data):
  55. """
  56. 计算数据大小
  57. :param int data: 准备计算大小的内容
  58. :return: float
  59. """
  60. _kb = float(data / 1024.0)
  61. return float(_kb / 1024.0)
  62. @staticmethod
  63. def getsize(data):
  64. """
  65. 计算数据大小
  66. :param data: 待上传的内容。
  67. :type data: bytes,str或file-like object
  68. :return str
  69. """
  70. size = 0
  71. if isinstance(data, str):
  72. try:
  73. size = os.path.getsize(data)
  74. except FileNotFoundError:
  75. pass
  76. elif isinstance(data, bytes):
  77. size = len(data)
  78. else:
  79. pass
  80. _kb = float(size) / 1024
  81. result = "{:.1f} kb".format(_kb)
  82. if _kb >= 1024:
  83. _M = _kb / 1024
  84. if _M >= 1024:
  85. _G = _M / 1024
  86. result = "{:.1f} G".format(_G)
  87. else:
  88. result = "{:.1f} M".format(_M)
  89. return result
  90. def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
  91. """
  92. 下载数据
  93. :param str url: 文件下载地址
  94. :param dict proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  95. :param file: 本地文件
  96. :param bool show_error_log: 展示错误堆栈信息日志
  97. """
  98. method = kwargs.pop("method", "get")
  99. request_kwargs = {}
  100. request_kwargs.setdefault("proxies", proxies)
  101. request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
  102. request_kwargs.setdefault("params", kwargs.pop("params", None))
  103. request_kwargs.setdefault("data", kwargs.pop("data", None))
  104. request_kwargs.setdefault("json", kwargs.pop("json", None))
  105. request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
  106. request_kwargs.setdefault("timeout", kwargs.pop("timeout", (60, 120)))
  107. request_kwargs.setdefault("stream", kwargs.pop("stream", True))
  108. request_kwargs.setdefault("verify", kwargs.pop("verify", False))
  109. request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
  110. stream = io.BytesIO()
  111. for _ in range(self._max_retries):
  112. try:
  113. with requests.request(method, url, **request_kwargs) as response:
  114. response.raise_for_status()
  115. lower_headers = {k.lower(): v for k, v in response.headers.items()}
  116. content_length = lower_headers.get('content-length')
  117. if content_length is not None:
  118. content_length = self.calculate_size(int(content_length))
  119. if content_length > 50:
  120. # 丢弃超过50Mb内容长度的文件
  121. return stream.getvalue()
  122. else:
  123. content_length = None
  124. chunk_size = 1024 * 20 # 20KB chunks
  125. downloaded_size = 0
  126. with tqdm.tqdm(
  127. total=content_length,
  128. unit="B",
  129. initial=0,
  130. unit_scale=True,
  131. unit_divisor=1024, # 1M=1024Kb,单位换算
  132. ascii=True,
  133. desc=file
  134. ) as bar:
  135. iter_content = response.iter_content(chunk_size=chunk_size)
  136. if file is not None:
  137. with open(file, "wb") as f:
  138. for chunk in iter_content:
  139. size = stream.write(chunk)
  140. f.write(chunk)
  141. bar.update(size)
  142. downloaded_size += size
  143. content_length = self.calculate_size(downloaded_size)
  144. if content_length > 50:
  145. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  146. stream.seek(0) # 将位置指针移回流的开始处
  147. break
  148. else:
  149. for chunk in iter_content:
  150. size = stream.write(chunk)
  151. bar.update(size)
  152. downloaded_size += size
  153. content_length = self.calculate_size(downloaded_size)
  154. if content_length > 50:
  155. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  156. stream.seek(0) # 将位置指针移回流的开始处
  157. break
  158. return stream.getvalue()
  159. except requests.RequestException as why:
  160. stream.truncate(0) # 截断流,保留前0个字节,即清空流
  161. stream.seek(0) # 将位置指针移回流的开始处
  162. if show_error_log:
  163. logger.exception(why)
  164. return stream.getvalue()
  165. def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
  166. """
  167. 推送数据流到oss
  168. :param str filename: 文件名称
  169. :param str filetype: 文件类型
  170. :param str url: 文件下载地址
  171. """
  172. stream = self.fetch_data(url, file=None, **kwargs)
  173. attachment = {
  174. "filename": "{}.{}".format(filename, filetype),
  175. "org_url": url
  176. }
  177. if len(stream) > 0:
  178. fid = get_sha1(stream)
  179. try:
  180. attachment["ftype"] = filetype
  181. attachment["fid"] = "{}.{}".format(fid, filetype)
  182. attachment["size"] = self.getsize(stream)
  183. attachment["url"] = "oss"
  184. # self._oss.upload("file", attachment["fid"], stream)
  185. self._oss.push_oss_from_stream(attachment["fid"], stream)
  186. except Exception as e:
  187. logger.error(
  188. "[{}]上传失败,原因:{}".format(filename, type(e).__name__)
  189. )
  190. return attachment
  191. def read_pdf_by_chunks(self, pdf_path, chunk_size=1024):
  192. try:
  193. with open(pdf_path, 'rb') as file:
  194. chunk = file.read(chunk_size)
  195. if "<</Names <</Dests 4 0 R>>" in str(chunk) and "SourceModified" in str(chunk):
  196. return False
  197. elif "doctypehtml" not in str(chunk):
  198. return True
  199. elif "%PDF" in str(chunk):
  200. return True
  201. else:
  202. return False
  203. except Exception as e:
  204. return False
  205. def _push_oss_from_local(self, filename, filetype, url, **kwargs):
  206. """
  207. 上传本地文件到oss
  208. :param str filename: 文件名称
  209. :param str filetype: 文件类型
  210. :param str url: 文件下载地址
  211. """
  212. file = self.create_file(filename, filetype)
  213. stream = self.fetch_data(url, file=file, **kwargs)
  214. '''上传/下载,无论失败成功都需要返回文件基础信息'''
  215. attachment = {
  216. "filename": "{}.{}".format(filename, filetype),
  217. "org_url": url
  218. }
  219. if kwargs.get('is_check', None):
  220. if not self.read_pdf_by_chunks(file):
  221. self.remove(file)
  222. return attachment
  223. if len(stream) > 0:
  224. hash_str = get_sha1(stream)
  225. try:
  226. attachment["fid"] = "{}.{}".format(hash_str, filetype)
  227. attachment["size"] = self.getsize(file)
  228. attachment["ftype"] = filetype
  229. attachment["url"] = "oss"
  230. # self._oss.upload("file", attachment["fid"], stream)
  231. self._oss.push_oss_from_local(attachment["fid"], file)
  232. except Exception as e:
  233. logger.error(
  234. "[{}]上传失败,原因:{}".format(filename, type(e).__name__)
  235. )
  236. self.remove(file) # 删除本地临时文件
  237. return attachment
  238. def fetch_attachment(
  239. self,
  240. file_name: str,
  241. file_type: str,
  242. download_url: str,
  243. mode="local",
  244. proxies=None,
  245. **kwargs
  246. ):
  247. """
  248. 下载附件
  249. @param file_name: 文件名称
  250. @param file_type: 文件类型
  251. @param download_url: 文件下载地址
  252. @param mode: 附件上传模式 "local" = 本地文件 or "stream" = 数据流
  253. @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
  254. @return:
  255. """
  256. if not file_name or not file_type or not download_url:
  257. raise AttachmentNullError
  258. file_name = clear_file_type_suffix(file_name, file_type) # 防止文件后缀重复
  259. file_kwargs = dict(
  260. filename=file_name,
  261. filetype=file_type,
  262. url=download_url,
  263. proxies=proxies,
  264. **kwargs
  265. )
  266. if mode == "stream":
  267. attachment = self._push_oss_from_stream(**file_kwargs)
  268. else:
  269. attachment = self._push_oss_from_local(**file_kwargs)
  270. return attachment