浏览代码

更新rabbitmq声明绑定方法

dongzhaorui 1 年之前
父节点
当前提交
cf1debb066

+ 8 - 13
FworkSpider/feapder/buffer/request_buffer.py

@@ -37,10 +37,10 @@ class RequestBuffer(threading.Thread):
             self._rabbitmq = rabbitmq or RabbitMQ()
             # 任务队列
             self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
-            self._rabbitmq.declare(queue=self._tab_requests)
+            self._rabbitmq.declare_bind(queue=self._tab_requests)
             # 失败任务队列
             self._tab_failed_requests = setting.TAB_FAILED_REQUESTS
-            self._rabbitmq.declare(queue=self._tab_failed_requests)
+            self._rabbitmq.declare_bind(queue=self._tab_failed_requests)
 
             if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
                 self.__class__.dedup = Dedup(
@@ -73,17 +73,12 @@ class RequestBuffer(threading.Thread):
     def put_failed_request(self, request, table=None):
         try:
             request_dict = request.to_dict
+            request_dict[self._redis_key] = ''  # 添加任务标识
             if table is not None:
-                # 声明额外的队列
-                self._rabbitmq.declare(queue=table)
+                self._rabbitmq.declare_bind(queue=table)  # 声明额外的队列
 
-            # 添加任务标识
-            request_dict[self._redis_key] = ''
-
-            self._rabbitmq.add(
-                table or self._tab_failed_requests,
-                request_dict
-            )
+            queue = table or self._tab_failed_requests
+            self._rabbitmq.add(request_dict, queue=queue)
         except Exception as e:
             log.exception(e)
 
@@ -133,13 +128,13 @@ class RequestBuffer(threading.Thread):
 
             # 入库(超过上限[MAX_URL_COUNT]执行)
             if len(request_list) > MAX_URL_COUNT:
-                self._rabbitmq.add(self._tab_requests, request_list)
+                self._rabbitmq.add_batch(self._tab_requests, request_list)
                 request_list = []
                 prioritys = []
 
         # 入库(小于上限[MAX_URL_COUNT]执行)
         if request_list:
-            self._rabbitmq.add(self._tab_requests, request_list)
+            self._rabbitmq.add_batch(self._tab_requests, request_list)
 
         # 执行回调
         for callback in callbacks:

+ 1 - 1
FworkSpider/feapder/core/collector.py

@@ -39,7 +39,7 @@ class Collector(threading.Thread):
         self._todo_requests = Queue(maxsize=setting.COLLECTOR_TASK_COUNT)
         # 任务队列
         self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
-        self._rabbitmq.declare(queue=self._tab_requests)
+        self._rabbitmq.declare_bind(queue=self._tab_requests)
 
         self._interval = setting.COLLECTOR_SLEEP_TIME
         self._request_count = setting.COLLECTOR_TASK_COUNT

+ 1 - 1
FworkSpider/feapder/core/handle_failed_items.py

@@ -32,7 +32,7 @@ class HandleFailedItems:
 
         # 数据保存失败队列
         self._tab_failed_items = setting.TAB_FAILED_ITEMS
-        self._rabbitmq.declare(queue=self._tab_failed_items)
+        self._rabbitmq.declare_bind(queue=self._tab_failed_items)
 
     def get_failed_items(self, count=10000):
         failed_items = self._rabbitmq.find(

+ 1 - 1
FworkSpider/feapder/core/handle_failed_requests.py

@@ -30,7 +30,7 @@ class HandleFailedRequests(object):
 
         # 失败任务队列
         self._tab_failed_requests = setting.TAB_FAILED_REQUESTS
-        self._rabbitmq.declare(queue=self._tab_failed_requests)
+        self._rabbitmq.declare_bind(queue=self._tab_failed_requests)
 
     def get_failed_messages(self, count=10000):
         failed_messages = self._rabbitmq.find(

+ 61 - 31
FworkSpider/feapder/db/rabbitMq.py

@@ -170,8 +170,8 @@ class RabbitMQ:
                     # 清除缓存
                     self.__cache.pop(queue, None)
                     self.__cache.pop(binding_key, None)
-                    # 重新声明
-                    self.declare(queue, exchange, routing_key)
+                    # 重新声明绑定
+                    self.declare_bind(queue, exchange, routing_key)
         except Exception as e:
             raise
 
@@ -206,27 +206,58 @@ class RabbitMQ:
     def _channel(self, channel: AmqpStormChannel):
         self.__channel = channel
 
-    def add(self, queue, data, exchange=None):
+    def add_batch(self, queue, datas, exchange='', routing_key=''):
         """
-        推送数据到rabbitmq消息队列
+        批量发布消息
 
         @param str queue: 队列名称
-        @param data: 推送数据
+        @param datas: 消息内容
         @param str exchange: 交换机名称
+        @param str routing_key: 路由键
         """
-        data_lst = data if isinstance(data, list) else [data]
-        for item in data_lst:
-            data_bytes = tools.dumps_obj(item)  # 对象序列化
-            # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
-            #   delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
-            #   delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
-            properties = dict(delivery_mode=2)
-            self._channel.basic.publish(
-                exchange=exchange or self._exchange,
-                routing_key=queue,
-                body=data_bytes,
-                properties=properties
-            )
+        data_lst = datas if isinstance(datas, list) else [datas]
+        for data in data_lst:
+            self.add(data, queue, exchange, routing_key)
+
+    def add(self, data, queue='', exchange='', routing_key=''):
+        """
+        发布消息
+
+        @param str queue: 队列名称
+        @param data: 消息内容
+        @param exchange: 交换机名称
+        @param routing_key: 路由键
+        """
+        if not routing_key and not queue:
+            raise AttributeError('请设置 routing_key or queue')
+
+        # 不指定交换机发送消息,routing_key 表示消息队列名称
+        # 指定交换机发送消息,routing_key 表示路由键
+        if not exchange:
+            routing_key = queue
+
+        # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
+        #   delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
+        #   delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
+        properties = dict(delivery_mode=2)
+        body = tools.dumps_obj(data)  # 对象序列化
+        self._channel.basic.publish(
+            body=body,
+            routing_key=routing_key,
+            exchange=exchange,
+            properties=properties
+        )
+
+    def add_dlx(self, exchange, routing_key, data):
+        """
+        发布延时消息
+
+        @param data: 消息内容
+        @param exchange: 交换机名称
+        @param routing_key: 路由键
+        """
+        queue = routing_key
+        self.add(data, queue, exchange, routing_key)
 
     def ack(self, delivery_tag=0, multiple=False):
         """
@@ -279,24 +310,24 @@ class RabbitMQ:
             else:
                 raise why
 
-    def declare(self, queue, exchange=None, routing_key=None):
+    def declare_bind(self, queue='', exchange='', routing_key=''):
         """
-        声明队列、交换机并将队列绑定到交换机
+        声明队列和交换机,同时将队列绑定交换机
 
         @param queue: 队列名称
         @param exchange: 交换机名称
         @param routing_key: 路由键
         """
-        routing_key = routing_key or queue
-        binding_key = (queue, exchange or self._exchange, routing_key)
+        exchange = exchange or self._exchange
+        binding_key = (queue, exchange, routing_key)
         if queue in self.__cache and binding_key in self.__cache:
             return self.__cache[queue]
 
-        self.declare_exchange(exchange or self._exchange)
+        self.declare_exchange(exchange)
         result = self.declare_queue(queue)
         self.__cache[queue] = result
         # 队列绑定一个交换机
-        self._channel.queue.bind(queue, exchange or self._exchange, routing_key)
+        self._channel.queue.bind(queue, exchange, routing_key)
         self.__cache[binding_key] = True
         return result
 
@@ -339,24 +370,22 @@ class RabbitMQ:
         @param to_str: 消息是否转成字符串
         """
         message_lst = []
-        # 设置预取上限数量
-        self._channel.basic.qos(prefetch_count=limit)
 
         def callback(message):
-            body = message.body
-            # 处理接收到的消息
-            impl = tools.loads_obj(body)
+            body = tools.loads_obj(message.body)  # 反序列化消息对象
             delivery_tag = message.delivery_tag
             if no_ack:
                 self._channel.basic.ack(delivery_tag)
                 delivery_tag = 0
 
             if to_str:
-                message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
+                message_lst.append(str(RabbitMQMessage(delivery_tag, body)))
             else:
-                message_lst.append(RabbitMQMessage(delivery_tag, impl))
+                message_lst.append(RabbitMQMessage(delivery_tag, body))
 
         try:
+            # 设置预取上限数量
+            self._channel.basic.qos(prefetch_count=limit)
             # 注册消费者并获取消费者标签
             consumer_tag = self._channel.basic.consume(callback, queue=queue)
             # 开始消费
@@ -415,6 +444,7 @@ class RabbitMQ:
 
     def get_message_count(self, queue):
         message = self._channel.queue.declare(queue, passive=True)
+        # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
         return message.get('message_count')
 
     def get_mq_obj(self):

+ 1 - 2
FworkSpider/feapder/pipelines/rabbitmq_pipeline.py

@@ -34,8 +34,7 @@ class RabbitMqPipeline(BasePipeline):
                  若False,不会将本批数据入到去重库,以便再次入库
         """
         try:
-            self.to_db.declare(queue=table)
-            self.to_db.add(table, items)
+            self.to_db.add_batch(table, items)
             datas_size = len(items)
             log.info("共导出 %s 条数据到 %s" % (datas_size, table))
             return True