watch_load.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. # coding:utf-8
  2. from a2s.a2s_monitor import watch_monitor
  3. from utils.send_message import send_weixin
  4. import time
  5. from math import ceil, floor
  6. from utils.tools import calculate_file_hash
  7. import threading
  8. import json
  9. from utils.ecs import EcsClient
  10. from pynats import NATSClient
  11. from a2s.tools import json_serialize
  12. # 负载计算
  13. class WorkerNodes(object):
  14. def __init__(self, config_path):
  15. self.config_path = config_path
  16. self.config_md5 = calculate_file_hash(self.config_path)
  17. config = json.load(open(self.config_path, "r"))
  18. self.worker_name = config.get("name", "worker").replace(" ", "")
  19. self.one_instance_node_num = config.get("one_instance_node_num", 1)
  20. self.max_instance = config.get("max_instance", 5)
  21. self.min_instance = config.get("min_instance", 1)
  22. self.watch_interval = config.get("watch_interval", 5)
  23. self.add_watch_time = config.get("add_watch_time", 180)
  24. self.release_watch_time = config.get("release_watch_time", 1200)
  25. self.a2s_ip = config.get("a2s_ip", "")
  26. self.upper_threshold = config.get("upper_threshold", 0.8)
  27. self.down_threshold = config.get("down_threshold", 0.5)
  28. self.ecs_config = config.get("ecs_config", {})
  29. self.freeze_time = config.get("freeze_time", 60)
  30. self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
  31. self.ecs_config["instance_name"] = self.worker_name
  32. self.change_alert = config.get("change_alert", True)
  33. self.load_alert = config.get("load_alert", True)
  34. self.load_alert_interval = config.get("load_alert_interval", 300)
  35. self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
  36. self.weixin_bot_url = config.get("weixin_bot_url", "")
  37. self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
  38. self.worker_timeout = config.get("worker_timeout", 300)
  39. self.monitor_subject = config.get("monitor_subject", "monitorRelease")
  40. self.nats_url = config.get("nats_url", "nats://172.17.4.188:19090")
  41. self.instances = []
  42. self.release_instances = []
  43. self._routine = []
  44. self.last_update_time = 0
  45. self.last_load_alert_time = 0
  46. self.last_refresh_instance_time = 0
  47. self.release_freeze_time = 0
  48. self.ecs = EcsClient(self.ecs_config)
  49. self.__initialize()
  50. def get_instance_num(self):
  51. """
  52. 获取实例数量
  53. :return:
  54. """
  55. return len(self.instances)
  56. def get_node_num(self):
  57. """
  58. 获取节点数量
  59. :return:
  60. """
  61. return len(self.instances) * self.one_instance_node_num
  62. # 安全退出应用
  63. def safe_logout(self, hosts):
  64. with NATSClient(self.nats_url) as nc:
  65. nc.connect()
  66. for host in hosts:
  67. nc.publish(subject=self.monitor_subject, payload=json_serialize({"host": host}))
  68. def refresh_instance(self):
  69. if not self.instances:
  70. return
  71. instances = [] # 分页查询状态
  72. _instances = [] # 正常的机器
  73. # 检查状态
  74. for ids, ip in self.instances:
  75. instances.append(ids)
  76. if len(instances) > 5:
  77. instances_ids = self.ecs.instance_status(instances)
  78. if instances_ids:
  79. _instances.extend(instances_ids)
  80. instances = []
  81. if instances:
  82. instances_ids = self.ecs.instance_status(instances)
  83. _instances.extend(instances_ids)
  84. # 无异常
  85. if len(_instances) == len(self.instances):
  86. self.ecs_warning(f"实例状态更新成功,没有设备被释放!")
  87. return
  88. # 有异常
  89. for instance in self.instances:
  90. if instance not in _instances:
  91. self.ecs.release_instance(instance[0], force=True)
  92. self.ecs_warning(f"实例状态更新成功,有{len(self.instances) - len(_instances)}台设备被释放!")
  93. # 更新存储
  94. self.instances = _instances
  95. def loop_monitor(self):
  96. """
  97. 循环检查
  98. :return:
  99. """
  100. while self._run:
  101. try:
  102. this_time = time.time()
  103. add_watch_number = ceil(self.add_watch_time / self.watch_interval)
  104. release_watch_number = ceil(self.release_watch_time / self.watch_interval)
  105. max_number = max([add_watch_number, release_watch_number])
  106. # 获取当前负载
  107. current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
  108. if not isinstance(current_nodes, int):
  109. self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
  110. if self._routine:
  111. self._routine.pop(0)
  112. time.sleep(60)
  113. continue
  114. # 计算当前负载
  115. load = current_nodes / (len(self.instances) * self.one_instance_node_num)
  116. self._routine.append(load)
  117. # 检查ecs的状态,去除系统自己释放的实例
  118. if this_time - self.last_refresh_instance_time > self.refresh_instance_interval:
  119. self.refresh_instance()
  120. self.last_refresh_instance_time = this_time
  121. # 清理大于"最大观察时间"的负载
  122. if len(self._routine) > max_number:
  123. self._routine.pop(0)
  124. # 冻结时间
  125. if this_time - self.last_update_time < self.freeze_time:
  126. time.sleep(self.watch_interval)
  127. continue
  128. # 检查当前负载
  129. if this_time - self.last_load_alert_time > self.load_alert_interval:
  130. self.ecs_warning(
  131. f"[{self.worker_name}] 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
  132. self.last_load_alert_time = this_time
  133. # 检查是否新增
  134. if len(self._routine) >= add_watch_number:
  135. add_start_index = len(self._routine) - add_watch_number
  136. add_load = sum(self._routine[add_start_index:]) / add_watch_number
  137. # 检查是否需要新增
  138. if add_load > self.upper_threshold:
  139. num = ceil((add_load - self.upper_threshold) * len(self.instances))
  140. if self.change_alert:
  141. self.ecs_warning(f"[{self.worker_name}]新增,当前负载:{add_load},预期新增:{num}台")
  142. self.add_instances(num)
  143. self.release_freeze_time = time.time()
  144. continue
  145. # 检查是否释放
  146. if len(self._routine) >= release_watch_number and time.time() - self.release_freeze_time > self.release_freeze_interval:
  147. release_start_index = len(self._routine) - release_watch_number
  148. release_load = sum(self._routine[release_start_index:]) / release_watch_number
  149. # 检查是否需要释放
  150. if release_load < self.down_threshold:
  151. # 释放机器数量
  152. # 负载小于 0.1
  153. if release_load < 0.1:
  154. num = 3 if len(self.instances) > 70 else 2
  155. # 负载大于 0.6
  156. else:
  157. num = 2 if len(self.instances) > 10 else 1
  158. # 告警
  159. if self.change_alert:
  160. self.ecs_warning(f"[{self.worker_name}]释放,当前负载:{release_load},预期释放:{num}台")
  161. self.reduce_instances(num)
  162. self.release_freeze_time = time.time()
  163. except Exception as e:
  164. self.ecs_warning(f"警告:[{self.worker_name}]服务出现异常,尽快处理{e}")
  165. time.sleep(self.watch_interval)
  166. self.close_server_instances()
  167. def add_instances(self, num):
  168. """
  169. 新增设备
  170. :param num:
  171. :return:
  172. """
  173. if num < 1:
  174. return
  175. if len(self.instances) < self.max_instance:
  176. if self.max_instance < len(self.instances) + num:
  177. num = self.max_instance - len(self.instances)
  178. if len(self.instances) + num < self.min_instance:
  179. num = self.min_instance - len(self.instances)
  180. # 开始新增,最大每次增加50台
  181. if num > 50:
  182. num = 50
  183. total_instance_ids = []
  184. # 新增实例,分页5台
  185. for i in range(floor(num / 5)):
  186. instance_ids = self.ecs.add_instance(5)
  187. if instance_ids:
  188. total_instance_ids.extend(instance_ids)
  189. self.instances.extend(instance_ids)
  190. self.last_update_time = time.time()
  191. else:
  192. self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
  193. # 不足5台
  194. yu_num = num % 5
  195. if yu_num > 0:
  196. instance_ids = self.ecs.add_instance(yu_num)
  197. if instance_ids:
  198. total_instance_ids.extend(instance_ids)
  199. self.instances.extend(instance_ids)
  200. self.last_update_time = time.time()
  201. else:
  202. self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
  203. # 通知
  204. if self.change_alert:
  205. self.ecs_warning(
  206. f"[{self.worker_name}]新增{len(total_instance_ids)}(预期{num})台设备成功,当前ECS:{len(self.instances)}台")
  207. print(f"新增完成,结果::{self.worker_name}::释放列表:{len(self.release_instances)}, 当前ECS列表:{self.instances}")
  208. else:
  209. # 通知
  210. if self.change_alert:
  211. self.ecs_warning(f"[{self.worker_name}]ECS服务实例达到最大值:{self.max_instance},预期新增{num}台, 扩容失败。")
  212. self.last_update_time = time.time()
  213. def reduce_instances(self, num):
  214. """
  215. 减少设备
  216. :param num:
  217. :return:
  218. """
  219. if len(self.instances) > self.min_instance:
  220. if len(self.instances) < self.min_instance + num:
  221. num = len(self.instances) - self.min_instance
  222. # 开始释放操作
  223. release_instance_ids = self.instances[:num]
  224. self.instances = self.instances[num:]
  225. hosts = []
  226. for instance_id, ip in release_instance_ids:
  227. hosts.append(ip)
  228. self.release_instances.append([instance_id, ip, int(time.time())])
  229. self.safe_logout(hosts)
  230. print(f"{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
  231. self.ecs_warning(f"[{self.worker_name}]预计释放:{num}台实例,当前释放队列{len(self.release_instances)},预计剩余{len(self.instances)}")
  232. def loop_release_instances(self):
  233. """
  234. 释放实例
  235. :return:
  236. """
  237. while self._run:
  238. time.sleep(60)
  239. delete_ids = []
  240. for instance_detail in self.release_instances:
  241. instance_id, ip, t = instance_detail
  242. if int(time.time()) - t > self.worker_timeout:
  243. try:
  244. print(f"时间够了,开始释放{instance_id},时间IP{ip}:{t}")
  245. result = self.ecs.release_instance(instance_id, force=True)
  246. if result:
  247. delete_ids.append(instance_detail)
  248. except Exception as e:
  249. print(e)
  250. time.sleep(2)
  251. else:
  252. break
  253. try:
  254. for instance_detail in delete_ids:
  255. self.release_instances.remove(instance_detail)
  256. except Exception as e:
  257. self.ecs_warning(f"释放失败,请检查{e}")
  258. print(f"释放完成,结果::{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
  259. def ecs_warning(self, text):
  260. send_weixin(text, self.weixin_bot_url)
  261. def __initialize(self):
  262. if len(self.instances) < self.min_instance:
  263. while True:
  264. try:
  265. self.add_instances(self.min_instance - len(self.instances))
  266. break
  267. except Exception as e:
  268. print(e)
  269. print("申请设备失败,等待60秒后重试")
  270. time.sleep(60)
  271. def start(self):
  272. t1 = threading.Thread(target=self.loop_monitor)
  273. t1.start()
  274. t2 = threading.Thread(target=self.update_config)
  275. t2.start()
  276. t3 = threading.Thread(target=self.loop_release_instances)
  277. t3.start()
  278. t1.join()
  279. t2.join()
  280. t3.join()
  281. self.close_server_instances()
  282. def update_config(self):
  283. """
  284. 更新配置
  285. :param config:
  286. :return:
  287. """
  288. while self._run:
  289. config_md5 = calculate_file_hash(self.config_path)
  290. if config_md5 != self.config_md5:
  291. self.config_md5 = config_md5
  292. config = json.load(open(self.config_path, "r"))
  293. self.worker_name = config.get("name", "worker").replace(" ", "")
  294. self.one_instance_node_num = config.get("one_instance_node_num", 1)
  295. self.max_instance = config.get("max_instance", 5)
  296. self.min_instance = config.get("min_instance", 1)
  297. self.watch_interval = config.get("watch_interval", 5)
  298. self.add_watch_time = config.get("add_watch_time", 180)
  299. self.release_watch_time = config.get("release_watch_time", 1200)
  300. self.a2s_ip = config.get("a2s_ip", "")
  301. self.upper_threshold = config.get("upper_threshold", 0.8)
  302. self.down_threshold = config.get("down_threshold", 0.5)
  303. self.freeze_time = config.get("freeze_time", 60)
  304. self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
  305. self.change_alert = config.get("change_alert", True)
  306. self.load_alert = config.get("load_alert", True)
  307. self.load_alert_interval = config.get("load_alert_interval", 300)
  308. self.weixin_bot_url = config.get("weixin_bot_url", "")
  309. self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
  310. self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
  311. self.worker_timeout = config.get("worker_timeout", 300)
  312. self.monitor_subject = config.get("monitor_subject", "monitorRelease")
  313. # 每间隔一段时间,查看一下配置文件的状态
  314. time.sleep(10)
  315. def close_server_instances(self):
  316. release_ids = []
  317. for ids, ip in self.instances:
  318. release_ids.append(ids)
  319. if self.release_instances:
  320. for instance_id, ip, t in self.release_instances:
  321. release_ids.append(instance_id)
  322. print("释放实例", release_ids)
  323. release_false = self.ecs.delete_instance(release_ids)
  324. # 释放完成
  325. if release_false:
  326. if self.change_alert:
  327. self.ecs_warning(f"[{self.worker_name}]]服务关闭,部分实例释放失败,请手动释放,问题实例:{release_false}")
  328. else:
  329. self.ecs_warning(f"[{self.worker_name}]服务关闭,所有实例释放完成")