Browse Source

新增心跳管理器

dongzhaorui 1 year ago
parent
commit
102f00d195

+ 214 - 0
FworkSpider/feapder/buffer/heartbeat_buffer.py

@@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-11-02
+---------
+@summary: 心跳管理器 负责缓冲添加到数据库中的item,由该manager统一添加。防止多线程同时访问数据库
+---------
+@author: dzr
+"""
+
+import threading
+from queue import Queue
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.network.item import HeartBeatItem
+from feapder.pipelines import BasePipeline
+from feapder.utils.log import log
+
+MAX_ITEM_COUNT = 5000  # 缓存中最大item数
+UPLOAD_BATCH_MAX_SIZE = 1000
+
+
+class HeartBeatBuffer(threading.Thread):
+
+    # 聚合汇总,因为线程切换而导致数据汇总结果不一致,此时汇总本次结果,并记录推送结果,用于下次计算
+    _prev_success_task_count = 0
+    _prev_failed_task_count = 0
+
+    def __init__(self, redis_key):
+        if not hasattr(self, "_items_queue"):
+            super(HeartBeatBuffer, self).__init__()
+
+            self._thread_stop = False
+            self._is_adding_to_db = False
+            self._redis_key = redis_key
+
+            self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
+
+            self._item_tables = {
+                # 'item_name': 'table_name' # 缓存item名与表名对应关系
+            }
+
+            self._pipelines = self.load_pipelines()
+
+    def load_pipelines(self):
+        pipelines = []
+        for pipeline_path in setting.ITEM_PIPELINES:
+            pipeline = tools.import_cls(pipeline_path)()
+            if not isinstance(pipeline, BasePipeline):
+                raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
+            pipelines.append(pipeline)
+
+        return pipelines
+
+    def run(self):
+        self._thread_stop = False
+        while not self._thread_stop:
+            self.flush()
+            tools.delay_time(1)
+
+        self.close()
+
+    def stop(self):
+        self._thread_stop = True
+        self._started.clear()
+
+    def put_item(self, item):
+        if isinstance(item, HeartBeatItem):
+            self._items_queue.put(item)
+
+    def flush(self):
+        try:
+            heartbeat_items = []
+            need_aggregate_items = []
+            data_count = 0
+
+            while not self._items_queue.empty():
+                data = self._items_queue.get_nowait()
+                data_count += 1
+
+                business_type = data.business_type
+                if business_type and str(business_type).endswith("Detail"):
+                    need_aggregate_items.append(data)
+                else:
+                    heartbeat_items.append(data)
+
+                if data_count >= UPLOAD_BATCH_MAX_SIZE:
+                    self.__add_item_to_db(heartbeat_items, need_aggregate_items)
+
+                    heartbeat_items = []
+                    need_aggregate_items = []
+                    data_count = 0
+
+            if data_count:
+                self.__add_item_to_db(heartbeat_items, need_aggregate_items)
+
+        except Exception as e:
+            log.exception(e)
+
+    def get_items_count(self):
+        return self._items_queue.qsize()
+
+    def is_adding_to_db(self):
+        return self._is_adding_to_db
+
+    def __pick_items(self, items, is_aggregate=False):
+        """
+        将每个表之间的数据分开 拆分后 原items为空
+        @param items:
+        @param is_aggregate: 是否需要聚合汇总数据
+        @return:
+        """
+        datas_dict = {
+            # 'table_name': [{}, {}]
+        }
+
+        while items:
+            item = items.pop(0)
+            # 取item下划线格式的名
+            # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
+            item_name = item.item_name
+            table_name = self._item_tables.get(item_name)
+            if not table_name:
+                table_name = item.table_name
+                self._item_tables[item_name] = table_name
+
+            if table_name not in datas_dict:
+                datas_dict[table_name] = []
+
+            datas_dict[table_name].append(item.to_dict)
+
+        if is_aggregate:
+            aggregate_data_dict = {
+                # 'table_name': [{}, {}]
+            }
+
+            for table_name, datas in datas_dict.items():
+                latest = datas[-1]
+                latest['rel_count'] = sum([item['rel_count'] for item in datas])
+                # 请求失败次数
+                max_failed_data_dict = max(datas, key=lambda x: x.get("failed_task_count", 0))
+                failed_task_count = max_failed_data_dict["failed_task_count"] - self._prev_failed_task_count
+                print(f'失败数量:{max_failed_data_dict["failed_task_count"]}')
+                self._prev_failed_task_count = max_failed_data_dict["failed_task_count"]
+                latest['failed_task_count'] = failed_task_count
+                # 请求成功次数
+                max_success_data_dict = max(datas, key=lambda x: x.get("success_task_count", 0))
+                success_task_count = max_success_data_dict["success_task_count"] - self._prev_success_task_count
+                print(f'成功数量:{max_success_data_dict["success_task_count"]}')
+                self._prev_success_task_count = max_success_data_dict["success_task_count"]
+                latest['success_task_count'] = success_task_count
+
+                if table_name not in aggregate_data_dict:
+                    aggregate_data_dict[table_name] = [latest]
+
+            datas_dict = aggregate_data_dict
+
+        return datas_dict
+
+    def __export_to_db(self, table, datas):
+        for pipeline in self._pipelines:
+            if not pipeline.save_items(table, datas):
+                log.error(
+                    f"{pipeline.__class__.__name__} 保存心跳失败. table: {table}  items: {datas}"
+                )
+                return False
+
+        return True
+
+    def __add_item_to_db(self, items, aggregate_items):
+        self._is_adding_to_db = True
+
+        # 分捡
+        items_dict = self.__pick_items(items)
+        aggregate_dict = self.__pick_items(aggregate_items, is_aggregate=True)
+
+        # heartbeat_item批量入库
+        while items_dict:
+            table, datas = items_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            self.__export_to_db(table, datas)
+
+        while aggregate_dict:
+            table, datas = aggregate_dict.popitem()
+
+            log.debug(
+                """
+                -------------- item 批量入库 --------------
+                表名: %s
+                datas: %s
+                    """
+                % (table, tools.dumps_json(datas, indent=16))
+            )
+
+            self.__export_to_db(table, datas)
+
+        self._is_adding_to_db = False
+
+    def close(self):
+        # 调用pipeline的close方法
+        for pipeline in self._pipelines:
+            try:
+                pipeline.close()
+            except:
+                pass

+ 45 - 129
FworkSpider/feapder/core/parser_control.py

@@ -17,7 +17,7 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
 from feapder.db.memory_db import MemoryDB
-from feapder.network.item import Item
+from feapder.network.item import Item, HeartBeatItem
 from feapder.network.request import Request
 from feapder.utils import metrics
 from feapder.utils.log import log
@@ -35,16 +35,24 @@ class PaserControl(threading.Thread):
     _success_task_count = 0
     _failed_task_count = 0
 
-    def __init__(self, collector, redis_key, request_buffer, item_buffer):
+    def __init__(self, collector, redis_key, request_buffer, item_buffer, heartbeat_buffer):
         super(PaserControl, self).__init__()
         self._parsers = []
         self._collector = collector
         self._redis_key = redis_key
         self._request_buffer = request_buffer
         self._item_buffer = item_buffer
+        self._heartbeat_buffer = heartbeat_buffer
 
         self._thread_stop = False
 
+    def is_not_task(self):
+        return self.is_show_tip
+
+    @classmethod
+    def get_task_status_count(cls):
+        return cls._failed_task_count, cls._success_task_count
+
     def run(self):
         self._thread_stop = False
         while not self._thread_stop:
@@ -61,25 +69,17 @@ class PaserControl(threading.Thread):
             except (Exception, BaseException) as e:
                 log.exception(e)
 
-    def is_not_task(self):
-        return self.is_show_tip
-
-    @classmethod
-    def get_task_status_count(cls):
-        return cls._failed_task_count, cls._success_task_count
-
     def deal_request(self, request):
         response = None
         request_redis = request["request_redis"]
         request = request["request_obj"]
+        now_page = request.page or -1
 
-        is_sent_heartbeat = False  # 发送心跳的标识
-        heartbeat_lst = []  # 待推送的心跳信息列表
         for parser in self._parsers:
-            now_page = request.page or -1
             counter = {
-                'realQuantity': 0,  # 去重后实际入库数量
-                'extractQuantity': 0,  # 列表页抽取的列表数量
+                'now_page': now_page,
+                'extract_count': 0,  # 列表页抽取的列表数量
+                'rel_count': 0,  # 去重后实际入库数量
             }
             if parser.name == request.parser_name:
                 used_download_midware_enable = False
@@ -210,9 +210,9 @@ class PaserControl(threading.Thread):
                             if "List" in parser.__business_type__ and hasattr(result, 'contenthtml'):
                                 result.is_mixed = True
 
-                            counter['extractQuantity'] += 1  # 统计抽取列表数
+                            counter['extract_count'] += 1  # 统计抽取列表数
                             if not self.is_duplicate(result):
-                                counter['realQuantity'] += 1  # 统计实际列表数
+                                counter['rel_count'] += 1  # 统计实际列表数
 
                             # 将item入库(异步)
                             self._item_buffer.put_item(result)
@@ -239,9 +239,6 @@ class PaserControl(threading.Thread):
                                 f"{function_name} result expect Request、Item or callback, but get type: {type(result)}"
                             )
 
-                        # 发送心跳的条件
-                        is_sent_heartbeat = True
-
                 except (Exception, BaseException) as e:
                     exception_type = (
                         str(type(e)).replace("<class '", "").replace("'>", "")
@@ -339,7 +336,6 @@ class PaserControl(threading.Thread):
                                 elif isinstance(result, Item):
                                     self._item_buffer.put_item(result)
 
-                            is_sent_heartbeat = True
                         else:
                             # 将 requests 重新入库 爬取
                             request.retry_times += 1
@@ -394,25 +390,10 @@ class PaserControl(threading.Thread):
                     if response and hasattr(response, "browser"):
                         request._webdriver_pool.put(response.browser)
 
-                    # 收集爬虫心跳
-                    if hasattr(parser, "__business_type__"):
-                        heartbeat_lst.append(dict(
-                            parser=parser,
-                            now_page=now_page,
-                            extract_count=counter['extractQuantity'],
-                            rel_count=counter['realQuantity'],
-                            request=request,
-                            response=response,
-                            filepath=str(pathlib.Path(setting.sys.argv[0])),
-                        ))
-
+                    # 发布心跳
+                    self.publish_heartbeat(parser, request, **counter)
                 break
 
-        # 发送心跳
-        if is_sent_heartbeat:
-            for heartbeat in heartbeat_lst:
-                self.spider_heartbeat(**heartbeat)
-
         if setting.SPIDER_SLEEP_TIME:
             if (
                 isinstance(setting.SPIDER_SLEEP_TIME, (tuple, list))
@@ -447,105 +428,40 @@ class PaserControl(threading.Thread):
                 return True
         return False
 
-    def sent_heartbeat(self, items, table=None):
-        """发送汇总采集详情"""
-        send_success = True
-        items = items if isinstance(items, list) else [items]
-        log.debug("发送心跳")
-        table = table or setting.SPIDER_HEARTBEAT_RECORD
-        if not self._item_buffer.export_to_db(table, items):
-            send_success = False
-            log.error("失败心跳:\n {}".format(tools.dumps_json(items)))
-        return send_success
-
-    @staticmethod
-    def get_spider_attribute(name, *args):
-        """获取对象属性"""
-        arg1, arg2 = args or (None, None)
-
-        val = None
-        if arg1 is not None:
-            if isinstance(arg1, dict):
-                val = arg1.get(name)
-                if not val and name == "spidercode":
-                    val = arg1.get("code")
-            else:
-                val = getattr(arg1, name, None)
-
-        if not val and arg2 is not None:
-            val = getattr(arg2, name, None)
-
-        return val if val is not None else ""
-
-    def spider_heartbeat(self, request, response, **kwargs):
-        """爬虫心跳"""
-        parser = kwargs["parser"]
-        now_page = kwargs["now_page"]
-        extract_count = kwargs["extract_count"]
-        request_count = sum(self.get_task_status_count())  # 采集任务总数(本次爬虫运行时发起的总请求数)
-        rel_count = kwargs["rel_count"]
-        filepath = kwargs["filepath"]
-        status_code = getattr(response, "status_code", -1)
-
-        spider_info = getattr(request, "item", {})
-        site = self.get_spider_attribute("site", spider_info, parser)
-        channel = self.get_spider_attribute("channel", spider_info, parser)
-        code = self.get_spider_attribute("spidercode", spider_info, parser)
-
+    def publish_heartbeat(self, parser, request, **kwargs):
+        request_item = getattr(request, "item")
         business_type: str = parser.__business_type__  # 爬虫业务类型
+        if business_type.endswith("List"):
+            site = getattr(parser, "site")
+            spidercode = request_item["code"]
+            count = kwargs["extract_count"]
+        else:
+            site = request_item["site"]
+            spidercode = request_item["spidercode"]
+            count = 0
+
         run_time = tools.get_current_date(date_format="%Y-%m-%d")  # 运行时间,单位:天
-        spider_id = tools.get_md5(code + business_type + run_time)
-        heartbeat_content = dict(
+        heartbeat_item = HeartBeatItem(
             node_ip=tools.os.environ.get("CRAWLAB_SERVER_REGISTER_IP"),  # crawlab节点名称
             crawlab_taskid=tools.os.environ.get("CRAWLAB_TASK_ID"),  # crawlab平台爬虫的任务id
             site=site,
-            channel=channel,
-            spidercode=code,
-            url=request.url,  # 访问地址
-            status_code=status_code,  # 响应状态码
-            runtime=run_time,
+            channel=request_item["channel"],
+            spidercode=spidercode,
             business_type=business_type,
-            spider_id=spider_id,
-            filepath=filepath,  # 文件路径
-            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 执行时间, 单位:秒
+            spider_id=tools.get_md5(spidercode + business_type + run_time),
+            filepath=str(pathlib.Path(setting.sys.argv[0])),  # 文件路径
+            runtime=run_time,
+            nowpage=kwargs["now_page"],  # 当前列表页页码
+            count=count,  # 抽取的列表数量
+            rel_count=kwargs["rel_count"],  # 实际入库总数
+            failed_task_count=self._failed_task_count,
+            success_task_count=self._success_task_count,
+            create_at=tools.ensure_int64(tools.get_current_timestamp()),  # 创建时间, 单位:秒
+            expire_at=tools.get_utcnow(),  # 设置utc时间,定期删除(5天)
         )
-
-        if hasattr(request, "error_msg") and status_code != 200:
-            error = getattr(request, "error_msg")
-            feature = dict(
-                err_type=str(error.split(": ")[0]),
-                err_msg=getattr(request, "error_msg"),
-            )
-            feature.setdefault("request_success", False)
-            if business_type.endswith("List"):
-                feature.update(dict(nowpage=now_page, ))
-            else:
-                feature.update(dict(count=request_count, ))
-        else:
-            if business_type.endswith("List"):
-                # 列表页
-                list_feature = dict(
-                    nowpage=now_page,  # 当前页码
-                    count=extract_count,  # 列表提取总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = list_feature
-            else:
-                # 详情页
-                detail_feature = dict(
-                    count=request_count,  # 发起请求的总数
-                    rel_count=rel_count,  # 实际入库总数
-                )
-                feature = detail_feature
-            feature.setdefault("request_success", True)
-
-        feature.update({
-            'failed_task_count': self._failed_task_count,
-            'success_task_count': self._success_task_count
-        })
-        feature['expire_at'] = tools.get_utcnow()  # 设置utc时间,定期删除(5天)
-        heartbeat_content.update(feature)
-        return self.sent_heartbeat(heartbeat_content)
+        # 采集任务总数(本次爬虫运行发起的总请求数) failed_task_count + success_task_count
+        heartbeat_item.table_name = setting.SPIDER_HEARTBEAT_RECORD  # 设置表名
+        return self._heartbeat_buffer.put_item(heartbeat_item)
 
 
 class AirSpiderParserControl(PaserControl):

+ 14 - 0
FworkSpider/feapder/core/scheduler.py

@@ -15,6 +15,7 @@ import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.buffer.item_buffer import ItemBuffer
 from feapder.buffer.request_buffer import RequestBuffer
+from feapder.buffer.heartbeat_buffer import HeartBeatBuffer
 from feapder.core.base_parser import BaseParser
 from feapder.core.collector import Collector
 from feapder.core.handle_failed_items import HandleFailedItems
@@ -76,6 +77,7 @@ class Scheduler(threading.Thread):
         self._request_buffer = RequestBuffer(redis_key)
         self._item_buffer = ItemBuffer(redis_key)
         self._collector = Collector(redis_key)
+        self._heartbeat_buffer = HeartBeatBuffer(redis_key)
 
         self._parsers = []
         self._parser_controls = []
@@ -148,6 +150,8 @@ class Scheduler(threading.Thread):
             )
             handle_failed_items.reput_failed_items_to_db()
 
+        self._heartbeat_buffer.start()  # 心跳管理器
+
         # STEP 3.1 开启 request_buffer -- 任务管理器 负责缓冲添加到数据库中的request
         self._request_buffer.start()
         # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
@@ -163,6 +167,7 @@ class Scheduler(threading.Thread):
                 self._redis_key,
                 self._request_buffer,
                 self._item_buffer,
+                self._heartbeat_buffer
             )
 
             for parser in self._parsers:  # step 3.5 把所有待执行任务添加到线程池
@@ -274,6 +279,13 @@ class Scheduler(threading.Thread):
             ):
                 return False
 
+            # 检测 heartbeat_buffer 状态
+            if (
+                self._heartbeat_buffer.get_items_count() > 0
+                or self._heartbeat_buffer.is_adding_to_db()
+            ):
+                return False
+
             tools.delay_time(1)  # 休眠1秒
 
         return True
@@ -345,6 +357,8 @@ class Scheduler(threading.Thread):
         for parser_control in self._parser_controls:
             parser_control.stop()
 
+        # 关闭心跳管理
+        self._heartbeat_buffer.stop()
         # 记录爬虫停止时间
         self._started.clear()
 

+ 6 - 0
FworkSpider/feapder/network/item.py

@@ -264,3 +264,9 @@ class FailedTaskItem(Item):
     def __init__(self, **kwargs):
         super(FailedTaskItem, self).__init__(**kwargs)
         self.pyuuid = self.__dict__['pyuuid']
+
+
+class HeartBeatItem(Item):
+
+    def __init__(self, **kwargs):
+        super(HeartBeatItem, self).__init__(**kwargs)