Przeglądaj źródła

修正rabbitmq不合理的关闭导致队列数据消失问题

dongzhaorui 1 rok temu
rodzic
commit
a647a6e9bb

+ 2 - 1
FworkSpider/feapder/buffer/item_buffer.py

@@ -83,7 +83,6 @@ class ItemBuffer(threading.Thread):
         self.close()
 
     def stop(self):
-        self._rabbitmq.close()
         self._thread_stop = True
         self._started.clear()
 
@@ -386,3 +385,5 @@ class ItemBuffer(threading.Thread):
                 pipeline.close()
             except:
                 pass
+        # 关闭rabbitmq
+        self._rabbitmq.close(1)

+ 6 - 1
FworkSpider/feapder/buffer/request_buffer.py

@@ -56,8 +56,9 @@ class RequestBuffer(threading.Thread):
 
             tools.delay_time(1)
 
+        self.close()
+
     def stop(self):
-        self._rabbitmq.close()
         self._thread_stop = True
         self._started.clear()
 
@@ -146,3 +147,7 @@ class RequestBuffer(threading.Thread):
                 log.exception(e)
 
         self._is_adding_to_db = False
+
+    def close(self):
+        # 关闭rabbitmq
+        self._rabbitmq.close(2)

+ 7 - 2
FworkSpider/feapder/core/collector.py

@@ -45,7 +45,7 @@ class Collector(threading.Thread):
         self._request_count = setting.COLLECTOR_TASK_COUNT
         self._is_collector_task = False
 
-    def run(self):  # step 线程入口
+    def run(self):
         self._thread_stop = False
         while not self._thread_stop:
             try:
@@ -57,8 +57,9 @@ class Collector(threading.Thread):
 
             time.sleep(self._interval)
 
+        self.close()
+
     def stop(self):
-        self._rabbitmq.close()
         self._thread_stop = True
         self._started.clear()
 
@@ -122,3 +123,7 @@ class Collector(threading.Thread):
 
     def is_collector_task(self):
         return self._is_collector_task
+
+    def close(self):
+        # 关闭rabbitmq
+        self._rabbitmq.close(3)

+ 73 - 67
FworkSpider/feapder/core/scheduler.py

@@ -143,15 +143,60 @@ class Scheduler(threading.Thread):
         else:
             raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
 
+    def _start(self):
+        # 将失败的item入库
+        if setting.RETRY_FAILED_ITEMS:
+            handle_failed_items = HandleFailedItems(
+                redis_key=self._redis_key,
+                item_buffer=self._item_buffer,
+                rabbitmq=self._rabbitmq,
+            )
+            handle_failed_items.reput_failed_items_to_db()
+
+        # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
+        self._request_buffer.start()
+        # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
+        self._item_buffer.start()
+        # STEP 3.3 开启 collector  -- 任务管理 分发任务
+        self._collector.start()
+
+        # 启动parser control
+        for i in range(self._thread_count):
+            # STEP 3.4 创建执行任务线程池
+            parser_control = self._parser_control_obj(
+                self._collector,
+                self._redis_key,
+                self._request_buffer,
+                self._item_buffer,
+            )
+
+            for parser in self._parsers:  # step 3.5 把所有待执行任务添加到线程池
+                parser_control.add_parser(parser)
+
+            parser_control.start()  # STEP 3.6 开启采集线程
+            self._parser_controls.append(parser_control)
+
+        # STEP 3.7下发任务 有消费线程之后开始读取任务
+        if setting.RETRY_FAILED_REQUESTS:
+            # 重设失败的任务
+            handle_failed_requests = HandleFailedRequests(
+                redis_key=self._redis_key,
+                rabbitmq=self._rabbitmq
+            )
+            handle_failed_requests.reput_failed_requests_to_requests()
+
+        # STEP 3.8下发新任务 ,生产新任务
+        if self._auto_start_requests:
+            self.__add_task()
+
     def run(self):
         self._start()
 
         while True:
-            self.__report_node_heartbeat('running')
             try:
                 if self.all_thread_is_done():
                     if not self._is_notify_end:
-                        self.spider_end()  # 爬虫运行结束
+                        self.spider_end()  # 爬虫结束
                         self._is_notify_end = True
 
                     if not self._keep_alive:  # 如果不是常驻爬虫 关闭所有线程
@@ -162,13 +207,13 @@ class Scheduler(threading.Thread):
                     self._is_notify_end = False
 
                 self.check_task_status()
-            except Exception as e:
+            except (Exception, BaseException) as e:
                 log.exception(e)
 
             tools.delay_time(1)
 
     def __add_task(self):
-        self.spider_begin()  # 启动爬虫 start_requests
+        self.spider_begin()  # 启动爬虫 -- start_requests
 
         # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
         todo_task_count = self._collector.get_requests_count()
@@ -207,52 +252,6 @@ class Scheduler(threading.Thread):
                 self._request_buffer.flush()
                 self._item_buffer.flush()
 
-    def _start(self):
-        # 将失败的item入库
-        if setting.RETRY_FAILED_ITEMS:
-            handle_failed_items = HandleFailedItems(
-                redis_key=self._redis_key,
-                item_buffer=self._item_buffer,
-                rabbitmq=self._rabbitmq,
-            )
-            handle_failed_items.reput_failed_items_to_db()
-
-        # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
-        self._request_buffer.start()
-        # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
-        self._item_buffer.start()
-        # STEP 3.3 开启 collector  -- 任务管理 分发任务
-        self._collector.start()
-
-        # 启动parser control
-        for i in range(self._thread_count):
-            # STEP 3.4 创建执行任务线程池
-            parser_control = self._parser_control_obj(
-                self._collector,
-                self._redis_key,
-                self._request_buffer,
-                self._item_buffer,
-            )
-
-            for parser in self._parsers:  # step 3.5 把所有待执行任务添加到线程池
-                parser_control.add_parser(parser)
-
-            parser_control.start()  # STEP 3.6 开启采集线程
-            self._parser_controls.append(parser_control)
-
-        # STEP 3.7下发任务 有消费线程之后开始读取任务
-        if setting.RETRY_FAILED_REQUESTS:
-            # 重设失败的任务
-            handle_failed_requests = HandleFailedRequests(
-                redis_key=self._redis_key,
-                rabbitmq=self._rabbitmq
-            )
-            handle_failed_requests.reput_failed_requests_to_requests()
-
-        # STEP 3.8下发新任务 ,生产新任务
-        if self._auto_start_requests:
-            self.__add_task()
-
     def all_thread_is_done(self):
         # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
         for i in range(3):
@@ -298,18 +297,18 @@ class Scheduler(threading.Thread):
         else:
             return
 
-        # 检查失败任务数量 超过1000 报警
+        # 检查失败任务数量 超过1000 报警
         failed_count = self._request_buffer.get_failed_requests_count()
         log.debug(f'《{self._spider_name}》爬虫失败任务数量:{failed_count}')
         if failed_count > setting.WARNING_FAILED_COUNT:
             # 发送报警
             msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
             log.error(msg)
-            tools.send_msg(**dict(
-                msg=msg,
+            self.send_msg(
+                msg,
                 level="error",
                 message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
-            ))
+            )
 
         # parser_control 实时统计已做任务数及失败任务数,若成功率<0.5 则报警
         failed_task_count, success_task_count = PaserControl.get_task_status_count()
@@ -325,11 +324,11 @@ class Scheduler(threading.Thread):
                     task_success_rate,
                 )
                 log.error(msg)
-                tools.send_msg(**dict(
-                    msg=msg,
+                self.send_msg(
+                    msg,
                     level="error",
                     message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
-                ))
+                )
 
         # 检查入库失败次数
         if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
@@ -337,11 +336,11 @@ class Scheduler(threading.Thread):
                 self._spider_name, self._item_buffer.export_falied_times
             )
             log.error(msg)
-            tools.send_msg(**dict(
-                msg=msg,
+            self.send_msg(
+                msg,
                 level="error",
                 message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
-            ))
+            )
 
     def _stop_all_thread(self):
         # 关闭任务管理器
@@ -354,12 +353,14 @@ class Scheduler(threading.Thread):
         for parser_control in self._parser_controls:
             parser_control.stop()
 
-        # 关闭rabbitmq
-        self._rabbitmq.close()
         # 记录爬虫停止时间
-        self.__report_node_heartbeat('close')
+        self.report_node_heartbeat('close')
         self._started.clear()
 
+    def send_msg(self, msg, level="debug", message_prefix=""):
+        # log.debug("发送报警 level:{} msg{}".format(level, msg))
+        tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
+
     def get_argvs(self):
         argvs = {"next_page": False, "max_page": 10}
         for item in sys.argv[1:]:
@@ -388,11 +389,11 @@ class Scheduler(threading.Thread):
             parser.start_callback()
 
         # 记录爬虫开始时间
-        self.__report_node_heartbeat('start')
+        self.report_node_heartbeat('start')
 
     def spider_end(self):
         # 爬虫结束时间
-        self.__report_node_heartbeat('end')
+        self.report_node_heartbeat('end')
 
         if self._end_callback:  # 任务结束回调
             self._end_callback()
@@ -400,6 +401,7 @@ class Scheduler(threading.Thread):
         for parser in self._parsers:
             if not self._keep_alive:
                 parser.close()  # 爬虫可自定义close
+
             parser.end_callback()  # 调用结束回调函数
 
         if not self._keep_alive:
@@ -415,9 +417,9 @@ class Scheduler(threading.Thread):
         if self._keep_alive:
             log.info("爬虫不自动结束,等待下一轮任务...")
         else:
-            log.info("《%s》爬虫结束" % (self._spider_name))
+            log.info("《%s》爬虫结束" % (self._spider_name,))
 
-    def __report_node_heartbeat(self, status):
+    def report_node_heartbeat(self, status):
         """
         爬虫心跳
         """
@@ -438,3 +440,7 @@ class Scheduler(threading.Thread):
             return
 
         super().join()
+
+    def stop(self):
+        # 关闭rabbitmq
+        self._rabbitmq.close(4)

+ 6 - 4
FworkSpider/feapder/core/spiders/spider.py

@@ -68,8 +68,8 @@ class Spider(
         self._is_distributed_task = False
         self._is_show_not_task = False
 
-    def run(self):
-        if not self._parsers:  # 不是add_parser 模式
+    def run(self):  # 调度控制流程
+        if not self._parsers:
             self._parsers.append(self)
 
         self._start()
@@ -84,16 +84,18 @@ class Spider(
                     if not self._keep_alive:
                         self._stop_all_thread()
                         break
-
                 else:
                     self._is_notify_end = False
+                    self.report_node_heartbeat('running')
 
                 self.check_task_status()
-            except Exception as e:
+            except (Exception, BaseException) as e:
                 log.exception(e)
 
             tools.delay_time(1)  # 1秒钟检查一次爬虫状态
 
+        self.stop()
+
     @classmethod
     def to_DebugSpider(cls, *args, **kwargs):
         # DebugSpider 继承 cls

+ 4 - 4
FworkSpider/feapder/db/rabbitMq.py

@@ -11,7 +11,7 @@ import time
 import amqpstorm
 from amqpstorm.channel import Channel as AmqpStormChannel
 from amqpstorm.connection import Connection as AmqpStormConnection
-from amqpstorm.exception import AMQPChannelError
+from amqpstorm.exception import AMQPChannelError, AMQPConnectionError
 
 import feapder.setting as setting
 import feapder.utils.tools as tools
@@ -350,7 +350,7 @@ class RabbitMQ:
             self.start_consuming(limit)
             # 停止消费并关闭消费者
             self.stop_consuming(consumer_tag)
-        except amqpstorm.exception.AMQPChannelError as why:
+        except (AMQPChannelError, AMQPConnectionError) as why:
             log.error(f"{why}")
 
         return message_lst
@@ -366,8 +366,8 @@ class RabbitMQ:
     def get_mq_obj(self):
         return self._mq
 
-    def close(self):
-        log.debug('关闭 RabbitMQ')
+    def close(self, n=-1):
+        log.debug(f'关闭 RabbitMQ {n}')
         if self._channel.is_open:
             self._channel.close()