# coding:utf-8 from a2s.a2s_monitor import watch_monitor import time from math import ceil, floor from file_server import add_file_process from picture_server import add_picture_process from utils.tools import calculate_file_hash import threading import json import multiprocessing func_dict = { "test2txt_add": add_file_process, "test2img_add": add_picture_process } # 负载计算 class WorkerNodes(object): def __init__(self, config_path): self.config_path = config_path self.config_md5 = calculate_file_hash(self.config_path) config = json.load(open(self.config_path, "r")) self.worker_name = config.get("name", "worker") self.one_instance_node_num = config.get("one_instance_node_num", 1) self.max_instance = config.get("max_instance", 5) self.min_instance = config.get("min_instance", 1) self.watch_interval = config.get("watch_interval", 5) self.add_watch_time = config.get("add_watch_time", 180) self.release_watch_time = config.get("release_watch_time", 1200) self.a2s_ip = config.get("a2s_ip", "") self.upper_threshold = config.get("upper_threshold", 0.8) self.down_threshold = config.get("down_threshold", 0.5) self.ecs_config = config.get("ecs_config", {}) self.freeze_time = config.get("freeze_time", 60) self._run = config.get("is_run", True) # 调整配置文件可以关闭服务 self.instances = [] self._routine = [] self.last_update_time = 0 self.__initialize() def get_instance_num(self): """ 获取实例数量 :return: """ return len(self.instances) def get_node_num(self): """ 获取节点数量 :return: """ return len(self.instances) * self.one_instance_node_num def loop_monitor(self): """ 循环检查 :return: """ while self._run: try: add_watch_number = ceil(self.add_watch_time / self.watch_interval) release_watch_number = ceil(self.release_watch_time / self.watch_interval) max_number = max([add_watch_number, release_watch_number]) current_nodes = watch_monitor(self.a2s_ip, self.worker_name) if not isinstance(current_nodes, int): self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!") if self._routine: self._routine.remove(0) time.sleep(60) continue # 获取当前负载 load = current_nodes / len(self.instances) * self.one_instance_node_num self._routine.append(load) # 清理大于"最大观察时间"的负载 if len(self._routine) > max_number: self._routine.pop(0) if time.time() - self.last_update_time < self.freeze_time: print(f"{self.worker_name}::冻结时间未到,不进行增减设备") time.sleep(self.watch_interval) continue # 检查是否新增 if len(self._routine) >= add_watch_number: add_start_index = len(self._routine) - add_watch_number print("长度比:::", len(self._routine), len(self._routine[add_start_index:]), add_watch_number) add_load = sum(self._routine[add_start_index:]) / add_watch_number if add_load > self.upper_threshold: num = ceil((add_load - self.upper_threshold) * len(self.instances)) print(f"{self.worker_name}新增add_load:::", add_load, self.upper_threshold, num) self.add_instances(num) self.last_update_time = time.time() continue # 检查是否释放 if len(self._routine) >= release_watch_number: release_start_index = len(self._routine) - release_watch_number release_load = sum(self._routine[release_start_index:]) / release_watch_number if release_load < self.down_threshold: num = floor((1 - self.down_threshold) * len(self.instances)) print(f"{self.worker_name}释放release_load:::", release_load, self.down_threshold, num) self.reduce_instances(num) self.last_update_time = time.time() # 检查间隔 print( f"{self.worker_name} 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}") except Exception as e: print("baocuole==>", e) time.sleep(self.watch_interval) self.close_server_instances() def add_instances(self, num): """ 新增设备 :param num: :return: """ if len(self.instances) < self.max_instance: if self.max_instance < len(self.instances) + num: num = self.max_instance - len(self.instances) if len(self.instances) + num < self.min_instance: num = self.min_instance - len(self.instances) print(f"ECS新增{num}设备,当前ECS数量为{len(self.instances)}") for i in range(num): p = multiprocessing.Process(target=func_dict[f"{self.worker_name}_add"]) p.start() self.instances.append(p) # 开始新增 else: self.ecs_warning(f"ECS 扩容失败,{self.worker_name}服务已达到最大实例数量:{self.max_instance}") def reduce_instances(self, num): """ 减少设备 :param num: :return: """ if len(self.instances) > self.min_instance: if len(self.instances) < self.min_instance + num: num = len(self.instances) - self.min_instance # 开始释放操作 for ind in range(num): if ind < num: p = self.instances.pop(0) print("ECS释放进程:", p, p.pid) p.terminate() else: self.ecs_warning(f"ECS 缩减失败,{self.worker_name}服务已达到最小实例数量:{self.min_instance}") def ecs_warning(self, text): print("警告:::⚠️", text) def __initialize(self): if len(self.instances) < self.min_instance: self.add_instances(self.min_instance - len(self.instances)) def start(self): t1 = threading.Thread(target=self.loop_monitor) t1.start() t2 = threading.Thread(target=self.update_config) t2.start() t1.join() t2.join() def update_config(self): """ 更新配置 :param config: :return: """ while self._run: config_md5 = calculate_file_hash(self.config_path) if config_md5 != self.config_md5: config = json.load(open(self.config_path, "r")) self.worker_name = config.get("name", "worker") self.one_instance_node_num = config.get("one_instance_node_num", 1) self.max_instance = config.get("max_instance", 5) self.min_instance = config.get("min_instance", 1) self.watch_interval = config.get("watch_interval", 5) self.add_watch_time = config.get("add_watch_time", 180) self.release_watch_time = config.get("release_watch_time", 1200) self.a2s_ip = config.get("a2s_ip", "") self.upper_threshold = config.get("upper_threshold", 0.8) self.down_threshold = config.get("down_threshold", 0.5) self.ecs_config = config.get("ecs_config", {}) self._run = config.get("is_run", True) # 调整配置文件可以关闭服务 # 每间隔一段时间,查看一下配置文件的状态 time.sleep(10) def close_server_instances(self): for p in self.instances: p.terminate()