|
@@ -8,16 +8,13 @@ Created on 2020/4/22 12:05 AM
|
|
|
@email: boris_liu@foxmail.com
|
|
|
"""
|
|
|
|
|
|
-import time
|
|
|
import warnings
|
|
|
from collections import Iterable
|
|
|
|
|
|
-import feapder.setting as setting
|
|
|
import feapder.utils.tools as tools
|
|
|
from feapder.core.base_parser import BaseParser
|
|
|
from feapder.core.scheduler import Scheduler
|
|
|
from feapder.db.mongodb import MongoDB
|
|
|
-from feapder.db.redisdb import RedisDB
|
|
|
from feapder.network.item import Item
|
|
|
from feapder.network.request import Request
|
|
|
from feapder.utils.log import log
|
|
@@ -84,99 +81,7 @@ class Spider(
|
|
|
self._is_distributed_task = False
|
|
|
self._is_show_not_task = False
|
|
|
|
|
|
- def start_monitor_task(self, *args, **kws):
|
|
|
- if not self.is_reach_next_spider_time():
|
|
|
- return
|
|
|
-
|
|
|
- self._auto_start_requests = False
|
|
|
- redisdb = RedisDB()
|
|
|
-
|
|
|
- if not self._parsers: # 不是add_parser 模式
|
|
|
- self._parsers.append(self)
|
|
|
-
|
|
|
- while True:
|
|
|
- try:
|
|
|
- # 检查redis中是否有任务
|
|
|
- tab_requests = setting.TAB_REQUESTS.format(redis_key=self._redis_key)
|
|
|
- todo_task_count = redisdb.zget_count(tab_requests)
|
|
|
-
|
|
|
- if todo_task_count < self._min_task_count: # 添加任务
|
|
|
- # make start requests
|
|
|
- self.distribute_task(*args, **kws)
|
|
|
-
|
|
|
- else:
|
|
|
- log.info("redis 中尚有%s条积压任务,暂时不派发新任务" % todo_task_count)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.exception(e)
|
|
|
-
|
|
|
- if not self._keep_alive:
|
|
|
- break
|
|
|
-
|
|
|
- time.sleep(self._check_task_interval)
|
|
|
-
|
|
|
- def distribute_task(self, *args, **kws):
|
|
|
- """
|
|
|
- @summary: 分发任务 并将返回的request入库
|
|
|
- ---------
|
|
|
- @param tasks:
|
|
|
- ---------
|
|
|
- @result:
|
|
|
- """
|
|
|
- self._is_distributed_task = False
|
|
|
-
|
|
|
- for parser in self._parsers:
|
|
|
- requests = parser.start_requests(*args, **kws)
|
|
|
- if requests and not isinstance(requests, Iterable):
|
|
|
- raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
|
|
|
-
|
|
|
- result_type = 1
|
|
|
- for request in requests or []:
|
|
|
- if isinstance(request, Request):
|
|
|
- request.parser_name = request.parser_name or parser.name
|
|
|
- self._request_buffer.put_request(request)
|
|
|
-
|
|
|
- self._is_distributed_task = True
|
|
|
- result_type = 1
|
|
|
-
|
|
|
- elif isinstance(request, Item):
|
|
|
- self._item_buffer.put_item(request)
|
|
|
- result_type = 2
|
|
|
-
|
|
|
- elif callable(request): # callbale的request可能是更新数据库操作的函数
|
|
|
- if result_type == 1:
|
|
|
- self._request_buffer.put_request(request)
|
|
|
- else:
|
|
|
- self._item_buffer.put_item(request)
|
|
|
- else:
|
|
|
- raise TypeError(
|
|
|
- "start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
|
|
|
- type(request)
|
|
|
- )
|
|
|
- )
|
|
|
-
|
|
|
- self._request_buffer.flush()
|
|
|
- self._item_buffer.flush()
|
|
|
-
|
|
|
- if self._is_distributed_task: # 有任务时才提示启动爬虫
|
|
|
- # begin
|
|
|
- self.spider_begin()
|
|
|
- # 重置已经提示无任务状态为False
|
|
|
- self._is_show_not_task = False
|
|
|
-
|
|
|
- elif not self._is_show_not_task: # 无任务,且没推送过无任务信息
|
|
|
- # 发送无任务消息
|
|
|
- msg = "《%s》start_requests无任务添加" % (self._spider_name)
|
|
|
- log.info(msg)
|
|
|
-
|
|
|
- # self.send_msg(msg)
|
|
|
-
|
|
|
- self._is_show_not_task = True
|
|
|
-
|
|
|
def run(self):
|
|
|
- if not self.is_reach_next_spider_time():
|
|
|
- return
|
|
|
-
|
|
|
if not self._parsers: # 不是add_parser 模式
|
|
|
self._parsers.append(self)
|
|
|
|
|
@@ -258,98 +163,9 @@ class DebugSpider(Spider):
|
|
|
|
|
|
self._request = request or Request.from_dict(request_dict)
|
|
|
|
|
|
- def save_cached(self, request, response, table):
|
|
|
- pass
|
|
|
-
|
|
|
- def delete_tables(self, delete_tables_list):
|
|
|
- if isinstance(delete_tables_list, bool):
|
|
|
- delete_tables_list = [self._redis_key + "*"]
|
|
|
- elif not isinstance(delete_tables_list, (list, tuple)):
|
|
|
- delete_tables_list = [delete_tables_list]
|
|
|
-
|
|
|
- redis = RedisDB()
|
|
|
- for delete_tab in delete_tables_list:
|
|
|
- if delete_tab == "*":
|
|
|
- delete_tab = self._redis_key + "*"
|
|
|
-
|
|
|
- tables = redis.getkeys(delete_tab)
|
|
|
- for table in tables:
|
|
|
- log.info("正在删除表 %s" % table)
|
|
|
- redis.clear(table)
|
|
|
-
|
|
|
def __start_requests(self):
|
|
|
yield self._request
|
|
|
|
|
|
- def distribute_task(self):
|
|
|
- """
|
|
|
- @summary: 分发任务 并将返回的request入库
|
|
|
- ---------
|
|
|
- ---------
|
|
|
- @result:
|
|
|
- """
|
|
|
- self._is_distributed_task = False
|
|
|
-
|
|
|
- for parser in self._parsers:
|
|
|
- requests = parser.__start_requests()
|
|
|
- if requests and not isinstance(requests, Iterable):
|
|
|
- raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
|
|
|
-
|
|
|
- result_type = 1
|
|
|
- for request in requests or []:
|
|
|
- if isinstance(request, Request):
|
|
|
- request.parser_name = request.parser_name or parser.name
|
|
|
- self._request_buffer.put_request(request)
|
|
|
-
|
|
|
- self._is_distributed_task = True
|
|
|
- result_type = 1
|
|
|
-
|
|
|
- elif isinstance(request, Item):
|
|
|
- self._item_buffer.put_item(request)
|
|
|
- result_type = 2
|
|
|
-
|
|
|
- elif callable(request): # callbale的request可能是更新数据库操作的函数
|
|
|
- if result_type == 1:
|
|
|
- self._request_buffer.put_request(request)
|
|
|
- else:
|
|
|
- self._item_buffer.put_item(request)
|
|
|
-
|
|
|
- self._request_buffer.flush()
|
|
|
- self._item_buffer.flush()
|
|
|
-
|
|
|
- if self._is_distributed_task: # 有任务时才提示启动爬虫
|
|
|
- # begin
|
|
|
- self.spider_begin()
|
|
|
- self.record_spider_state(
|
|
|
- spider_type=1,
|
|
|
- state=0,
|
|
|
- batch_date=tools.get_current_date(),
|
|
|
- spider_start_time=tools.get_current_date(),
|
|
|
- batch_interval=self._batch_interval,
|
|
|
- )
|
|
|
-
|
|
|
- # 重置已经提示无任务状态为False
|
|
|
- self._is_show_not_task = False
|
|
|
-
|
|
|
- elif not self._is_show_not_task: # 无任务,且没推送过无任务信息
|
|
|
- # 发送无任务消息
|
|
|
- msg = "《%s》start_requests无任务添加" % (self._spider_name)
|
|
|
- log.info(msg)
|
|
|
-
|
|
|
- # self.send_msg(msg)
|
|
|
-
|
|
|
- self._is_show_not_task = True
|
|
|
-
|
|
|
- def record_spider_state(
|
|
|
- self,
|
|
|
- spider_type,
|
|
|
- state,
|
|
|
- batch_date=None,
|
|
|
- spider_start_time=None,
|
|
|
- spider_end_time=None,
|
|
|
- batch_interval=None,
|
|
|
- ):
|
|
|
- pass
|
|
|
-
|
|
|
def _start(self):
|
|
|
# 启动parser 的 start_requests
|
|
|
self.spider_begin() # 不自动结束的爬虫此处只能执行一遍
|
|
@@ -420,8 +236,6 @@ class DebugSpider(Spider):
|
|
|
|
|
|
tools.delay_time(1) # 1秒钟检查一次爬虫状态
|
|
|
|
|
|
- self.delete_tables([self._redis_key + "*"])
|
|
|
-
|
|
|
|
|
|
class BusinessBaseListSpider(Spider):
|
|
|
"""列表页采集业务基类"""
|