Bläddra i källkod

修复因任务发布请求超时,导致进程异常阻塞问题

dongzhaorui 3 månader sedan
förälder
incheckning
2fbb16e0c6

+ 10 - 7
FworkSpider/feapder/core/spiders/air_spider.py

@@ -17,8 +17,8 @@ from feapder.core.base_parser import BaseParser
 from feapder.core.parser_control import AirSpiderParserControl
 from feapder.db.memory_db import MemoryDB
 from feapder.network.request import Request
-from feapder.utils.log import log
 from feapder.utils import metrics
+from feapder.utils.log import log
 
 
 class AirSpider(BaseParser, Thread):
@@ -45,12 +45,15 @@ class AirSpider(BaseParser, Thread):
         metrics.init(**setting.METRICS_OTHER_ARGS)
 
     def distribute_task(self):
-        for request in self.start_requests():
-            if not isinstance(request, Request):
-                raise ValueError("仅支持 yield Request")
-
-            request.parser_name = request.parser_name or self.name
-            self._memory_db.add(request)
+        try:
+            for request in self.start_requests():
+                if not isinstance(request, Request):
+                    raise ValueError("仅支持 yield Request")
+
+                request.parser_name = request.parser_name or self.name
+                self._memory_db.add(request)
+        except IOError:
+            log.error("distribute task failed")
 
     def all_thread_is_done(self):
         for i in range(3):  # 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性

+ 10 - 6
FworkSpider/feapder/core/spiders/spider.py

@@ -53,12 +53,15 @@ class Spider(BaseParser, Thread):
         self.task_api_auth_token = None
 
     def distribute_task(self):
-        for request in self.start_requests():
-            if not isinstance(request, Request):
-                raise ValueError("仅支持 yield Request")
+        try:
+            for request in self.start_requests():
+                if not isinstance(request, Request):
+                    raise ValueError("仅支持 yield Request")
 
-            request.parser_name = request.parser_name or self.name
-            self._memory_db.add(request)
+                request.parser_name = request.parser_name or self.name
+                self._memory_db.add(request)
+        except IOError:
+            log.error("distribute task failed")
 
     def all_thread_is_done(self):
         for i in range(3):
@@ -211,12 +214,13 @@ class BaseBusinessDetailSpider(Spider):
         yield failed_item
 
     def get_tasks(self, limit=None, **kwargs):
+        timeout = kwargs.pop("timeout", 10)
         queue = setting.TAB_ITEMS.format(redis_key=self._redis_key.replace("_detailc", ""))
 
         # 获取任务
         url = f"{setting.JY_TASK_URL}/tasks/fd?qn={queue}&limit={limit}"
         headers = {"Authorization": self.task_api_auth_token}
-        params = dict(headers=headers, timeout=10, proxies=False)
+        params = dict(headers=headers, timeout=timeout, proxies=False)
         response = Request(method="GET", url=url, **params).get_response()
         ret = response.json["task"]
         self.tasks_dict = {