Pārlūkot izejas kodu

新增检索函数

dongzhaorui 1 gadu atpakaļ
vecāks
revīzija
ea26a6df0d
1 mainītis faili ar 42 papildinājumiem un 0 dzēšanām
  1. 42 0
      FworkSpider/feapder/db/rabbitMq.py

+ 42 - 0
FworkSpider/feapder/db/rabbitMq.py

@@ -370,6 +370,48 @@ class RabbitMQ:
 
         return message_lst
 
+    def find(self, queue, limit, feature, no_ack=False, to_str=True):
+        """
+        通过检索关键词从rabbitmq消息队列中获取指定数据
+
+        @param str queue: 队列名称
+        @param int limit: 获取消息数量
+        @param str feature: 数据特征
+        @param no_ack: 自动回复消息确认
+        @param to_str: 消息是否转成字符串
+        """
+        message_lst = []
+        # 设置预取上限数量
+        self._channel.basic.qos(prefetch_count=limit)
+
+        def callback(message):
+            body = message.body
+            # 处理接收到的消息
+            impl = tools.loads_obj(body)
+            log.debug(f"【{queue}】Mq Received>>> {impl}")
+            delivery_tag = message.delivery_tag
+            if feature in impl:
+                if no_ack:
+                    self._channel.basic.ack(delivery_tag)
+                    delivery_tag = 0
+
+                if to_str:
+                    message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
+                else:
+                    message_lst.append(RabbitMQMessage(delivery_tag, impl))
+
+        try:
+            # 注册消费者并获取消费者标签
+            consumer_tag = self._channel.basic.consume(callback, queue=queue)
+            # 开始消费
+            self.start_consuming(limit)
+            # 停止消费并关闭消费者
+            self.stop_consuming(consumer_tag)
+        except (AMQPChannelError, AMQPConnectionError) as why:
+            log.error(f"{why}")
+
+        return message_lst
+
     @property
     def is_open(self):
         return self._mq.is_open