浏览代码

添加RabbitMQ消息队列(基于amqpstorm封装)

dongzhaorui 1 年之前
父节点
当前提交
bc1d103716
共有 1 个文件被更改,包括 385 次插入0 次删除
  1. 385 0
      FworkSpider/feapder/db/rabbitMq.py

+ 385 - 0
FworkSpider/feapder/db/rabbitMq.py

@@ -0,0 +1,385 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-09-25
+---------
+@summary:  rabbitMq消息队列(基于amqpstorm封装)
+---------
+@author: Dzr
+"""
+import time
+
+import amqpstorm
+from amqpstorm.channel import Channel as AmqpStormChannel
+from amqpstorm.connection import Connection as AmqpStormConnection
+from amqpstorm.exception import AMQPChannelError
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.utils.log import log
+
+
+class RabbitMQMessage:
+
+    def __init__(self, delivery_tag, body):
+        self.delivery_tag = delivery_tag
+        self.body = body
+
+    def __str__(self):
+        return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})"
+
+
+class RabbitMQ:
+    __RABBITMQ_ATTRS__ = {
+        'timeout',
+        'virtual_host',
+        'heartbeat',
+        'ssl',
+        'ssl_options',
+        'client_properties',
+    }
+
+    def __init__(
+        self,
+        user=None,
+        user_pass=None,
+        ip_port=None,
+        url=None,
+        exchange=None,
+        exchange_type=None,
+        durable=True,
+        **kwargs
+    ):
+
+        """
+        @param str user: 用户名
+        @param str user_pass: 密码
+        @param ip_port: ip:port
+        @param str url:
+        @param str exchange: 交换机名称
+        @param str exchange_type: 交换机类型
+            RabbitMQ支持以下几种exchange_type类型:
+                1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。
+                2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。
+                3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。
+                4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。
+        @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态)
+        @param kwargs: 自定义键值参数
+        """
+
+        if ip_port is None:
+            ip_port = setting.RABBITMQ_IP_PORT
+        if user is None:
+            user = setting.RABBITMQ_USER
+        if user_pass is None:
+            user_pass = setting.RABBITMQ_USER_PASS
+        if exchange is None:
+            exchange = setting.RABBITMQ_EXCHANGE
+        if exchange_type is None:
+            exchange_type = setting.RABBITMQ_EXCHANGE_TYPE
+
+        self.__mq = None
+        self.__channel = None
+        self._url = url
+        self._ip_port = ip_port
+        self._user = user
+        self._user_pass = user_pass
+        self._durable = durable
+        self._exchange = exchange
+        self._exchange_type = exchange_type
+
+        self.mq_kwargs = {
+            'virtual_host': setting.RABBITMQ_VIRTUAL_HOST,
+            'heartbeat': setting.RABBITMQ_HEARTBEAT,
+            'timeout': setting.RABBITMQ_SOCKET_TIMEOUT
+        }
+        for key, val in kwargs.copy().items():
+            if key in self.__RABBITMQ_ATTRS__:
+                self.mq_kwargs[key] = val
+
+        # 创建连接
+        self.get_connect()
+        # 创建信道
+        self.get_channel()
+        # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
+        self.__cache = {}
+
+    @property
+    def _mq(self) -> AmqpStormConnection:
+        try:
+            if not self.__mq.is_open:
+                raise ConnectionError("unable to connect to RabbitMQ")
+        except:
+            self._reconnect()
+
+        return self.__mq
+
+    @_mq.setter
+    def _mq(self, connection: AmqpStormConnection):
+        self.__mq = connection
+
+    def _reconnect(self):
+        # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连
+        retry_count = 0
+        while True:
+            try:
+                retry_count += 1
+                log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}")
+                if self.get_connect():
+                    log.info(f"RabbitMQ 连接成功")
+                    return True
+            except (ConnectionError,) as e:
+                log.error(f"连接失败 e: {e}")
+
+            time.sleep(1)
+
+    def get_connect(self, lazy=False):
+        try:
+            if not self._url:
+                if not self._ip_port:
+                    raise Exception("未设置 RabbitMQ 连接信息")
+
+                ip, port = self._ip_port.split(":")
+                node = {
+                    "hostname": ip,
+                    "port": int(port),
+                    **self.mq_kwargs
+                }
+                if self._user and self._user_pass:
+                    node['username'] = self._user
+                    node['password'] = self._user_pass
+                # 创建连接
+                self._mq = amqpstorm.Connection(**node, lazy=lazy)
+            else:
+                # 创建连接
+                self._mq = amqpstorm.UriConnection(self._url, lazy=lazy)
+        except Exception as e:
+            raise
+
+        return self.__mq.is_open
+
+    def get_channel(self):
+        try:
+            # 建立信道
+            self._channel = self._mq.channel()
+            # 声明交换机
+            self._channel.exchange.declare(
+                exchange=self._exchange,
+                exchange_type=self._exchange_type,
+                durable=self._durable
+            )
+        except Exception as e:
+            raise
+
+        return self.__channel.is_open
+
+    def _reconnect_channel(self):
+        retry_count = 0
+        while True:
+            try:
+                retry_count += 1
+                log.error(f"Channel 连接断开, 重新连接 {retry_count}")
+                if self.get_channel():
+                    log.info(f"Channel 连接成功")
+                    # 队列重新绑定交换机
+                    for binding_key in self.__cache.copy():
+                        if isinstance(binding_key, tuple):
+                            queue, exchange, routing_key = binding_key
+                            # 清除缓存
+                            del self.__cache[queue]
+                            del self.__cache[binding_key]
+                            # 重新声明
+                            self.declare(queue, exchange, routing_key)
+                    return True
+            except (ConnectionError,) as e:
+                log.error(f"连接失败 e: {e}")
+
+            time.sleep(1)
+
+    @property
+    def _channel(self) -> AmqpStormChannel:
+        try:
+            if not self.__channel.is_open:
+                raise ConnectionError("unable to connect to Channel")
+        except:
+            self._reconnect_channel()
+
+        return self.__channel
+
+    @_channel.setter
+    def _channel(self, channel: AmqpStormChannel):
+        self.__channel = channel
+
+    def add(self, queue, data, exchange=None):
+        """
+        推送数据到rabbitmq消息队列
+
+        @param str queue: 队列名称
+        @param data: 推送数据
+        @param str exchange: 交换机名称
+        """
+        data_lst = data if isinstance(data, list) else [data]
+        for item in data_lst:
+            log.debug(f"【{queue}】Mq Push >>> {item}")
+            data_bytes = tools.dumps_obj(item)  # 对象序列化
+            # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
+            #   delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
+            #   delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
+            properties = dict(delivery_mode=2)
+            self._channel.basic.publish(
+                exchange=exchange or self._exchange,
+                routing_key=queue,
+                body=data_bytes,
+                properties=properties
+            )
+
+    def ack(self, queue, delivery_tag=0, multiple=False):
+        """
+        手动回复队列消息确认
+
+        @param str queue: 队列名称
+        @param int delivery_tag: 消息标签
+        @param bool multiple: 开启多个回复消息确认
+        """
+        self._channel.queue.declare(queue=queue, passive=self._durable)
+        self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
+
+    def _bind_exchange(self, queue, exchange, routing_key):
+        """
+        将队列绑定到指定的交换机
+
+        @param queue: 队列名称
+        @param exchange: 交换机名称
+        @param routing_key: 路由键
+        """
+        binding_key = (queue, exchange, routing_key)
+        if binding_key in self.__cache:
+            return
+        else:
+            # 队列绑定一个交换机
+            self._channel.queue.bind(queue, exchange, routing_key)
+            self.__cache[binding_key] = True
+
+    def _declare_queue(self, queue):
+        """
+        声明持久化rabbitmq队列
+
+        @param queue: 队列名称
+        """
+        if queue in self.__cache:
+            return self.__cache[queue]
+        else:
+            # 声明队列
+            result = self._channel.queue.declare(queue, durable=True)
+            self.__cache[queue] = result
+            return result
+
+    def declare(self, queue, exchange=None, routing_key=None):
+        """
+        声明rabbitmq队列并与一个交换机进行绑定
+
+        @param queue: 队列名称
+        @param exchange: 交换机名称
+        @param routing_key: 路由键
+        """
+        self._declare_queue(queue)
+
+        routing_key = routing_key or queue
+        self._bind_exchange(queue, exchange or self._exchange, routing_key)
+
+    def start_consuming(self, limit=None, to_tuple=False, auto_decode=True):
+        """
+        @param int limit: 消费数据上限
+        @param to_tuple:
+        @param auto_decode:
+        """
+        if not self._channel._consumer_callbacks:
+            raise AMQPChannelError('no consumer callback defined')
+
+        count = 0
+        params = dict(break_on_empty=True, auto_decode=auto_decode)
+        for message in self._channel.build_inbound_messages(**params):
+            consumer_tag = message._method.get('consumer_tag')
+            if to_tuple:
+                # noinspection PyCallingNonCallable
+                self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
+                continue
+            # noinspection PyCallingNonCallable
+            self._channel._consumer_callbacks[consumer_tag](message)
+            count += 1
+            if limit is not None and count == limit:
+                break
+
+    def stop_consuming(self, consumer_tag=None):
+        """
+        @param str consumer_tag: 消费者标签
+        """
+        self._channel.basic.cancel(consumer_tag)
+
+    def get(self, queue, limit, no_ack=False):
+        """
+        获取rabbitmq消息队列中的信道数据
+
+        @param str queue: 队列名称
+        @param int limit: 获取消息数量
+        @param no_ack: 自动回复消息确认
+        """
+        message_lst = []
+
+        # 设置预取上限数量
+        self._channel.basic.qos(prefetch_count=limit)
+
+        def callback(message):
+            body = message.body
+            # 处理接收到的消息
+            impl = tools.loads_obj(body)
+            log.debug(f"【{queue}】Mq Received>>> {impl}")
+            delivery_tag = message.delivery_tag
+            if no_ack:
+                self._channel.basic.ack(delivery_tag)
+                delivery_tag = 0
+
+            message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
+
+        # 注册消费者并获取消费者标签
+        consumer_tag = self._channel.basic.consume(callback, queue=queue)
+        # 开始消费
+        self.start_consuming(limit)
+        # 停止消费并关闭消费者
+        self.stop_consuming(consumer_tag)
+
+        return message_lst
+
+    def get_message_count(self, queue):
+        message = self._channel.queue.declare(queue, passive=True)
+        return message.get('message_count')
+
+    def get_mq_obj(self):
+        return self._mq
+
+    def close(self):
+        if self._mq.is_open:
+            self._mq.close()
+
+    def __getattr__(self, name):
+        return getattr(self._mq, name)
+
+    def __repr__(self):
+        if self._url:
+            return "<RabbitMQ url:{}>".format(self._url)
+
+        return "<RabbitMQ ip_port: {} username:{} password:{}>".format(
+            self._ip_port, self._user, self._user_pass
+        )
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exception_type, exception_value, _):
+        if exception_type:
+            log.warning(
+                'Closing RabbitMQ to an unhandled exception: %s',
+                exception_value
+            )
+        if not self._mq.is_open:
+            return
+        self.close()