Jelajahi Sumber

爬虫模块优化:1、去重领取任务;2、新增应用程序关联标识符,可指定数据的消费对象

dongzhaorui 1 tahun lalu
induk
melakukan
cee8e108fb

+ 5 - 3
FworkSpider/feapder/buffer/item_buffer.py

@@ -396,10 +396,12 @@ class ItemBuffer(threading.Thread):
 
             if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
                 if self._redis_key != "air_spider":
-                    failed_items[self._redis_key] = ''  # 添加item失败标识
+                    # 设置访问者的唯一标识
+                    properties = dict(correlation_id=self._redis_key)
                     # 记录失败的item
-                    self._rabbitmq.add_batch(self._tab_failed_items, failed_items)
-
+                    self._rabbitmq.add_batch(self._tab_failed_items,
+                                             failed_items,
+                                             properties=properties)
                     # 删除做过的request
                     if requests:
                         # self.redis_db.zrem(self._table_request, requests)

+ 3 - 2
FworkSpider/feapder/buffer/request_buffer.py

@@ -73,12 +73,13 @@ class RequestBuffer(threading.Thread):
     def put_failed_request(self, request, table=None):
         try:
             request_dict = request.to_dict
-            request_dict[self._redis_key] = ''  # 添加任务标识
             if table is not None:
                 self._rabbitmq.declare_bind(queue=table)  # 声明额外的队列
 
             queue = table or self._tab_failed_requests
-            self._rabbitmq.add(request_dict, queue=queue)
+            # 设置访问者的唯一标识
+            properties = dict(correlation_id=self._redis_key)
+            self._rabbitmq.add(request_dict, queue=queue, properties=properties)
         except Exception as e:
             log.exception(e)
 

+ 2 - 6
FworkSpider/feapder/core/handle_failed_items.py

@@ -35,11 +35,8 @@ class HandleFailedItems:
         self._rabbitmq.declare_bind(queue=self._tab_failed_items)
 
     def get_failed_items(self, count=10000):
-        failed_items = self._rabbitmq.find(
-            queue=self._tab_failed_items,
-            limit=count,
-            feature=self._redis_key
-        )
+        failed_items = self._rabbitmq.get(self._tab_failed_items, count,
+                                          correlation_id=self._redis_key)
         failed_items = [eval(message) for message in failed_items]
         return failed_items
 
@@ -55,7 +52,6 @@ class HandleFailedItems:
                 for message in failed_items:
                     delivery_tag = message.delivery_tag
                     data = message.body
-                    del data[self._redis_key]  # 删除item数据标识
                     for add in data.get("add"):
                         table = add.get("table")
                         datas = add.get("datas")

+ 3 - 6
FworkSpider/feapder/core/handle_failed_requests.py

@@ -33,11 +33,8 @@ class HandleFailedRequests(object):
         self._rabbitmq.declare_bind(queue=self._tab_failed_requests)
 
     def get_failed_messages(self, count=10000):
-        failed_messages = self._rabbitmq.find(
-            queue=self._tab_failed_requests,
-            limit=count,
-            feature=self._redis_key
-        )
+        failed_messages = self._rabbitmq.get(self._tab_failed_requests, count,
+                                             correlation_id=self._redis_key)
         failed_messages = [eval(message) for message in failed_messages]
         return failed_messages
 
@@ -53,7 +50,7 @@ class HandleFailedRequests(object):
                 for message in failed_messages:
                     delivery_tag = message.delivery_tag
                     request = message.body
-                    del request[self._redis_key]  # 删除任务标识
+                    # del request[self._redis_key]  # 删除任务标识
                     request["retry_times"] = 0
                     request_obj = Request.from_dict(request)
                     self._request_buffer.put_request(request_obj)

+ 66 - 70
FworkSpider/feapder/db/rabbitMq.py

@@ -206,20 +206,20 @@ class RabbitMQ:
     def _channel(self, channel: AmqpStormChannel):
         self.__channel = channel
 
-    def add_batch(self, queue, datas, exchange='', routing_key=''):
+    def add_batch(self, queue, datas, exchange='', routing_key='', **kwargs):
         """
         批量发布消息
 
         @param str queue: 队列名称
         @param datas: 消息内容
-        @param str exchange: 交换机名称
-        @param str routing_key: 路由键
+        @param exchange: 交换机名称
+        @param routing_key: 路由键
         """
         data_lst = datas if isinstance(datas, list) else [datas]
         for data in data_lst:
-            self.add(data, queue, exchange, routing_key)
+            self.add(data, queue, exchange, routing_key, **kwargs)
 
-    def add(self, data, queue='', exchange='', routing_key=''):
+    def add(self, data, queue='', exchange='', routing_key='', properties=None):
         """
         发布消息
 
@@ -227,41 +227,46 @@ class RabbitMQ:
         @param data: 消息内容
         @param exchange: 交换机名称
         @param routing_key: 路由键
+        @param properties: 消息属性
         """
         if not routing_key and not queue:
-            raise AttributeError('请设置 routing_key or queue')
+            raise AttributeError("请设置 routing_key or queue")
 
         # 不指定交换机发送消息,routing_key 表示消息队列名称
         # 指定交换机发送消息,routing_key 表示路由键
         if not exchange:
             routing_key = queue
 
-        message_id = tools.get_uuid().replace('-', '')
+        message_id = tools.get_uuid().replace("-", "")
         if isinstance(data, dict):
             message_id = data.get("pyuuid") or message_id
 
         # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
         #   delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
         #   delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
-        properties = dict(delivery_mode=2, message_id=message_id)
-        body = tools.dumps_obj(data)  # 对象序列化
+        properties = properties or {}
+        properties = dict(delivery_mode=2, **properties)
+        if "message_id" not in properties:
+            properties["message_id"] = message_id
+
         self._channel.basic.publish(
-            body=body,
+            body=tools.dumps_obj(data),  # 对象序列化
             routing_key=routing_key,
             exchange=exchange,
-            properties=properties
+            properties=properties  # specification.Basic.Properties
         )
 
-    def add_dlx(self, exchange, routing_key, data):
+    def add_dlx(self, exchange, routing_key, data, properties=None):
         """
         发布延时消息
 
-        @param data: 消息内容
         @param exchange: 交换机名称
         @param routing_key: 路由键
+        @param data: 消息内容
+        @param properties: 消息属性
         """
         queue = routing_key
-        self.add(data, queue, exchange, routing_key)
+        self.add(data, queue, exchange, routing_key, properties)
 
     def ack(self, delivery_tag=0, multiple=False):
         """
@@ -272,8 +277,7 @@ class RabbitMQ:
         """
         self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
 
-    def declare_exchange(self, exchange, exchange_type=None, auto_delete=False,
-                         arguments=None):
+    def declare_exchange(self, exchange, exchange_type=None, auto_delete=False, arguments=None):
         """声明交换机"""
         shares = dict(
             exchange_type=exchange_type or self._exchange_type,
@@ -335,27 +339,56 @@ class RabbitMQ:
         self.__cache[binding_key] = True
         return result
 
-    def start_consuming(self, limit=None, to_tuple=False, auto_decode=True):
+    def start_consuming(
+        self,
+        limit=None,
+        to_tuple=False,
+        auto_decode=True,
+        correlation_id='',
+        use_filter=False
+    ):
         """
         @param int limit: 消费数据上限
-        @param to_tuple:
-        @param auto_decode:
+        @param to_tuple: 消息结果返回元组形式
+        @param auto_decode: 自动解码
+        @param correlation_id: 应用程序关联标识符
+        @param use_filter: 使用缓存过滤重复数据
         """
         if not self._channel._consumer_callbacks:
             raise AMQPChannelError('no consumer callback defined')
 
-        count = 0
+        cache = []  # 消息队列缓存,去除重复消息
+        recv_count = 0  # 接收消息计数
         params = dict(break_on_empty=True, auto_decode=auto_decode)
         for message in self._channel.build_inbound_messages(**params):
             consumer_tag = message._method.get('consumer_tag')
+            if use_filter and message.message_id in cache:
+                continue
+
+            cache.append(message.message_id)  # 缓存消息id
+
+            # 指定应用程序标识,只会在特定的程序中被消费
+            if correlation_id:
+                if correlation_id == message.correlation_id:
+                    self._channel._consumer_callbacks[consumer_tag](message)
+                    recv_count += 1
+
+                if limit is not None and recv_count == limit:
+                    break
+                else:
+                    continue
+
+            recv_count += 1
+
             if to_tuple:
-                # noinspection PyCallingNonCallable
                 self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
-                continue
-            # noinspection PyCallingNonCallable
+                if limit is not None and recv_count == limit:
+                    break
+                else:
+                    continue
+
             self._channel._consumer_callbacks[consumer_tag](message)
-            count += 1
-            if limit is not None and count == limit:
+            if limit is not None and recv_count == limit:
                 break
 
     def stop_consuming(self, consumer_tag=None):
@@ -364,21 +397,25 @@ class RabbitMQ:
         """
         self._channel.basic.cancel(consumer_tag)
 
-    def get(self, queue, limit, no_ack=False, to_str=True):
+    def get(self, queue, limit, auto_ack=False, to_str=True, **kwargs):
         """
         获取rabbitmq消息队列中的信道数据
 
         @param str queue: 队列名称
         @param int limit: 获取消息数量
-        @param no_ack: 自动回复消息确认
+        @param auto_ack: 自动回复消息确认
+        @param to_str: 消息是否转成字符串
         @param to_str: 消息是否转成字符串
         """
         message_lst = []
 
+        if "use_filter" not in kwargs:
+            kwargs["use_filter"] = True
+
         def callback(message):
             body = tools.loads_obj(message.body)  # 反序列化消息对象
             delivery_tag = message.delivery_tag
-            if no_ack:
+            if auto_ack:
                 self._channel.basic.ack(delivery_tag)
                 delivery_tag = 0
 
@@ -393,48 +430,7 @@ class RabbitMQ:
             # 注册消费者并获取消费者标签
             consumer_tag = self._channel.basic.consume(callback, queue=queue)
             # 开始消费
-            self.start_consuming(limit)
-            # 停止消费并关闭消费者
-            self.stop_consuming(consumer_tag)
-        except (AMQPChannelError, AMQPConnectionError) as why:
-            log.error(f"{why}")
-
-        return message_lst
-
-    def find(self, queue, limit, feature, no_ack=False, to_str=True):
-        """
-        通过检索关键词从rabbitmq消息队列中获取指定数据
-
-        @param str queue: 队列名称
-        @param int limit: 获取消息数量
-        @param str feature: 数据特征
-        @param no_ack: 自动回复消息确认
-        @param to_str: 消息是否转成字符串
-        """
-        message_lst = []
-        # 设置预取上限数量
-        self._channel.basic.qos(prefetch_count=limit)
-
-        def callback(message):
-            body = message.body
-            # 处理接收到的消息
-            impl = tools.loads_obj(body)
-            delivery_tag = message.delivery_tag
-            if feature in impl:
-                if no_ack:
-                    self._channel.basic.ack(delivery_tag)
-                    delivery_tag = 0
-
-                if to_str:
-                    message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
-                else:
-                    message_lst.append(RabbitMQMessage(delivery_tag, impl))
-
-        try:
-            # 注册消费者并获取消费者标签
-            consumer_tag = self._channel.basic.consume(callback, queue=queue)
-            # 开始消费
-            self.start_consuming(limit)
+            self.start_consuming(limit, **kwargs)
             # 停止消费并关闭消费者
             self.stop_consuming(consumer_tag)
         except (AMQPChannelError, AMQPConnectionError) as why: