# -*- coding: utf-8 -*- """ Created on 2023-09-25 --------- @summary: rabbitMq消息队列(基于amqpstorm封装) --------- @author: Dzr """ import time import amqpstorm from amqpstorm.channel import Channel as AmqpStormChannel from amqpstorm.connection import Connection as AmqpStormConnection from amqpstorm.exception import AMQPChannelError, AMQPConnectionError import feapder.setting as setting import feapder.utils.tools as tools from feapder.utils.log import log class RabbitMQMessage: def __init__(self, delivery_tag, body): self.delivery_tag = delivery_tag self.body = body def __str__(self): return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})" class RabbitMQ: __RABBITMQ_ATTRS__ = { 'timeout', 'virtual_host', 'heartbeat', 'ssl', 'ssl_options', 'client_properties', } __cache = {} # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作 def __init__( self, user=None, user_pass=None, ip_port=None, url=None, exchange=None, exchange_type=None, durable=True, **kwargs ): """ @param str user: 用户名 @param str user_pass: 密码 @param ip_port: ip:port @param str url: @param str exchange: 交换机名称 @param str exchange_type: 交换机类型 RabbitMQ支持以下几种exchange_type类型: 1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。 2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。 3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。 4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。 @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态) @param kwargs: 自定义键值参数 """ if ip_port is None: ip_port = setting.RABBITMQ_IP_PORT if user is None: user = setting.RABBITMQ_USER if user_pass is None: user_pass = setting.RABBITMQ_USER_PASS if exchange is None: exchange = setting.RABBITMQ_EXCHANGE if exchange_type is None: exchange_type = setting.RABBITMQ_EXCHANGE_TYPE self.__mq = None self.__channel = None self._url = url self._ip_port = ip_port self._user = user self._user_pass = user_pass self._durable = durable self._exchange = exchange self._exchange_type = exchange_type self._stop_server = False self.mq_kwargs = { 'virtual_host': setting.RABBITMQ_VIRTUAL_HOST, 'heartbeat': setting.RABBITMQ_HEARTBEAT, 'timeout': setting.RABBITMQ_SOCKET_TIMEOUT } for key, val in kwargs.copy().items(): if key in self.__RABBITMQ_ATTRS__: self.mq_kwargs[key] = val # 创建连接 self.get_connect() # 创建信道 self.get_channel() @property def _mq(self) -> AmqpStormConnection: try: if not self.__mq.is_open: raise ConnectionError("unable to connect to RabbitMQ") except: if not self._stop_server: self._reconnect() return self.__mq @_mq.setter def _mq(self, connection: AmqpStormConnection): self.__mq = connection def _reconnect(self): # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连 retry_count = 0 while True: try: retry_count += 1 log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}") if self.get_connect(): log.info(f"RabbitMQ 连接成功") return True except (ConnectionError,) as e: log.error(f"连接失败 e: {e}") time.sleep(1) def get_connect(self, lazy=False): try: if not self._url: if not self._ip_port: raise Exception("未设置 RabbitMQ 连接信息") ip, port = self._ip_port.split(":") node = { "hostname": ip, "port": int(port), **self.mq_kwargs } if self._user and self._user_pass: node['username'] = self._user node['password'] = self._user_pass # 创建连接 self._mq = amqpstorm.Connection(**node, lazy=lazy) else: # 创建连接 self._mq = amqpstorm.UriConnection(self._url, lazy=lazy) except Exception as e: raise return self.__mq.is_open def get_channel(self): try: # 建立信道 self._channel = self._mq.channel() # 队列重新绑定交换机 for binding_key in self.__cache.copy(): if isinstance(binding_key, tuple): queue, exchange, routing_key = binding_key # 清除缓存 self.__cache.pop(queue, None) self.__cache.pop(binding_key, None) # 重新声明绑定 self.declare_bind(queue, exchange, routing_key) except Exception as e: raise return self.__channel.is_open def _re0channel(self): retry_count = 0 while True: try: retry_count += 1 log.error(f"Channel 连接断开, 重新连接 {retry_count}") if self.get_channel(): log.info(f"Channel 连接成功") return True except (ConnectionError,) as e: log.error(f"连接失败 e: {e}") time.sleep(1) @property def _channel(self) -> AmqpStormChannel: try: if not self.__channel.is_open: raise ConnectionError("unable to connect to Channel") except: if not self._stop_server: self._re0channel() return self.__channel @_channel.setter def _channel(self, channel: AmqpStormChannel): self.__channel = channel def add_batch(self, queue, datas, exchange='', routing_key=''): """ 批量发布消息 @param str queue: 队列名称 @param datas: 消息内容 @param str exchange: 交换机名称 @param str routing_key: 路由键 """ data_lst = datas if isinstance(datas, list) else [datas] for data in data_lst: self.add(data, queue, exchange, routing_key) def add(self, data, queue='', exchange='', routing_key=''): """ 发布消息 @param str queue: 队列名称 @param data: 消息内容 @param exchange: 交换机名称 @param routing_key: 路由键 """ if not routing_key and not queue: raise AttributeError('请设置 routing_key or queue') # 不指定交换机发送消息,routing_key 表示消息队列名称 # 指定交换机发送消息,routing_key 表示路由键 if not exchange: routing_key = queue # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值: # delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。 # delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失 properties = dict(delivery_mode=2) body = tools.dumps_obj(data) # 对象序列化 self._channel.basic.publish( body=body, routing_key=routing_key, exchange=exchange, properties=properties ) def add_dlx(self, exchange, routing_key, data): """ 发布延时消息 @param data: 消息内容 @param exchange: 交换机名称 @param routing_key: 路由键 """ queue = routing_key self.add(data, queue, exchange, routing_key) def ack(self, delivery_tag=0, multiple=False): """ 手动回复队列消息确认 @param delivery_tag: 消息标签 @param bool multiple: 开启多个回复消息确认 """ self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple) def declare_exchange(self, exchange, exchange_type=None, auto_delete=False, arguments=None): """声明交换机""" shares = dict( exchange_type=exchange_type or self._exchange_type, auto_delete=auto_delete, arguments=arguments ) try: # 检查交换机是否存在 params = dict(passive=True, **shares) return self._channel.exchange.declare(exchange, **params) except AMQPChannelError as why: if why.error_code == 404: self.get_channel() # 创建一个直连交换机 params = dict(durable=True, **shares) return self._channel.exchange.declare(exchange, **params) else: raise why def declare_queue(self, queue, auto_delete=False, arguments=None): """ 声明队列 @param queue: @param auto_delete: @param arguments: """ shares = dict(auto_delete=auto_delete, arguments=arguments) try: params = dict(passive=True, **shares) return self._channel.queue.declare(queue, **params) except AMQPChannelError as why: if why.error_code == 404: self.get_channel() # 声明持久化队列 params = dict(durable=True, **shares) return self._channel.queue.declare(queue, **params) else: raise why def declare_bind(self, queue='', exchange='', routing_key=''): """ 声明队列和交换机,同时将队列绑定交换机 @param queue: 队列名称 @param exchange: 交换机名称 @param routing_key: 路由键 """ exchange = exchange or self._exchange binding_key = (queue, exchange, routing_key) if queue in self.__cache and binding_key in self.__cache: return self.__cache[queue] self.declare_exchange(exchange) result = self.declare_queue(queue) self.__cache[queue] = result # 队列绑定一个交换机 self._channel.queue.bind(queue, exchange, routing_key) self.__cache[binding_key] = True return result def start_consuming(self, limit=None, to_tuple=False, auto_decode=True): """ @param int limit: 消费数据上限 @param to_tuple: @param auto_decode: """ if not self._channel._consumer_callbacks: raise AMQPChannelError('no consumer callback defined') count = 0 params = dict(break_on_empty=True, auto_decode=auto_decode) for message in self._channel.build_inbound_messages(**params): consumer_tag = message._method.get('consumer_tag') if to_tuple: # noinspection PyCallingNonCallable self._channel._consumer_callbacks[consumer_tag](*message.to_tuple()) continue # noinspection PyCallingNonCallable self._channel._consumer_callbacks[consumer_tag](message) count += 1 if limit is not None and count == limit: break def stop_consuming(self, consumer_tag=None): """ @param str consumer_tag: 消费者标签 """ self._channel.basic.cancel(consumer_tag) def get(self, queue, limit, no_ack=False, to_str=True): """ 获取rabbitmq消息队列中的信道数据 @param str queue: 队列名称 @param int limit: 获取消息数量 @param no_ack: 自动回复消息确认 @param to_str: 消息是否转成字符串 """ message_lst = [] def callback(message): body = tools.loads_obj(message.body) # 反序列化消息对象 delivery_tag = message.delivery_tag if no_ack: self._channel.basic.ack(delivery_tag) delivery_tag = 0 if to_str: message_lst.append(str(RabbitMQMessage(delivery_tag, body))) else: message_lst.append(RabbitMQMessage(delivery_tag, body)) try: # 设置预取上限数量 self._channel.basic.qos(prefetch_count=limit) # 注册消费者并获取消费者标签 consumer_tag = self._channel.basic.consume(callback, queue=queue) # 开始消费 self.start_consuming(limit) # 停止消费并关闭消费者 self.stop_consuming(consumer_tag) except (AMQPChannelError, AMQPConnectionError) as why: log.error(f"{why}") return message_lst def find(self, queue, limit, feature, no_ack=False, to_str=True): """ 通过检索关键词从rabbitmq消息队列中获取指定数据 @param str queue: 队列名称 @param int limit: 获取消息数量 @param str feature: 数据特征 @param no_ack: 自动回复消息确认 @param to_str: 消息是否转成字符串 """ message_lst = [] # 设置预取上限数量 self._channel.basic.qos(prefetch_count=limit) def callback(message): body = message.body # 处理接收到的消息 impl = tools.loads_obj(body) delivery_tag = message.delivery_tag if feature in impl: if no_ack: self._channel.basic.ack(delivery_tag) delivery_tag = 0 if to_str: message_lst.append(str(RabbitMQMessage(delivery_tag, impl))) else: message_lst.append(RabbitMQMessage(delivery_tag, impl)) try: # 注册消费者并获取消费者标签 consumer_tag = self._channel.basic.consume(callback, queue=queue) # 开始消费 self.start_consuming(limit) # 停止消费并关闭消费者 self.stop_consuming(consumer_tag) except (AMQPChannelError, AMQPConnectionError) as why: log.error(f"{why}") return message_lst @property def is_open(self): return self._mq.is_open def get_message_count(self, queue): message = self._channel.queue.declare(queue, passive=True) # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计 return message.get('message_count') def get_mq_obj(self): return self._mq def close(self, n=-1): log.debug(f'关闭 RabbitMQ {n}') if self._channel.is_open: self._channel.close() if self._mq.is_open: self.__mq.close() self._stop_server = True def __getattr__(self, name): return getattr(self._mq, name) def __repr__(self): if self._url: return "".format(self._url) return "".format( self._ip_port, self._user, self._user_pass ) def __enter__(self): return self def __exit__(self, exception_type, exception_value, _): if exception_type: log.warning( 'Closing RabbitMQ to an unhandled exception: %s', exception_value ) if not self._mq.is_open: return self.close()