Browse Source

爬虫新增多服务实例任务查询函数方法

dongzhaorui 1 year ago
parent
commit
782961b36d
1 changed files with 24 additions and 19 deletions
  1. 24 19
      FworkSpider/feapder/core/spiders/spider.py

+ 24 - 19
FworkSpider/feapder/core/spiders/spider.py

@@ -15,7 +15,7 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 import feapder.utils.tools as tools
 from feapder.core.base_parser import BaseParser
 from feapder.core.base_parser import BaseParser
 from feapder.core.scheduler import Scheduler
 from feapder.core.scheduler import Scheduler
-from feapder.network.item import Item
+from feapder.network.item import Item, FailedTaskItem
 from feapder.network.request import Request
 from feapder.network.request import Request
 from feapder.utils.log import log
 from feapder.utils.log import log
 
 
@@ -292,28 +292,15 @@ class BaseBusinessDetailSpider(Spider):
 
 
     def failed_request(self, request, response):
     def failed_request(self, request, response):
         """请求、解析错误次数超过上限后,记录错误详情信息"""
         """请求、解析错误次数超过上限后,记录错误详情信息"""
-        failed_task_items = request.item  # 采集失败的任务信息
-        failed_times = int(failed_task_items.pop("failed_times", 0)) + 1
-        failed_items = dict(
-            state=3,  # 待采集任务失败采集状态[3=采集失败]
-            failed_times=failed_times,
+        failed_item = FailedTaskItem(
             reason=getattr(request, "error_msg", ""),
             reason=getattr(request, "error_msg", ""),
             status_code=getattr(response, "status_code", -1),
             status_code=getattr(response, "status_code", -1),
-            create_at=tools.ensure_int64(tools.get_current_timestamp()),
-            **failed_task_items,
+            **request.item,  # 请求失败的任务详情
         )
         )
-        if 'queue_name' not in failed_items:
-            failed_items['queue_name'] = setting.TAB_ITEMS.format(
-                redis_key=self._redis_key.replace('_detailc', '')
-            )
-
-        def callback():
-            # 推送采集失败任务信息
-            self._rabbitmq.add(setting.TASK_CRAWL_STATE, failed_items)
-
-        yield callback
+        failed_item.table_name = setting.TASK_REQUEST_FAILED
+        yield failed_item
 
 
-    def consume_tasks(self, limit=None, auto_ack=True):
+    def get_tasks_by_rabbitmq(self, limit=None, auto_ack=True):
         """
         """
 
 
         @param limit: 获取消息数量
         @param limit: 获取消息数量
@@ -333,6 +320,24 @@ class BaseBusinessDetailSpider(Spider):
                 task_lst.append(body)
                 task_lst.append(body)
         return task_lst
         return task_lst
 
 
+    def get_tasks_by_mongodb(self, table=None, query=None, limit=None):
+        pipeline_path = "feapder.pipelines.mongo_pipeline.TaskPipeline"
+        pipeline = tools.import_cls(pipeline_path)()
+        table = table or setting.TASK_REQUEST_PRODUCE
+        queue_name = setting.TAB_ITEMS.format(
+            redis_key=self._redis_key.replace('_detailc', '')
+        )
+        conditions = query or {
+            'state': {'$in': [1, 3]},
+            'queue_name': queue_name,
+            'update_at': {'$lt': tools.get_current_timestamp()}
+        }
+        limit = limit or setting.COLLECTOR_TASK_COUNT
+        results = pipeline.find_items(table, conditions, limit)
+        ignore = {'_id', 'state', 'update_at', 'queue_name'}
+        task_lst = [{k: v for k, v in items.items() if k not in ignore} for items in results]
+        return task_lst
+
 
 
 class BiddingListSpider(BaseBusinessListSpider):
 class BiddingListSpider(BaseBusinessListSpider):
     """标讯列表页采集爬虫"""
     """标讯列表页采集爬虫"""