# coding:utf-8 from a2s.a2s_monitor import watch_monitor from utils.send_message import send_weixin import time from math import ceil, floor from utils.tools import calculate_file_hash import threading import json from utils.ecs import EcsClient from pynats import NATSClient from a2s.tools import json_serialize # 负载计算 class WorkerNodes(object): def __init__(self, config_path): self.config_path = config_path self.config_md5 = calculate_file_hash(self.config_path) config = json.load(open(self.config_path, "r")) self.worker_name = config.get("name", "worker").replace(" ", "") self.one_instance_node_num = config.get("one_instance_node_num", 1) self.max_instance = config.get("max_instance", 5) self.min_instance = config.get("min_instance", 1) self.watch_interval = config.get("watch_interval", 5) self.add_watch_time = config.get("add_watch_time", 180) self.release_watch_time = config.get("release_watch_time", 1200) self.a2s_ip = config.get("a2s_ip", "") self.upper_threshold = config.get("upper_threshold", 0.8) self.down_threshold = config.get("down_threshold", 0.5) self.ecs_config = config.get("ecs_config", {}) self.freeze_time = config.get("freeze_time", 60) self._run = config.get("is_run", True) # 调整配置文件可以关闭服务 self.ecs_config["instance_name"] = self.worker_name self.change_alert = config.get("change_alert", True) self.load_alert = config.get("load_alert", True) self.load_alert_interval = config.get("load_alert_interval", 300) self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30) self.weixin_bot_url = config.get("weixin_bot_url", "") self.refresh_instance_interval = config.get("refresh_instance_interval", 300) self.worker_timeout = config.get("worker_timeout", 300) self.monitor_subject = config.get("monitor_subject", "monitorRelease") self.nats_url = config.get("nats_url", "nats://172.17.4.188:19090") self.instances = [] self.release_instances = [] self._routine = [] self.last_update_time = 0 self.last_load_alert_time = 0 self.last_refresh_instance_time = 0 self.release_freeze_time = 0 self.ecs = EcsClient(self.ecs_config) self.__initialize() def get_instance_num(self): """ 获取实例数量 :return: """ return len(self.instances) def get_node_num(self): """ 获取节点数量 :return: """ return len(self.instances) * self.one_instance_node_num # 安全退出应用 def safe_logout(self, hosts): with NATSClient(self.nats_url) as nc: nc.connect() for host in hosts: nc.publish(subject=self.monitor_subject, payload=json_serialize({"host": host})) def refresh_instance(self): if not self.instances: return instances = [] # 分页查询状态 _instances = [] # 正常的机器 # 检查状态 for ids, ip in self.instances: instances.append(ids) if len(instances) > 5: instances_ids = self.ecs.instance_status(instances) if instances_ids: _instances.extend(instances_ids) instances = [] if instances: instances_ids = self.ecs.instance_status(instances) _instances.extend(instances_ids) # 无异常 if len(_instances) == len(self.instances): self.ecs_warning(f"实例状态更新成功,没有设备被释放!") return # 有异常 for instance in self.instances: if instance not in _instances: self.ecs.release_instance(instance[0], force=True) self.ecs_warning(f"实例状态更新成功,有{len(self.instances) - len(_instances)}台设备被释放!") # 更新存储 self.instances = _instances def loop_monitor(self): """ 循环检查 :return: """ while self._run: try: this_time = time.time() add_watch_number = ceil(self.add_watch_time / self.watch_interval) release_watch_number = ceil(self.release_watch_time / self.watch_interval) max_number = max([add_watch_number, release_watch_number]) # 获取当前负载 current_nodes = watch_monitor(self.a2s_ip, self.worker_name) if not isinstance(current_nodes, int): self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!") if self._routine: self._routine.pop(0) time.sleep(60) continue # 计算当前负载 load = current_nodes / (len(self.instances) * self.one_instance_node_num) self._routine.append(load) # 检查ecs的状态,去除系统自己释放的实例 if this_time - self.last_refresh_instance_time > self.refresh_instance_interval: self.refresh_instance() self.last_refresh_instance_time = this_time # 清理大于"最大观察时间"的负载 if len(self._routine) > max_number: self._routine.pop(0) # 冻结时间 if this_time - self.last_update_time < self.freeze_time: time.sleep(self.watch_interval) continue # 检查当前负载 if this_time - self.last_load_alert_time > self.load_alert_interval: self.ecs_warning( f"[{self.worker_name}] 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}") self.last_load_alert_time = this_time # 检查是否新增 if len(self._routine) >= add_watch_number: add_start_index = len(self._routine) - add_watch_number add_load = sum(self._routine[add_start_index:]) / add_watch_number # 检查是否需要新增 if add_load > self.upper_threshold: num = ceil((add_load - self.upper_threshold) * len(self.instances)) if self.change_alert: self.ecs_warning(f"[{self.worker_name}]新增,当前负载:{add_load},预期新增:{num}台") self.add_instances(num) self.release_freeze_time = time.time() continue # 检查是否释放 if len(self._routine) >= release_watch_number and time.time() - self.release_freeze_time > self.release_freeze_interval: release_start_index = len(self._routine) - release_watch_number release_load = sum(self._routine[release_start_index:]) / release_watch_number # 检查是否需要释放 if release_load < self.down_threshold: # 释放机器数量 # 负载小于 0.1 if release_load < 0.1: num = 3 if len(self.instances) > 70 else 2 # 负载大于 0.6 else: num = 2 if len(self.instances) > 10 else 1 # 告警 if self.change_alert: self.ecs_warning(f"[{self.worker_name}]释放,当前负载:{release_load},预期释放:{num}台") self.reduce_instances(num) self.release_freeze_time = time.time() except Exception as e: self.ecs_warning(f"警告:[{self.worker_name}]服务出现异常,尽快处理{e}") time.sleep(self.watch_interval) self.close_server_instances() def add_instances(self, num): """ 新增设备 :param num: :return: """ if num < 1: return if len(self.instances) < self.max_instance: if self.max_instance < len(self.instances) + num: num = self.max_instance - len(self.instances) if len(self.instances) + num < self.min_instance: num = self.min_instance - len(self.instances) # 开始新增,最大每次增加50台 if num > 50: num = 50 total_instance_ids = [] # 新增实例,分页5台 for i in range(floor(num / 5)): instance_ids = self.ecs.add_instance(5) if instance_ids: total_instance_ids.extend(instance_ids) self.instances.extend(instance_ids) self.last_update_time = time.time() else: self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台") # 不足5台 yu_num = num % 5 if yu_num > 0: instance_ids = self.ecs.add_instance(yu_num) if instance_ids: total_instance_ids.extend(instance_ids) self.instances.extend(instance_ids) self.last_update_time = time.time() else: self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台") # 通知 if self.change_alert: self.ecs_warning( f"[{self.worker_name}]新增{len(total_instance_ids)}(预期{num})台设备成功,当前ECS:{len(self.instances)}台") print(f"新增完成,结果::{self.worker_name}::释放列表:{len(self.release_instances)}, 当前ECS列表:{self.instances}") else: # 通知 if self.change_alert: self.ecs_warning(f"[{self.worker_name}]ECS服务实例达到最大值:{self.max_instance},预期新增{num}台, 扩容失败。") self.last_update_time = time.time() def reduce_instances(self, num): """ 减少设备 :param num: :return: """ if len(self.instances) > self.min_instance: if len(self.instances) < self.min_instance + num: num = len(self.instances) - self.min_instance # 开始释放操作 release_instance_ids = self.instances[:num] self.instances = self.instances[num:] hosts = [] for instance_id, ip in release_instance_ids: hosts.append(ip) self.release_instances.append([instance_id, ip, int(time.time())]) self.safe_logout(hosts) print(f"{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}") self.ecs_warning(f"[{self.worker_name}]预计释放:{num}台实例,当前释放队列{len(self.release_instances)},预计剩余{len(self.instances)}") def loop_release_instances(self): """ 释放实例 :return: """ while self._run: time.sleep(60) delete_ids = [] for instance_detail in self.release_instances: instance_id, ip, t = instance_detail if int(time.time()) - t > self.worker_timeout: try: print(f"时间够了,开始释放{instance_id},时间IP{ip}:{t}") result = self.ecs.release_instance(instance_id, force=True) if result: delete_ids.append(instance_detail) except Exception as e: print(e) time.sleep(2) else: break try: for instance_detail in delete_ids: self.release_instances.remove(instance_detail) except Exception as e: self.ecs_warning(f"释放失败,请检查{e}") print(f"释放完成,结果::{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}") def ecs_warning(self, text): send_weixin(text, self.weixin_bot_url) def __initialize(self): if len(self.instances) < self.min_instance: while True: try: self.add_instances(self.min_instance - len(self.instances)) break except Exception as e: print(e) print("申请设备失败,等待60秒后重试") time.sleep(60) def start(self): t1 = threading.Thread(target=self.loop_monitor) t1.start() t2 = threading.Thread(target=self.update_config) t2.start() t3 = threading.Thread(target=self.loop_release_instances) t3.start() t1.join() t2.join() t3.join() self.close_server_instances() def update_config(self): """ 更新配置 :param config: :return: """ while self._run: config_md5 = calculate_file_hash(self.config_path) if config_md5 != self.config_md5: self.config_md5 = config_md5 config = json.load(open(self.config_path, "r")) self.worker_name = config.get("name", "worker").replace(" ", "") self.one_instance_node_num = config.get("one_instance_node_num", 1) self.max_instance = config.get("max_instance", 5) self.min_instance = config.get("min_instance", 1) self.watch_interval = config.get("watch_interval", 5) self.add_watch_time = config.get("add_watch_time", 180) self.release_watch_time = config.get("release_watch_time", 1200) self.a2s_ip = config.get("a2s_ip", "") self.upper_threshold = config.get("upper_threshold", 0.8) self.down_threshold = config.get("down_threshold", 0.5) self.freeze_time = config.get("freeze_time", 60) self._run = config.get("is_run", True) # 调整配置文件可以关闭服务 self.change_alert = config.get("change_alert", True) self.load_alert = config.get("load_alert", True) self.load_alert_interval = config.get("load_alert_interval", 300) self.weixin_bot_url = config.get("weixin_bot_url", "") self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30) self.refresh_instance_interval = config.get("refresh_instance_interval", 300) self.worker_timeout = config.get("worker_timeout", 300) self.monitor_subject = config.get("monitor_subject", "monitorRelease") # 每间隔一段时间,查看一下配置文件的状态 time.sleep(10) def close_server_instances(self): release_ids = [] for ids, ip in self.instances: release_ids.append(ids) if self.release_instances: for instance_id, ip, t in self.release_instances: release_ids.append(instance_id) print("释放实例", release_ids) release_false = self.ecs.delete_instance(release_ids) # 释放完成 if release_false: if self.change_alert: self.ecs_warning(f"[{self.worker_name}]]服务关闭,部分实例释放失败,请手动释放,问题实例:{release_false}") else: self.ecs_warning(f"[{self.worker_name}]服务关闭,所有实例释放完成")