mon.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # coding:utf-8
  2. from a2s.a2s_monitor import watch_monitor
  3. import time
  4. from math import ceil, floor
  5. from file_server import add_file_process
  6. from picture_server import add_picture_process
  7. from utils.tools import calculate_file_hash
  8. import threading
  9. import json
  10. import multiprocessing
  11. func_dict = {
  12. "test2txt_add": add_file_process,
  13. "test2img_add": add_picture_process
  14. }
  15. # 负载计算
  16. class WorkerNodes(object):
  17. def __init__(self, config_path):
  18. self.config_path = config_path
  19. self.config_md5 = calculate_file_hash(self.config_path)
  20. config = json.load(open(self.config_path, "r"))
  21. self.worker_name = config.get("name", "worker")
  22. self.one_instance_node_num = config.get("one_instance_node_num", 1)
  23. self.max_instance = config.get("max_instance", 5)
  24. self.min_instance = config.get("min_instance", 1)
  25. self.watch_interval = config.get("watch_interval", 5)
  26. self.add_watch_time = config.get("add_watch_time", 180)
  27. self.release_watch_time = config.get("release_watch_time", 1200)
  28. self.a2s_ip = config.get("a2s_ip", "")
  29. self.upper_threshold = config.get("upper_threshold", 0.8)
  30. self.down_threshold = config.get("down_threshold", 0.5)
  31. self.ecs_config = config.get("ecs_config", {})
  32. self.freeze_time = config.get("freeze_time", 60)
  33. self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
  34. self.instances = []
  35. self._routine = []
  36. self.last_update_time = 0
  37. self.__initialize()
  38. def get_instance_num(self):
  39. """
  40. 获取实例数量
  41. :return:
  42. """
  43. return len(self.instances)
  44. def get_node_num(self):
  45. """
  46. 获取节点数量
  47. :return:
  48. """
  49. return len(self.instances) * self.one_instance_node_num
  50. def loop_monitor(self):
  51. """
  52. 循环检查
  53. :return:
  54. """
  55. while self._run:
  56. try:
  57. add_watch_number = ceil(self.add_watch_time / self.watch_interval)
  58. release_watch_number = ceil(self.release_watch_time / self.watch_interval)
  59. max_number = max([add_watch_number, release_watch_number])
  60. current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
  61. if not isinstance(current_nodes, int):
  62. self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
  63. if self._routine:
  64. self._routine.remove(0)
  65. time.sleep(60)
  66. continue
  67. # 获取当前负载
  68. load = current_nodes / len(self.instances) * self.one_instance_node_num
  69. self._routine.append(load)
  70. # 清理大于"最大观察时间"的负载
  71. if len(self._routine) > max_number:
  72. self._routine.pop(0)
  73. if time.time() - self.last_update_time < self.freeze_time:
  74. print(f"{self.worker_name}::冻结时间未到,不进行增减设备")
  75. time.sleep(self.watch_interval)
  76. continue
  77. # 检查是否新增
  78. if len(self._routine) >= add_watch_number:
  79. add_start_index = len(self._routine) - add_watch_number
  80. print("长度比:::", len(self._routine), len(self._routine[add_start_index:]), add_watch_number)
  81. add_load = sum(self._routine[add_start_index:]) / add_watch_number
  82. if add_load > self.upper_threshold:
  83. num = ceil((add_load - self.upper_threshold) * len(self.instances))
  84. print(f"{self.worker_name}新增add_load:::", add_load, self.upper_threshold, num)
  85. self.add_instances(num)
  86. self.last_update_time = time.time()
  87. continue
  88. # 检查是否释放
  89. if len(self._routine) >= release_watch_number:
  90. release_start_index = len(self._routine) - release_watch_number
  91. release_load = sum(self._routine[release_start_index:]) / release_watch_number
  92. if release_load < self.down_threshold:
  93. num = floor((1 - self.down_threshold) * len(self.instances))
  94. print(f"{self.worker_name}释放release_load:::", release_load, self.down_threshold, num)
  95. self.reduce_instances(num)
  96. self.last_update_time = time.time()
  97. # 检查间隔
  98. print(
  99. f"{self.worker_name} 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
  100. except Exception as e:
  101. print("baocuole==>", e)
  102. time.sleep(self.watch_interval)
  103. self.close_server_instances()
  104. def add_instances(self, num):
  105. """
  106. 新增设备
  107. :param num:
  108. :return:
  109. """
  110. if len(self.instances) < self.max_instance:
  111. if self.max_instance < len(self.instances) + num:
  112. num = self.max_instance - len(self.instances)
  113. if len(self.instances) + num < self.min_instance:
  114. num = self.min_instance - len(self.instances)
  115. print(f"ECS新增{num}设备,当前ECS数量为{len(self.instances)}")
  116. for i in range(num):
  117. p = multiprocessing.Process(target=func_dict[f"{self.worker_name}_add"])
  118. p.start()
  119. self.instances.append(p)
  120. # 开始新增
  121. else:
  122. self.ecs_warning(f"ECS 扩容失败,{self.worker_name}服务已达到最大实例数量:{self.max_instance}")
  123. def reduce_instances(self, num):
  124. """
  125. 减少设备
  126. :param num:
  127. :return:
  128. """
  129. if len(self.instances) > self.min_instance:
  130. if len(self.instances) < self.min_instance + num:
  131. num = len(self.instances) - self.min_instance
  132. # 开始释放操作
  133. for ind in range(num):
  134. if ind < num:
  135. p = self.instances.pop(0)
  136. print("ECS释放进程:", p, p.pid)
  137. p.terminate()
  138. else:
  139. self.ecs_warning(f"ECS 缩减失败,{self.worker_name}服务已达到最小实例数量:{self.min_instance}")
  140. def ecs_warning(self, text):
  141. print("警告:::⚠️", text)
  142. def __initialize(self):
  143. if len(self.instances) < self.min_instance:
  144. self.add_instances(self.min_instance - len(self.instances))
  145. def start(self):
  146. t1 = threading.Thread(target=self.loop_monitor)
  147. t1.start()
  148. t2 = threading.Thread(target=self.update_config)
  149. t2.start()
  150. t1.join()
  151. t2.join()
  152. def update_config(self):
  153. """
  154. 更新配置
  155. :param config:
  156. :return:
  157. """
  158. while self._run:
  159. config_md5 = calculate_file_hash(self.config_path)
  160. if config_md5 != self.config_md5:
  161. config = json.load(open(self.config_path, "r"))
  162. self.worker_name = config.get("name", "worker")
  163. self.one_instance_node_num = config.get("one_instance_node_num", 1)
  164. self.max_instance = config.get("max_instance", 5)
  165. self.min_instance = config.get("min_instance", 1)
  166. self.watch_interval = config.get("watch_interval", 5)
  167. self.add_watch_time = config.get("add_watch_time", 180)
  168. self.release_watch_time = config.get("release_watch_time", 1200)
  169. self.a2s_ip = config.get("a2s_ip", "")
  170. self.upper_threshold = config.get("upper_threshold", 0.8)
  171. self.down_threshold = config.get("down_threshold", 0.5)
  172. self.ecs_config = config.get("ecs_config", {})
  173. self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
  174. # 每间隔一段时间,查看一下配置文件的状态
  175. time.sleep(10)
  176. def close_server_instances(self):
  177. for p in self.instances:
  178. p.terminate()