123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-09-25
- ---------
- @summary: rabbitMq消息队列(基于amqpstorm封装)
- ---------
- @author: Dzr
- """
- import time
- import amqpstorm
- from amqpstorm.channel import Channel as AmqpStormChannel
- from amqpstorm.connection import Connection as AmqpStormConnection
- from amqpstorm.exception import AMQPChannelError, AMQPConnectionError
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.utils.log import log
- class RabbitMQMessage:
- def __init__(self, delivery_tag, body):
- self.delivery_tag = delivery_tag
- self.body = body
- def __str__(self):
- return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})"
- class RabbitMQ:
- __RABBITMQ_ATTRS__ = {
- "timeout",
- "virtual_host",
- "heartbeat",
- "ssl",
- "ssl_options",
- "client_properties",
- }
- __cache = {} # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
- def __init__(
- self,
- user=None,
- user_pass=None,
- ip_port=None,
- url=None,
- exchange=None,
- exchange_type=None,
- durable=True,
- **kwargs
- ):
- """
- @param str user: 用户名
- @param str user_pass: 密码
- @param ip_port: ip:port
- @param str url:
- @param str exchange: 交换机名称
- @param str exchange_type: 交换机类型
- RabbitMQ支持以下几种exchange_type类型:
- 1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。
- 2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。
- 3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。
- 4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。
- @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态)
- @param kwargs: 自定义键值参数
- """
- if ip_port is None:
- ip_port = setting.RABBITMQ_IP_PORT
- if user is None:
- user = setting.RABBITMQ_USER
- if user_pass is None:
- user_pass = setting.RABBITMQ_USER_PASS
- if exchange is None:
- exchange = setting.RABBITMQ_EXCHANGE
- if exchange_type is None:
- exchange_type = setting.RABBITMQ_EXCHANGE_TYPE
- self.__mq = None
- self.__channel = None
- self._url = url
- self._ip_port = ip_port
- self._user = user
- self._user_pass = user_pass
- self._durable = durable
- self._exchange = exchange
- self._exchange_type = exchange_type
- self._stop_server = False
- self.mq_kwargs = {
- "virtual_host": setting.RABBITMQ_VIRTUAL_HOST,
- "heartbeat": setting.RABBITMQ_HEARTBEAT,
- "timeout": setting.RABBITMQ_SOCKET_TIMEOUT
- }
- for key, val in kwargs.copy().items():
- if key in self.__RABBITMQ_ATTRS__:
- self.mq_kwargs[key] = val
- # 创建连接
- self.get_connect()
- # 创建信道
- self.get_channel()
- @property
- def _mq(self) -> AmqpStormConnection:
- try:
- if not self.__mq.is_open:
- raise ConnectionError("unable to connect to RabbitMQ")
- except:
- if not self._stop_server:
- self._reconnect()
- return self.__mq
- @_mq.setter
- def _mq(self, connection: AmqpStormConnection):
- self.__mq = connection
- def _reconnect(self):
- # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连
- retry_count = 0
- while True:
- try:
- retry_count += 1
- log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}")
- if self.get_connect():
- log.info(f"RabbitMQ 连接成功")
- return True
- except (ConnectionError,) as e:
- log.error(f"连接失败 e: {e}")
- time.sleep(1)
- def get_connect(self, lazy=False):
- try:
- if not self._url:
- if not self._ip_port:
- raise Exception("未设置 RabbitMQ 连接信息")
- ip, port = self._ip_port.split(":")
- node = {
- "hostname": ip,
- "port": int(port),
- **self.mq_kwargs
- }
- if self._user and self._user_pass:
- node["username"] = self._user
- node["password"] = self._user_pass
- # 创建连接
- self._mq = amqpstorm.Connection(**node, lazy=lazy)
- else:
- # 创建连接
- self._mq = amqpstorm.UriConnection(self._url, lazy=lazy)
- except Exception as e:
- raise
- return self.__mq.is_open
- def get_channel(self):
- try:
- # 建立信道
- self._channel = self._mq.channel()
- # 队列重新绑定交换机
- for binding_key in self.__cache.copy():
- if isinstance(binding_key, tuple):
- queue, exchange, routing_key = binding_key
- # 清除缓存
- self.__cache.pop(queue, None)
- self.__cache.pop(binding_key, None)
- # 重新声明绑定
- self.declare_bind(queue, exchange, routing_key)
- except Exception as e:
- raise
- return self.__channel.is_open
- def _re0channel(self):
- retry_count = 0
- while True:
- try:
- retry_count += 1
- log.error(f"Channel 连接断开, 重新连接 {retry_count}")
- if self.get_channel():
- log.info(f"Channel 连接成功")
- return True
- except (ConnectionError,) as e:
- log.error(f"连接失败 e: {e}")
- time.sleep(1)
- @property
- def _channel(self) -> AmqpStormChannel:
- try:
- if not self.__channel.is_open:
- raise ConnectionError("unable to connect to Channel")
- except:
- if not self._stop_server:
- self._re0channel()
- return self.__channel
- @_channel.setter
- def _channel(self, channel: AmqpStormChannel):
- self.__channel = channel
- def add_batch(self, queue, datas, exchange="", routing_key="", **kwargs):
- """
- 批量发布消息
- @param str queue: 队列名称
- @param datas: 消息内容
- @param exchange: 交换机名称
- @param routing_key: 路由键
- """
- data_lst = datas if isinstance(datas, list) else [datas]
- for data in data_lst:
- self.add(data, queue, exchange, routing_key, **kwargs)
- def add(self, data, queue="", exchange="", routing_key="", properties=None):
- """
- 发布消息
- @param str queue: 队列名称
- @param data: 消息内容
- @param exchange: 交换机名称
- @param routing_key: 路由键
- @param properties: 消息属性
- """
- if not routing_key and not queue:
- raise AttributeError("请设置 routing_key or queue")
- # 不指定交换机发送消息,routing_key 表示消息队列名称
- # 指定交换机发送消息,routing_key 表示路由键
- if not exchange:
- routing_key = queue
- message_id = tools.get_uuid().replace("-", "")
- if isinstance(data, dict):
- message_id = data.get("pyuuid") or message_id
- # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
- # delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
- # delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
- properties = properties or {}
- properties = dict(delivery_mode=2, **properties)
- if "message_id" not in properties:
- properties["message_id"] = message_id
- self._channel.basic.publish(
- body=tools.dumps_obj(data), # 对象序列化
- routing_key=routing_key,
- exchange=exchange,
- properties=properties # specification.Basic.Properties
- )
- def add_dlx(self, exchange, routing_key, data, properties=None):
- """
- 发布延时消息
- @param exchange: 交换机名称
- @param routing_key: 路由键
- @param data: 消息内容
- @param properties: 消息属性
- """
- queue = routing_key
- self.add(data, queue, exchange, routing_key, properties)
- def ack(self, delivery_tag=0, multiple=False):
- """
- 手动回复队列消息确认
- @param delivery_tag: 消息标签
- @param bool multiple: 开启多个回复消息确认
- """
- self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
- def declare_exchange(self, exchange, exchange_type=None, auto_delete=False, arguments=None):
- """声明交换机"""
- shares = dict(
- exchange_type=exchange_type or self._exchange_type,
- auto_delete=auto_delete,
- arguments=arguments
- )
- try:
- # 检查交换机是否存在
- params = dict(passive=True, **shares)
- return self._channel.exchange.declare(exchange, **params)
- except AMQPChannelError as why:
- if why.error_code == 404:
- self.get_channel()
- # 创建一个直连交换机
- params = dict(durable=True, **shares)
- return self._channel.exchange.declare(exchange, **params)
- else:
- raise why
- def declare_queue(self, queue, auto_delete=False, arguments=None):
- """
- 声明队列
- @param queue:
- @param auto_delete:
- @param arguments:
- """
- shares = dict(auto_delete=auto_delete, arguments=arguments)
- try:
- params = dict(passive=True, **shares)
- return self._channel.queue.declare(queue, **params)
- except AMQPChannelError as why:
- if why.error_code == 404:
- self.get_channel()
- # 声明持久化队列
- params = dict(durable=True, **shares)
- return self._channel.queue.declare(queue, **params)
- else:
- raise why
- def declare_bind(self, queue="", exchange="", routing_key=""):
- """
- 声明队列和交换机,同时将队列绑定交换机
- @param queue: 队列名称
- @param exchange: 交换机名称
- @param routing_key: 路由键
- """
- exchange = exchange or self._exchange
- binding_key = (queue, exchange, routing_key)
- if queue in self.__cache and binding_key in self.__cache:
- return self.__cache[queue]
- self.declare_exchange(exchange)
- result = self.declare_queue(queue)
- self.__cache[queue] = result
- # 队列绑定一个交换机
- self._channel.queue.bind(queue, exchange, routing_key)
- self.__cache[binding_key] = True
- return result
- def start_consuming(
- self,
- limit=None,
- to_tuple=False,
- auto_decode=True,
- correlation_id="",
- use_filter=False
- ):
- """
- @param int limit: 消费数据上限
- @param to_tuple: 消息结果返回元组形式
- @param auto_decode: 自动解码
- @param correlation_id: 应用程序关联标识符
- @param use_filter: 使用缓存过滤重复数据
- """
- if not self._channel._consumer_callbacks:
- raise AMQPChannelError("no consumer callback defined")
- cache = [] # 消息队列缓存,去除重复消息
- recv_count = 0 # 接收消息计数
- params = dict(break_on_empty=True, auto_decode=auto_decode)
- for message in self._channel.build_inbound_messages(**params):
- consumer_tag = message._method.get("consumer_tag")
- if use_filter and message.message_id in cache:
- continue
- cache.append(message.message_id) # 缓存消息id
- # 指定应用程序标识,只会在特定的程序中被消费
- if correlation_id:
- if correlation_id == message.correlation_id:
- self._channel._consumer_callbacks[consumer_tag](message)
- recv_count += 1
- if limit is not None and recv_count == limit:
- break
- else:
- continue
- recv_count += 1
- if to_tuple:
- self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
- if limit is not None and recv_count == limit:
- break
- else:
- continue
- self._channel._consumer_callbacks[consumer_tag](message)
- if limit is not None and recv_count == limit:
- break
- def stop_consuming(self, consumer_tag=None):
- """
- @param str consumer_tag: 消费者标签
- """
- self._channel.basic.cancel(consumer_tag)
- def get(self, queue, limit, auto_ack=False, to_str=True, **kwargs):
- """
- 获取rabbitmq消息队列中的信道数据
- @param str queue: 队列名称
- @param int limit: 获取消息数量
- @param auto_ack: 自动回复消息确认
- @param to_str: 消息是否转成字符串
- @param to_str: 消息是否转成字符串
- """
- message_lst = []
- if "use_filter" not in kwargs:
- kwargs["use_filter"] = True
- def callback(message):
- body = tools.loads_obj(message.body) # 反序列化消息对象
- delivery_tag = message.delivery_tag
- if auto_ack:
- self.ack(delivery_tag)
- delivery_tag = 0
- if to_str:
- message_lst.append(str(RabbitMQMessage(delivery_tag, body)))
- else:
- message_lst.append(RabbitMQMessage(delivery_tag, body))
- try:
- # 设置预取上限数量
- self._channel.basic.qos(prefetch_count=limit)
- # 注册消费者并获取消费者标签
- consumer_tag = self._channel.basic.consume(callback, queue=queue)
- # 开始消费
- self.start_consuming(limit, **kwargs)
- # 停止消费并关闭消费者
- self.stop_consuming(consumer_tag)
- except (AMQPChannelError, AMQPConnectionError) as why:
- log.exception(why)
- return message_lst
- @property
- def is_open(self):
- return self._mq.is_open
- def __get_message_count(self, correlation_id, **kwargs):
- channel = self._mq.channel() # 启用检查数量的临时信道
- recv_count = 0 # 接收消息计数
- try:
- channel.basic.consume(queue=kwargs["queue"]) # 指定查询的队列
- for message in channel.build_inbound_messages(break_on_empty=True):
- # 指定应用程序标识,只会在特定的程序中被消费
- if correlation_id == message.correlation_id:
- recv_count += 1
- except amqpstorm.exception.AMQPChannelError as why:
- log.exception(why)
- finally:
- channel.close() # 关闭查询的临时信道
- result = kwargs.copy()
- result["message_count"] = recv_count # 重置消息数量
- return result
- def get_message_count(self, queue, user=None):
- try:
- message = self._channel.queue.declare(queue, passive=True)
- if user is not None:
- message = self.__get_message_count(user, **message)
- # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
- return message.get("message_count")
- except amqpstorm.exception.AMQPChannelError:
- return 0
- def get_mq_obj(self):
- return self._mq
- def close(self, n=-1):
- log.debug(f"关闭 RabbitMQ {n}")
- if self._channel.is_open:
- self._channel.close()
- if self._mq.is_open:
- self.__mq.close()
- self._stop_server = True
- def __getattr__(self, name):
- return getattr(self._mq, name)
- def __repr__(self):
- if self._url:
- return "<RabbitMQ url:{}>".format(self._url)
- return "<RabbitMQ ip_port: {} username:{} password:{}>".format(
- self._ip_port, self._user, self._user_pass
- )
- def __enter__(self):
- return self
- def __exit__(self, exception_type, exception_value, _):
- if exception_type:
- log.warning(
- "Closing RabbitMQ to an unhandled exception: %s",
- exception_value
- )
- if not self._mq.is_open:
- return
- self.close()
|