Sfoglia il codice sorgente

添加mq任务队列领取任务方法;删除MongoDB实例

dongzhaorui 1 anno fa
parent
commit
fc1b3a5649
1 ha cambiato i file con 23 aggiunte e 60 eliminazioni
  1. 23 60
      FworkSpider/feapder/core/spiders/spider.py

+ 23 - 60
FworkSpider/feapder/core/spiders/spider.py

@@ -15,7 +15,6 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.core.base_parser import BaseParser
 from feapder.core.scheduler import Scheduler
-from feapder.db.mongodb import MongoDB
 from feapder.network.item import Item
 from feapder.network.request import Request
 from feapder.utils.log import log
@@ -296,9 +295,6 @@ class BaseBusinessDetailSpider(Spider):
         ITEM_FILTER_ENABLE=False
     )
 
-    err_coll_name = "listdata_err"  # 详情采集失败时存放的详情任务数据的表
-    _to_db = None
-
     def __init__(
             self,
             redis_key=None,
@@ -325,53 +321,34 @@ class BaseBusinessDetailSpider(Spider):
             **kwargs
         )
 
-    def get_tasks(self, query, limit=10, is_delete=True, **kwargs):
-        """
-        领取采集任务
-
-        :param dict query: 查询条件
-        :param limit: 结果数量
-        :param is_delete: 取走的任务是否删除
-        :param kwargs
-            更多参数 https://docs.mongodb.com/manual/reference/command/find/#command-fields
-
-        :return: dict
-        """
-        if "sort" not in kwargs:
-            kwargs.setdefault("sort", {"_id": -1})
-
-        cursor = self.to_db.find(coll_name=self.db_name, condition=query, limit=limit, **kwargs)
-        for task in cursor:
-            yield task
-
-            if is_delete:
-                self.to_db.delete(coll_name=self.db_name, condition=task)
-
-    def create_failed_item(self, request, response, **kwargs):
-        result = kwargs.pop("origin_data")
-        failed_times = int(result.pop("failed_times", 0)) + 1
-        item = Item(origin_data=result, failed_times=failed_times)
-        item.table_name = self.err_coll_name
-        item.err_reason = getattr(request, "error_msg", "")
-        item.status_code = getattr(response, "status_code", -1)
-        item.create_at = tools.ensure_int64(tools.get_current_timestamp())
-        item.origin = self.db_name
-        item.spidercode = result["spidercode"]
-        return item
-
-    @property
-    def to_db(self):
-        if not self._to_db:
-            self._to_db = MongoDB()
-        return self._to_db
+    def failed_request(self, request, response):
+        """请求、解析错误次数超过上限后,添加failed字段,将错误信息重新保存至数据库"""
+        failed_info = request.list_info
+        failed_times = int(failed_info.pop("failed_times", 0)) + 1
+        failed_item = Item(
+            **failed_info,
+            failed_times=failed_times,
+            err_reason=getattr(request, "error_msg", ""),
+            status_code=getattr(response, "status_code", -1),
+            create_at=tools.ensure_int64(tools.get_current_timestamp())
+        )
+        failed_item.table_name = setting.TASK_FAILED
+        yield failed_item
 
     def consume_tasks(self, limit=100):
+        task_lst = []
         queue_name = setting.TAB_ITEMS.format(
             redis_key=self._redis_key.replace('_detailc', '')
         )
-        sizes = limit or setting.COLLECTOR_TASK_COUNT
-        messages = self._rabbitmq.get(queue_name, sizes, True, to_str=False)
-        return [message.body.to_dict for message in messages]
+        limit = limit or setting.COLLECTOR_TASK_COUNT
+        messages = self._rabbitmq.get(queue_name, limit, False, to_str=False)
+        for message in messages:
+            body = message.body
+            if isinstance(body, Item):
+                task_lst.append(body.to_dict)
+            else:
+                task_lst.append(body)
+        return task_lst
 
 
 class BiddingListSpider(BaseBusinessListSpider):
@@ -384,13 +361,6 @@ class BiddingDetailSpider(BaseBusinessDetailSpider):
     """标讯详情页采集业务类"""
 
     __business_type__ = "BiddingDetail"
-    db_name = "biding_list"
-
-    def failed_request(self, request, response):
-        """请求、解析错误次数超过上限后,将原信息重新保存至数据库,并修改failed字段"""
-        item = request.base_info if isinstance(request.base_info, dict) else request.base_info.to_dict
-        failed_item = self.create_failed_item(request, response, origin_data=item)
-        yield failed_item
 
 
 class PlanToBuildListSpider(BaseBusinessListSpider):
@@ -403,10 +373,3 @@ class PlanToBuildDetailSpider(BaseBusinessDetailSpider):
     """拟建详情页采集业务类"""
 
     __business_type__ = "PlanToBuildDetail"
-    db_name = "njpc_list"
-
-    def failed_request(self, request, response):
-        """请求、解析错误次数超过上限后,将原信息重新保存至数据库,并修改failed字段"""
-        item = request.item if isinstance(request.item, dict) else request.item.to_dict
-        failed_item = self.create_failed_item(request, response, origin_data=item)
-        yield failed_item