Forráskód Böngészése

删除直接添加列表采集任务推送rabbitmq;添加延时采集任务状态

dongzhaorui 1 éve
szülő
commit
6a353b3f04
1 módosított fájl, 12 hozzáadás és 17 törlés
  1. 12 17
      FworkSpider/feapder/buffer/item_buffer.py

+ 12 - 17
FworkSpider/feapder/buffer/item_buffer.py

@@ -47,17 +47,17 @@ class ItemBuffer(threading.Thread):
 
 
             # 任务队列
             # 任务队列
             self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
             self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
-            self._rabbitmq.declare(queue=self._tab_requests)
+            self._rabbitmq.declare_bind(queue=self._tab_requests)
 
 
             # 数据保存失败队列
             # 数据保存失败队列
             self._tab_failed_items = setting.TAB_FAILED_ITEMS
             self._tab_failed_items = setting.TAB_FAILED_ITEMS
-            self._rabbitmq.declare(queue=self._tab_failed_items)
+            self._rabbitmq.declare_bind(queue=self._tab_failed_items)
 
 
-            # 采集任务队列
+            # 采集任务队列(rabbitMq)
             self._tab_items = setting.TAB_ITEMS.format(
             self._tab_items = setting.TAB_ITEMS.format(
                 redis_key=redis_key.replace('_detailc', '')
                 redis_key=redis_key.replace('_detailc', '')
             )
             )
-            self._rabbitmq.declare(queue=self._tab_items)
+            self._rabbitmq.declare_bind(queue=self._tab_items)
 
 
             self._item_tables = {
             self._item_tables = {
                 # 'item_name': 'table_name' # 缓存item名与表名对应关系
                 # 'item_name': 'table_name' # 缓存item名与表名对应关系
@@ -164,9 +164,12 @@ class ItemBuffer(threading.Thread):
 
 
                 elif isinstance(data, Item):
                 elif isinstance(data, Item):
                     if isinstance(data, BaseListItem):
                     if isinstance(data, BaseListItem):
-                        data.state = 1  # 待采集任务等待采集状态[1=等待采集]
                         data.queue_name = self._tab_items
                         data.queue_name = self._tab_items
                         data.update_at = update_at
                         data.update_at = update_at
+                        if hasattr(data, 'is_delay') and data.is_delay:
+                            data.state = 5  # 待采集任务延时采集状态[5=延时采集]
+                        else:
+                            data.state = 1  # 待采集任务等待采集状态[1=等待采集]
 
 
                     elif isinstance(data, BaseDetailItem):
                     elif isinstance(data, BaseDetailItem):
                         update_item = UpdateItem(
                         update_item = UpdateItem(
@@ -312,9 +315,6 @@ class ItemBuffer(threading.Thread):
         if setting.ITEM_FILTER_ENABLE:
         if setting.ITEM_FILTER_ENABLE:
             items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
             items, items_fingerprints = self.__dedup_items(items, items_fingerprints)
 
 
-        # 分捡采集任务
-        task_items = [item for item in items if isinstance(item, BaseListItem)]
-
         # 分捡
         # 分捡
         items_dict = self.__pick_items(items)
         items_dict = self.__pick_items(items)
         update_items_dict = self.__pick_items(update_items, is_update_item=True)
         update_items_dict = self.__pick_items(update_items, is_update_item=True)
@@ -338,10 +338,6 @@ class ItemBuffer(threading.Thread):
                 export_success = False
                 export_success = False
                 failed_items["add"].append({"table": table, "datas": datas})
                 failed_items["add"].append({"table": table, "datas": datas})
 
 
-        # 批量推送采集任务进入采集队列(优先级必须低于item批量入库)
-        if task_items:
-            self._rabbitmq.add(self._tab_items, task_items)
-
         # 执行批量update
         # 执行批量update
         while update_items_dict:
         while update_items_dict:
             table, datas = update_items_dict.popitem()
             table, datas = update_items_dict.popitem()
@@ -402,10 +398,9 @@ class ItemBuffer(threading.Thread):
 
 
             if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
             if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES:
                 if self._redis_key != "air_spider":
                 if self._redis_key != "air_spider":
-                    # 添加item失败标识
-                    failed_items[self._redis_key] = ''
-                    # 失败的item记录到rabbitmq
-                    self._rabbitmq.add(self._tab_failed_items, failed_items)
+                    failed_items[self._redis_key] = ''  # 添加item失败标识
+                    # 记录失败的item
+                    self._rabbitmq.add_batch(self._tab_failed_items, failed_items)
 
 
                     # 删除做过的request
                     # 删除做过的request
                     if requests:
                     if requests:
@@ -425,7 +420,7 @@ class ItemBuffer(threading.Thread):
                     tip.append("不执行回调")
                     tip.append("不执行回调")
                 if requests:
                 if requests:
                     tip.append("不删除任务")
                     tip.append("不删除任务")
-                    self._rabbitmq.add(self._tab_requests, requests)
+                    self._rabbitmq.add_batch(self._tab_requests, requests)
 
 
                 if setting.ITEM_FILTER_ENABLE:
                 if setting.ITEM_FILTER_ENABLE:
                     tip.append("数据不入去重库")
                     tip.append("数据不入去重库")