浏览代码

将redis修改成rabbitmq消息队列

dongzhaorui 1 年之前
父节点
当前提交
b18c426b43

+ 17 - 59
FworkSpider/feapder/buffer/item_buffer.py

@@ -8,45 +8,42 @@ Created on 2018-06-19 17:17
 @email: boris_liu@foxmail.com
 """
 
-import importlib
 import threading
 from queue import Queue
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ
 from feapder.dedup import Dedup
 from feapder.network.item import Item, UpdateItem
 from feapder.pipelines import BasePipeline
-from feapder.pipelines.mysql_pipeline import MysqlPipeline
 from feapder.utils import metrics
 from feapder.utils.log import log
 
 MAX_ITEM_COUNT = 5000  # 缓存中最大item数
 UPLOAD_BATCH_MAX_SIZE = 1000
 
-MYSQL_PIPELINE_PATH = "feapder.pipelines.mysql_pipeline.MysqlPipeline"
-
 
 class ItemBuffer(threading.Thread):
     dedup = None
-    __redis_db = None
 
-    def __init__(self, redis_key, task_table=None):
+    def __init__(self, redis_key, rabbitmq=None):
         if not hasattr(self, "_table_item"):
             super(ItemBuffer, self).__init__()
 
             self._thread_stop = False
             self._is_adding_to_db = False
             self._redis_key = redis_key
-            self._task_table = task_table
 
             self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
 
-            self._table_request = setting.TAB_REQUESTS.format(redis_key=redis_key)
-            self._table_failed_items = setting.TAB_FAILED_ITEMS.format(
-                redis_key=redis_key
-            )
+            self._rabbitmq = rabbitmq or RabbitMQ()
+            # 任务队列
+            self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
+            self._rabbitmq.declare(queue=self._tab_requests)
+            # 数据保存失败队列
+            self._tab_failed_items = setting.TAB_FAILED_ITEMS.format(redis_key=redis_key)
+            self._rabbitmq.declare(queue=self._tab_failed_items)
 
             self._item_tables = {
                 # 'item_name': 'table_name' # 缓存item名与表名对应关系
@@ -57,10 +54,6 @@ class ItemBuffer(threading.Thread):
             }
 
             self._pipelines = self.load_pipelines()
-
-            self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES
-            self._mysql_pipeline = None
-
             if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup:
                 self.__class__.dedup = Dedup(
                     to_md5=False, **setting.ITEM_FILTER_SETTING
@@ -71,34 +64,16 @@ class ItemBuffer(threading.Thread):
             # 导出失败的次数 TODO 非air爬虫使用redis统计
             self.export_falied_times = 0
 
-    @property
-    def redis_db(self):
-        if self.__class__.__redis_db is None:
-            self.__class__.__redis_db = RedisDB()
-
-        return self.__class__.__redis_db
-
     def load_pipelines(self):
         pipelines = []
         for pipeline_path in setting.ITEM_PIPELINES:
-            module, class_name = pipeline_path.rsplit(".", 1)
-            pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
-            pipeline = pipeline_cls()
+            pipeline = tools.import_cls(pipeline_path)()
             if not isinstance(pipeline, BasePipeline):
                 raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
             pipelines.append(pipeline)
 
         return pipelines
 
-    @property
-    def mysql_pipeline(self):
-        if not self._mysql_pipeline:
-            module, class_name = MYSQL_PIPELINE_PATH.rsplit(".", 1)
-            pipeline_cls = importlib.import_module(module).__getattribute__(class_name)
-            self._mysql_pipeline = pipeline_cls()
-
-        return self._mysql_pipeline
-
     def run(self):
         self._thread_stop = False
         while not self._thread_stop:
@@ -246,17 +221,11 @@ class ItemBuffer(threading.Thread):
     def __export_to_db(self, table, datas, is_update=False, update_keys=()):
         for pipeline in self._pipelines:
             if is_update:
-                if table == self._task_table and not isinstance(
-                    pipeline, MysqlPipeline
-                ):
-                    continue
-
                 if not pipeline.update_items(table, datas, update_keys=update_keys):
                     log.error(
                         f"{pipeline.__class__.__name__} 更新数据失败. table: {table}  items: {datas}"
                     )
                     return False
-
             else:
                 if not pipeline.save_items(table, datas):
                     log.error(
@@ -264,16 +233,6 @@ class ItemBuffer(threading.Thread):
                     )
                     return False
 
-        # 若是任务表, 且上面的pipeline里没mysql,则需调用mysql更新任务
-        if not self._have_mysql_pipeline and is_update and table == self._task_table:
-            if not self.mysql_pipeline.update_items(
-                table, datas, update_keys=update_keys
-            ):
-                log.error(
-                    f"{self.mysql_pipeline.__class__.__name__} 更新数据失败. table: {table}  items: {datas}"
-                )
-                return False
-
         self.metric_datas(table=table, datas=datas)
         return True
 
@@ -342,7 +301,8 @@ class ItemBuffer(threading.Thread):
 
             # 删除做过的request
             if requests:
-                self.redis_db.zrem(self._table_request, requests)
+                # self._rabbitmq.add(self._tab_requests, requests)
+                pass
 
             # 去重入库
             if setting.ITEM_FILTER_ENABLE:
@@ -353,12 +313,13 @@ class ItemBuffer(threading.Thread):
 
             if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
                 if self._redis_key != "air_spider":
-                    # 失败的item记录到redis
-                    self.redis_db.sadd(self._table_failed_items, failed_items)
+                    # 失败的item记录到rabbitmq
+                    self._rabbitmq.add(self._tab_failed_items, failed_items)
 
                     # 删除做过的request
                     if requests:
-                        self.redis_db.zrem(self._table_request, requests)
+                        # self.redis_db.zrem(self._table_request, requests)
+                        print(f'做过的requests数量: {len(requests)}')
 
                     log.error(
                         "入库超过最大重试次数,不再重试,数据记录到redis,items:\n {}".format(
@@ -373,10 +334,7 @@ class ItemBuffer(threading.Thread):
                     tip.append("不执行回调")
                 if requests:
                     tip.append("不删除任务")
-                    exists = self.redis_db.zexists(self._table_request, requests)
-                    for exist, request in zip(exists, requests):
-                        if exist:
-                            self.redis_db.zadd(self._table_request, requests, 300)
+                    self._rabbitmq.add(self._tab_requests, requests)
 
                 if setting.ITEM_FILTER_ENABLE:
                     tip.append("数据不入去重库")

+ 25 - 29
FworkSpider/feapder/buffer/request_buffer.py

@@ -13,7 +13,7 @@ import threading
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ
 from feapder.dedup import Dedup
 from feapder.utils.log import log
 
@@ -23,7 +23,7 @@ MAX_URL_COUNT = 1000  # 缓存中最大request数
 class RequestBuffer(threading.Thread):
     dedup = None
 
-    def __init__(self, redis_key):
+    def __init__(self, redis_key, rabbitmq=None):
         if not hasattr(self, "_requests_deque"):
             super(RequestBuffer, self).__init__()
 
@@ -32,12 +32,14 @@ class RequestBuffer(threading.Thread):
 
             self._requests_deque = collections.deque()
             self._del_requests_deque = collections.deque()
-            self._db = RedisDB()
 
-            self._table_request = setting.TAB_REQUESTS.format(redis_key=redis_key)
-            self._table_failed_request = setting.TAB_FAILED_REQUESTS.format(
-                redis_key=redis_key
-            )
+            self._rabbitmq = rabbitmq or RabbitMQ()
+            # 任务队列
+            self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
+            self._rabbitmq.declare(queue=self._tab_requests)
+            # 失败任务队列
+            self._tab_failed_requests = setting.TAB_FAILED_REQUESTS.format(redis_key=redis_key)
+            self._rabbitmq.declare(queue=self._tab_failed_requests)
 
             if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
                 self.__class__.dedup = Dedup(
@@ -58,20 +60,25 @@ class RequestBuffer(threading.Thread):
         self._thread_stop = True
         self._started.clear()
 
+    def get_failed_requests_count(self):
+        return self._rabbitmq.get_message_count(self._tab_failed_requests)
+
     def put_request(self, request):
         self._requests_deque.append(request)
 
         if self.get_requests_count() > MAX_URL_COUNT:  # 超过最大缓存,主动调用
             self.flush()
 
-    def put_del_request(self, request):
-        self._del_requests_deque.append(request)
-
     def put_failed_request(self, request, table=None):
         try:
             request_dict = request.to_dict
-            self._db.zadd(
-                table or self._table_failed_request, request_dict, request.priority
+            if table is not None:
+                # 声明额外的队列
+                self._rabbitmq.declare(queue=table)
+
+            self._rabbitmq.add(
+                table or self._tab_failed_requests,
+                request_dict
             )
         except Exception as e:
             log.exception(e)
@@ -108,26 +115,27 @@ class RequestBuffer(threading.Thread):
 
             priority = request.priority
 
-            # 如果需要去重并且库中已重复 则continue
+            # 如果需要去重并且库中已重复,则continue
             if (
                 request.filter_repeat
                 and setting.REQUEST_FILTER_ENABLE
                 and not self.__class__.dedup.add(request.fingerprint)
             ):
-                log.debug("request已存在  url = %s" % request.url)
+                log.debug("request已存在 url = %s" % request.url)
                 continue
             else:
                 request_list.append(str(request.to_dict))
                 prioritys.append(priority)
 
+            # 入库(超过上限[MAX_URL_COUNT]执行)
             if len(request_list) > MAX_URL_COUNT:
-                self._db.zadd(self._table_request, request_list, prioritys)
+                self._rabbitmq.add(self._tab_requests, request_list)
                 request_list = []
                 prioritys = []
 
-        # 入库
+        # 入库(小于上限[MAX_URL_COUNT]执行)
         if request_list:
-            self._db.zadd(self._table_request, request_list, prioritys)
+            self._rabbitmq.add(self._tab_requests, request_list)
 
         # 执行回调
         for callback in callbacks:
@@ -136,16 +144,4 @@ class RequestBuffer(threading.Thread):
             except Exception as e:
                 log.exception(e)
 
-        # 删除已做任务
-        if self._del_requests_deque:
-            request_done_list = []
-            while self._del_requests_deque:
-                request_done_list.append(self._del_requests_deque.popleft())
-
-            # 去掉request_list中的requests, 否则可能会将刚添加的request删除
-            request_done_list = list(set(request_done_list) - set(request_list))
-
-            if request_done_list:
-                self._db.zrem(self._table_request, request_done_list)
-
         self._is_adding_to_db = False

+ 28 - 101
FworkSpider/feapder/core/collector.py

@@ -1,11 +1,10 @@
 # -*- coding: utf-8 -*-
 """
-Created on 2016-12-23 11:24
+Created on 2023-09-21 11:24
 ---------
 @summary: request 管理
 ---------
-@author: Boris
-@email: boris_liu@foxmail.com
+@author: dzr
 """
 import threading
 import time
@@ -13,14 +12,17 @@ from queue import Queue, Empty
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ, RabbitMQMessage
 from feapder.network.request import Request
 from feapder.utils.log import log
 
+# 执行 eval 需要的全局对象
+tools.load_globals(RabbitMQMessage)
+
 
 class Collector(threading.Thread):
 
-    def __init__(self, redis_key):
+    def __init__(self, redis_key, rabbitmq=None):
         """
         @summary:
         ---------
@@ -30,29 +32,23 @@ class Collector(threading.Thread):
         """
 
         super(Collector, self).__init__()
-        self._db = RedisDB()
 
         self._thread_stop = False
 
+        self._rabbitmq = rabbitmq or RabbitMQ()
         self._todo_requests = Queue(maxsize=setting.COLLECTOR_TASK_COUNT)
-
+        # 任务队列
         self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
-        self._tab_spider_status = setting.TAB_SPIDER_STATUS.format(redis_key=redis_key)
-
-        self._spider_mark = tools.get_localhost_ip() + f"-{time.time()}"
+        self._rabbitmq.declare(queue=self._tab_requests)
 
         self._interval = setting.COLLECTOR_SLEEP_TIME
         self._request_count = setting.COLLECTOR_TASK_COUNT
         self._is_collector_task = False
-        self._first_get_task = True
-
-        self.__delete_dead_node()
 
     def run(self):  # step 线程入口
         self._thread_stop = False
         while not self._thread_stop:
             try:
-                self.__report_node_heartbeat()  # step 汇报节点心跳
                 self.__input_data()
             except Exception as e:
                 log.exception(e)
@@ -65,6 +61,11 @@ class Collector(threading.Thread):
         self._thread_stop = True
         self._started.clear()
 
+    def __get_messages(self, request_count):
+        messages = self._rabbitmq.get(self._tab_requests, request_count)
+        messages = [eval(message) for message in messages]
+        return messages
+
     def __input_data(self):
         if self._request_count / setting.SPIDER_THREAD_COUNT > 1 and (
             self._todo_requests.qsize() > setting.SPIDER_THREAD_COUNT
@@ -73,88 +74,21 @@ class Collector(threading.Thread):
             time.sleep(0.1)
             return
 
-        current_timestamp = tools.get_current_timestamp()
-
-        request_count = self._request_count  # 先赋值
-        # 查询最近有心跳的节点数量
-        spider_count = self._db.zget_count(
-            self._tab_spider_status,
-            priority_min=current_timestamp - (self._interval + 10),
-            priority_max=current_timestamp,
-        )
-        # 根据等待节点数量,动态分配request
-        if spider_count:
-            # 任务数量
-            task_count = self._db.zget_count(self._tab_requests)
-            # 动态分配的数量 = 任务数量 / 休息的节点数量 + 1
-            request_count = task_count // spider_count + 1
-
-        request_count = (
-            request_count
-            if request_count <= self._request_count
-            else self._request_count
-        )
-
-        if not request_count:
-            return
-
-        # 当前无其他节点,并且是首次取任务,则重置丢失的任务
-        if self._first_get_task and spider_count <= 1:
-            datas = self._db.zrangebyscore_set_score(
-                self._tab_requests,
-                priority_min=current_timestamp,
-                priority_max=current_timestamp + setting.REQUEST_LOST_TIMEOUT,
-                score=300,
-                count=None,
-            )
-            self._first_get_task = False
-            lose_count = len(datas)
-            if lose_count:
-                log.info("重置丢失任务完毕,共{}条".format(len(datas)))
-
-        # 取任务,只取当前时间搓以内的任务,同时将取走的任务分数修改为 current_timestamp + setting.REQUEST_LOST_TIMEOUT
-        requests_list = self._db.zrangebyscore_set_score(
-            self._tab_requests,
-            priority_min="-inf",
-            priority_max=current_timestamp,
-            score=current_timestamp + setting.REQUEST_LOST_TIMEOUT,
-            count=request_count,
-        )
-
-        # 当任务Queue存在任务且其它节点再次启动爬虫,但爬虫无任务可执行
-        # 原因是爬虫调用间隔时间小于 REQUEST_LOST_TIMEOUT
-        # log.debug("领取新任务完毕,共{}条".format(len(requests_list)))
-
-        if requests_list:
+        # 取任务
+        message_list = self.__get_messages(self._request_count)
+        if message_list:
             self._is_collector_task = True
-            # 存request
-            self.__put_requests(requests_list)
+            self.__put_messages(message_list)
         else:
             time.sleep(0.1)
 
-    def __report_node_heartbeat(self):
-        """
-        汇报节点心跳,以便任务平均分配
-        """
-        self._db.zadd(
-            self._tab_spider_status, self._spider_mark, tools.get_current_timestamp()
-        )
-
-    def __delete_dead_node(self):
-        """
-        删除没有心跳的节点信息
-        """
-        self._db.zremrangebyscore(
-            self._tab_spider_status,
-            "-inf",
-            tools.get_current_timestamp() - (self._interval + 10),
-        )
-
-    def __put_requests(self, requests_list):
-        for request in requests_list:
+    def __put_messages(self, message_list):
+        for message in message_list:
+            delivery_tag = message.delivery_tag
+            request = message.body
             try:
                 request_dict = {
-                    "request_obj": Request.from_dict(eval(request)),
+                    "request_obj": Request.from_dict(request),
                     "request_redis": request,
                 }
             except Exception as e:
@@ -169,6 +103,7 @@ class Collector(threading.Thread):
 
             if request_dict:
                 self._todo_requests.put(request_dict)
+                self._rabbitmq.ack(self._tab_requests, delivery_tag)
 
     def get_request(self):
         try:
@@ -179,18 +114,10 @@ class Collector(threading.Thread):
 
     def get_requests_count(self):
         return (
-            self._todo_requests.qsize() or self._db.zget_count(self._tab_requests) or 0
+            self._todo_requests.qsize()
+            or self._rabbitmq.get_message_count(queue=self._tab_requests)
+            or 0
         )
 
     def is_collector_task(self):
         return self._is_collector_task
-
-    def get_spider_count(self):
-        return self._db.zget_count(
-            self._tab_spider_status,
-            priority_min=tools.get_current_timestamp() - (self._interval + 10),
-            priority_max=tools.get_current_timestamp(),
-        )
-
-    def delete_spider_node(self):
-        self._db.zrem(self._tab_spider_status, self._spider_mark)

+ 17 - 17
FworkSpider/feapder/core/handle_failed_items.py

@@ -8,35 +8,34 @@ Created on 2022/11/18 11:33 AM
 @email: boris_liu@foxmail.com
 """
 import bson
+from bson import ObjectId
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ, RabbitMQMessage
 from feapder.network.item import Item, UpdateItem
 from feapder.utils.log import log
 
-# 执行 eval 时动态变量
-try:
-    from bson import ObjectId
-except ImportError:
-    pass
+# 执行 eval 需要的变量属性
+tools.load_globals(RabbitMQMessage, ObjectId)
 
 
 class HandleFailedItems:
-    def __init__(self, redis_key, task_table=None, item_buffer=None):
+    def __init__(self, redis_key, rabbitmq=None, item_buffer=None):
         if redis_key.endswith(":s_failed_items"):
             redis_key = redis_key.replace(":s_failed_items", "")
 
-        self._redisdb = RedisDB()
-        self._item_buffer = item_buffer or ItemBuffer(redis_key, task_table=task_table)
+        self._rabbitmq = rabbitmq or RabbitMQ()
+        self._item_buffer = item_buffer or ItemBuffer(redis_key)
 
-        self._table_failed_items = setting.TAB_FAILED_ITEMS.format(redis_key=redis_key)
+        # 数据保存失败队列
+        self._tab_failed_items = setting.TAB_FAILED_ITEMS.format(redis_key=redis_key)
+        self._rabbitmq.declare(queue=self._tab_failed_items)
 
     def get_failed_items(self, count=1):
-        failed_items = self._redisdb.sget(
-            self._table_failed_items, count=count, is_pop=False
-        )
+        failed_items = self._rabbitmq.get(self._tab_failed_items, count)
+        failed_items = [eval(message) for message in failed_items]
         return failed_items
 
     def reput_failed_items_to_db(self):
@@ -48,8 +47,9 @@ class HandleFailedItems:
                 if not failed_items:
                     break
 
-                for data_str in failed_items:
-                    data = eval(data_str)
+                for message in failed_items:
+                    delivery_tag = message.delivery_tag
+                    data = message.body
 
                     for add in data.get("add"):
                         table = add.get("table")
@@ -78,7 +78,7 @@ class HandleFailedItems:
 
                     # 入库成功后删除
                     def delete_item():
-                        self._redisdb.srem(self._table_failed_items, data_str)
+                        self._rabbitmq.ack(self._tab_failed_items, delivery_tag)
 
                     self._item_buffer.put_item(delete_item)
                     self._item_buffer.flush()
@@ -87,7 +87,7 @@ class HandleFailedItems:
                 log.exception(e)
 
         if total_count:
-            log.debug("导入%s条失败item到数库" % total_count)
+            log.debug("导入%s条失败item到数库" % total_count)
         else:
             log.debug("没有失败的item")
 

+ 23 - 16
FworkSpider/feapder/core/handle_failed_requests.py

@@ -8,49 +8,56 @@ Created on 2018-08-13 11:43:01
 @email:  boris_liu@foxmail.com
 """
 import feapder.setting as setting
+import feapder.utils.tools as tools
 from feapder.buffer.request_buffer import RequestBuffer
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ, RabbitMQMessage
 from feapder.network.request import Request
 from feapder.utils.log import log
 
+# 执行 eval 需要的变量属性
+tools.load_globals(RabbitMQMessage)
+
 
 class HandleFailedRequests(object):
     """docstring for HandleFailedRequests"""
 
-    def __init__(self, redis_key):
+    def __init__(self, redis_key, rabbitmq=None):
         super(HandleFailedRequests, self).__init__()
         self._redis_key = redis_key
 
-        self._redisdb = RedisDB()
-        self._request_buffer = RequestBuffer(self._redis_key)
+        self._rabbitmq = rabbitmq or RabbitMQ()
+        self._request_buffer = RequestBuffer(self._redis_key, rabbitmq)
 
-        self._table_failed_request = setting.TAB_FAILED_REQUESTS.format(
-            redis_key=redis_key
-        )
+        # 失败任务队列
+        self._tab_failed_requests = setting.TAB_FAILED_REQUESTS.format(redis_key=redis_key)
+        self._rabbitmq.declare(queue=self._tab_failed_requests)
 
-    def get_failed_requests(self, count=10000):
-        failed_requests = self._redisdb.zget(self._table_failed_request, count=count)
-        failed_requests = [eval(failed_request) for failed_request in failed_requests]
-        return failed_requests
+    def get_failed_messages(self, count=10000):
+        failed_messages = self._rabbitmq.get(self._tab_failed_requests, count)
+        failed_messages = [eval(message) for message in failed_messages]
+        return failed_messages
 
     def reput_failed_requests_to_requests(self):
         log.debug("正在重置失败的requests...")
         total_count = 0
         while True:
             try:
-                failed_requests = self.get_failed_requests()
-                if not failed_requests:
+                failed_messages = self.get_failed_messages()
+                if not failed_messages:
                     break
 
-                for request in failed_requests:
+                for message in failed_messages:
+                    delivery_tag = message.delivery_tag
+                    request = message.body
                     request["retry_times"] = 0
                     request_obj = Request.from_dict(request)
                     self._request_buffer.put_request(request_obj)
-
+                    self._rabbitmq.ack(self._tab_failed_requests, delivery_tag)
                     total_count += 1
+
             except Exception as e:
                 log.exception(e)
 
         self._request_buffer.flush()
-
+        self._request_buffer.stop()
         log.debug("重置%s条失败requests为待抓取requests" % total_count)

+ 103 - 125
FworkSpider/feapder/core/parser_control.py

@@ -42,9 +42,111 @@ class PaserControl(threading.Thread):
         self._redis_key = redis_key
         self._request_buffer = request_buffer
         self._item_buffer = item_buffer
-
         self._thread_stop = False
 
+    def is_filter(self, item):
+        """item入库前是否会被过滤"""
+        if setting.ITEM_FILTER_ENABLE:
+            if self._item_buffer.__class__.dedup.get(item.fingerprint):
+                return True
+        return False
+
+    def sent_heartbeat(self, items, table=None):
+        """发送心跳数据"""
+        send_success = True
+        is_list = isinstance(items, list)
+        items = items if is_list else [items]
+        log.debug("发送心跳")
+        table = table or setting.RECORD_SPIDER_HEARTBEAT
+        if not self._item_buffer.export_to_db(table, items):
+            send_success = False
+            log.error("失败心跳:\n {}".format(tools.dumps_json(items)))
+        return send_success
+
+    @staticmethod
+    def get_spider_attribute(name, *args):
+        """获取对象属性"""
+        obj1, obj2 = args or (None, None)
+
+        val = None
+        if obj1 is not None:
+            if isinstance(obj1, dict):
+                val = obj1.get(name)
+                if not val and name == "spidercode":
+                    val = obj1.get("code")
+            else:
+                val = getattr(obj1, name, None)
+
+        if not val and obj2 is not None:
+            val = getattr(obj2, name, None)
+
+        return val if val is not None else ""
+
+    def spider_heartbeat(self, request, response, **kwargs):
+        """爬虫心跳"""
+        parser = kwargs["parser"]
+        now_page = kwargs["now_page"]
+        extract_count = kwargs["extract_count"]
+        task_count = kwargs["task_count"]
+        rel_count = kwargs["rel_count"]
+        filepath = kwargs["filepath"]
+        status_code = getattr(response, "status_code", -1)
+
+        item = getattr(request, "item", {})
+        site = self.get_spider_attribute("site", item, parser)
+        channel = self.get_spider_attribute("channel", item, parser)
+        code = self.get_spider_attribute("spidercode", item, parser)
+        business_type: str = parser.__business_type__  # 爬虫业务类型
+        run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
+        spider_id = tools.get_md5(code + business_type + run_time)
+        heartbeat_content = dict(
+            node_ip=tools.os.environ.get("CRAWLAB_SERVER_REGISTER_IP"),  # crawlab节点名称
+            crawlab_taskid=tools.os.environ.get("CRAWLAB_TASK_ID"),  # crawlab平台爬虫的任务id
+            site=site,
+            channel=channel,
+            spidercode=code,
+            url=request.url,  # 访问地址
+            status_code=status_code,  # 响应状态码
+            runtime=run_time,
+            business_type=business_type,
+            spider_id=spider_id,
+            filepath=filepath,  # 文件路径
+            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 执行时间, 单位:秒
+        )
+
+        if hasattr(request, "error_msg") and status_code != 200:
+            error = getattr(request, "error_msg")
+            feature = dict(
+                err_type=str(error.split(": ")[0]),
+                err_msg=getattr(request, "error_msg"),
+            )
+            feature.setdefault("request_success", False)
+            if business_type.endswith("List"):
+                feature.update(dict(nowpage=now_page, ))
+            else:
+                feature.update(dict(count=task_count, ))
+        else:
+            if business_type.endswith("List"):
+                # 列表页
+                list_feature = dict(
+                    nowpage=now_page,  # 当前页码
+                    count=extract_count,  # 列表提取总数
+                    rel_count=rel_count,  # 实际入库总数
+                )
+                feature = list_feature
+            else:
+                # 详情页
+                detail_feature = dict(
+                    count=task_count,  # 发起请求的总数
+                    rel_count=rel_count,  # 实际入库总数
+                )
+                feature = detail_feature
+            feature.setdefault("request_success", True)
+
+        feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
+        heartbeat_content.update(feature)
+        return self.sent_heartbeat(heartbeat_content)
+
     def run(self):
         self._thread_stop = False
         while not self._thread_stop:
@@ -74,12 +176,8 @@ class PaserControl(threading.Thread):
         request_redis = request["request_redis"]
         request = request["request_obj"]
 
-        del_request_redis_after_item_to_db = False
-        del_request_redis_after_request_to_db = False
-
         is_sent_heartbeat = False  # 发送心跳的条件
         heartbeat_lst = []  # 待推送的心跳信息列表
-
         for parser in self._parsers:
             now_page = getattr(request, "page", -1)  # 当前访问页码
             extract_count = 0  # 列表抽取总数量
@@ -208,7 +306,6 @@ class PaserControl(threading.Thread):
                             else:  # 异步
                                 # 将next_request 入库
                                 self._request_buffer.put_request(result)
-                                del_request_redis_after_request_to_db = True
 
                         elif isinstance(result, Item):
 
@@ -233,17 +330,13 @@ class PaserControl(threading.Thread):
 
                             # 将item入库(异步)
                             self._item_buffer.put_item(result)
-                            # 需删除正在做的request
-                            del_request_redis_after_item_to_db = True
 
                         elif callable(result):  # result为可执行的无参函数
                             if result_type == 2:  # item 的 callback,buffer里的item均入库后再执行
                                 self._item_buffer.put_item(result)
-                                del_request_redis_after_item_to_db = True
 
                             else:  # result_type == 1: # request 的 callback,buffer里的request均入库后再执行。可能有的parser直接返回callback
                                 self._request_buffer.put_request(result)
-                                del_request_redis_after_request_to_db = True
 
                         elif result is not None:
                             function_name = "{}.{}".format(
@@ -382,7 +475,6 @@ class PaserControl(threading.Thread):
                                 elif isinstance(result, Item):
                                     self._item_buffer.put_item(result)
 
-                            del_request_redis_after_request_to_db = True
                             is_sent_heartbeat = True
                         else:
                             # 将 requests 重新入库 爬取
@@ -419,7 +511,6 @@ class PaserControl(threading.Thread):
                                 self._request_buffer.put_request(original_request)
                             else:
                                 self._request_buffer.put_request(request)
-                            del_request_redis_after_request_to_db = True
 
                 else:
                     # 记录下载成功的文档
@@ -461,17 +552,6 @@ class PaserControl(threading.Thread):
             for heartbeat in heartbeat_lst:
                 self.spider_heartbeat(**heartbeat)
 
-        # 删除正在做的request 跟随item优先
-        if request_redis:
-            if del_request_redis_after_item_to_db:
-                self._item_buffer.put_item(request_redis)
-
-            elif del_request_redis_after_request_to_db:
-                self._request_buffer.put_del_request(request_redis)
-
-            else:
-                self._request_buffer.put_del_request(request_redis)
-
         if setting.SPIDER_SLEEP_TIME:
             if (
                 isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
@@ -499,108 +579,6 @@ class PaserControl(threading.Thread):
     def add_parser(self, parser):
         self._parsers.append(parser)
 
-    def sent_heartbeat(self, items, table=setting.HEARTBEAT_TABLE):
-        """发送心跳数据"""
-        send_success = True
-        is_list = isinstance(items, list)
-        items = items if is_list else [items]
-        log.debug("发送心跳")
-        if not self._item_buffer.export_to_db(table, items):
-            send_success = False
-            log.error("失败心跳:\n {}".format(tools.dumps_json(items)))
-        return send_success
-
-    @staticmethod
-    def get_spider_attribute(name, *args):
-        """获取对象属性"""
-        obj1, obj2 = args or (None, None)
-
-        val = None
-        if obj1 is not None:
-            if isinstance(obj1, dict):
-                val = obj1.get(name)
-                if not val and name == "spidercode":
-                    val = obj1.get("code")
-            else:
-                val = getattr(obj1, name, None)
-
-        if not val and obj2 is not None:
-            val = getattr(obj2, name, None)
-
-        return val if val is not None else ""
-
-    def spider_heartbeat(self, request, response, **kwargs):
-        """爬虫心跳"""
-        parser = kwargs["parser"]
-        now_page = kwargs["now_page"]
-        extract_count = kwargs["extract_count"]
-        task_count = kwargs["task_count"]
-        rel_count = kwargs["rel_count"]
-        filepath = kwargs["filepath"]
-        status_code = getattr(response, "status_code", -1)
-
-        item = getattr(request, "item", {})
-        site = self.get_spider_attribute("site", item, parser)
-        channel = self.get_spider_attribute("channel", item, parser)
-        code = self.get_spider_attribute("spidercode", item, parser)
-        business_type: str = parser.__business_type__  # 爬虫业务类型
-        run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
-        spider_id = tools.get_md5(code + business_type + run_time)
-        heartbeat_content = dict(
-            node_ip=tools.os.environ.get("CRAWLAB_SERVER_REGISTER_IP"),  # crawlab节点名称
-            crawlab_taskid=tools.os.environ.get("CRAWLAB_TASK_ID"),  # crawlab平台爬虫的任务id
-            site=site,
-            channel=channel,
-            spidercode=code,
-            url=request.url,  # 访问地址
-            status_code=status_code,  # 响应状态码
-            runtime=run_time,
-            business_type=business_type,
-            spider_id=spider_id,
-            filepath=filepath,  # 文件路径
-            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 执行时间, 单位:秒
-        )
-
-        if hasattr(request, "error_msg") and status_code != 200:
-            error = getattr(request, "error_msg")
-            feature = dict(
-                err_type=str(error.split(": ")[0]),
-                err_msg=getattr(request, "error_msg"),
-            )
-            feature.setdefault("request_success", False)
-            if business_type.endswith("List"):
-                feature.update(dict(nowpage=now_page, ))
-            else:
-                feature.update(dict(count=task_count, ))
-        else:
-            if business_type.endswith("List"):
-                # 列表页
-                list_feature = dict(
-                    nowpage=now_page,  # 当前页码
-                    count=extract_count,  # 列表提取总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = list_feature
-            else:
-                # 详情页
-                detail_feature = dict(
-                    count=task_count,  # 发起请求的总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = detail_feature
-            feature.setdefault("request_success", True)
-
-        feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
-        heartbeat_content.update(feature)
-        return self.sent_heartbeat(heartbeat_content)
-
-    def is_filter(self, item):
-        """item入库前是否会被过滤"""
-        if setting.ITEM_FILTER_ENABLE:
-            if self._item_buffer.__class__.dedup.get(item.fingerprint):
-                return True
-        return False
-
 
 class AirSpiderParserControl(PaserControl):
     is_show_tip = False

+ 78 - 251
FworkSpider/feapder/core/scheduler.py

@@ -12,6 +12,7 @@ import sys
 import threading
 import time
 from collections import Iterable
+from types import SimpleNamespace
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
@@ -22,23 +23,11 @@ from feapder.core.collector import Collector
 from feapder.core.handle_failed_items import HandleFailedItems
 from feapder.core.handle_failed_requests import HandleFailedRequests
 from feapder.core.parser_control import PaserControl
-from feapder.db.redisdb import RedisDB
+from feapder.db.rabbitMq import RabbitMQ
 from feapder.network.item import Item
 from feapder.network.request import Request
 from feapder.utils import metrics
 from feapder.utils.log import log
-from feapder.utils.redis_lock import RedisLock
-
-SPIDER_UUID = tools.get_uuid()
-SPIDER_START_TIME = "spider_start_time"
-SPIDER_START_TIME_KEY = SPIDER_START_TIME + "#" + SPIDER_UUID
-SPIDER_END_TIME_KEY = "spider_end_time"
-SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY = "last_task_count_record_time"
-
-
-class Obj(object):
-    def __init__(self, dict_):
-        self.__dict__.update(dict_)
 
 
 class Scheduler(threading.Thread):
@@ -50,12 +39,8 @@ class Scheduler(threading.Thread):
         thread_count=None,
         begin_callback=None,
         end_callback=None,
-        delete_keys=(),
         keep_alive=None,
         auto_start_requests=None,
-        batch_interval=0,
-        wait_lock=True,
-        task_table=None,
         **kwargs
     ):
         """
@@ -65,12 +50,8 @@ class Scheduler(threading.Thread):
         @param thread_count: 线程数,默认为配置文件中的线程数
         @param begin_callback: 爬虫开始回调函数
         @param end_callback: 爬虫结束回调函数
-        @param delete_keys: 爬虫启动时删除的key,类型: 元组/bool/string。 支持正则
         @param keep_alive: 爬虫是否常驻,默认否
         @param auto_start_requests: 爬虫是否自动添加任务
-        @param batch_interval: 抓取时间间隔 默认为0 天为单位 多次启动时,只有当前时间与第一次抓取结束的时间间隔大于指定的时间间隔时,爬虫才启动
-        @param wait_lock: 下发任务时否等待锁,若不等待锁,可能会存在多进程同时在下发一样的任务,因此分布式环境下请将该值设置True
-        @param task_table: 任务表, 批次爬虫传递
         ---------
         @result:
         """
@@ -82,16 +63,7 @@ class Scheduler(threading.Thread):
                 setattr(setting, "KEEP_ALIVE", not value)
             else:
                 setattr(setting, key, value)
-        
-        # 历史爬虫[redis_key]
-        for item in sys.argv[1:]:
-            if item.startswith("--purpose"):
-                val = item.split('=')[-1]
-                if not redis_key.endswith(val):
-                    # 历史爬虫需要单独的redis_key,防止增量爬虫
-                    # 与历史爬虫共用同一个redis_key,出现增量爬虫断点续采的情况
-                    redis_key += f'_{val}'
-                    
+
         self._redis_key = redis_key or setting.REDIS_KEY
         if not self._redis_key:
             raise Exception(
@@ -102,10 +74,12 @@ class Scheduler(threading.Thread):
                 """
             )
 
-        self._request_buffer = RequestBuffer(redis_key)
-        self._item_buffer = ItemBuffer(redis_key, task_table)
+        self._rabbitmq = RabbitMQ()
 
+        self._request_buffer = RequestBuffer(redis_key)
+        self._item_buffer = ItemBuffer(redis_key)
         self._collector = Collector(redis_key)
+
         self._parsers = []
         self._parser_controls = []
         self._parser_control_obj = PaserControl
@@ -114,16 +88,15 @@ class Scheduler(threading.Thread):
         if "auto_stop_when_spider_done" in kwargs:
             self._keep_alive = not kwargs.get("auto_stop_when_spider_done")
         else:
-
             self._keep_alive = (
                 keep_alive if keep_alive is not None else setting.KEEP_ALIVE
             )
+
         self._auto_start_requests = (
             auto_start_requests
             if auto_start_requests is not None
             else setting.SPIDER_AUTO_START_REQUESTS
         )
-        self._batch_interval = batch_interval
 
         self._begin_callback = (
             begin_callback
@@ -140,34 +113,20 @@ class Scheduler(threading.Thread):
             setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
         )
 
+        self._spider_id = tools.get_uuid(redis_key, tools.get_current_date())
         self._spider_name = redis_key
-        self._project_name = redis_key.split(":")[0]
-        self._task_table = task_table
-
-        self._tab_spider_time = setting.TAB_SPIDER_TIME.format(redis_key=redis_key)
-        self._tab_spider_status = setting.TAB_SPIDER_STATUS.format(redis_key=redis_key)
-        self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
-        self._tab_failed_requests = setting.TAB_FAILED_REQUESTS.format(
-            redis_key=redis_key
-        )
 
-        self._is_notify_end = False  # 是否已经通知结束
-        self._last_task_count = 0  # 最近一次任务数量
-        self._redisdb = RedisDB()
+        # 声明爬虫心跳队列
+        self._tab_spider_heartbeat = setting.SPIDER_HEARTBEAT
+        self._rabbitmq.declare(queue=self._tab_spider_heartbeat)
 
-        self._project_total_state_table = "{}_total_state".format(self._project_name)
-        self._is_exist_project_total_state_table = False
+        self._is_notify_end = False  # 是否已经通知结束
 
         # Request 缓存设置
         Request.cached_redis_key = redis_key
         Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
 
-        delete_keys = delete_keys or setting.DELETE_KEYS
-        if delete_keys:
-            self.delete_tables(delete_keys)
-
         self._last_check_task_status_time = 0
-        self.wait_lock = wait_lock
 
         self.init_metrics()
 
@@ -184,36 +143,32 @@ class Scheduler(threading.Thread):
         else:
             raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
 
-    def run(self):  # STEP 1 爬虫框架入口
-        if not self.is_reach_next_spider_time():  # STEP 2 检测爬虫是否到达执行时间
-            return
-
-        self._start()  # STEP 3 开始运行爬虫
+    def run(self):
+        self._start()
 
-        while True:  # step 4 对爬虫状态的一个监控
+        while True:
+            self.__report_node_heartbeat('running')
             try:
-                if self.all_thread_is_done(): # Step 5 判断爬虫是否运行完成
+                if self.all_thread_is_done():
                     if not self._is_notify_end:
-                        self.spider_end()  # 跑完一轮
+                        self.spider_end()  # 爬虫运行结束
                         self._is_notify_end = True
 
-                    if not self._keep_alive: # step 7 如果不是常驻爬虫 停止所有线程
+                    if not self._keep_alive:  # 如果不是常驻爬虫 关闭所有线程
                         self._stop_all_thread()
                         break
 
                 else:
                     self._is_notify_end = False
 
-                self.check_task_status()  # step 8 检查任务状态,并进行告警通知
-
+                self.check_task_status()
             except Exception as e:
                 log.exception(e)
 
-            tools.delay_time(1)  # 1秒钟检查一次爬虫状态
+            tools.delay_time(1)
 
     def __add_task(self):
-        # 启动parser 的 start_requests
-        self.spider_begin()  # 不自动结束的爬虫此处只能执行一遍
+        self.spider_begin()  # 启动爬虫 start_requests
 
         # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
         todo_task_count = self._collector.get_requests_count()
@@ -257,21 +212,21 @@ class Scheduler(threading.Thread):
         if setting.RETRY_FAILED_ITEMS:
             handle_failed_items = HandleFailedItems(
                 redis_key=self._redis_key,
-                task_table=self._task_table,
                 item_buffer=self._item_buffer,
+                rabbitmq=self._rabbitmq,
             )
             handle_failed_items.reput_failed_items_to_db()
 
-        # STEP 3.1 启动request_buffer -- 任务管理器, 负责缓冲添加到数据库中的request
+        # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
         self._request_buffer.start()
-        # STEP 3.2 启动item_buffer -- 管道管理器 责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
+        # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
         self._item_buffer.start()
-        # STEP 3.3 启动collector  -- 任务管理 ,根据节点和任务,平均分配给每个节点
+        # STEP 3.3 开启 collector  -- 任务管理 分发任务
         self._collector.start()
 
         # 启动parser control
         for i in range(self._thread_count):
-            # STEP 3.4 根据 任务管理器、redis_key,下载器,数据管道创建一个线程池
+            # STEP 3.4 创建执行任务线程池
             parser_control = self._parser_control_obj(
                 self._collector,
                 self._redis_key,
@@ -279,27 +234,24 @@ class Scheduler(threading.Thread):
                 self._item_buffer,
             )
 
-            for parser in self._parsers:  # step 3.5 把所有任务放入线程池
+            for parser in self._parsers:  # step 3.5 把所有待执行任务添加到线程池
                 parser_control.add_parser(parser)
 
-            parser_control.start()  # STEP 3.6 根据线程池开辟一个线程
+            parser_control.start()  # STEP 3.6 开启采集线程
             self._parser_controls.append(parser_control)
 
         # STEP 3.7下发任务 有消费线程之后开始读取任务
         if setting.RETRY_FAILED_REQUESTS:
-            # 重设失败的任务, 不用加锁,原子性操作
-            handle_failed_requests = HandleFailedRequests(self._redis_key)
+            # 重设失败的任务
+            handle_failed_requests = HandleFailedRequests(
+                redis_key=self._redis_key,
+                rabbitmq=self._rabbitmq
+            )
             handle_failed_requests.reput_failed_requests_to_requests()
 
         # STEP 3.8下发新任务 ,生产新任务
-        if self._auto_start_requests:  # 自动下发
-            if self.wait_lock:
-                # Stress 将添加任务处加锁,防止多进程之间添加重复的任务
-                with RedisLock(key=self._spider_name) as lock:
-                    if lock.locked:
-                        self.__add_task()
-            else:
-                self.__add_task()
+        if self._auto_start_requests:
+            self.__add_task()
 
     def all_thread_is_done(self):
         # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
@@ -347,19 +299,19 @@ class Scheduler(threading.Thread):
             return
 
         # 检查失败任务数量 超过1000 报警,
-        failed_count = self._redisdb.zget_count(self._tab_failed_requests)
-        print('<<<<<<<<<<<<<<<<<<<<<<<<<<<< 失败次数:', failed_count)
+        failed_count = self._request_buffer.get_failed_requests_count()
+        log.debug(f'《{self._spider_name}》爬虫失败任务数量:{failed_count}')
         if failed_count > setting.WARNING_FAILED_COUNT:
             # 发送报警
             msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
             log.error(msg)
-            self.send_msg(
-                msg,
+            tools.send_msg(**dict(
+                msg=msg,
                 level="error",
                 message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
-            )
+            ))
 
-        # parser_control实时统计已做任务数及失败任务数,若成功率<0.5 则报警
+        # parser_control 实时统计已做任务数及失败任务数,若成功率<0.5 则报警
         failed_task_count, success_task_count = PaserControl.get_task_status_count()
         total_count = success_task_count + failed_task_count
         if total_count > 0:
@@ -373,68 +325,11 @@ class Scheduler(threading.Thread):
                     task_success_rate,
                 )
                 log.error(msg)
-                self.send_msg(
-                    msg,
+                tools.send_msg(**dict(
+                    msg=msg,
                     level="error",
                     message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
-                )
-
-        # 判断任务数是否变化
-        # step 检查redis中任务状态,若连续20分钟内任务数量未发生变化(parser可能卡死),则发出报警信息
-        task_count = self._redisdb.zget_count(self._tab_requests)
-
-        if task_count:
-            if task_count != self._last_task_count:
-                self._last_task_count = task_count
-                self._redisdb.hset(
-                    self._tab_spider_time,
-                    SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY,
-                    tools.get_current_timestamp(),
-                )  # 多进程会重复发消息, 使用redis记录上次统计时间
-            else:
-                # step 判断时间间隔是否超过20分钟
-                lua = """
-                    -- local key = KEYS[1]
-                    local field = ARGV[1]
-                    local current_timestamp = ARGV[2]
-
-                    -- 取值
-                    local last_timestamp = redis.call('hget', KEYS[1], field)
-                    if last_timestamp and current_timestamp - last_timestamp >= 1200 then
-                        -- 返回任务停滞时间 秒
-                        return current_timestamp - last_timestamp 
-                    end
-
-                    if not last_timestamp then
-                        redis.call('hset', KEYS[1], field, current_timestamp)
-                    end
-
-                    return 0
-
-                """
-                redis_obj = self._redisdb.get_redis_obj()
-                cmd = redis_obj.register_script(lua)
-                overtime = cmd(
-                    keys=[self._tab_spider_time],
-                    args=[
-                        SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY,
-                        tools.get_current_timestamp(),
-                    ],
-                )
-
-                if overtime:
-                    # step 记录日志,并发送报警
-                    msg = "{}  爬虫任务停滞 {},请检查爬虫是否正常".format(
-                        self._spider_name, tools.format_seconds(overtime)
-                    )
-                    log.error(msg) # TODO 这一步可以加一个print,在平台的日志框里输出
-                    self.send_msg(
-                        msg,
-                        level="error",
-                        message_prefix="《{}》爬虫任务停滞".format(self._spider_name),
-                    )
-        else:
-            self._last_task_count = 0
+                ))
 
         # 检查入库失败次数
         if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
@@ -442,49 +337,27 @@ class Scheduler(threading.Thread):
                 self._spider_name, self._item_buffer.export_falied_times
             )
             log.error(msg)
-            self.send_msg(
-                msg, level="error", message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
-            )
-
-    def delete_tables(self, delete_tables_list):
-        if isinstance(delete_tables_list, bool):
-            delete_tables_list = [self._redis_key + "*"]
-        elif not isinstance(delete_tables_list, (list, tuple)):
-            delete_tables_list = [delete_tables_list]
-
-        redis = RedisDB()
-        for delete_tab in delete_tables_list:
-            if not delete_tab.startswith(self._redis_key):
-                delete_tab = self._redis_key + delete_tab
-            tables = redis.getkeys(delete_tab)
-            for table in tables:
-                if table != self._tab_spider_time:
-                    log.info("正在删除key %s" % table)
-                    redis.clear(table)
-                else:
-                    keys = redis.hgetall(table)
-                    for key in keys:
-                        if key.startswith(SPIDER_START_TIME):
-                            redis.hdel(table, key)
+            tools.send_msg(**dict(
+                msg=msg,
+                level="error",
+                message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
+            ))
 
     def _stop_all_thread(self):
+        # 关闭任务管理器
         self._request_buffer.stop()
+        # 关闭数据管道
         self._item_buffer.stop()
-        # 停止 collector
+        # 关闭任务管理
         self._collector.stop()
         # 停止 parser_controls
         for parser_control in self._parser_controls:
             parser_control.stop()
 
+        # 记录爬虫停止时间
+        self.__report_node_heartbeat('close')
         self._started.clear()
 
-    def send_msg(self, msg, level="debug", message_prefix=""):
-        #TODO 这个方法是消息预警,但如果每次都发送,会造成消息轰炸,所以采集框架的消息预警没有开启,
-        # 后续优化方向,消息预警的内容可以通过接口,接受保存,并对内容紧急度进行分辨,紧急度高的消息,可以直接发送至微信群中,这里尽量不要直接存储,feapder
-        # 框架不进行mongo的直接存储,只做查询操作
-        # log.debug("发送报警 level:{} msg{}".format(level, msg))
-        tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
-
     def get_argvs(self):
         argvs = {"next_page": False, "max_page": 10}
         for item in sys.argv[1:]:
@@ -494,7 +367,7 @@ class Scheduler(threading.Thread):
                 val = item.split('=')[-1]
                 if key != 'purpose':
                     argvs[key] = eval(val)  # 此处使用eval的原因是字符串转bool或int
-        return json.loads(json.dumps(argvs), object_hook=Obj)
+        return json.loads(json.dumps(argvs), object_hook=lambda d: SimpleNamespace(**d))
 
     def spider_begin(self):
         """
@@ -503,7 +376,6 @@ class Scheduler(threading.Thread):
         ---------
         @result:
         """
-
         if self._begin_callback:
             self._begin_callback()
 
@@ -513,29 +385,23 @@ class Scheduler(threading.Thread):
             parser.platform_max_page = parameter.max_page
             parser.start_callback()
 
-        # 记录开始时间
-        if not self._redisdb.hexists(self._tab_spider_time, SPIDER_START_TIME_KEY):
-            current_timestamp = tools.get_current_timestamp()
-            self._redisdb.hset(
-                self._tab_spider_time, SPIDER_START_TIME_KEY, current_timestamp
-            )
-
-            # 发送消息
-            # self.send_msg("《%s》爬虫开始" % self._spider_name)
+        # 记录爬虫开始时间
+        self.__report_node_heartbeat('start')
 
-    def spider_end(self):  # step end 爬虫结束时的一些操作
-        self.record_end_time()
+    def spider_end(self):
+        # 爬虫结束时间
+        self.__report_node_heartbeat('end')
 
-        if self._end_callback:  # 系统自带的回调,如果自定义回调,则这个回调不会执行
+        if self._end_callback:  # 任务结束回调
             self._end_callback()
 
         for parser in self._parsers:
             if not self._keep_alive:
                 parser.close()  # 爬虫可自定义close
-            parser.end_callback()  # 调用结束回调函数,可在爬虫自定义
+            parser.end_callback()  # 调用结束回调函数
 
         if not self._keep_alive:
-            # 关闭webdirver
+            # 关闭 webdriver 管理池
             if Request.webdriver_pool:
                 Request.webdriver_pool.close()
 
@@ -544,62 +410,23 @@ class Scheduler(threading.Thread):
         else:
             metrics.flush()
 
-        # 计算抓取时长
-        data = self._redisdb.hget(
-            self._tab_spider_time, SPIDER_START_TIME_KEY, is_pop=True
-        )
-        if data:
-            begin_timestamp = int(data)
-            elapsed_time = tools.get_current_timestamp() - begin_timestamp
-            msg = "《%s》爬虫结束,耗时 %s" % (
-                self._spider_name,
-                tools.format_seconds(elapsed_time),
-            )
-            log.info(msg)
-
-            # self.send_msg(msg)
-
         if self._keep_alive:
             log.info("爬虫不自动结束,等待下一轮任务...")
         else:
-            if self._collector.get_spider_count() <= 1:
-                self.delete_tables(self._tab_spider_time)
-                self.delete_tables(self._tab_spider_status)
-            else:
-                # 清除关闭爬虫的心跳记录,防止删除任务共享表,造成爬虫异常僵死
-                self._collector.delete_spider_node()
-
-    def record_end_time(self):
-        # 记录结束时间
-        if self._batch_interval:
-            current_timestamp = tools.get_current_timestamp()
-            self._redisdb.hset(
-                self._tab_spider_time, SPIDER_END_TIME_KEY, current_timestamp
-            )
+            log.info("《%s》爬虫结束" % (self._spider_name))
 
-    def is_reach_next_spider_time(self): # 如果没有设置爬虫的启动时间,这一块儿不需要管的
-        if not self._batch_interval:
-            return True
-        # 下面是对上次执行完成的时间和当前时间的一个校验,不在规定范围内则不启动爬虫,阻塞等待时间到达后再运行爬虫
-        last_spider_end_time = self._redisdb.hget(
-            self._tab_spider_time, SPIDER_END_TIME_KEY
-        )
-        if last_spider_end_time:
-            last_spider_end_time = int(last_spider_end_time)
-            current_timestamp = tools.get_current_timestamp()
-            time_interval = current_timestamp - last_spider_end_time
-
-            if time_interval < self._batch_interval * 86400:
-                log.info(
-                    "上次运行结束时间为 {} 与当前时间间隔 为 {}, 小于规定的抓取时间间隔 {}。爬虫不执行,退出~".format(
-                        tools.timestamp_to_date(last_spider_end_time),
-                        tools.format_seconds(time_interval),
-                        tools.format_seconds(self._batch_interval * 86400),
-                    )
-                )
-                return False
-
-        return True
+    def __report_node_heartbeat(self, status):
+        """
+        爬虫心跳
+        """
+        message = {
+            'ip': tools.get_localhost_ip(),
+            'spider_id': self._spider_id,
+            'spider_name': self._spider_name,
+            'ts': tools.get_current_timestamp(),
+            'status': status
+        }
+        self._rabbitmq.add(self._tab_spider_heartbeat, message)
 
     def join(self, timeout=None):
         """

+ 14 - 3
FworkSpider/feapder/setting.py

@@ -36,10 +36,20 @@ REDISDB_DB = int(os.getenv("REDISDB_DB", 0))
 # 适用于redis哨兵模式
 REDISDB_SERVICE_NAME = os.getenv("REDISDB_SERVICE_NAME")
 
-# 数据入库的pipeline,可自定义,默认MysqlPipeline
+# rabbitMq
+RABBITMQ_IP_PORT = os.getenv("RABBITMQ_IP_PORT")
+RABBITMQ_USER = os.getenv("RABBITMQ_USER")
+RABBITMQ_USER_PASS = os.getenv("RABBITMQ_USER_PASS")
+RABBITMQ_VIRTUAL_HOST = os.getenv("RABBITMQ_VIRTUAL_HOST", "/")
+RABBITMQ_HEARTBEAT = int(os.getenv("RABBITMQ_HEARTBEAT", 1200))
+RABBITMQ_SOCKET_TIMEOUT = int(os.getenv("RABBITMQ_SOCKET_TIMEOUT", 10))
+RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "spider")
+RABBITMQ_EXCHANGE_TYPE = os.getenv("RABBITMQ_EXCHANGE_TYPE", "direct")
+
+# 数据入库的pipeline,可自定义,默认MongoPipeline
 ITEM_PIPELINES = [
-    "feapder.pipelines.mysql_pipeline.MysqlPipeline",
-    # "feapder.pipelines.mongo_pipeline.MongoPipeline",
+    # "feapder.pipelines.mysql_pipeline.MysqlPipeline",
+    "feapder.pipelines.mongo_pipeline.MongoPipeline",
 ]
 EXPORT_DATA_MAX_FAILED_TIMES = 10  # 导出数据时最大的失败次数,包括保存和更新,超过这个次数报警
 EXPORT_DATA_MAX_RETRY_TIMES = 10  # 导出数据时最大的重试次数,包括保存和更新,超过这个次数则放弃重试
@@ -50,6 +60,7 @@ COLLECTOR_SLEEP_TIME = 1  # 从任务队列中获取任务到内存队列的间
 COLLECTOR_TASK_COUNT = 10  # 每次获取任务数量
 
 # SPIDER
+SPIDER_HEARTBEAT = os.getenv("SPIDER_HEARTBEAT")  # 爬虫心跳
 SPIDER_THREAD_COUNT = 1  # 爬虫并发数
 SPIDER_SLEEP_TIME = (
     0