|
@@ -443,19 +443,23 @@ class RabbitMQ:
|
|
|
return self._mq.is_open
|
|
|
|
|
|
def __get_message_count(self, correlation_id, **kwargs):
|
|
|
+ channel = self._mq.channel() # 启用检查数量的临时信道
|
|
|
recv_count = 0 # 接收消息计数
|
|
|
|
|
|
- # 注册消费者并获取消费者标签
|
|
|
- consumer_tag = self._channel.basic.consume(queue=kwargs["queue"])
|
|
|
- for message in self._channel.build_inbound_messages(break_on_empty=True):
|
|
|
- # 指定应用程序标识,只会在特定的程序中被消费
|
|
|
- if correlation_id == message.correlation_id:
|
|
|
- recv_count += 1
|
|
|
- self.stop_consuming(consumer_tag)
|
|
|
+ try:
|
|
|
+ channel.basic.consume(queue=kwargs["queue"]) # 指定查询的队列
|
|
|
+ for message in channel.build_inbound_messages(break_on_empty=True):
|
|
|
+ # 指定应用程序标识,只会在特定的程序中被消费
|
|
|
+ if correlation_id == message.correlation_id:
|
|
|
+ recv_count += 1
|
|
|
+
|
|
|
+ except amqpstorm.exception.AMQPChannelError as why:
|
|
|
+ log.exception(why)
|
|
|
+ finally:
|
|
|
+ channel.close() # 关闭查询的临时信道
|
|
|
|
|
|
- # 重置消息数量
|
|
|
result = kwargs.copy()
|
|
|
- result["message_count"] = recv_count
|
|
|
+ result["message_count"] = recv_count # 重置消息数量
|
|
|
return result
|
|
|
|
|
|
def get_message_count(self, queue, user=None):
|