Browse Source

添加采集任务与状态管理器

dongzhaorui 1 year ago
parent
commit
01ed9c890f
1 changed files with 16 additions and 2 deletions
  1. 16 2
      FworkSpider/feapder/core/scheduler.py

+ 16 - 2
FworkSpider/feapder/core/scheduler.py

@@ -18,6 +18,7 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
 from feapder.buffer.request_buffer import RequestBuffer
+from feapder.buffer.task_buffer import TaskBuffer
 from feapder.core.base_parser import BaseParser
 from feapder.core.collector import Collector
 from feapder.core.handle_failed_items import HandleFailedItems
@@ -79,6 +80,7 @@ class Scheduler(threading.Thread):
         self._request_buffer = RequestBuffer(redis_key)
         self._item_buffer = ItemBuffer(redis_key)
         self._collector = Collector(redis_key)
+        self._task_buffer = TaskBuffer(redis_key)
 
         self._parsers = []
         self._parser_controls = []
@@ -155,14 +157,16 @@ class Scheduler(threading.Thread):
             )
             handle_failed_items.reput_failed_items_to_db()
 
+        # 开启 task_buffer -- 任务状态管理器 负责缓冲采集任务状态更新
+        self._task_buffer.start()
         # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
         self._request_buffer.start()
         # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
         self._item_buffer.start()
-        # STEP 3.3 开启 collector  -- 任务管理 分发任务
+        # STEP 3.4 开启 collector  -- 任务管理 分发任务
         self._collector.start()
 
-        # 启动parser control
+        # 启动parser control 线程池
         for i in range(self._thread_count):
             # STEP 3.4 创建执行任务线程池
             parser_control = self._parser_control_obj(
@@ -170,6 +174,7 @@ class Scheduler(threading.Thread):
                 self._redis_key,
                 self._request_buffer,
                 self._item_buffer,
+                self._task_buffer
             )
 
             for parser in self._parsers:  # step 3.5 把所有待执行任务添加到线程池
@@ -281,6 +286,13 @@ class Scheduler(threading.Thread):
             ):
                 return False
 
+            # STEP 5.5 检测 task_buffer 状态
+            if (
+                self._task_buffer.get_tasks_count() > 0
+                or self._task_buffer.is_adding_to_db()
+            ):
+                return False
+
             tools.delay_time(1)  # 休眠1秒
 
         return True
@@ -349,6 +361,8 @@ class Scheduler(threading.Thread):
         self._item_buffer.stop()
         # 关闭任务管理
         self._collector.stop()
+        # 关闭采集状态管理器
+        self._task_buffer.stop()
         # 停止 parser_controls
         for parser_control in self._parser_controls:
             parser_control.stop()