3
0
Эх сурвалжийг харах

更新采集心跳统计结果,添加成功任务数量、失败任务数量

dongzhaorui 1 жил өмнө
parent
commit
51829ba87d

+ 23 - 30
FworkSpider/feapder/core/parser_control.py

@@ -77,10 +77,10 @@ class PaserControl(threading.Thread):
         heartbeat_lst = []  # 待推送的心跳信息列表
         for parser in self._parsers:
             now_page = getattr(request, "page", -1)  # 当前访问页码
-            extract_count = 0  # 列表抽取总数量
-            task_count = 0  # 详情任务总数量
-            rel_count = 0  # 实际入库
-
+            counter = {
+                'realQuantity': 0,  # 去重后实际入库数量
+                'extractQuantity': 0,  # 列表页抽取的列表数
+            }
             if parser.name == request.parser_name:
                 used_download_midware_enable = False
                 try:
@@ -151,8 +151,6 @@ class PaserControl(threading.Thread):
                                 )
                             )
 
-                        task_count += 1  # 详情任务数量计数
-
                         if response == None:
                             raise Exception(
                                 "连接超时 url: %s" % (request.url or request_temp.url)
@@ -205,19 +203,17 @@ class PaserControl(threading.Thread):
                         elif isinstance(result, Item):
                             result_type = 2
 
-                            # 添加属性 - 混合采集
-                            result.is_mixed = parser.is_mix
-
-                            # 实际入库数量计数
+                            counter['extractQuantity'] += 1  # 统计抽取列表数
                             if not self.is_duplicate(result):
-                                rel_count += 1
+                                counter['realQuantity'] += 1  # 统计实际列表数
 
-                            request.rel_count = rel_count  # 实际入库数量
-
-                            # 添加属性 - 待采集任务队列名称(仅对详情采集添加队列名称,以便任务发布)
+                            # 添加属性 - 待采集任务队列名称(仅对采集列表生效,便以发布任务)
                             if isinstance(result, BaseListItem):
                                 result.queue_name = self._task_buffer._tab_items
 
+                            # 添加属性 - 混合采集
+                            result.is_mixed = parser.is_mix
+
                             # 将item入库(异步)
                             self._item_buffer.put_item(result)
                             # 推送任务到待采集队列(异步)
@@ -247,10 +243,6 @@ class PaserControl(threading.Thread):
 
                         # 发送心跳的条件
                         is_sent_heartbeat = True
-                        # 抽取列表数量计数
-                        if hasattr(parser, "__extract_count__"):
-                            extract_count += parser.get_extract_count()
-                            parser.reset_extract_count()
 
                 except (Exception, BaseException) as e:
                     exception_type = (
@@ -301,8 +293,6 @@ class PaserControl(threading.Thread):
                     if "Invalid URL" in str(e):
                         request.is_abandoned = True
 
-                    request.rel_count = 0  # 重置实际入库数
-
                     requests = parser.exception_request(request, response) or [request]
                     if not isinstance(requests, Iterable):
                         raise Exception(
@@ -411,9 +401,8 @@ class PaserControl(threading.Thread):
                         heartbeat_lst.append(dict(
                             parser=parser,
                             now_page=now_page,
-                            task_count=task_count,
-                            extract_count=extract_count,
-                            rel_count=rel_count,
+                            extract_count=counter['extractQuantity'],
+                            rel_count=counter['realQuantity'],
                             request=request,
                             response=response,
                             filepath=str(pathlib.Path(setting.sys.argv[0])),
@@ -495,15 +484,15 @@ class PaserControl(threading.Thread):
         parser = kwargs["parser"]
         now_page = kwargs["now_page"]
         extract_count = kwargs["extract_count"]
-        task_count = kwargs["task_count"]
+        request_count = sum(self.get_task_status_count())  # 采集任务总数(本次爬虫运行时发起的总请求数)
         rel_count = kwargs["rel_count"]
         filepath = kwargs["filepath"]
         status_code = getattr(response, "status_code", -1)
 
-        item = getattr(request, "list_info", {})
-        site = self.get_spider_attribute("site", item, parser)
-        channel = self.get_spider_attribute("channel", item, parser)
-        code = self.get_spider_attribute("spidercode", item, parser)
+        spider_info = getattr(request, "item", {})
+        site = self.get_spider_attribute("site", spider_info, parser)
+        channel = self.get_spider_attribute("channel", spider_info, parser)
+        code = self.get_spider_attribute("spidercode", spider_info, parser)
 
         business_type: str = parser.__business_type__  # 爬虫业务类型
         run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
@@ -533,7 +522,7 @@ class PaserControl(threading.Thread):
             if business_type.endswith("List"):
                 feature.update(dict(nowpage=now_page, ))
             else:
-                feature.update(dict(count=task_count, ))
+                feature.update(dict(count=request_count, ))
         else:
             if business_type.endswith("List"):
                 # 列表页
@@ -546,12 +535,16 @@ class PaserControl(threading.Thread):
             else:
                 # 详情页
                 detail_feature = dict(
-                    count=task_count,  # 发起请求的总数
+                    count=request_count,  # 发起请求的总数
                     rel_count=rel_count,  # 实际入库总数
                 )
                 feature = detail_feature
             feature.setdefault("request_success", True)
 
+        feature.update({
+            'failed_task_count': self._failed_task_count,
+            'success_task_count': self._success_task_count
+        })
         feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
         heartbeat_content.update(feature)
         return self.sent_heartbeat(heartbeat_content)

+ 7 - 20
FworkSpider/feapder/core/spiders/spider.py

@@ -229,19 +229,13 @@ class BaseBusinessListSpider(Spider):
     """列表页采集基础爬虫"""
 
     __business_type__ = "List"
-    __extract_count__ = 0
 
-    @classmethod
-    def get_extract_count(cls):
-        return cls.__extract_count__
 
-    @classmethod
-    def reset_extract_count(cls):
-        cls.__extract_count__ = 0
+class MixBusinessSpider(BaseBusinessListSpider):
+    """混采(列表页+详情页)采集基础爬虫"""
 
-    @classmethod
-    def increment_extract_count(cls):
-        cls.__extract_count__ += 1
+    def is_mix(self):
+        return True
 
 
 class BaseBusinessDetailSpider(Spider):
@@ -280,15 +274,15 @@ class BaseBusinessDetailSpider(Spider):
 
     def failed_request(self, request, response):
         """请求、解析错误次数超过上限后,记录错误详情信息"""
-        failed_request_info = request.list_info
-        failed_times = int(failed_request_info.pop("failed_times", 0)) + 1
+        failed_task_items = request.item  # 采集失败的任务信息
+        failed_times = int(failed_task_items.pop("failed_times", 0)) + 1
         failed_items = dict(
             state=3,  # 待采集任务失败采集状态[3=采集失败]
             failed_times=failed_times,
             reason=getattr(request, "error_msg", ""),
             status_code=getattr(response, "status_code", -1),
             create_at=tools.ensure_int64(tools.get_current_timestamp()),
-            **failed_request_info,
+            **failed_task_items,
         )
         if 'queue_name' not in failed_items:
             failed_items['queue_name'] = setting.TAB_ITEMS.format(
@@ -322,13 +316,6 @@ class BaseBusinessDetailSpider(Spider):
         return task_lst
 
 
-class MixBusinessSpider(BaseBusinessListSpider):
-    """混采(列表页+详情页)采集基础爬虫"""
-
-    def is_mix(self):
-        return True
-
-
 class BiddingListSpider(BaseBusinessListSpider):
     """标讯列表页采集爬虫"""
 

+ 2 - 3
FworkSpider/feapder/network/request.py

@@ -92,7 +92,6 @@ class Request(object):
         render_time=0,
         splash=False,
         iframes=0,
-        rel_count=0,
         **kwargs,
     ):
         """
@@ -148,10 +147,10 @@ class Request(object):
         self.download_midware = download_midware
         self.is_abandoned = is_abandoned
         self.render = render
+        self.render_time = render_time or setting.WEBDRIVER.get("render_time", 0)
+
         self.splash = splash
         self.iframes = iframes
-        self.rel_count = rel_count
-        self.render_time = render_time or setting.WEBDRIVER.get("render_time", 0)
 
         self.requests_kwargs = {}
         for key, value in kwargs.items():