123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- # coding:utf-8
- from a2s.a2s_monitor import watch_monitor
- import time
- from math import ceil, floor
- from file_server import add_file_process
- from picture_server import add_picture_process
- from utils.tools import calculate_file_hash
- import threading
- import json
- import multiprocessing
- func_dict = {
- "test2txt_add": add_file_process,
- "test2img_add": add_picture_process
- }
- # 负载计算
- class WorkerNodes(object):
- def __init__(self, config_path):
- self.config_path = config_path
- self.config_md5 = calculate_file_hash(self.config_path)
- config = json.load(open(self.config_path, "r"))
- self.worker_name = config.get("name", "worker")
- self.one_instance_node_num = config.get("one_instance_node_num", 1)
- self.max_instance = config.get("max_instance", 5)
- self.min_instance = config.get("min_instance", 1)
- self.watch_interval = config.get("watch_interval", 5)
- self.add_watch_time = config.get("add_watch_time", 180)
- self.release_watch_time = config.get("release_watch_time", 1200)
- self.a2s_ip = config.get("a2s_ip", "")
- self.upper_threshold = config.get("upper_threshold", 0.8)
- self.down_threshold = config.get("down_threshold", 0.5)
- self.ecs_config = config.get("ecs_config", {})
- self.freeze_time = config.get("freeze_time", 60)
- self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
- self.instances = []
- self._routine = []
- self.last_update_time = 0
- self.__initialize()
- def get_instance_num(self):
- """
- 获取实例数量
- :return:
- """
- return len(self.instances)
- def get_node_num(self):
- """
- 获取节点数量
- :return:
- """
- return len(self.instances) * self.one_instance_node_num
- def loop_monitor(self):
- """
- 循环检查
- :return:
- """
- while self._run:
- try:
- add_watch_number = ceil(self.add_watch_time / self.watch_interval)
- release_watch_number = ceil(self.release_watch_time / self.watch_interval)
- max_number = max([add_watch_number, release_watch_number])
- current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
- if not isinstance(current_nodes, int):
- self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
- if self._routine:
- self._routine.remove(0)
- time.sleep(60)
- continue
- # 获取当前负载
- load = current_nodes / len(self.instances) * self.one_instance_node_num
- self._routine.append(load)
- # 清理大于"最大观察时间"的负载
- if len(self._routine) > max_number:
- self._routine.pop(0)
- if time.time() - self.last_update_time < self.freeze_time:
- print(f"{self.worker_name}::冻结时间未到,不进行增减设备")
- time.sleep(self.watch_interval)
- continue
- # 检查是否新增
- if len(self._routine) >= add_watch_number:
- add_start_index = len(self._routine) - add_watch_number
- print("长度比:::", len(self._routine), len(self._routine[add_start_index:]), add_watch_number)
- add_load = sum(self._routine[add_start_index:]) / add_watch_number
- if add_load > self.upper_threshold:
- num = ceil((add_load - self.upper_threshold) * len(self.instances))
- print(f"{self.worker_name}新增add_load:::", add_load, self.upper_threshold, num)
- self.add_instances(num)
- self.last_update_time = time.time()
- continue
- # 检查是否释放
- if len(self._routine) >= release_watch_number:
- release_start_index = len(self._routine) - release_watch_number
- release_load = sum(self._routine[release_start_index:]) / release_watch_number
- if release_load < self.down_threshold:
- num = floor((1 - self.down_threshold) * len(self.instances))
- print(f"{self.worker_name}释放release_load:::", release_load, self.down_threshold, num)
- self.reduce_instances(num)
- self.last_update_time = time.time()
- # 检查间隔
- print(
- f"{self.worker_name} 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
- except Exception as e:
- print("baocuole==>", e)
- time.sleep(self.watch_interval)
- self.close_server_instances()
- def add_instances(self, num):
- """
- 新增设备
- :param num:
- :return:
- """
- if len(self.instances) < self.max_instance:
- if self.max_instance < len(self.instances) + num:
- num = self.max_instance - len(self.instances)
- if len(self.instances) + num < self.min_instance:
- num = self.min_instance - len(self.instances)
- print(f"ECS新增{num}设备,当前ECS数量为{len(self.instances)}")
- for i in range(num):
- p = multiprocessing.Process(target=func_dict[f"{self.worker_name}_add"])
- p.start()
- self.instances.append(p)
- # 开始新增
- else:
- self.ecs_warning(f"ECS 扩容失败,{self.worker_name}服务已达到最大实例数量:{self.max_instance}")
- def reduce_instances(self, num):
- """
- 减少设备
- :param num:
- :return:
- """
- if len(self.instances) > self.min_instance:
- if len(self.instances) < self.min_instance + num:
- num = len(self.instances) - self.min_instance
- # 开始释放操作
- for ind in range(num):
- if ind < num:
- p = self.instances.pop(0)
- print("ECS释放进程:", p, p.pid)
- p.terminate()
- else:
- self.ecs_warning(f"ECS 缩减失败,{self.worker_name}服务已达到最小实例数量:{self.min_instance}")
- def ecs_warning(self, text):
- print("警告:::⚠️", text)
- def __initialize(self):
- if len(self.instances) < self.min_instance:
- self.add_instances(self.min_instance - len(self.instances))
- def start(self):
- t1 = threading.Thread(target=self.loop_monitor)
- t1.start()
- t2 = threading.Thread(target=self.update_config)
- t2.start()
- t1.join()
- t2.join()
- def update_config(self):
- """
- 更新配置
- :param config:
- :return:
- """
- while self._run:
- config_md5 = calculate_file_hash(self.config_path)
- if config_md5 != self.config_md5:
- config = json.load(open(self.config_path, "r"))
- self.worker_name = config.get("name", "worker")
- self.one_instance_node_num = config.get("one_instance_node_num", 1)
- self.max_instance = config.get("max_instance", 5)
- self.min_instance = config.get("min_instance", 1)
- self.watch_interval = config.get("watch_interval", 5)
- self.add_watch_time = config.get("add_watch_time", 180)
- self.release_watch_time = config.get("release_watch_time", 1200)
- self.a2s_ip = config.get("a2s_ip", "")
- self.upper_threshold = config.get("upper_threshold", 0.8)
- self.down_threshold = config.get("down_threshold", 0.5)
- self.ecs_config = config.get("ecs_config", {})
- self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
- # 每间隔一段时间,查看一下配置文件的状态
- time.sleep(10)
- def close_server_instances(self):
- for p in self.instances:
- p.terminate()
|