Browse Source

更新事项:1、删除ack时指定队列,检查队列的逻辑;2、更新信道创建方式;3、添加队列声明检查;4、添加交换机队列声明检查;5、优化断线重连流程

dongzhaorui 1 year ago
parent
commit
2c869fc7d7
1 changed files with 65 additions and 54 deletions
  1. 65 54
      FworkSpider/feapder/db/rabbitMq.py

+ 65 - 54
FworkSpider/feapder/db/rabbitMq.py

@@ -38,6 +38,8 @@ class RabbitMQ:
         'client_properties',
     }
 
+    __cache = {}  # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
+
     def __init__(
         self,
         user=None,
@@ -86,6 +88,7 @@ class RabbitMQ:
         self._durable = durable
         self._exchange = exchange
         self._exchange_type = exchange_type
+        self._stop_server = False
 
         self.mq_kwargs = {
             'virtual_host': setting.RABBITMQ_VIRTUAL_HOST,
@@ -100,10 +103,6 @@ class RabbitMQ:
         self.get_connect()
         # 创建信道
         self.get_channel()
-        # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
-        self.__cache = {}
-
-        self._stop_server = False
 
     @property
     def _mq(self) -> AmqpStormConnection:
@@ -164,18 +163,21 @@ class RabbitMQ:
         try:
             # 建立信道
             self._channel = self._mq.channel()
-            # 声明交换机
-            self._channel.exchange.declare(
-                exchange=self._exchange,
-                exchange_type=self._exchange_type,
-                durable=self._durable
-            )
+            # 队列重新绑定交换机
+            for binding_key in self.__cache.copy():
+                if isinstance(binding_key, tuple):
+                    queue, exchange, routing_key = binding_key
+                    # 清除缓存
+                    del self.__cache[queue]
+                    del self.__cache[binding_key]
+                    # 重新声明
+                    self.declare(queue, exchange, routing_key)
         except Exception as e:
             raise
 
         return self.__channel.is_open
 
-    def _reconnect_channel(self):
+    def _re0channel(self):
         retry_count = 0
         while True:
             try:
@@ -183,15 +185,6 @@ class RabbitMQ:
                 log.error(f"Channel 连接断开, 重新连接 {retry_count}")
                 if self.get_channel():
                     log.info(f"Channel 连接成功")
-                    # 队列重新绑定交换机
-                    for binding_key in self.__cache.copy():
-                        if isinstance(binding_key, tuple):
-                            queue, exchange, routing_key = binding_key
-                            # 清除缓存
-                            del self.__cache[queue]
-                            del self.__cache[binding_key]
-                            # 重新声明
-                            self.declare(queue, exchange, routing_key)
                     return True
             except (ConnectionError,) as e:
                 log.error(f"连接失败 e: {e}")
@@ -205,7 +198,7 @@ class RabbitMQ:
                 raise ConnectionError("unable to connect to Channel")
         except:
             if not self._stop_server:
-                self._reconnect_channel()
+                self._re0channel()
 
         return self.__channel
 
@@ -236,59 +229,77 @@ class RabbitMQ:
                 properties=properties
             )
 
-    def ack(self, queue, delivery_tag=0, multiple=False):
+    def ack(self, delivery_tag=0, multiple=False):
         """
         手动回复队列消息确认
 
-        @param str queue: 队列名称
-        @param int delivery_tag: 消息标签
+        @param delivery_tag: 消息标签
         @param bool multiple: 开启多个回复消息确认
         """
-        self._channel.queue.declare(queue=queue, passive=self._durable)
         self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
 
-    def _bind_exchange(self, queue, exchange, routing_key):
-        """
-        将队列绑定到指定的交换机
-
-        @param queue: 队列名称
-        @param exchange: 交换机名称
-        @param routing_key: 路由键
-        """
-        binding_key = (queue, exchange, routing_key)
-        if binding_key in self.__cache:
-            return
-        else:
-            # 队列绑定一个交换机
-            self._channel.queue.bind(queue, exchange, routing_key)
-            self.__cache[binding_key] = True
+    def declare_exchange(self, exchange, exchange_type=None, auto_delete=False,
+                         arguments=None):
+        """声明交换机"""
+        shares = dict(
+            exchange_type=exchange_type or self._exchange_type,
+            auto_delete=auto_delete,
+            arguments=arguments
+        )
+        try:
+            # 检查交换机是否存在
+            params = dict(passive=True, **shares)
+            return self._channel.exchange.declare(exchange, **params)
+        except AMQPChannelError as why:
+            if why.error_code == 404:
+                self.get_channel()
+                # 创建一个直连交换机
+                params = dict(durable=True, **shares)
+                return self._channel.exchange.declare(exchange, **params)
+            else:
+                raise why
 
-    def _declare_queue(self, queue):
+    def declare_queue(self, queue, auto_delete=False, arguments=None):
         """
-        声明持久化rabbitmq队列
+        声明队列
 
-        @param queue: 队列名称
+        @param queue:
+        @param auto_delete:
+        @param arguments:
         """
-        if queue in self.__cache:
-            return self.__cache[queue]
-        else:
-            # 声明队列
-            result = self._channel.queue.declare(queue, durable=True)
-            self.__cache[queue] = result
-            return result
+        shares = dict(auto_delete=auto_delete, arguments=arguments)
+        try:
+            params = dict(passive=True, **shares)
+            return self._channel.queue.declare(queue, **params)
+        except AMQPChannelError as why:
+            if why.error_code == 404:
+                self.get_channel()
+                # 声明持久化队列
+                params = dict(durable=True, **shares)
+                return self._channel.queue.declare(queue, **params)
+            else:
+                raise why
 
     def declare(self, queue, exchange=None, routing_key=None):
         """
-        声明rabbitmq队列并与一个交换机进行绑定
+        声明队列、交换机并将队列绑定到交换机
 
         @param queue: 队列名称
         @param exchange: 交换机名称
         @param routing_key: 路由键
         """
-        self._declare_queue(queue)
-
         routing_key = routing_key or queue
-        self._bind_exchange(queue, exchange or self._exchange, routing_key)
+        binding_key = (queue, exchange or self._exchange, routing_key)
+        if queue in self.__cache and binding_key in self.__cache:
+            return self.__cache[queue]
+
+        self.declare_exchange(exchange or self._exchange)
+        result = self.declare_queue(queue)
+        self.__cache[queue] = result
+        # 队列绑定一个交换机
+        self._channel.queue.bind(queue, exchange or self._exchange, routing_key)
+        self.__cache[binding_key] = True
+        return result
 
     def start_consuming(self, limit=None, to_tuple=False, auto_decode=True):
         """