123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-11-02
- ---------
- @summary: 心跳管理器 负责缓冲添加到数据库中的item,由该manager统一添加。防止多线程同时访问数据库
- ---------
- @author: dzr
- """
- import threading
- from queue import Queue
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.network.item import HeartBeatItem
- from feapder.pipelines import BasePipeline
- from feapder.utils.log import log
- MAX_ITEM_COUNT = 5000 # 缓存中最大item数
- UPLOAD_BATCH_MAX_SIZE = 1000
- class HeartBeatBuffer(threading.Thread):
- # 聚合汇总,因为线程切换而导致数据汇总结果不一致,此时汇总本次结果,并记录推送结果,用于下次计算
- _prev_success_task_count = 0
- _prev_failed_task_count = 0
- def __init__(self, redis_key=None):
- if not hasattr(self, "_items_queue"):
- super(HeartBeatBuffer, self).__init__()
- self._thread_stop = False
- self._is_adding_to_db = False
- self._redis_key = redis_key
- self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
- self._item_tables = {
- # 'item_name': 'table_name' # 缓存item名与表名对应关系
- }
- self._pipelines = self.load_pipelines()
- def load_pipelines(self):
- pipelines = []
- for pipeline_path in setting.ITEM_PIPELINES:
- pipeline = tools.import_cls(pipeline_path)()
- if not isinstance(pipeline, BasePipeline):
- raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
- pipelines.append(pipeline)
- return pipelines
- def run(self):
- self._thread_stop = False
- while not self._thread_stop:
- self.flush()
- tools.delay_time(1)
- self.close()
- def stop(self):
- self._thread_stop = True
- self._started.clear()
- def put_item(self, item):
- if isinstance(item, HeartBeatItem):
- self._items_queue.put(item)
- def flush(self):
- try:
- heartbeat_items = []
- need_aggregate_items = []
- data_count = 0
- while not self._items_queue.empty():
- data = self._items_queue.get_nowait()
- data_count += 1
- business_type = data.business_type
- if business_type and str(business_type).endswith("Detail"):
- need_aggregate_items.append(data)
- else:
- heartbeat_items.append(data)
- if data_count >= UPLOAD_BATCH_MAX_SIZE:
- self.__add_item_to_db(heartbeat_items, need_aggregate_items)
- heartbeat_items = []
- need_aggregate_items = []
- data_count = 0
- if data_count:
- self.__add_item_to_db(heartbeat_items, need_aggregate_items)
- except Exception as e:
- log.exception(e)
- def get_items_count(self):
- return self._items_queue.qsize()
- def is_adding_to_db(self):
- return self._is_adding_to_db
- def __pick_items(self, items, is_aggregate=False):
- """
- 将每个表之间的数据分开 拆分后 原items为空
- @param items:
- @param is_aggregate: 是否需要聚合汇总数据
- @return:
- """
- datas_dict = {
- # 'table_name': [{}, {}]
- }
- while items:
- item = items.pop(0)
- # 取item下划线格式的名
- # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
- item_name = item.item_name
- table_name = self._item_tables.get(item_name)
- if not table_name:
- table_name = item.table_name
- self._item_tables[item_name] = table_name
- if table_name not in datas_dict:
- datas_dict[table_name] = []
- datas_dict[table_name].append(item.to_dict)
- if is_aggregate:
- aggregate_data_dict = {
- # 'table_name': [{}, {}]
- }
- for table_name, datas in datas_dict.items():
- latest = datas[-1]
- latest['rel_count'] = sum([item['rel_count'] for item in datas])
- # 请求失败次数
- max_failed_data_dict = max(datas, key=lambda x: x.get("failed_task_count", 0))
- failed_task_count = max_failed_data_dict["failed_task_count"] - self._prev_failed_task_count
- self._prev_failed_task_count = max_failed_data_dict["failed_task_count"]
- latest['failed_task_count'] = failed_task_count
- # 请求成功次数
- max_success_data_dict = max(datas, key=lambda x: x.get("success_task_count", 0))
- success_task_count = max_success_data_dict["success_task_count"] - self._prev_success_task_count
- self._prev_success_task_count = max_success_data_dict["success_task_count"]
- latest['success_task_count'] = success_task_count
- # 总请求次数
- latest['count'] = failed_task_count + success_task_count
- if table_name not in aggregate_data_dict:
- aggregate_data_dict[table_name] = [latest]
- datas_dict = aggregate_data_dict
- return datas_dict
- def __export_to_db(self, table, datas):
- for pipeline in self._pipelines:
- if not pipeline.save_items(table, datas):
- log.error(
- f"{pipeline.__class__.__name__} 保存心跳失败. table: {table} items: {datas}"
- )
- return False
- return True
- def __add_item_to_db(self, items, aggregate_items):
- self._is_adding_to_db = True
- # 分捡
- items_dict = self.__pick_items(items)
- aggregate_dict = self.__pick_items(aggregate_items, is_aggregate=True)
- # heartbeat_item批量入库
- while items_dict:
- table, datas = items_dict.popitem()
- log.debug(
- """
- -------------- item 批量入库 --------------
- 表名: %s
- datas: %s
- """
- % (table, tools.dumps_json(datas, indent=16))
- )
- self.__export_to_db(table, datas)
- while aggregate_dict:
- table, datas = aggregate_dict.popitem()
- log.debug(
- """
- -------------- item 批量入库 --------------
- 表名: %s
- datas: %s
- """
- % (table, tools.dumps_json(datas, indent=16))
- )
- self.__export_to_db(table, datas)
- self._is_adding_to_db = False
- def close(self):
- # 调用pipeline的close方法
- for pipeline in self._pipelines:
- try:
- pipeline.close()
- except:
- pass
|