Преглед на файлове

修复远程浏览器会话丢失进程阻塞问题

dongzhaorui преди 3 месеца
родител
ревизия
a4d77ad6ca

+ 306 - 0
FworkSpider/feapder/core/parser_control/parser_control.py → FworkSpider/feapder/core/parser_control.py

@@ -16,6 +16,7 @@ from collections.abc import Iterable
 import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
+from feapder.buffer.item_buffer import JyItemBuffer
 from feapder.db.memory_db import MemoryDB
 from feapder.network.item import Item, HeartBeatItem
 from feapder.network.request import Request
@@ -756,3 +757,308 @@ class AirSpiderParserControl(PaserControl):
                 time.sleep(sleep_time)
             else:
                 time.sleep(setting.SPIDER_SLEEP_TIME)
+
+
+class JySpiderParserControl(PaserControl):
+    is_show_tip = False
+
+    _success_task_count = 0
+    _failed_task_count = 0
+
+    def __init__(self, memory_db: MemoryDB, item_buffer: JyItemBuffer, heartbeat_buffer):
+        super(PaserControl, self).__init__()
+        self._parsers = []
+
+        self._memory_db = memory_db
+        self._item_buffer = item_buffer
+        self._heartbeat_buffer = heartbeat_buffer
+
+        self._thread_stop = False
+        self._selenium_stop = False
+
+    def run(self):
+        while not self._thread_stop:
+            try:
+                request = self._memory_db.get()
+                if not request:
+                    if not self.is_show_tip:
+                        log.debug("等待任务...")
+                        self.is_show_tip = True
+                    continue
+
+                self.is_show_tip = False
+                self.deal_request(request)
+            except (Exception, BaseException) as e:
+                log.exception(e)
+
+            finally:
+                if self._selenium_stop and not self.is_show_tip:
+                    log.debug("暂无可用浏览器,释放任务...")
+                    self._memory_db.clear()
+
+    def deal_request(self, request):
+        response = None
+        now_page = request.page or -1
+
+        for parser in self._parsers:
+            counter = {
+                "now_page": now_page,
+                "extract_count": 0,  # 列表页抽取的列表数量
+                "rel_count": 0,  # 去重后实际入库数量
+            }
+            if parser.name == request.parser_name:
+                try:
+                    # 解析request
+                    if request.auto_request:
+                        request_temp = None
+                        response = None
+
+                        # 下载中间件
+                        if request.download_midware:
+                            if isinstance(request.download_midware, (list, tuple)):
+                                request_temp = request
+                                for download_midware in request.download_midware:
+                                    download_midware = (
+                                        download_midware
+                                        if callable(download_midware)
+                                        else tools.get_method(
+                                            parser, download_midware
+                                        )
+                                    )
+                                    request_temp = download_midware(request_temp)
+                            else:
+                                download_midware = (
+                                    request.download_midware
+                                    if callable(request.download_midware)
+                                    else tools.get_method(
+                                        parser, request.download_midware
+                                    )
+                                )
+                                request_temp = download_midware(request)
+
+                        elif request.download_midware != False:
+                            request_temp = parser.download_midware(request)
+
+                        # 请求
+                        if request_temp:
+                            if (
+                                isinstance(request_temp, (tuple, list))
+                                and len(request_temp) == 2
+                            ):
+                                request_temp, response = request_temp
+
+                            if not isinstance(request_temp, Request):
+                                raise Exception(
+                                    "download_midware need return a request,"
+                                    "but received type: {}".format(
+                                        type(request_temp)
+                                    )
+                                )
+                            request = request_temp
+
+                        if not response:
+                            response = (
+                                request.get_response()
+                                if not setting.RESPONSE_CACHED_USED
+                                else request.get_response_from_cached(save_cached=False)
+                            )
+
+                        # 校验
+                        if parser.validate(request, response) == False:
+                            break
+
+                    else:
+                        response = None
+
+                    if request.callback:  # 如果有parser的回调函数,则用回调处理
+                        callback_parser = (
+                            request.callback
+                            if callable(request.callback)
+                            else tools.get_method(parser, request.callback)
+                        )
+                        results = callback_parser(request, response)
+                    else:  # 否则默认用parser处理
+                        results = parser.parse(request, response)
+
+                    if results and not isinstance(results, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代"
+                            % (parser.name, request.callback or "parse")
+                        )
+
+                    # 此处判断是 request 还是 item
+                    for result in results or []:
+                        if isinstance(result, Request):
+                            # 给request的 parser_name 赋值
+                            result.parser_name = result.parser_name or parser.name
+
+                            # 判断是同步的callback还是异步的
+                            if result.request_sync:  # 同步
+                                self.deal_request(result)
+                            else:  # 异步
+                                # 将next_request 入库
+                                self._memory_db.add(result)
+
+                        elif isinstance(result, Item):
+                            # 爬虫采集方式[True=混合采集(列表页+详情页); False=独立采集(列表页,详情页)]
+                            result.is_mixed = False
+                            if "List" in parser.__business_type__ and hasattr(result, "contenthtml"):
+                                result.is_mixed = True
+
+                            counter["extract_count"] += 1  # 统计抽取列表数
+                            if not self.is_duplicate(result):
+                                counter["rel_count"] += 1  # 统计实际列表数
+
+                            self._item_buffer.put_item(result)
+
+                        elif result is not None:
+                            function_name = "{}.{}".format(
+                                parser.name,
+                                (
+                                    request.callback
+                                    and callable(request.callback)
+                                    and getattr(request.callback, "__name__")
+                                    or request.callback
+                                ) or "parse",
+                            )
+                            raise TypeError(
+                                f"{function_name} result expect Request or Item, bug get type: {type(result)}"
+                            )
+
+                except (Exception, BaseException) as e:
+                    exception_type = (
+                        str(type(e)).replace("<class '", "").replace("'>", "")
+                    )
+
+                    if setting.LOG_LEVEL == "DEBUG":  # 只有debug模式下打印, 超时的异常篇幅太多
+                        log.exception(e)
+
+                    log.error(
+                        """
+                            -------------- %s.%s error -------------
+                            error          %s
+                            response       %s
+                            deal request   %s
+                        """
+                        % (
+                            parser.name,
+                            (
+                                request.callback
+                                and callable(request.callback)
+                                and getattr(request.callback, "__name__")
+                                or request.callback
+                            ) or "parse",
+                            str(e),
+                            response,
+                            tools.dumps_json(request.to_dict, indent=28)
+                            if setting.LOG_LEVEL == "DEBUG"
+                            else request,
+                        )
+                    )
+
+                    request.error_msg = "%s: %s" % (exception_type, e)
+                    request.response = str(response)
+
+                    if "Invalid URL" in str(e):
+                        request.is_abandoned = True
+
+                    if exception_type == "selenium.common.exceptions.InvalidSessionIdException":
+                        self._selenium_stop = True  # TODO 暂无解决方案,目前只能通过重建爬虫实例,建立新会话。参考文章:https://github.com/SeleniumHQ/docker-selenium/issues/2153
+                        _id = str(e.args[0]).split()[14] if len(str(e.args[0]).split()) > 14 else ""
+                        raise IOError(
+                            "%s 远程调用超时, session_id: %s"
+                            % ("selenium.driver.session", _id)
+                        )
+
+                    requests = parser.exception_request(request, response) or [
+                        request
+                    ]
+                    if not isinstance(requests, Iterable):
+                        raise Exception(
+                            "%s.%s返回值必须可迭代"
+                            % (parser.name, "exception_request")
+                        )
+
+                    for request in requests:
+                        if not isinstance(request, Request):
+                            raise Exception("exception_request 需 yield request")
+
+                        if (
+                            request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
+                            or request.is_abandoned
+                        ):
+                            self.__class__._failed_task_count += 1  # 记录失败任务数
+
+                            # 处理failed_request的返回值 request 或 func
+                            results = parser.failed_request(request, response) or [
+                                request
+                            ]
+                            if not isinstance(results, Iterable):
+                                raise Exception(
+                                    "%s.%s返回值必须可迭代"
+                                    % (parser.name, "failed_request")
+                                )
+
+                            log.info(
+                                """
+                                任务超过最大重试次数,丢弃
+                                url     %s
+                                重试次数 %s
+                                最大允许重试次数 %s"""
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
+                                )
+                            )
+
+                        else:
+                            # 将 requests 重新入库 爬取
+                            request.retry_times += 1
+                            request.filter_repeat = False
+                            log.info(
+                                """
+                                    入库 等待重试
+                                    url     %s
+                                    重试次数 %s
+                                    最大允许重试次数 %s"""
+                                % (
+                                    request.url,
+                                    request.retry_times,
+                                    setting.SPIDER_MAX_RETRY_TIMES,
+                                )
+                            )
+                            self._memory_db.add(request)
+
+                else:
+                    # 记录成功任务数
+                    self.__class__._success_task_count += 1
+
+                    # 缓存下载成功的文档
+                    if setting.RESPONSE_CACHED_ENABLE:
+                        request.save_cached(
+                            response=response,
+                            expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
+                        )
+
+                finally:
+                    # 释放浏览器
+                    if response and getattr(response, "browser", None):
+                        request._webdriver_pool.put(response.browser)
+
+                    self.publish_heartbeat(parser, request, response, **counter)
+
+                break
+
+        if setting.SPIDER_SLEEP_TIME:
+            if (
+                isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
+                and len(setting.SPIDER_SLEEP_TIME) == 2
+            ):
+                sleep_time = random.randint(
+                    int(setting.SPIDER_SLEEP_TIME[0]),
+                    int(setting.SPIDER_SLEEP_TIME[1])
+                )
+                time.sleep(sleep_time)
+            else:
+                time.sleep(setting.SPIDER_SLEEP_TIME)

+ 0 - 13
FworkSpider/feapder/core/parser_control/__init__.py

@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2024-08-29 
----------
-@summary:  
----------
-@author: Dzr
-"""
-from feapder.core.parser_control.parser_control import (
-    PaserControl,
-    AirSpiderParserControl
-)
-from feapder.core.parser_control.jy_parser_control import JySpiderParserControl

+ 0 - 310
FworkSpider/feapder/core/parser_control/jy_parser_control.py

@@ -1,310 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2017-01-03 16:06
----------
-@summary: parser 控制类
----------
-@author: Boris
-@email: boris_liu@foxmail.com
-"""
-import random
-import time
-from collections.abc import Iterable
-
-import feapder.setting as setting
-import feapder.utils.tools as tools
-from feapder.buffer.item_buffer import JyItemBuffer
-from feapder.core.parser_control import PaserControl
-from feapder.db.memory_db import MemoryDB
-from feapder.network.item import Item
-from feapder.network.request import Request
-from feapder.utils.log import log
-
-
-class JySpiderParserControl(PaserControl):
-    is_show_tip = False
-
-    _success_task_count = 0
-    _failed_task_count = 0
-
-    def __init__(self, memory_db: MemoryDB, item_buffer: JyItemBuffer, heartbeat_buffer):
-        super(PaserControl, self).__init__()
-        self._parsers = []
-        self._thread_stop = False
-
-        self._memory_db = memory_db
-        self._item_buffer = item_buffer
-        self._heartbeat_buffer = heartbeat_buffer
-
-    def run(self):
-        while not self._thread_stop:
-            try:
-                request = self._memory_db.get()
-                if not request:
-                    if not self.is_show_tip:
-                        log.debug("等待任务...")
-                        self.is_show_tip = True
-                    continue
-
-                self.is_show_tip = False
-                self.deal_request(request)
-
-            except (Exception, BaseException) as e:
-                log.exception(e)
-
-    def deal_request(self, request):
-        response = None
-        now_page = request.page or -1
-
-        for parser in self._parsers:
-            counter = {
-                "now_page": now_page,
-                "extract_count": 0,  # 列表页抽取的列表数量
-                "rel_count": 0,  # 去重后实际入库数量
-            }
-            if parser.name == request.parser_name:
-                try:
-                    # 解析request
-                    if request.auto_request:
-                        request_temp = None
-                        response = None
-
-                        # 下载中间件
-                        if request.download_midware:
-                            if isinstance(request.download_midware, (list, tuple)):
-                                request_temp = request
-                                for download_midware in request.download_midware:
-                                    download_midware = (
-                                        download_midware
-                                        if callable(download_midware)
-                                        else tools.get_method(
-                                            parser, download_midware
-                                        )
-                                    )
-                                    request_temp = download_midware(request_temp)
-                            else:
-                                download_midware = (
-                                    request.download_midware
-                                    if callable(request.download_midware)
-                                    else tools.get_method(
-                                        parser, request.download_midware
-                                    )
-                                )
-                                request_temp = download_midware(request)
-
-                        elif request.download_midware != False:
-                            request_temp = parser.download_midware(request)
-
-                        # 请求
-                        if request_temp:
-                            if (
-                                isinstance(request_temp, (tuple, list))
-                                and len(request_temp) == 2
-                            ):
-                                request_temp, response = request_temp
-
-                            if not isinstance(request_temp, Request):
-                                raise Exception(
-                                    "download_midware need return a request,"
-                                    "but received type: {}".format(
-                                        type(request_temp)
-                                    )
-                                )
-                            request = request_temp
-
-                        if not response:
-                            response = (
-                                request.get_response()
-                                if not setting.RESPONSE_CACHED_USED
-                                else request.get_response_from_cached(save_cached=False)
-                            )
-
-                        # 校验
-                        if parser.validate(request, response) == False:
-                            break
-
-                    else:
-                        response = None
-
-                    if request.callback:  # 如果有parser的回调函数,则用回调处理
-                        callback_parser = (
-                            request.callback
-                            if callable(request.callback)
-                            else tools.get_method(parser, request.callback)
-                        )
-                        results = callback_parser(request, response)
-                    else:  # 否则默认用parser处理
-                        results = parser.parse(request, response)
-
-                    if results and not isinstance(results, Iterable):
-                        raise Exception(
-                            "%s.%s返回值必须可迭代"
-                            % (parser.name, request.callback or "parse")
-                        )
-
-                    # 此处判断是 request 还是 item
-                    for result in results or []:
-                        if isinstance(result, Request):
-                            # 给request的 parser_name 赋值
-                            result.parser_name = result.parser_name or parser.name
-
-                            # 判断是同步的callback还是异步的
-                            if result.request_sync:  # 同步
-                                self.deal_request(result)
-                            else:  # 异步
-                                # 将next_request 入库
-                                self._memory_db.add(result)
-
-                        elif isinstance(result, Item):
-                            # 爬虫采集方式[True=混合采集(列表页+详情页); False=独立采集(列表页,详情页)]
-                            result.is_mixed = False
-                            if "List" in parser.__business_type__ and hasattr(result, "contenthtml"):
-                                result.is_mixed = True
-
-                            counter["extract_count"] += 1  # 统计抽取列表数
-                            if not self.is_duplicate(result):
-                                counter["rel_count"] += 1  # 统计实际列表数
-
-                            self._item_buffer.put_item(result)
-
-                        elif result is not None:
-                            function_name = "{}.{}".format(
-                                parser.name,
-                                (
-                                    request.callback
-                                    and callable(request.callback)
-                                    and getattr(request.callback, "__name__")
-                                    or request.callback
-                                )
-                                or "parse",
-                            )
-                            raise TypeError(
-                                f"{function_name} result expect Request or Item, bug get type: {type(result)}"
-                            )
-
-                except (Exception, BaseException) as e:
-                    exception_type = (
-                        str(type(e)).replace("<class '", "").replace("'>", "")
-                    )
-
-                    if setting.LOG_LEVEL == "DEBUG":  # 只有debug模式下打印, 超时的异常篇幅太多
-                        log.exception(e)
-
-                    log.error(
-                        """
-                            -------------- %s.%s error -------------
-                            error          %s
-                            response       %s
-                            deal request   %s
-                        """
-                        % (
-                            parser.name,
-                            (
-                                request.callback
-                                and callable(request.callback)
-                                and getattr(request.callback, "__name__")
-                                or request.callback
-                            )
-                            or "parse",
-                            str(e),
-                            response,
-                            tools.dumps_json(request.to_dict, indent=28)
-                            if setting.LOG_LEVEL == "DEBUG"
-                            else request,
-                        )
-                    )
-
-                    request.error_msg = "%s: %s" % (exception_type, e)
-                    request.response = str(response)
-
-                    if "Invalid URL" in str(e):
-                        request.is_abandoned = True
-
-                    requests = parser.exception_request(request, response) or [
-                        request
-                    ]
-                    if not isinstance(requests, Iterable):
-                        raise Exception(
-                            "%s.%s返回值必须可迭代" % (parser.name, "exception_request")
-                        )
-                    for request in requests:
-                        if not isinstance(request, Request):
-                            raise Exception("exception_request 需 yield request")
-
-                        if (
-                            request.retry_times + 1 > setting.SPIDER_MAX_RETRY_TIMES
-                            or request.is_abandoned
-                        ):
-                            self.__class__._failed_task_count += 1  # 记录失败任务数
-
-                            # 处理failed_request的返回值 request 或 func
-                            results = parser.failed_request(request, response) or [
-                                request
-                            ]
-                            if not isinstance(results, Iterable):
-                                raise Exception(
-                                    "%s.%s返回值必须可迭代"
-                                    % (parser.name, "failed_request")
-                                )
-
-                            log.info(
-                                """
-                                任务超过最大重试次数,丢弃
-                                url     %s
-                                重试次数 %s
-                                最大允许重试次数 %s"""
-                                % (
-                                    request.url,
-                                    request.retry_times,
-                                    setting.SPIDER_MAX_RETRY_TIMES,
-                                )
-                            )
-
-                        else:
-                            # 将 requests 重新入库 爬取
-                            request.retry_times += 1
-                            request.filter_repeat = False
-                            log.info(
-                                """
-                                    入库 等待重试
-                                    url     %s
-                                    重试次数 %s
-                                    最大允许重试次数 %s"""
-                                % (
-                                    request.url,
-                                    request.retry_times,
-                                    setting.SPIDER_MAX_RETRY_TIMES,
-                                )
-                            )
-                            self._memory_db.add(request)
-
-                else:
-                    # 记录成功任务数
-                    self.__class__._success_task_count += 1
-
-                    # 缓存下载成功的文档
-                    if setting.RESPONSE_CACHED_ENABLE:
-                        request.save_cached(
-                            response=response,
-                            expire_time=setting.RESPONSE_CACHED_EXPIRE_TIME,
-                        )
-
-                finally:
-                    # 释放浏览器
-                    if response and getattr(response, "browser", None):
-                        request._webdriver_pool.put(response.browser)
-
-                    self.publish_heartbeat(parser, request, response, **counter)
-                break
-
-        if setting.SPIDER_SLEEP_TIME:
-            if (
-                isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
-                and len(setting.SPIDER_SLEEP_TIME) == 2
-            ):
-                sleep_time = random.randint(
-                    int(setting.SPIDER_SLEEP_TIME[0]), int(setting.SPIDER_SLEEP_TIME[1])
-                )
-                time.sleep(sleep_time)
-            else:
-                time.sleep(setting.SPIDER_SLEEP_TIME)

+ 3 - 0
FworkSpider/feapder/db/memory_db.py

@@ -35,3 +35,6 @@ class MemoryDB:
 
     def empty(self):
         return self.priority_queue.empty()
+
+    def clear(self):
+        self.priority_queue = PriorityQueue()