소스 검색

框架升级优化

dongzhaorui 2 년 전
부모
커밋
0f64d7c284

+ 222 - 259
zgztb_cookie/FworkSpider/feapder/core/parser_control.py

@@ -65,7 +65,7 @@ class PaserControl(threading.Thread):
                 self.is_show_tip = False
                 self.deal_requests(requests)
 
-            except Exception as e:
+            except (Exception, BaseException) as e:
                 log.exception(e)
                 time.sleep(3)
 
@@ -141,59 +141,33 @@ class PaserControl(threading.Thread):
                                     )
                                 used_download_midware_enable = True
                                 if not response:
-                                    try:
-                                        response = (
-                                            request_temp.get_response()
-                                            if not setting.RESPONSE_CACHED_USED
-                                            else request_temp.get_response_from_cached(
-                                                save_cached=False
-                                            )
-                                        )
-                                    except Exception as e:
-                                        response = None
-                                        log.info("requests", extra={"url": request.url or request_temp.url,"code": -1,"error_info":e})
-                                        raise Exception(
-                                            "155 连接超时 url: %s" % (request.url or request_temp.url)
-                                        )
-                                    except:
-                                        response = None
-                                        log.error("response 请求异常 url: %s" % (request.url or request_temp.url))
-                                        # raise Exception(
-                                        #     "response 请求异常 url: %s" % (request.url or request_temp.url))
-                            else:
-                                try:
                                     response = (
-                                        request.get_response()
+                                        request_temp.get_response()
                                         if not setting.RESPONSE_CACHED_USED
-                                        else request.get_response_from_cached(
+                                        else request_temp.get_response_from_cached(
                                             save_cached=False
                                         )
                                     )
-                                except Exception as e:
-                                    response = None
-                                    log.info("requests", extra={"url": request.url or request_temp.url, "code": -1, "error_info": e})
-                                    raise Exception(
-                                        "165 连接超时 url: %s" % (request.url or request_temp.url)
+                            else:
+                                response = (
+                                    request.get_response()
+                                    if not setting.RESPONSE_CACHED_USED
+                                    else request.get_response_from_cached(
+                                        save_cached=False
                                     )
-                                except:
-                                    response = None
-                                    log.error("response 请求异常 url: %s" % (request.url or request_temp.url))
-                                    # raise Exception(
-                                    #     "response 请求异常 url: %s" % (request.url or request_temp.url))
-
+                                )
 
                             if response == None:
-                                raise Exception(
-                                    "177 连接超时 url: %s" % (request.url or request_temp.url)
-                                )
+                                raise Exception("连接超时 url: %s" % (
+                                            request.url or request_temp.url))
+
+                            # 校验
+                            if parser.validate(request, response) == False:
+                                continue
 
                         else:
                             response = None
 
-                        # 校验
-                        if parser.validate(request, response) == False:
-                            continue
-
                         if request.callback:  # 如果有parser的回调函数,则用回调处理
                             callback_parser = (
                                 request.callback
@@ -235,7 +209,7 @@ class PaserControl(threading.Thread):
                                 result_type = 2
                                 # 将item入库
                                 self._item_buffer.put_item(result)
-                                # 需删除正在做的request
+                                # 需删除已完成的request
                                 del_request_redis_after_item_to_db = True
 
                             elif callable(result):  # result为可执行的无参函数
@@ -252,7 +226,7 @@ class PaserControl(threading.Thread):
                             # else:
                             #     raise TypeError('Expect Request、Item、callback func, bug get type: {}'.format(type(result)))
 
-                    except Exception as e:
+                    except (Exception, BaseException) as e:
                         exception_type = (
                             str(type(e)).replace("<class '", "").replace("'>", "")
                         )
@@ -483,8 +457,8 @@ class AirSpiderParserControl(PaserControl):
     def run(self):
         while not self._thread_stop:
             try:
-                requests = self._memory_db.get()
-                if not requests:
+                request = self._memory_db.get()
+                if not request:
                     if not self.is_show_tip:
                         log.debug("parser 等待任务...")
                         self.is_show_tip = True
@@ -494,256 +468,245 @@ class AirSpiderParserControl(PaserControl):
                     continue
 
                 self.is_show_tip = False
-                self.deal_requests([requests])
+                self.deal_request(request)
 
-            except Exception as e:
+            except (Exception, BaseException) as e:
                 log.exception(e)
                 time.sleep(3)
 
-    def deal_requests(self, requests):
-        for request in requests:
-
-            response = None
-
-            for parser in self._parsers:
-                if parser.name == request.parser_name:
-                    try:
-                        # 记录需下载的文档
-                        self.record_download_status(
-                            PaserControl.DOWNLOAD_TOTAL, parser.name
-                        )
-
-                        # 解析request
-                        if request.auto_request:
-                            request_temp = None
-                            response = None
-
-                            # 下载中间件
-                            if request.download_midware:
-                                if isinstance(request.download_midware, (list, tuple)):
-                                    request_temp = request
-                                    for download_midware in request.download_midware:
-                                        download_midware = (
-                                            download_midware
-                                            if callable(download_midware)
-                                            else tools.get_method(
-                                                parser, download_midware
-                                            )
-                                        )
-                                        request_temp = download_midware(request_temp)
-                                else:
+    def deal_request(self, request):
+        response = None
+
+        for parser in self._parsers:
+            if parser.name == request.parser_name:
+                try:
+                    # 记录需下载的文档
+                    self.record_download_status(
+                        PaserControl.DOWNLOAD_TOTAL, parser.name
+                    )
+
+                    # 解析request
+                    if request.auto_request:
+                        request_temp = None
+                        response = None
+
+                        # 下载中间件
+                        if request.download_midware:
+                            if isinstance(request.download_midware, (list, tuple)):
+                                request_temp = request
+                                for download_midware in request.download_midware:
                                     download_midware = (
-                                        request.download_midware
-                                        if callable(request.download_midware)
+                                        download_midware
+                                        if callable(download_midware)
                                         else tools.get_method(
-                                            parser, request.download_midware
+                                            parser, download_midware
                                         )
                                     )
-                                    request_temp = download_midware(request)
-                            elif request.download_midware != False:
-                                request_temp = parser.download_midware(request)
-
-                            # 请求
-                            if request_temp:
-                                if (
-                                    isinstance(request_temp, (tuple, list))
-                                    and len(request_temp) == 2
-                                ):
-                                    request_temp, response = request_temp
-
-                                if not isinstance(request_temp, Request):
-                                    raise Exception(
-                                        "download_midware need return a request, but received type: {}".format(
-                                            type(request_temp)
-                                        )
+                                    request_temp = download_midware(request_temp)
+                            else:
+                                download_midware = (
+                                    request.download_midware
+                                    if callable(request.download_midware)
+                                    else tools.get_method(
+                                        parser, request.download_midware
                                     )
-                                request = request_temp
+                                )
+                                request_temp = download_midware(request)
+                        elif request.download_midware != False:
+                            request_temp = parser.download_midware(request)
 
-                            if not response:
-                                try:
-                                    response = (
-                                        request.get_response()
-                                        if not setting.RESPONSE_CACHED_USED
-                                        else request.get_response_from_cached(
-                                            save_cached=False
-                                        )
-                                    )
-                                except Exception as e:
-                                    log.info("requests", extra={"url": request.url or request_temp.url, "code": -1, "error_info": e})
-                                    raise Exception(
-                                        "565 连接超时 url: %s" % (request.url or request_temp.url)
-                                    )
-                                except:
-                                    raise Exception(
-                                        "response 请求超时 url: %s" % (request.url or request_temp.url))
+                        # 请求
+                        if request_temp:
+                            if (
+                                isinstance(request_temp, (tuple, list))
+                                and len(request_temp) == 2
+                            ):
+                                request_temp, response = request_temp
 
-                        else:
-                            response = None
+                            if not isinstance(request_temp, Request):
+                                raise Exception(
+                                    "download_midware need return a request, but received type: {}".format(
+                                        type(request_temp)
+                                    )
+                                )
+                            request = request_temp
+
+                        if not response:
+                            response = (
+                                request.get_response()
+                                if not setting.RESPONSE_CACHED_USED
+                                else request.get_response_from_cached(
+                                    save_cached=False
+                                )
+                            )
 
                         # 校验
                         if parser.validate(request, response) == False:
-                            continue
+                            break
 
-                        if request.callback:  # 如果有parser的回调函数,则用回调处理
-                            callback_parser = (
-                                request.callback
-                                if callable(request.callback)
-                                else tools.get_method(parser, request.callback)
-                            )
-                            results = callback_parser(request, response)
-                        else:  # 否则默认用parser处理
-                            results = parser.parse(request, response)
-
-                        if results and not isinstance(results, Iterable):
-                            raise Exception(
-                                "%s.%s返回值必须可迭代"
-                                % (parser.name, request.callback or "parse")
-                            )
-
-                        # 此处判断是request 还是 item
-                        for result in results or []:
-                            if isinstance(result, Request):
-                                # 给request的 parser_name 赋值
-                                result.parser_name = result.parser_name or parser.name
-
-                                # 判断是同步的callback还是异步的
-                                if result.request_sync:  # 同步
-                                    requests.append(result)
-                                else:  # 异步
-                                    # 将next_request 入库
-                                    self._memory_db.add(result)
+                    else:
+                        response = None
 
-                            elif isinstance(result, Item):
-                                self._item_buffer.put_item(result)
+                    if request.callback:  # 如果有parser的回调函数,则用回调处理
+                        callback_parser = (
+                            request.callback
+                            if callable(request.callback)
+                            else tools.get_method(parser, request.callback)
+                        )
+                        results = callback_parser(request, response)
+                    else:  # 否则默认用parser处理
+                        results = parser.parse(request, response)
+
+                    if results and not isinstance(results, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代"
+                            % (parser.name, request.callback or "parse")
+                        )
 
-                    except Exception as e:
-                        exception_type = (
-                            str(type(e)).replace("<class '", "").replace("'>", "")
+                    # 此处判断是request 还是 item
+                    for result in results or []:
+                        if isinstance(result, Request):
+                            # 给request的 parser_name 赋值
+                            result.parser_name = result.parser_name or parser.name
+
+                            # 判断是同步的callback还是异步的
+                            if result.request_sync:  # 同步
+                                self.deal_request(result)
+                            else:  # 异步
+                                # 将next_request 入库
+                                self._memory_db.add(result)
+
+                        elif isinstance(result, Item):
+                            self._item_buffer.put_item(result)
+
+                except (Exception, BaseException) as e:
+                    exception_type = (
+                        str(type(e)).replace("<class '", "").replace("'>", "")
+                    )
+                    if exception_type.startswith("requests"):
+                        # 记录下载失败的文档
+                        self.record_download_status(
+                            PaserControl.DOWNLOAD_EXCEPTION, parser.name
                         )
-                        if exception_type.startswith("requests"):
-                            # 记录下载失败的文档
-                            self.record_download_status(
-                                PaserControl.DOWNLOAD_EXCEPTION, parser.name
-                            )
 
-                        else:
-                            # 记录解析程序异常
-                            self.record_download_status(
-                                PaserControl.PAESERS_EXCEPTION, parser.name
-                            )
+                    else:
+                        # 记录解析程序异常
+                        self.record_download_status(
+                            PaserControl.PAESERS_EXCEPTION, parser.name
+                        )
 
-                        if setting.LOG_LEVEL == "DEBUG":  # 只有debug模式下打印, 超时的异常篇幅太多
-                            log.exception(e)
+                    if setting.LOG_LEVEL == "DEBUG":  # 只有debug模式下打印, 超时的异常篇幅太多
+                        log.exception(e)
 
-                        log.error(
+                    log.error(
+                        """
+                            -------------- %s.%s error -------------
+                            error          %s
+                            response       %s
+                            deal request   %s
                             """
-                                -------------- %s.%s error -------------
-                                error          %s
-                                response       %s
-                                deal request   %s
-                                """
-                            % (
-                                parser.name,
-                                (
-                                    request.callback
-                                    and callable(request.callback)
-                                    and getattr(request.callback, "__name__")
-                                    or request.callback
-                                )
-                                or "parse",
-                                str(e),
-                                response,
-                                tools.dumps_json(request.to_dict, indent=28)
-                                if setting.LOG_LEVEL == "DEBUG"
-                                else request,
+                        % (
+                            parser.name,
+                            (
+                                request.callback
+                                and callable(request.callback)
+                                and getattr(request.callback, "__name__")
+                                or request.callback
                             )
+                            or "parse",
+                            str(e),
+                            response,
+                            tools.dumps_json(request.to_dict, indent=28)
+                            if setting.LOG_LEVEL == "DEBUG"
+                            else request,
                         )
+                    )
 
-                        request.error_msg = "%s: %s" % (exception_type, e)
-                        request.response = str(response)
+                    request.error_msg = "%s: %s" % (exception_type, e)
+                    request.response = str(response)
 
-                        if "Invalid URL" in str(e):
-                            request.is_abandoned = True
+                    if "Invalid URL" in str(e):
+                        request.is_abandoned = True
 
-                        requests = parser.exception_request(request, response) or [
-                            request
-                        ]
-                        if not isinstance(requests, Iterable):
-                            raise Exception(
-                                "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
-                            )
-                        for request in requests:
-                            if not isinstance(request, Request):
-                                raise Exception("exception_request 需 yield request")
-
-                            if (
-                                request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
-                                or request.is_abandoned
-                            ):
-                                self.__class__._failed_task_count += 1  # 记录失败任务数
+                    requests = parser.exception_request(request, response) or [
+                        request
+                    ]
+                    if not isinstance(requests, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
+                        )
+                    for request in requests:
+                        if not isinstance(request, Request):
+                            raise Exception("exception_request 需 yield request")
+
+                        if (
+                            request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
+                            or request.is_abandoned
+                        ):
+                            self.__class__._failed_task_count += 1  # 记录失败任务数
+
+                            # 处理failed_request的返回值 request 或 func
+                            results = parser.failed_request(request, response) or [
+                                request
+                            ]
+                            if not isinstance(results, Iterable):
+                                raise Exception(
+                                    "%s.%s返回值必须可迭代"
+                                    % (parser.name, "failed_request")
+                                )
 
-                                # 处理failed_request的返回值 request 或 func
-                                results = parser.failed_request(request, response) or [
-                                    request
-                                ]
-                                if not isinstance(results, Iterable):
-                                    raise Exception(
-                                        "%s.%s返回值必须可迭代"
-                                        % (parser.name, "failed_request")
-                                    )
+                            log.info(
+                                """
+                                任务超过最大重试次数,丢弃
+                                url     %s
+                                重试次数 %s
+                                最大允许重试次数 %s"""
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
+                                )
+                            )
 
-                                log.info(
-                                    """
-                                    任务超过最大重试次数,丢弃
+                        else:
+                            # 将 requests 重新入库 爬取
+                            request.retry_times += 1
+                            request.filter_repeat = False
+                            log.info(
+                                """
+                                    入库 等待重试
                                     url     %s
                                     重试次数 %s
                                     最大允许重试次数 %s"""
-                                    % (
-                                        request.url,
-                                        request.retry_times,
-                                        setting.SPIDER_MAX_RETRY_TIMES,
-                                    )
-                                )
-
-                            else:
-                                # 将 requests 重新入库 爬取
-                                request.retry_times += 1
-                                request.filter_repeat = False
-                                log.info(
-                                    """
-                                        入库 等待重试
-                                        url     %s
-                                        重试次数 %s
-                                        最大允许重试次数 %s"""
-                                    % (
-                                        request.url,
-                                        request.retry_times,
-                                        setting.SPIDER_MAX_RETRY_TIMES,
-                                    )
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
                                 )
-                                self._memory_db.add(request)
+                            )
+                            self._memory_db.add(request)
 
-                    else:
-                        # 记录下载成功的文档
-                        self.record_download_status(
-                            PaserControl.DOWNLOAD_SUCCESS, parser.name
+                else:
+                    # 记录下载成功的文档
+                    self.record_download_status(
+                        PaserControl.DOWNLOAD_SUCCESS, parser.name
+                    )
+                    # 记录成功任务数
+                    self.__class__._success_task_count += 1
+
+                    # 缓存下载成功的文档
+                    if setting.RESPONSE_CACHED_ENABLE:
+                        request.save_cached(
+                            response=response,
+                            expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
                         )
-                        # 记录成功任务数
-                        self.__class__._success_task_count += 1
-
-                        # 缓存下载成功的文档
-                        if setting.RESPONSE_CACHED_ENABLE:
-                            request.save_cached(
-                                response=response,
-                                expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
-                            )
 
-                    finally:
-                        # 释放浏览器
-                        if response and hasattr(response, "browser"):
-                            request._webdriver_pool.put(response.browser)
+                finally:
+                    # 释放浏览器
+                    if response and hasattr(response, "browser"):
+                        request._webdriver_pool.put(response.browser)
 
-                    break
+                break
 
         if setting.SPIDER_SLEEP_TIME:
             if (

+ 8 - 3
zgztb_cookie/FworkSpider/feapder/network/request.py

@@ -243,7 +243,7 @@ class Request(object):
             else self.callback
         )
 
-    @func_set_timeout(30, allowOverride=True)
+    # @func_set_timeout(30, allowOverride=True)
     def get_response(self, save_cached=False):
         """
         获取带有selector功能的response
@@ -256,7 +256,12 @@ class Request(object):
         )  # connect=22 read=22
 
         # 设置stream
-        # 默认情况下,当你进行网络请求后,响应体会立即被下载。你可以通过 stream 参数覆盖这个行为,推迟下载响应体直到访问 Response.content 属性。此时仅有响应头被下载下来了。缺点: stream 设为 True,Requests 无法将连接释放回连接池,除非你 消耗了所有的数据,或者调用了 Response.close。 这样会带来连接效率低下的问题。
+        # 默认情况下,当你进行网络请求后,响应体会立即被下载。
+        # 你可以通过 stream 参数覆盖这个行为,推迟下载响应体直到访问 Response.content 属性。
+        # 此时仅有响应头被下载下来了。
+        # 缺点: stream 设为 True,Requests 无法将连接释放回连接池,
+        # 除非你 消耗了所有的数据,或者调用了 Response.close。
+        # 这样会带来连接效率低下的问题。
         self.requests_kwargs.setdefault("stream", True)
 
         # 关闭证书验证
@@ -408,7 +413,7 @@ class Request(object):
 
         if save_cached:
             self.save_cached(response, expire_time=self.__class__.cached_expire_time)
-        log.info("requests",extra={"url":response.url,"code":response.status_code})
+
         return response
 
     def proxies(self):

+ 8 - 2
zgztb_cookie/FworkSpider/feapder/network/response.py

@@ -43,6 +43,7 @@ class Response(res):
         self._cached_selector = None
         self._cached_text = None
         self._cached_json = None
+        self._cached_content = None
 
         self._encoding = None
 
@@ -269,8 +270,13 @@ class Response(res):
 
     @property
     def content(self):
-        content = super(Response, self).content
-        return content
+        if self._cached_content is None:
+            self._cached_content = super(Response, self).content
+        return self._cached_content
+
+    @content.setter
+    def content(self, obj_bytes):
+        self._cached_content = obj_bytes
 
     @property
     def is_html(self):

+ 18 - 0
zgztb_cookie/FworkSpider/feapder/utils/tools.py

@@ -7,6 +7,7 @@ Created on 2018-09-06 14:21
 @author: Boris
 @email: boris_liu@foxmail.com
 """
+import ast
 import asyncio
 import calendar
 import codecs
@@ -32,6 +33,7 @@ import uuid
 import weakref
 from functools import partial, wraps
 from hashlib import md5
+from itertools import chain
 from pprint import pformat
 from pprint import pprint
 from urllib import request
@@ -2552,3 +2554,19 @@ def ensure_float(n):
     if not n:
         return 0.0
     return float(n)
+
+
+def literal_eval(node_or_string):
+    """
+    安全地计算表达式节点或包含Python表达式的字符串。
+    提供的字符串或节点只能由以下Python文字结构组成:字符串、字节、数字、元组、列表、字典、集、布尔值和None。
+
+    @param node_or_string:
+    @return:
+    """
+    return ast.literal_eval(node_or_string)
+
+
+def from_iterable(iterable):
+    """遍历展开一个一元列表,返回该列表中的元素"""
+    return chain.from_iterable(iterable)

+ 2 - 2
zgztb_cookie/FworkSpider/untils/tools.py

@@ -8,9 +8,9 @@ from .cleaner import cleaner
 SearchText = namedtuple('SearchText', ['total'])
 
 
-def substitute(html_str, special=None, completely=False):
+def substitute(html_str, **kwargs):
     """HTML 替换"""
-    html_str = cleaner(html=html_str, special=None, completely=False)
+    html_str = cleaner(html=html_str, **kwargs)
     return html_str