dongzhaorui 1 год назад
Родитель
Сommit
cfbeb4b29c
2 измененных файлов с 190 добавлено и 242 удалено
  1. 100 127
      FworkSpider/untils/attachment.py
  2. 90 115
      FworkSpider/untils/attachment_res.py

+ 100 - 127
FworkSpider/untils/attachment.py

@@ -1,52 +1,46 @@
-import hashlib
+# -*- coding: utf-8 -*-
+"""
+Created on 2020-09-06
+---------
+@summary: 附件下载模块
+---------
+@author: Dzr
+"""
 import io
 import os
-import traceback
 import uuid
 
 import requests
 import tqdm
 import urllib3
 
+import feapder.utils.tools as tools
+from feapder.utils.log import log as logger
 from untils.aliyun import AliYunService
 from untils.execptions import AttachmentNullError
-from untils.proxy_pool import ProxyPool
 
 urllib3.disable_warnings()
 
 headers = {
-    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36',
-    'Accept': '*/*'
+    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
+    "Accept": "*/*"
 }
 
 
 class AttachmentDownloader:
-    """附件下载模块"""
 
     def __init__(self):
-        self.dir_name = 'file'
+        self.dir_name = "file"
 
     def create_dir(self):
         if not os.path.exists(self.dir_name):
             os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
 
-    def create_file(self, filename, file_type):
+    def create_file(self, filename, filetype):
         self.create_dir()
-        sign = self._hash("{}_{}".format(filename, uuid.uuid4()))
-        local_file_name = "{}.{}".format(sign, file_type)
-        return "{}/{}".format(self.dir_name, local_file_name)
-
-    def create_fid(self, data: bytes):
-        return self._hash(data)
-
-    @staticmethod
-    def _hash(val):
-        _sha1 = hashlib.sha1()
-        if isinstance(val, bytes):
-            _sha1.update(str(val).encode("utf-8"))
-        elif isinstance(val, str):
-            _sha1.update(val.encode("utf-8"))
-        return _sha1.hexdigest()
+        sign = tools.get_sha1("{}_{}".format(filename, uuid.uuid4()))
+        file_name = "{}.{}".format(sign, filetype)
+        return "{}/{}".format(self.dir_name, file_name)
 
     @staticmethod
     def clean_attachment(file_path):
@@ -105,36 +99,36 @@ class AttachmentDownloader:
                 result = "{:.1f} M".format(_M)
         return result
 
-    def fetch_data(self, url, file=None, **kwargs):
+    def fetch_data(self, url, proxies=None, file=None, show_error_log=False, **kwargs):
         """
         数据下载
 
         :param str url: 下载地址
         :param file: 本地文件
         :param dict kwargs: requests请求参数
-        :return:
+        :param dict proxies: 代理ip
+        :param show_error_log: 展示错误堆栈信息日志
         """
-        enable_proxy = kwargs.pop('enable_proxy', False)
-        allow_show_exception = kwargs.pop('allow_show_exception', False)
-        method = kwargs.pop('method', 'get')
-        request_params = {}
-        request_params.setdefault('data', kwargs.pop('data', None))
-        request_params.setdefault('cookies', kwargs.pop('cookies', None))
-        request_params.setdefault('headers', kwargs.get('headers') or headers)
-        request_params.setdefault('proxies', kwargs.get('proxies'))
-        request_params.setdefault('timeout', kwargs.pop('timeout', 60))
-        request_params.setdefault('stream', kwargs.pop('stream', True))
-        request_params.setdefault('verify', kwargs.pop('verify', False))
-        request_params.setdefault('allow_redirects', kwargs.pop('allow_redirects', True))
+        method = kwargs.pop("method", "get")
+
+        request_kwargs = {}
+        request_kwargs.setdefault("proxies", proxies)
+        request_kwargs.setdefault("headers", kwargs.get("headers") or headers)
+        request_kwargs.setdefault("data", kwargs.pop("data", None))
+        request_kwargs.setdefault("cookies", kwargs.pop("cookies", None))
+        request_kwargs.setdefault("timeout", kwargs.pop("timeout", 60))
+        request_kwargs.setdefault("stream", kwargs.pop("stream", True))
+        request_kwargs.setdefault("verify", kwargs.pop("verify", False))
+        request_kwargs.setdefault("allow_redirects", kwargs.pop("allow_redirects", True))
 
         retries = 0
         while retries < 3:
             try:
-                with requests.request(method, url, **request_params) as req:
+                with requests.request(method, url, **request_kwargs) as req:
                     stream = io.BytesIO()
                     lh = {k.lower(): v for k, v in req.headers.items()}
                     '''内容长度'''
-                    cl = lh.get('content-length') or len(req.content)
+                    cl = lh.get("content-length") or len(req.content)
                     icl = int(cl)
                     content_length = self.get_mb(icl)
                     if content_length > 50:
@@ -148,14 +142,14 @@ class AttachmentDownloader:
                     iter_content = req.iter_content(chunk_size=1024 * 20)
                     with tqdm.tqdm(
                             total=icl,
-                            unit='B',
+                            unit="B",
                             initial=0,
                             unit_scale=True,
                             unit_divisor=1024,  # 1M=1024Kb,单位换算
                             ascii=True,
                             desc=file) as bar:
                         if file is not None:
-                            with open(file, 'wb') as f:
+                            with open(file, "wb") as f:
                                 for chunk in iter_content:
                                     stream.write(chunk)
                                     size = f.write(chunk)
@@ -165,123 +159,102 @@ class AttachmentDownloader:
                                 size = stream.write(chunk)
                                 bar.update(size)
                     return stream.getvalue()
-            except requests.RequestException:
-                if allow_show_exception:
-                    traceback.print_exc()
-                if enable_proxy:
-                    request_params.update({'proxies': ProxyPool().get()})
+            except requests.RequestException as why:
                 retries += 1
+                if show_error_log:
+                    logger.exception(why)
+
         return b''
 
-    def _push_oss_from_stream(self, file_name, file_type, url, **kw):
+    def _push_oss_from_stream(self, filename, filetype, url, **kwargs):
         """
         将数据流推送oss
 
-        :param str file_name: 文件名称
-        :param str file_type: 文件类型
+        :param str filename: 文件名称
+        :param str filetype: 文件类型
         :param str url: 下载地址
-        :param dict kw: 额外下载信息
-        :return: dict: 附件信息
         """
-        stream = self.fetch_data(url, None, **kw)
+        stream = self.fetch_data(url, file=None, **kwargs)
+        attachment = {
+            "filename": "{}.{}".format(filename, filetype),
+            "org_url": url
+        }
         if len(stream) > 0:
-            fid = self.create_fid(stream)
+            fid = tools.get_sha1(stream)
             try:
-                result = {
-                    'filename': file_name,
-                    'ftype': file_type,
-                    'fid': "{}.{}".format(fid, file_type),
-                    'org_url': url,
-                    'size': self.getsize(stream),
-                    'url': 'oss',
-                }
-                AliYunService().push_oss_from_stream(result['fid'], stream)
+                attachment["ftype"] = filetype
+                attachment["fid"] = "{}.{}".format(fid, filetype)
+                attachment["size"] = self.getsize(stream)
+                attachment["url"] = "oss"
+                AliYunService().push_oss_from_stream(attachment["fid"], stream)
             except Exception:
-                result = {
-                    'filename': file_name,
-                    'org_url': url,
-                }
-        else:
-            result = {
-                'filename': file_name,
-                'org_url': url,
-            }
-        return result
+                pass
+
+        return attachment
 
-    def _push_oss_from_file(self, file_name, file_type, url, **kw):
+    def _push_oss_from_local(self, filename, filetype, url, **kwargs):
         """
         将本地文件推送oss
 
-        :param str file_name: 文件名称
-        :param str file_type: 文件类型
+        :param str filename: 文件名称
+        :param str filetype: 文件类型
         :param str url: 下载地址
-        :param dict kw: 额外下载信息
-        :return: dict: 附件信息
         """
-        file = self.create_file(file_name, file_type)
-        stream = self.fetch_data(url, file, **kw)
+        file = self.create_file(filename, filetype)
+        stream = self.fetch_data(url, file=file, **kwargs)
         '''上传/下载,无论失败成功都需要返回文件基础信息'''
+        attachment = {
+            "filename": "{}.{}".format(filename, filetype),
+            "org_url": url
+        }
         if len(stream) > 0:
-            fid = self.create_fid(stream)
+            content_hash = tools.get_sha1(stream)
             try:
-                result = {
-                    'filename': file_name,
-                    'ftype': file_type,
-                    'fid': "{}.{}".format(fid, file_type),
-                    'org_url': url,
-                    'size': self.getsize(file),
-                    'url': 'oss',
-                }
-                AliYunService().push_oss_from_local(result['fid'], file)
-            except Exception:
-                result = {
-                    'filename': file_name,
-                    'org_url': url,
-                }
-        else:
-            result = {
-                'filename': file_name,
-                'org_url': url,
-            }
-        '''删除本地临时文件'''
-        self.remove(file)
-        return result
-
-    def _fetch_attachment(self, file_name, file_type, download_url, **kwargs):
+                attachment["fid"] = "{}.{}".format(content_hash, filetype)
+                attachment["size"] = self.getsize(file)
+                attachment["ftype"] = filetype
+                attachment["url"] = "oss"
+                AliYunService().push_oss_from_local(attachment["fid"], file)
+            except Exception as e:
+                logger.error(
+                    "[{}]上传失败,原因:{}".format(filename, e.__class__.__name__)
+                )
+
+        self.remove(file)  # 删除本地临时文件
+        return attachment
+
+    def _fetch_attachment(self, filename, filetype, download_url, mode, **kwargs):
         """
         下载附件
 
-        :param str file_name: 文件名称
-        :param str file_type: 文件类型
+        :param str filename: 文件名称
+        :param str filetype: 文件类型
         :param str download_url: 下载地址
-        :param dict kwargs: 额外的附件下载配置
-        :return: dict: 附件
+        :param str mode: 附件上传模式 "local" or "stream"
         """
-        mode = kwargs.pop('mode', 'local')
+        file_kwargs = dict(
+            filename=filename,
+            filetype=filetype,
+            url=download_url,
+            **kwargs
+        )
         if mode == "stream":
-            res = self._push_oss_from_stream(
-                file_name,
-                file_type,
-                download_url,
-                **kwargs
-            )
+            res = self._push_oss_from_stream(**file_kwargs)
         else:
-            res = self._push_oss_from_file(
-                file_name,
-                file_type,
-                download_url,
-                **kwargs
-            )
+            res = self._push_oss_from_local(**file_kwargs)
         return res
 
     def fetch_attachment(
-            self,
-            file_name: str,
-            file_type: str,
-            download_url: str,
-            **kw
+        self,
+        file_name: str,
+        file_type: str,
+        download_url: str,
+        proxies=None,
+        mode="local",
+        **kwargs
     ):
         if not file_name or not file_type or not download_url:
             raise AttachmentNullError
 
-        return self._fetch_attachment(file_name, file_type, download_url, **kw)
+        file_kwargs = dict(proxies=proxies, **kwargs)
+        return self._fetch_attachment(file_name, file_type, download_url, mode, **file_kwargs)

+ 90 - 115
FworkSpider/untils/attachment_res.py

@@ -1,45 +1,52 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022-03-06
+---------
+@summary: 附件下载模块res
+---------
+@author: Lzz
+"""
 import hashlib
 import os
 import re
-import traceback
 import uuid
 from urllib.parse import urlparse, unquote
 
 import requests
 import urllib3
 
+import feapder.utils.tools as tools
 from feapder.utils.log import log as logger
 from untils.aliyun import AliYunService
 from untils.execptions import AttachmentNullError
-from untils.proxy_pool import ProxyPool
 
 urllib3.disable_warnings()
 # 文件文档类型
 DOCTYPE = {
-    'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
-    'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
-    'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
-    'odt', 'wps', 'csv', 'xml', 'xps'
+    "txt", "rtf", "dps", "et", "ett", "xls",
+    "xlsx", "xlsb", "xlsm", "xlt", "ods", "pmd", "pmdx",
+    "doc", "docm", "docx", "dot", "dotm", "dotx",
+    "odt", "wps", "csv", "xml", "xps"
 }
 # 压缩类型
 COMPRESSION_TYPE = {
-    'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
-    'arj', 'lzh', 'ace', 'uue', 'edxz',
+    "rar", "zip", "gzzb", "7z", "tar", "gz", "bz2", "jar", "iso", "cab",
+    "arj", "lzh", "ace", "uue", "edxz",
 }
 # 图片类型
 IMAGE_TYPE = {
-    'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
-    'pdf'
+    "jpg", "png", "jpeg", "tiff", "gif", "psd", "raw", "eps", "svg", "bmp",
+    "pdf"
 }
 # 其他类型
 OTHER_TYPE = {
-    'swf', 'nxzf', 'xezf', 'nxcf'
+    "swf", "nxzf", "xezf", "nxcf"
 }
 
 
 headers = {
-    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36',
-    'Accept': '*/*'
+    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
+    "Accept": "*/*"
 }
 
 
@@ -66,12 +73,12 @@ def getsize(file):
         return 0
 
 
-def discern_file_format(text, allow_show_waring=False):
+def discern_file_format(text, show_warn_log=False):
     """
     识别文件格式
 
     @param text: 识别文本
-    @param allow_show_waring: 是否打印警告信息
+    @param show_warn_log: 是否打印警告信息
     @return: 文件格式
     """
     file_types = {
@@ -83,13 +90,13 @@ def discern_file_format(text, allow_show_waring=False):
     for file_type in file_types:
         all_file_format = [file_type, file_type.upper()]
         for t in all_file_format:
-            result = re.match(f'.*{t}$', text, re.S)
+            result = re.match(f".*{t}$", text, re.S)
             if result is not None:
                 return t
     else:
         unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
-        if allow_show_waring:
-            logger.warning(f'[未识别文件类型]{unknown_type}')
+        if show_warn_log:
+            logger.warning(f"[未识别文件类型]{unknown_type}")
         return None
 
 
@@ -103,10 +110,10 @@ def extract_file_name_by_href(href: str, file_type: str):
     """从url中抽取文件名称"""
     # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
     # 中文字符:[\u4e00 -\u9fa5]
-    zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
+    zh_char_pattern = "[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+"
     parser = urlparse(href)
     query = (parser.query or parser.path)
-    result = re.search(f'.*\\.{file_type}', query, re.S)
+    result = re.search(f".*\\.{file_type}", query, re.S)
     if result is not None:
         encode_str = unquote(result.group())
         name = re.search(zh_char_pattern, encode_str)
@@ -118,7 +125,7 @@ def extract_file_name_by_href(href: str, file_type: str):
 def extract_file_name(text):
     file_type = discern_file_format(text)
     if file_type is not None:
-        repl = '.{}'.format(file_type)
+        repl = ".{}".format(file_type)
         text = text.replace(repl, '')
     return text
 
@@ -129,16 +136,16 @@ def verify_file_name(name):
 
 
 # 去除附件名空格、两个后缀
-def clean_file_name(file_name: str, file_type: str):
+def clear_file_type_suffix(file_name: str, file_type: str):
     file_name = file_name.strip()
     if file_type in file_name:
-        file_name = file_name.replace(f'.{file_type}', '')
+        file_name = file_name.replace(f".{file_type}", '')
     return file_name
 
 
 # 限制附件大小:size < 5 kb 不存入数据库
 def limit_file_size(file_size: str):
-    _pattern = '^[0-9]\d*\.\d*|[1-9]\d*'
+    _pattern = "^[0-9]\d*\.\d*|[1-9]\d*"
     if "M" in file_size or "m" in file_size:
         file_size = float("".join(re.findall(_pattern, file_size))) * 1000
     else:
@@ -161,7 +168,7 @@ class AttachmentDownloader(AliYunService):
 
     def __init__(self):
         super(AttachmentDownloader, self).__init__()
-        self.dir_name = 'file'
+        self.dir_name = "file"
 
     def _create_file(self, filename, filetype):
         os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
@@ -171,14 +178,6 @@ class AttachmentDownloader(AliYunService):
         )
         return "{}/{}".format(self.dir_name, file)
 
-    @staticmethod
-    def _create_fid(file_stream: bytes):
-        return sha1(file_stream)
-
-    @staticmethod
-    def _origin_filename(fid: str, filetype: str):
-        return "{}.{}".format(fid, filetype)
-
     @staticmethod
     def _file_size(file: str):
         _kb = float(getsize(file)) / 1024
@@ -194,110 +193,86 @@ class AttachmentDownloader(AliYunService):
 
     @staticmethod
     def _fetch_attachment(
-            get_file_type:str,
-            file_type_name:str,
-            url: str,
-            enable_proxy=False,
-            proxy={},
-            allow_show_exception=False,
-            **kwargs
+        callback,
+        url: str,
+        proxies=None,
+        show_error_log=False,
+        **kwargs
     ):
         request_params = {}
-        request_params.setdefault('headers', kwargs.get('headers') or headers)
-        request_params.setdefault('proxies', kwargs.get('proxies'))
-        request_params.setdefault('timeout', kwargs.get('timeout') or 60)
-        request_params.setdefault('stream', kwargs.get('stream') or True)
-        request_params.setdefault('verify', kwargs.get('verify') or False)
-        if enable_proxy:
-            proxy = ProxyPool()
-        else:
-            proxy = proxy
+        request_params.setdefault("proxies", proxies)
+        request_params.setdefault("headers", kwargs.get("headers") or headers)
+        request_params.setdefault("timeout", kwargs.get("timeout") or 60)
+        request_params.setdefault("stream", kwargs.get("stream") or True)
+        request_params.setdefault("verify", kwargs.get("verify") or False)
+
         retries = 0
         while retries < 3:
             try:
-                with requests.get(url, **request_params) as req:
-                    if req.status_code == 200:
-                        stream = req.content
-                        '''
-                        file_type_name 响应头中附件后缀所对应的键
-                        get_file_type  取附件后缀的规则
-                        file_type_txt  附件响应头
-                        '''
-                        if len(get_file_type) > 10:
-                            file_types = []
-                            file_type_txt = req.headers.get(file_type_name)
-                            exec(get_file_type)
-                            if file_types:
-                                file_type = file_types[0]
-                            else:
-                                file_type = ''
-                            return stream,file_type
-                        else:
-                            return stream, get_file_type
+                with requests.get(url, **request_params) as response:
+                    if response.status_code == 200:
+                        stream = response.content
+                        filetype_lst = []  # 文件类型列表
+                        if callable(callback):
+                            # 通过自定义的解析函数获取响应体头部属性中的文件类型
+                            callback(response, filetype_lst)
+
+                        filetype = filetype_lst[0] if filetype_lst else ""
+                        return stream, filetype
                     else:
                         retries += 1
-            except requests.RequestException:
-                if allow_show_exception:
-                    traceback.print_exc()
-                if enable_proxy:
-                    request_params.update({'proxies': proxy.get()})
+            except requests.RequestException as why:
                 retries += 1
+                if show_error_log:
+                    logger.exception(why)
+
         return b''
 
     def fetch_attachment(
-            self,
-            get_file_type:str,
-            file_name: str,
-            file_type_name: str,
-            download_url: str,
-            enable_proxy=False,
-            allow_show_exception=False,
-            **kwargs
+        self,
+        file_name: str,
+        download_url: str,
+        callback,
+        **kwargs
     ):
-        if not file_name  or not download_url:
+        if not file_name or not download_url:
             raise AttachmentNullError
 
-        file_stream = self._fetch_attachment(
-            get_file_type,
-            file_type_name,
-            download_url,
-            enable_proxy,
-            allow_show_exception=allow_show_exception,
-            **kwargs
-        )
-
-        if len(file_stream) == 2:
-            file_type = file_stream[-1]
+        results = self._fetch_attachment(callback, download_url, **kwargs)
+        if len(results) == 2:
+            filetype = results[-1]
         else:
-            file_type = ''
+            filetype = ""
 
-        file_name = clean_file_name(file_name,file_type)
+        filename = clear_file_type_suffix(file_name, filetype)
         download_url = judge_file_url(download_url)
 
-        local_tmp_file = self._create_file(file_name, file_type)
-        with open(local_tmp_file, 'wb') as f:
-            f.write(file_stream[0])
+        # 保存本地临时文件
+        file_stream = results[0]
+        local_temp_file = self._create_file(filename, filetype)
+        with open(local_temp_file, "wb") as f:
+            f.write(file_stream)
 
-        result = {
-            'filename': '{}.{}'.format(file_name, file_type),
-            'org_url': download_url
+        '''上传/下载,无论失败/成功必须返回附件信息'''
+        attachment = {
+            "filename": "{}.{}".format(filename, filetype),
+            "org_url": download_url
         }
-        if len(file_stream[0]) > 0:
+        if len(file_stream) > 0:
+            content_hash = tools.get_sha1(file_stream)
             try:
-                fid = self._create_fid(file_stream[0])
-                key = self._origin_filename(fid, file_type)
-                result.setdefault('fid', key)
-                result.setdefault('ftype', file_type)
-                result.setdefault('size', self._file_size(local_tmp_file))
-                result.setdefault('url', 'oss')
-                super().push_oss_from_local(key, local_tmp_file)
+                attachment["fid"] = "{}.{}".format(content_hash, filetype)
+                attachment["size"] = self._file_size(local_temp_file)
+                attachment["ftype"] = filetype
+                attachment["url"] = "oss"
+                super().push_oss_from_local(attachment["fid"], local_temp_file)
             except Exception as e:
-                logger.warning(
-                    "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__)
+                logger.error(
+                    "[{}]上传失败,原因:{}".format(file_name, e.__class__.__name__)
                 )
-        remove(local_tmp_file)
-        '''上传/下载,无论失败/成功必须返回附件信息'''
-        if "size" not in result or limit_file_size(result.get('size')):
-            return result
+
+        remove(local_temp_file)
+        if "size" not in attachment or limit_file_size(attachment.get("size")):
+            return attachment
         else:
             return {}