Ver código fonte

优化任务生产与消费策略

dongzhaorui 1 ano atrás
pai
commit
0750d4f6e6

+ 6 - 7
FworkSpider/feapder/buffer/item_buffer.py

@@ -33,13 +33,14 @@ UPLOAD_BATCH_MAX_SIZE = 1000
 class ItemBuffer(threading.Thread):
     dedup = None
 
-    def __init__(self, redis_key, rabbitmq=None):
+    def __init__(self, redis_key, rabbitmq=None, user=None):
         if not hasattr(self, "_items_queue"):
             super(ItemBuffer, self).__init__()
 
             self._thread_stop = False
             self._is_adding_to_db = False
             self._redis_key = redis_key
+            self._user = user
 
             self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
 
@@ -401,15 +402,13 @@ class ItemBuffer(threading.Thread):
                     self.__class__.dedup.add(items_fingerprints, skip_check=True)
         else:
             failed_items["requests"] = requests
+            # 设置mq访问者的唯一标识特性 correlation_id
+            properties = dict(correlation_id=self._user or self._redis_key)
 
             if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
                 if self._redis_key != "air_spider":
-                    # 设置访问者的唯一标识
-                    properties = dict(correlation_id=self._redis_key)
                     # 记录失败的item
-                    self._rabbitmq.add_batch(self._tab_failed_items,
-                                             failed_items,
-                                             properties=properties)
+                    self._rabbitmq.add_batch(self._tab_failed_items, failed_items, properties=properties)
                     # 删除做过的request
                     if requests:
                         # self.redis_db.zrem(self._table_request, requests)
@@ -428,7 +427,7 @@ class ItemBuffer(threading.Thread):
                     tip.append("不执行回调")
                 if requests:
                     tip.append("不删除任务")
-                    self._rabbitmq.add_batch(self._tab_requests, requests)
+                    self._rabbitmq.add_batch(self._tab_requests, requests, properties=properties)
 
                 if setting.ITEM_FILTER_ENABLE:
                     tip.append("数据不入去重库")

+ 7 - 4
FworkSpider/feapder/buffer/request_buffer.py

@@ -23,13 +23,14 @@ MAX_URL_COUNT = 1000  # 缓存中最大request数
 class RequestBuffer(threading.Thread):
     dedup = None
 
-    def __init__(self, redis_key, rabbitmq=None):
+    def __init__(self, redis_key, rabbitmq=None, user=None):
         if not hasattr(self, "_requests_deque"):
             super(RequestBuffer, self).__init__()
 
             self._thread_stop = False
             self._is_adding_to_db = False
             self._redis_key = redis_key
+            self._user = user
 
             self._requests_deque = collections.deque()
             self._del_requests_deque = collections.deque()
@@ -78,7 +79,7 @@ class RequestBuffer(threading.Thread):
 
             queue = table or self._tab_failed_requests
             # 设置访问者的唯一标识
-            properties = dict(correlation_id=self._redis_key)
+            properties = dict(correlation_id=self._user or self._redis_key)
             self._rabbitmq.add(request_dict, queue=queue, properties=properties)
         except Exception as e:
             log.exception(e)
@@ -96,6 +97,8 @@ class RequestBuffer(threading.Thread):
         return self._is_adding_to_db
 
     def __add_request_to_db(self):
+        kw = {"properties": dict(correlation_id=self._user) if self._user else None}
+
         request_list = []
         prioritys = []
         callbacks = []
@@ -129,13 +132,13 @@ class RequestBuffer(threading.Thread):
 
             # 入库(超过上限[MAX_URL_COUNT]执行)
             if len(request_list) > MAX_URL_COUNT:
-                self._rabbitmq.add_batch(self._tab_requests, request_list)
+                self._rabbitmq.add_batch(self._tab_requests, request_list, **kw)
                 request_list = []
                 prioritys = []
 
         # 入库(小于上限[MAX_URL_COUNT]执行)
         if request_list:
-            self._rabbitmq.add_batch(self._tab_requests, request_list)
+            self._rabbitmq.add_batch(self._tab_requests, request_list, **kw)
 
         # 执行回调
         for callback in callbacks:

+ 9 - 5
FworkSpider/feapder/core/collector.py

@@ -22,7 +22,7 @@ tools.load_globals(RabbitMQMessage)
 
 class Collector(threading.Thread):
 
-    def __init__(self, redis_key, rabbitmq=None):
+    def __init__(self, redis_key, rabbitmq=None, user=None):
         """
         @summary:
         ---------
@@ -30,13 +30,15 @@ class Collector(threading.Thread):
         ---------
         @result:
         """
-
         super(Collector, self).__init__()
 
         self._thread_stop = False
+        self._user = user
 
-        self._rabbitmq = rabbitmq or RabbitMQ()
         self._todo_requests = Queue(maxsize=setting.COLLECTOR_TASK_COUNT)
+
+        self._rabbitmq = rabbitmq or RabbitMQ()
+
         # 任务队列
         self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
         self._rabbitmq.declare_bind(queue=self._tab_requests)
@@ -62,7 +64,8 @@ class Collector(threading.Thread):
         self._started.clear()
 
     def __get_messages(self, request_count):
-        messages = self._rabbitmq.get(self._tab_requests, request_count)
+        kwargs = dict(correlation_id=self._user)
+        messages = self._rabbitmq.get(self._tab_requests, request_count, **kwargs)
         messages = [eval(message) for message in messages]
         return messages
 
@@ -113,9 +116,10 @@ class Collector(threading.Thread):
             return None
 
     def get_requests_count(self):
+        arguments = dict(queue=self._tab_requests, user=self._user)
         return (
             self._todo_requests.qsize()
-            or self._rabbitmq.get_message_count(queue=self._tab_requests)
+            or self._rabbitmq.get_message_count(**arguments)
             or 0
         )
 

+ 5 - 4
FworkSpider/feapder/core/handle_failed_items.py

@@ -22,21 +22,22 @@ tools.load_globals(RabbitMQMessage, ObjectId)
 
 
 class HandleFailedItems:
-    def __init__(self, redis_key, rabbitmq=None, item_buffer=None):
+    def __init__(self, redis_key, rabbitmq=None, item_buffer=None, user=None):
         if redis_key.endswith(":s_failed_items"):
             redis_key = redis_key.replace(":s_failed_items", "")
 
         self._redis_key = redis_key
+        self._user = user
         self._rabbitmq = rabbitmq or RabbitMQ()
-        self._item_buffer = item_buffer or ItemBuffer(redis_key)
+        self._item_buffer = item_buffer or ItemBuffer(redis_key, user=user)
 
         # 数据保存失败队列
         self._tab_failed_items = setting.TAB_FAILED_ITEMS
         self._rabbitmq.declare_bind(queue=self._tab_failed_items)
 
     def get_failed_items(self, count=10000):
-        failed_items = self._rabbitmq.get(self._tab_failed_items, count,
-                                          correlation_id=self._redis_key)
+        kwargs = dict(correlation_id=self._user or self._redis_key)
+        failed_items = self._rabbitmq.get(self._tab_failed_items, count, **kwargs)
         failed_items = [eval(message) for message in failed_items]
         return failed_items
 

+ 5 - 4
FworkSpider/feapder/core/handle_failed_requests.py

@@ -21,9 +21,11 @@ tools.load_globals(RabbitMQMessage)
 class HandleFailedRequests(object):
     """docstring for HandleFailedRequests"""
 
-    def __init__(self, redis_key, rabbitmq=None):
+    def __init__(self, redis_key, rabbitmq=None, user=None):
         super(HandleFailedRequests, self).__init__()
+
         self._redis_key = redis_key
+        self._user = user
 
         self._rabbitmq = rabbitmq or RabbitMQ()
         self._request_buffer = RequestBuffer(self._redis_key, rabbitmq)
@@ -33,8 +35,8 @@ class HandleFailedRequests(object):
         self._rabbitmq.declare_bind(queue=self._tab_failed_requests)
 
     def get_failed_messages(self, count=10000):
-        failed_messages = self._rabbitmq.get(self._tab_failed_requests, count,
-                                             correlation_id=self._redis_key)
+        kwargs = dict(correlation_id=self._user or self._redis_key)
+        failed_messages = self._rabbitmq.get(self._tab_failed_requests, count, **kwargs)
         failed_messages = [eval(message) for message in failed_messages]
         return failed_messages
 
@@ -50,7 +52,6 @@ class HandleFailedRequests(object):
                 for message in failed_messages:
                     delivery_tag = message.delivery_tag
                     request = message.body
-                    # del request[self._redis_key]  # 删除任务标识
                     request["retry_times"] = 0
                     request_obj = Request.from_dict(request)
                     self._request_buffer.put_request(request_obj)

+ 9 - 5
FworkSpider/feapder/core/scheduler.py

@@ -34,6 +34,7 @@ class Scheduler(threading.Thread):
     def __init__(
         self,
         redis_key=None,
+        user=None,
         thread_count=None,
         begin_callback=None,
         end_callback=None,
@@ -45,6 +46,7 @@ class Scheduler(threading.Thread):
         @summary: 调度器
         ---------
         @param redis_key: 爬虫request及item存放redis中的文件夹
+        @param user: 指定mq特定的程序消费用户标识
         @param thread_count: 线程数,默认为配置文件中的线程数
         @param begin_callback: 爬虫开始回调函数
         @param end_callback: 爬虫结束回调函数
@@ -73,10 +75,9 @@ class Scheduler(threading.Thread):
             )
 
         self._rabbitmq = RabbitMQ()
-
-        self._request_buffer = RequestBuffer(redis_key)
-        self._item_buffer = ItemBuffer(redis_key)
-        self._collector = Collector(redis_key)
+        self._request_buffer = RequestBuffer(redis_key, user=user)
+        self._item_buffer = ItemBuffer(redis_key, user=user)
+        self._collector = Collector(redis_key, user=user)
         self._heartbeat_buffer = HeartBeatBuffer(redis_key)
 
         self._parsers = []
@@ -122,6 +123,7 @@ class Scheduler(threading.Thread):
         Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
 
         self._last_check_task_status_time = 0
+        self._user = user
 
         self.init_metrics()
 
@@ -147,6 +149,7 @@ class Scheduler(threading.Thread):
                 redis_key=self._redis_key,
                 item_buffer=self._item_buffer,
                 rabbitmq=self._rabbitmq,
+                user=self._user
             )
             handle_failed_items.reput_failed_items_to_db()
 
@@ -181,7 +184,8 @@ class Scheduler(threading.Thread):
             # 重设失败的任务
             handle_failed_requests = HandleFailedRequests(
                 redis_key=self._redis_key,
-                rabbitmq=self._rabbitmq
+                rabbitmq=self._rabbitmq,
+                user=self._user
             )
             handle_failed_requests.reput_failed_requests_to_requests()
 

+ 4 - 1
FworkSpider/feapder/core/spiders/spider.py

@@ -33,6 +33,7 @@ class Spider(
     def __init__(
         self,
         redis_key=None,
+        user=None,
         check_task_interval=5,
         thread_count=None,
         begin_callback=None,
@@ -45,6 +46,7 @@ class Spider(
         @summary: 爬虫
         ---------
         @param redis_key: 任务等数据存放在redis中的key前缀
+        @param user: 指定mq特定的程序消费用户标识,在多个生产者对应单一消费者时生效
         @param check_task_interval: 检查是否还有任务的时间间隔;默认5秒
         @param thread_count: 线程数,默认为配置文件中的线程数
         @param begin_callback: 爬虫开始回调函数
@@ -56,6 +58,7 @@ class Spider(
         """
         super(Spider, self).__init__(
             redis_key=redis_key,
+            user=user,
             thread_count=thread_count,
             begin_callback=begin_callback,
             end_callback=end_callback,
@@ -68,7 +71,7 @@ class Spider(
         self._is_distributed_task = False
         self._is_show_not_task = False
 
-    def run(self):  # 调度控制流程
+    def run(self):  # 调度控制流程起始
         if not self._parsers:
             self._parsers.append(self)
 

+ 21 - 2
FworkSpider/feapder/db/rabbitMq.py

@@ -434,7 +434,7 @@ class RabbitMQ:
             # 停止消费并关闭消费者
             self.stop_consuming(consumer_tag)
         except (AMQPChannelError, AMQPConnectionError) as why:
-            log.error(f"{why}")
+            log.exception(why)
 
         return message_lst
 
@@ -442,9 +442,28 @@ class RabbitMQ:
     def is_open(self):
         return self._mq.is_open
 
-    def get_message_count(self, queue):
+    def __get_message_count(self, correlation_id, **kwargs):
+        recv_count = 0  # 接收消息计数
+
+        # 注册消费者并获取消费者标签
+        consumer_tag = self._channel.basic.consume(queue=kwargs["queue"])
+        for message in self._channel.build_inbound_messages(break_on_empty=True):
+            # 指定应用程序标识,只会在特定的程序中被消费
+            if correlation_id == message.correlation_id:
+                recv_count += 1
+        self.stop_consuming(consumer_tag)
+
+        # 重置消息数量
+        result = kwargs.copy()
+        result["message_count"] = recv_count
+        return result
+
+    def get_message_count(self, queue, user=None):
         try:
             message = self._channel.queue.declare(queue, passive=True)
+            if user is not None:
+                message = self.__get_message_count(user, **message)
+
             # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
             return message.get("message_count")
         except amqpstorm.exception.AMQPChannelError: