|
@@ -7,12 +7,9 @@ Created on 2017-01-09 10:38
|
|
|
@author: Boris
|
|
|
@email: boris_liu@foxmail.com
|
|
|
"""
|
|
|
-import json
|
|
|
-import sys
|
|
|
import threading
|
|
|
import time
|
|
|
from collections import Iterable
|
|
|
-from types import SimpleNamespace
|
|
|
|
|
|
import feapder.setting as setting
|
|
|
import feapder.utils.tools as tools
|
|
@@ -118,10 +115,6 @@ class Scheduler(threading.Thread):
|
|
|
self._spider_id = tools.get_uuid(redis_key, tools.get_current_date())
|
|
|
self._spider_name = redis_key
|
|
|
|
|
|
- # 声明爬虫心跳队列
|
|
|
- self._tab_spider_heartbeat = setting.TAB_SPIDER_HEARTBEAT
|
|
|
- self._rabbitmq.declare(queue=self._tab_spider_heartbeat)
|
|
|
-
|
|
|
self._is_notify_end = False # 是否已经通知结束
|
|
|
|
|
|
# Request 缓存设置
|
|
@@ -143,7 +136,7 @@ class Scheduler(threading.Thread):
|
|
|
if isinstance(parser, BaseParser):
|
|
|
self._parsers.append(parser)
|
|
|
else:
|
|
|
- raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
|
|
|
+ raise ValueError("类型错误,爬虫需继承feapder.BaseParser")
|
|
|
|
|
|
def _start(self):
|
|
|
self.spider_begin() # 启动爬虫 -- start_callback
|
|
@@ -157,7 +150,7 @@ class Scheduler(threading.Thread):
|
|
|
)
|
|
|
handle_failed_items.reput_failed_items_to_db()
|
|
|
|
|
|
- # 开启 task_buffer -- 任务状态管理器 负责缓冲采集任务状态更新
|
|
|
+ # 开启 task_buffer -- 采集任务与状态管理器,负责缓冲采集任务和任务采集状态更新
|
|
|
self._task_buffer.start()
|
|
|
# STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
|
|
|
self._request_buffer.start()
|
|
@@ -311,7 +304,6 @@ class Scheduler(threading.Thread):
|
|
|
|
|
|
# 检查失败任务数量 超过1000 报警
|
|
|
failed_count = self._request_buffer.get_failed_requests_count()
|
|
|
- log.debug(f'《{self._spider_name}》爬虫失败任务数量:{failed_count}')
|
|
|
if failed_count > setting.WARNING_FAILED_COUNT:
|
|
|
# 发送报警
|
|
|
msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
|
|
@@ -368,24 +360,12 @@ class Scheduler(threading.Thread):
|
|
|
parser_control.stop()
|
|
|
|
|
|
# 记录爬虫停止时间
|
|
|
- self.report_node_heartbeat('close')
|
|
|
self._started.clear()
|
|
|
|
|
|
def send_msg(self, msg, level="debug", message_prefix=""):
|
|
|
# log.debug("发送报警 level:{} msg{}".format(level, msg))
|
|
|
tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
|
|
|
|
|
|
- def get_argvs(self):
|
|
|
- argvs = {"next_page": False, "max_page": 10}
|
|
|
- for item in sys.argv[1:]:
|
|
|
- # print(item)
|
|
|
- if item.startswith("--"):
|
|
|
- key = item.replace("--", "").split('=')[0]
|
|
|
- val = item.split('=')[-1]
|
|
|
- if key != 'purpose':
|
|
|
- argvs[key] = eval(val) # 此处使用eval的原因是字符串转bool或int
|
|
|
- return json.loads(json.dumps(argvs), object_hook=lambda d: SimpleNamespace(**d))
|
|
|
-
|
|
|
def spider_begin(self):
|
|
|
"""
|
|
|
@summary: start_monitor_task 方式启动,此函数与spider_end不在同一进程内,变量不可共享
|
|
@@ -396,19 +376,10 @@ class Scheduler(threading.Thread):
|
|
|
if self._begin_callback:
|
|
|
self._begin_callback()
|
|
|
|
|
|
- parameter = self.get_argvs()
|
|
|
for parser in self._parsers:
|
|
|
- parser.platform_next_page = parameter.next_page
|
|
|
- parser.platform_max_page = parameter.max_page
|
|
|
parser.start_callback()
|
|
|
|
|
|
- # 记录爬虫开始时间
|
|
|
- self.report_node_heartbeat('start')
|
|
|
-
|
|
|
def spider_end(self):
|
|
|
- # 爬虫结束时间
|
|
|
- self.report_node_heartbeat('end')
|
|
|
-
|
|
|
if self._end_callback: # 任务结束回调
|
|
|
self._end_callback()
|
|
|
|
|
@@ -433,19 +404,6 @@ class Scheduler(threading.Thread):
|
|
|
else:
|
|
|
log.info("《%s》爬虫结束" % (self._spider_name,))
|
|
|
|
|
|
- def report_node_heartbeat(self, status):
|
|
|
- """
|
|
|
- 爬虫心跳
|
|
|
- """
|
|
|
- message = {
|
|
|
- 'ip': tools.get_localhost_ip(),
|
|
|
- 'spider_id': self._spider_id,
|
|
|
- 'spider_name': self._spider_name,
|
|
|
- 'ts': tools.get_current_timestamp(),
|
|
|
- 'status': status
|
|
|
- }
|
|
|
- self._rabbitmq.add(self._tab_spider_heartbeat, message)
|
|
|
-
|
|
|
def join(self, timeout=None):
|
|
|
"""
|
|
|
重写线程的join
|