# -*- coding: utf-8 -*- """ Created on 2023-11-02 --------- @summary: 心跳管理器 负责缓冲添加到数据库中的item,由该manager统一添加。防止多线程同时访问数据库 --------- @author: dzr """ import threading from queue import Queue import feapder.setting as setting import feapder.utils.tools as tools from feapder.network.item import HeartBeatItem from feapder.pipelines import BasePipeline from feapder.utils.log import log MAX_ITEM_COUNT = 5000 # 缓存中最大item数 UPLOAD_BATCH_MAX_SIZE = 1000 class HeartBeatBuffer(threading.Thread): # 聚合汇总,因为线程切换而导致数据汇总结果不一致,此时汇总本次结果,并记录推送结果,用于下次计算 _prev_success_task_count = 0 _prev_failed_task_count = 0 def __init__(self, redis_key=None): if not hasattr(self, "_items_queue"): super(HeartBeatBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._redis_key = redis_key self._items_queue = Queue(maxsize=MAX_ITEM_COUNT) self._item_tables = { # 'item_name': 'table_name' # 缓存item名与表名对应关系 } self._pipelines = self.load_pipelines() def load_pipelines(self): pipelines = [] for pipeline_path in setting.ITEM_PIPELINES: pipeline = tools.import_cls(pipeline_path)() if not isinstance(pipeline, BasePipeline): raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline") pipelines.append(pipeline) return pipelines def run(self): self._thread_stop = False while not self._thread_stop: self.flush() tools.delay_time(1) self.close() def stop(self): self._thread_stop = True self._started.clear() def put_item(self, item): if isinstance(item, HeartBeatItem): self._items_queue.put(item) def flush(self): try: heartbeat_items = [] need_aggregate_items = [] data_count = 0 while not self._items_queue.empty(): data = self._items_queue.get_nowait() data_count += 1 business_type = data.business_type if business_type and str(business_type).endswith("Detail"): need_aggregate_items.append(data) else: heartbeat_items.append(data) if data_count >= UPLOAD_BATCH_MAX_SIZE: self.__add_item_to_db(heartbeat_items, need_aggregate_items) heartbeat_items = [] need_aggregate_items = [] data_count = 0 if data_count: self.__add_item_to_db(heartbeat_items, need_aggregate_items) except Exception as e: log.exception(e) def get_items_count(self): return self._items_queue.qsize() def is_adding_to_db(self): return self._is_adding_to_db def __pick_items(self, items, is_aggregate=False): """ 将每个表之间的数据分开 拆分后 原items为空 @param items: @param is_aggregate: 是否需要聚合汇总数据 @return: """ datas_dict = { # 'table_name': [{}, {}] } while items: item = items.pop(0) # 取item下划线格式的名 # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度 item_name = item.item_name table_name = self._item_tables.get(item_name) if not table_name: table_name = item.table_name self._item_tables[item_name] = table_name if table_name not in datas_dict: datas_dict[table_name] = [] datas_dict[table_name].append(item.to_dict) if is_aggregate: aggregate_data_dict = { # 'table_name': [{}, {}] } for table_name, datas in datas_dict.items(): latest = datas[-1] latest['rel_count'] = sum([item['rel_count'] for item in datas]) # 请求失败次数 max_failed_data_dict = max(datas, key=lambda x: x.get("failed_task_count", 0)) failed_task_count = max_failed_data_dict["failed_task_count"] - self._prev_failed_task_count self._prev_failed_task_count = max_failed_data_dict["failed_task_count"] latest['failed_task_count'] = failed_task_count # 请求成功次数 max_success_data_dict = max(datas, key=lambda x: x.get("success_task_count", 0)) success_task_count = max_success_data_dict["success_task_count"] - self._prev_success_task_count self._prev_success_task_count = max_success_data_dict["success_task_count"] latest['success_task_count'] = success_task_count # 总请求次数 latest['count'] = failed_task_count + success_task_count if table_name not in aggregate_data_dict: aggregate_data_dict[table_name] = [latest] datas_dict = aggregate_data_dict return datas_dict def __export_to_db(self, table, datas): for pipeline in self._pipelines: if not pipeline.save_items(table, datas): log.error( f"{pipeline.__class__.__name__} 保存心跳失败. table: {table} items: {datas}" ) return False return True def __add_item_to_db(self, items, aggregate_items): self._is_adding_to_db = True # 分捡 items_dict = self.__pick_items(items) aggregate_dict = self.__pick_items(aggregate_items, is_aggregate=True) # heartbeat_item批量入库 while items_dict: table, datas = items_dict.popitem() log.debug( """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) self.__export_to_db(table, datas) while aggregate_dict: table, datas = aggregate_dict.popitem() log.debug( """ -------------- item 批量入库 -------------- 表名: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) self.__export_to_db(table, datas) self._is_adding_to_db = False def close(self): # 调用pipeline的close方法 for pipeline in self._pipelines: try: pipeline.close() except: pass