|
@@ -103,13 +103,16 @@ class RabbitMQ:
|
|
|
# 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
|
|
|
self.__cache = {}
|
|
|
|
|
|
+ self._stop_server = False
|
|
|
+
|
|
|
@property
|
|
|
def _mq(self) -> AmqpStormConnection:
|
|
|
try:
|
|
|
if not self.__mq.is_open:
|
|
|
raise ConnectionError("unable to connect to RabbitMQ")
|
|
|
except:
|
|
|
- self._reconnect()
|
|
|
+ if not self._stop_server:
|
|
|
+ self._reconnect()
|
|
|
|
|
|
return self.__mq
|
|
|
|
|
@@ -201,7 +204,8 @@ class RabbitMQ:
|
|
|
if not self.__channel.is_open:
|
|
|
raise ConnectionError("unable to connect to Channel")
|
|
|
except:
|
|
|
- self._reconnect_channel()
|
|
|
+ if not self._stop_server:
|
|
|
+ self._reconnect_channel()
|
|
|
|
|
|
return self.__channel
|
|
|
|
|
@@ -324,7 +328,6 @@ class RabbitMQ:
|
|
|
@param no_ack: 自动回复消息确认
|
|
|
"""
|
|
|
message_lst = []
|
|
|
-
|
|
|
# 设置预取上限数量
|
|
|
self._channel.basic.qos(prefetch_count=limit)
|
|
|
|
|
@@ -340,15 +343,22 @@ class RabbitMQ:
|
|
|
|
|
|
message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
|
|
|
|
|
|
- # 注册消费者并获取消费者标签
|
|
|
- consumer_tag = self._channel.basic.consume(callback, queue=queue)
|
|
|
- # 开始消费
|
|
|
- self.start_consuming(limit)
|
|
|
- # 停止消费并关闭消费者
|
|
|
- self.stop_consuming(consumer_tag)
|
|
|
+ try:
|
|
|
+ # 注册消费者并获取消费者标签
|
|
|
+ consumer_tag = self._channel.basic.consume(callback, queue=queue)
|
|
|
+ # 开始消费
|
|
|
+ self.start_consuming(limit)
|
|
|
+ # 停止消费并关闭消费者
|
|
|
+ self.stop_consuming(consumer_tag)
|
|
|
+ except amqpstorm.exception.AMQPChannelError as why:
|
|
|
+ log.error(f"{why}")
|
|
|
|
|
|
return message_lst
|
|
|
|
|
|
+ @property
|
|
|
+ def is_open(self):
|
|
|
+ return self._mq.is_open
|
|
|
+
|
|
|
def get_message_count(self, queue):
|
|
|
message = self._channel.queue.declare(queue, passive=True)
|
|
|
return message.get('message_count')
|
|
@@ -357,8 +367,14 @@ class RabbitMQ:
|
|
|
return self._mq
|
|
|
|
|
|
def close(self):
|
|
|
+ log.debug('关闭 RabbitMQ')
|
|
|
+ if self._channel.is_open:
|
|
|
+ self._channel.close()
|
|
|
+
|
|
|
if self._mq.is_open:
|
|
|
- self._mq.close()
|
|
|
+ self.__mq.close()
|
|
|
+
|
|
|
+ self._stop_server = True
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
return getattr(self._mq, name)
|