|
@@ -30,12 +30,12 @@ class RabbitMQMessage:
|
|
|
|
|
|
class RabbitMQ:
|
|
|
__RABBITMQ_ATTRS__ = {
|
|
|
- 'timeout',
|
|
|
- 'virtual_host',
|
|
|
- 'heartbeat',
|
|
|
- 'ssl',
|
|
|
- 'ssl_options',
|
|
|
- 'client_properties',
|
|
|
+ "timeout",
|
|
|
+ "virtual_host",
|
|
|
+ "heartbeat",
|
|
|
+ "ssl",
|
|
|
+ "ssl_options",
|
|
|
+ "client_properties",
|
|
|
}
|
|
|
|
|
|
__cache = {} # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
|
|
@@ -91,9 +91,9 @@ class RabbitMQ:
|
|
|
self._stop_server = False
|
|
|
|
|
|
self.mq_kwargs = {
|
|
|
- 'virtual_host': setting.RABBITMQ_VIRTUAL_HOST,
|
|
|
- 'heartbeat': setting.RABBITMQ_HEARTBEAT,
|
|
|
- 'timeout': setting.RABBITMQ_SOCKET_TIMEOUT
|
|
|
+ "virtual_host": setting.RABBITMQ_VIRTUAL_HOST,
|
|
|
+ "heartbeat": setting.RABBITMQ_HEARTBEAT,
|
|
|
+ "timeout": setting.RABBITMQ_SOCKET_TIMEOUT
|
|
|
}
|
|
|
for key, val in kwargs.copy().items():
|
|
|
if key in self.__RABBITMQ_ATTRS__:
|
|
@@ -147,8 +147,8 @@ class RabbitMQ:
|
|
|
**self.mq_kwargs
|
|
|
}
|
|
|
if self._user and self._user_pass:
|
|
|
- node['username'] = self._user
|
|
|
- node['password'] = self._user_pass
|
|
|
+ node["username"] = self._user
|
|
|
+ node["password"] = self._user_pass
|
|
|
# 创建连接
|
|
|
self._mq = amqpstorm.Connection(**node, lazy=lazy)
|
|
|
else:
|
|
@@ -355,13 +355,13 @@ class RabbitMQ:
|
|
|
@param use_filter: 使用缓存过滤重复数据
|
|
|
"""
|
|
|
if not self._channel._consumer_callbacks:
|
|
|
- raise AMQPChannelError('no consumer callback defined')
|
|
|
+ raise AMQPChannelError("no consumer callback defined")
|
|
|
|
|
|
cache = [] # 消息队列缓存,去除重复消息
|
|
|
recv_count = 0 # 接收消息计数
|
|
|
params = dict(break_on_empty=True, auto_decode=auto_decode)
|
|
|
for message in self._channel.build_inbound_messages(**params):
|
|
|
- consumer_tag = message._method.get('consumer_tag')
|
|
|
+ consumer_tag = message._method.get("consumer_tag")
|
|
|
if use_filter and message.message_id in cache:
|
|
|
continue
|
|
|
|
|
@@ -446,7 +446,7 @@ class RabbitMQ:
|
|
|
try:
|
|
|
message = self._channel.queue.declare(queue, passive=True)
|
|
|
# message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
|
|
|
- return message.get('message_count')
|
|
|
+ return message.get("message_count")
|
|
|
except amqpstorm.exception.AMQPChannelError:
|
|
|
return 0
|
|
|
|
|
@@ -454,7 +454,7 @@ class RabbitMQ:
|
|
|
return self._mq
|
|
|
|
|
|
def close(self, n=-1):
|
|
|
- log.debug(f'关闭 RabbitMQ {n}')
|
|
|
+ log.debug(f"关闭 RabbitMQ {n}")
|
|
|
if self._channel.is_open:
|
|
|
self._channel.close()
|
|
|
|
|
@@ -480,7 +480,7 @@ class RabbitMQ:
|
|
|
def __exit__(self, exception_type, exception_value, _):
|
|
|
if exception_type:
|
|
|
log.warning(
|
|
|
- 'Closing RabbitMQ to an unhandled exception: %s',
|
|
|
+ "Closing RabbitMQ to an unhandled exception: %s",
|
|
|
exception_value
|
|
|
)
|
|
|
if not self._mq.is_open:
|