heartbeat_buffer.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-11-02
  4. ---------
  5. @summary: 心跳管理器 负责缓冲添加到数据库中的item,由该manager统一添加。防止多线程同时访问数据库
  6. ---------
  7. @author: dzr
  8. """
  9. import threading
  10. from queue import Queue
  11. import feapder.setting as setting
  12. import feapder.utils.tools as tools
  13. from feapder.network.item import HeartBeatItem
  14. from feapder.pipelines import BasePipeline
  15. from feapder.utils.log import log
  16. MAX_ITEM_COUNT = 5000 # 缓存中最大item数
  17. UPLOAD_BATCH_MAX_SIZE = 1000
  18. class HeartBeatBuffer(threading.Thread):
  19. # 聚合汇总,因为线程切换而导致数据汇总结果不一致,此时汇总本次结果,并记录推送结果,用于下次计算
  20. _prev_success_task_count = 0
  21. _prev_failed_task_count = 0
  22. def __init__(self, redis_key=None):
  23. if not hasattr(self, "_items_queue"):
  24. super(HeartBeatBuffer, self).__init__()
  25. self._thread_stop = False
  26. self._is_adding_to_db = False
  27. self._redis_key = redis_key
  28. self._items_queue = Queue(maxsize=MAX_ITEM_COUNT)
  29. self._item_tables = {
  30. # 'item_name': 'table_name' # 缓存item名与表名对应关系
  31. }
  32. self._pipelines = self.load_pipelines()
  33. def load_pipelines(self):
  34. pipelines = []
  35. for pipeline_path in setting.ITEM_PIPELINES:
  36. pipeline = tools.import_cls(pipeline_path)()
  37. if not isinstance(pipeline, BasePipeline):
  38. raise ValueError(f"{pipeline_path} 需继承 feapder.pipelines.BasePipeline")
  39. pipelines.append(pipeline)
  40. return pipelines
  41. def run(self):
  42. self._thread_stop = False
  43. while not self._thread_stop:
  44. self.flush()
  45. tools.delay_time(1)
  46. self.close()
  47. def stop(self):
  48. self._thread_stop = True
  49. self._started.clear()
  50. def put_item(self, item):
  51. if isinstance(item, HeartBeatItem):
  52. self._items_queue.put(item)
  53. def flush(self):
  54. try:
  55. heartbeat_items = []
  56. need_aggregate_items = []
  57. data_count = 0
  58. while not self._items_queue.empty():
  59. data = self._items_queue.get_nowait()
  60. data_count += 1
  61. business_type = data.business_type
  62. if business_type and str(business_type).endswith("Detail"):
  63. need_aggregate_items.append(data)
  64. else:
  65. heartbeat_items.append(data)
  66. if data_count >= UPLOAD_BATCH_MAX_SIZE:
  67. self.__add_item_to_db(heartbeat_items, need_aggregate_items)
  68. heartbeat_items = []
  69. need_aggregate_items = []
  70. data_count = 0
  71. if data_count:
  72. self.__add_item_to_db(heartbeat_items, need_aggregate_items)
  73. except Exception as e:
  74. log.exception(e)
  75. def get_items_count(self):
  76. return self._items_queue.qsize()
  77. def is_adding_to_db(self):
  78. return self._is_adding_to_db
  79. def __pick_items(self, items, is_aggregate=False):
  80. """
  81. 将每个表之间的数据分开 拆分后 原items为空
  82. @param items:
  83. @param is_aggregate: 是否需要聚合汇总数据
  84. @return:
  85. """
  86. datas_dict = {
  87. # 'table_name': [{}, {}]
  88. }
  89. while items:
  90. item = items.pop(0)
  91. # 取item下划线格式的名
  92. # 下划线类的名先从dict中取,没有则现取,然后存入dict。加快下次取的速度
  93. item_name = item.item_name
  94. table_name = self._item_tables.get(item_name)
  95. if not table_name:
  96. table_name = item.table_name
  97. self._item_tables[item_name] = table_name
  98. if table_name not in datas_dict:
  99. datas_dict[table_name] = []
  100. datas_dict[table_name].append(item.to_dict)
  101. if is_aggregate:
  102. aggregate_data_dict = {
  103. # 'table_name': [{}, {}]
  104. }
  105. for table_name, datas in datas_dict.items():
  106. latest = datas[-1]
  107. latest['rel_count'] = sum([item['rel_count'] for item in datas])
  108. # 请求失败次数
  109. max_failed_data_dict = max(datas, key=lambda x: x.get("failed_task_count", 0))
  110. failed_task_count = max_failed_data_dict["failed_task_count"] - self._prev_failed_task_count
  111. self._prev_failed_task_count = max_failed_data_dict["failed_task_count"]
  112. latest['failed_task_count'] = failed_task_count
  113. # 请求成功次数
  114. max_success_data_dict = max(datas, key=lambda x: x.get("success_task_count", 0))
  115. success_task_count = max_success_data_dict["success_task_count"] - self._prev_success_task_count
  116. self._prev_success_task_count = max_success_data_dict["success_task_count"]
  117. latest['success_task_count'] = success_task_count
  118. # 总请求次数
  119. latest['count'] = failed_task_count + success_task_count
  120. if table_name not in aggregate_data_dict:
  121. aggregate_data_dict[table_name] = [latest]
  122. datas_dict = aggregate_data_dict
  123. return datas_dict
  124. def __export_to_db(self, table, datas):
  125. for pipeline in self._pipelines:
  126. if not pipeline.save_items(table, datas):
  127. log.error(
  128. f"{pipeline.__class__.__name__} 保存心跳失败. table: {table} items: {datas}"
  129. )
  130. return False
  131. return True
  132. def __add_item_to_db(self, items, aggregate_items):
  133. self._is_adding_to_db = True
  134. # 分捡
  135. items_dict = self.__pick_items(items)
  136. aggregate_dict = self.__pick_items(aggregate_items, is_aggregate=True)
  137. # heartbeat_item批量入库
  138. while items_dict:
  139. table, datas = items_dict.popitem()
  140. log.debug(
  141. """
  142. -------------- item 批量入库 --------------
  143. 表名: %s
  144. datas: %s
  145. """
  146. % (table, tools.dumps_json(datas, indent=16))
  147. )
  148. self.__export_to_db(table, datas)
  149. while aggregate_dict:
  150. table, datas = aggregate_dict.popitem()
  151. log.debug(
  152. """
  153. -------------- item 批量入库 --------------
  154. 表名: %s
  155. datas: %s
  156. """
  157. % (table, tools.dumps_json(datas, indent=16))
  158. )
  159. self.__export_to_db(table, datas)
  160. self._is_adding_to_db = False
  161. def close(self):
  162. # 调用pipeline的close方法
  163. for pipeline in self._pipelines:
  164. try:
  165. pipeline.close()
  166. except:
  167. pass