|
@@ -15,7 +15,6 @@ import feapder.setting as setting
|
|
|
import feapder.utils.tools as tools
|
|
|
from feapder.buffer.item_buffer import ItemBuffer
|
|
|
from feapder.buffer.request_buffer import RequestBuffer
|
|
|
-from feapder.buffer.task_buffer import TaskBuffer
|
|
|
from feapder.core.base_parser import BaseParser
|
|
|
from feapder.core.collector import Collector
|
|
|
from feapder.core.handle_failed_items import HandleFailedItems
|
|
@@ -77,7 +76,6 @@ class Scheduler(threading.Thread):
|
|
|
self._request_buffer = RequestBuffer(redis_key)
|
|
|
self._item_buffer = ItemBuffer(redis_key)
|
|
|
self._collector = Collector(redis_key)
|
|
|
- self._task_buffer = TaskBuffer(redis_key)
|
|
|
|
|
|
self._parsers = []
|
|
|
self._parser_controls = []
|
|
@@ -150,9 +148,7 @@ class Scheduler(threading.Thread):
|
|
|
)
|
|
|
handle_failed_items.reput_failed_items_to_db()
|
|
|
|
|
|
- # 开启 task_buffer -- 采集任务与状态管理器,负责缓冲采集任务和任务采集状态更新
|
|
|
- self._task_buffer.start()
|
|
|
- # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
|
|
|
+ # STEP 3.1 开启 request_buffer -- 任务管理器 负责缓冲添加到数据库中的request
|
|
|
self._request_buffer.start()
|
|
|
# STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
|
|
|
self._item_buffer.start()
|
|
@@ -167,7 +163,6 @@ class Scheduler(threading.Thread):
|
|
|
self._redis_key,
|
|
|
self._request_buffer,
|
|
|
self._item_buffer,
|
|
|
- self._task_buffer
|
|
|
)
|
|
|
|
|
|
for parser in self._parsers: # step 3.5 把所有待执行任务添加到线程池
|
|
@@ -279,13 +274,6 @@ class Scheduler(threading.Thread):
|
|
|
):
|
|
|
return False
|
|
|
|
|
|
- # STEP 5.5 检测 task_buffer 状态
|
|
|
- if (
|
|
|
- self._task_buffer.get_tasks_count() > 0
|
|
|
- or self._task_buffer.is_adding_to_db()
|
|
|
- ):
|
|
|
- return False
|
|
|
-
|
|
|
tools.delay_time(1) # 休眠1秒
|
|
|
|
|
|
return True
|
|
@@ -353,8 +341,6 @@ class Scheduler(threading.Thread):
|
|
|
self._item_buffer.stop()
|
|
|
# 关闭任务管理
|
|
|
self._collector.stop()
|
|
|
- # 关闭采集状态管理器
|
|
|
- self._task_buffer.stop()
|
|
|
# 停止 parser_controls
|
|
|
for parser_control in self._parser_controls:
|
|
|
parser_control.stop()
|