浏览代码

添加item状态更新维护

dongzhaorui 1 年之前
父节点
当前提交
20ea787b1e
共有 1 个文件被更改,包括 92 次插入5 次删除
  1. 92 5
      FworkSpider/feapder/buffer/item_buffer.py

+ 92 - 5
FworkSpider/feapder/buffer/item_buffer.py

@@ -2,7 +2,7 @@
 """
 Created on 2018-06-19 17:17
 ---------
-@summary: item 管理器 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
+@summary: item 管理器 负责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
 ---------
 @author: Boris
 @email: boris_liu@foxmail.com
@@ -15,7 +15,13 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.db.rabbitMq import RabbitMQ
 from feapder.dedup import Dedup
-from feapder.network.item import Item, UpdateItem
+from feapder.network.item import (
+    Item,
+    UpdateItem,
+    BaseListItem,
+    BaseDetailItem,
+    FailedTaskItem,
+)
 from feapder.pipelines import BasePipeline
 from feapder.utils import metrics
 from feapder.utils.log import log
@@ -38,13 +44,21 @@ class ItemBuffer(threading.Thread):
             self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
 
             self._rabbitmq = rabbitmq or RabbitMQ()
+
             # 任务队列
             self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
             self._rabbitmq.declare(queue=self._tab_requests)
+
             # 数据保存失败队列
             self._tab_failed_items = setting.TAB_FAILED_ITEMS
             self._rabbitmq.declare(queue=self._tab_failed_items)
 
+            # 待采集任务队列
+            self._tab_items = setting.TAB_ITEMS.format(
+                redis_key=redis_key.replace('_detailc', '')
+            )
+            self._rabbitmq.declare(queue=self._tab_items)
+
             self._item_tables = {
                 # 'item_name': 'table_name' # 缓存item名与表名对应关系
             }
@@ -100,6 +114,7 @@ class ItemBuffer(threading.Thread):
         try:
             items = []
             update_items = []
+            failed_task_items = []
             requests = []
             callbacks = []
             items_fingerprints = []
@@ -108,6 +123,7 @@ class ItemBuffer(threading.Thread):
             while not self._items_queue.empty():
                 data = self._items_queue.get_nowait()
                 data_count += 1
+                update_at = tools.ensure_int64(tools.get_current_timestamp())
 
                 # data 分类
                 if callable(data):
@@ -116,7 +132,52 @@ class ItemBuffer(threading.Thread):
                 elif isinstance(data, UpdateItem):
                     update_items.append(data)
 
+                elif isinstance(data, FailedTaskItem):
+                    data.queue_name = self._tab_items  # 采集任务队列名称
+                    failed_times = data.to_dict.pop('failed_times', 0)
+                    if failed_times >= setting.SPIDER_FAILED_TASK_MAX_RETRY_TIMES:
+                        state = 4  # 待采集任务停止采集状态[4=停止采集]
+                        # 更新完成采集的任务状态
+                        update_item = UpdateItem(
+                            state=state,
+                            pyuuid=data.pyuuid,
+                            update_at=update_at,
+                        )
+                        update_item.update_key = ['state', 'update_at']
+                        update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                        update_items.append(update_item)
+                        # 保存失败的采集任务详情
+                        data.state = state
+                        data.failed_times = failed_times
+                        data.create_at = update_at
+                        failed_task_items.append(data)
+                    else:
+                        update_item = UpdateItem(
+                            state=3,  # 待采集任务失败采集状态[3=采集失败]
+                            pyuuid=data.pyuuid,
+                            update_at=update_at,
+                            failed_times=failed_times + 1
+                        )
+                        update_item.update_key = ['state', 'update_at', 'failed_times']
+                        update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                        update_items.append(update_item)
+
                 elif isinstance(data, Item):
+                    if isinstance(data, BaseListItem):
+                        data.state = 1  # 待采集任务等待采集状态[1=等待采集]
+                        data.queue_name = self._tab_items
+                        data.update_at = update_at
+
+                    elif isinstance(data, BaseDetailItem):
+                        update_item = UpdateItem(
+                            state=2,  # 待采集任务成功采集状态[2=完成采集]
+                            pyuuid=data.pyuuid,
+                            update_at=update_at,
+                        )
+                        update_item.update_key = ['state', 'update_at']
+                        update_item.table_name = setting.TASK_REQUEST_PRODUCE
+                        update_items.append(update_item)
+
                     items.append(data)
                     if setting.ITEM_FILTER_ENABLE:
                         items_fingerprints.append(data.fingerprint)
@@ -126,11 +187,12 @@ class ItemBuffer(threading.Thread):
 
                 if data_count >= UPLOAD_BATCH_MAX_SIZE:
                     self.__add_item_to_db(
-                        items, update_items, requests, callbacks, items_fingerprints
+                        items, update_items, failed_task_items, requests, callbacks, items_fingerprints
                     )
 
                     items = []
                     update_items = []
+                    failed_task_items = []
                     requests = []
                     callbacks = []
                     items_fingerprints = []
@@ -138,7 +200,7 @@ class ItemBuffer(threading.Thread):
 
             if data_count:
                 self.__add_item_to_db(
-                    items, update_items, requests, callbacks, items_fingerprints
+                    items, update_items, failed_task_items, requests, callbacks, items_fingerprints
                 )
 
         except Exception as e:
@@ -242,7 +304,7 @@ class ItemBuffer(threading.Thread):
         return self.__export_to_db(table, datas, **kwargs)
 
     def __add_item_to_db(
-        self, items, update_items, requests, callbacks, items_fingerprints
+        self, items, update_items, failed_task_items, requests, callbacks, items_fingerprints
     ):
         export_success = True
         self._is_adding_to_db = True
@@ -250,9 +312,13 @@ class ItemBuffer(threading.Thread):
         if setting.ITEM_FILTER_ENABLE:
             items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
 
+        # 分捡采集任务
+        task_items = [item for item in items if isinstance(item, BaseListItem)]
+
         # 分捡
         items_dict = self.__pick_items(items)
         update_items_dict = self.__pick_items(update_items, is_update_item=True)
+        failed_task_items_dict = self.__pick_items(failed_task_items)
 
         # item批量入库
         failed_items = {"add": [], "update": [], "requests": []}
@@ -272,6 +338,10 @@ class ItemBuffer(threading.Thread):
                 export_success = False
                 failed_items["add"].append({"table": table, "datas": datas})
 
+        # 批量推送采集任务进入采集队列(优先级必须低于item批量入库)
+        if task_items:
+            self._rabbitmq.add(self._tab_items, task_items)
+
         # 执行批量update
         while update_items_dict:
             table, datas = update_items_dict.popitem()
@@ -292,6 +362,23 @@ class ItemBuffer(threading.Thread):
                 export_success = False
                 failed_items["update"].append({"table": table, "datas": datas})
 
+        # 采集失败 item批量入库
+        while failed_task_items_dict:
+            table, datas = failed_task_items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- crawl failed item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            if not self.__export_to_db(table, datas):
+                export_success = False
+                failed_items["add"].append({"table": table, "datas": datas})
+
         if export_success:
             # 执行回调
             while callbacks: