|
- # coding:utf-8
- from a2s.a2s_monitor import watch_monitor
- from utils.send_message import send_weixin
- import time
- from math import ceil, floor
- from utils.tools import calculate_file_hash
- import threading
- import json
- from utils.ecs import EcsClient
- from pynats import NATSClient
- from a2s.tools import json_serialize
- # 负载计算
- class WorkerNodes(object):
- def __init__(self, config_path):
- self.config_path = config_path
- self.config_md5 = calculate_file_hash(self.config_path)
- config = json.load(open(self.config_path, "r"))
- self.worker_name = config.get("name", "worker").replace(" ", "")
- self.one_instance_node_num = config.get("one_instance_node_num", 1)
- self.max_instance = config.get("max_instance", 5)
- self.min_instance = config.get("min_instance", 1)
- self.watch_interval = config.get("watch_interval", 5)
- self.add_watch_time = config.get("add_watch_time", 180)
- self.release_watch_time = config.get("release_watch_time", 1200)
- self.a2s_ip = config.get("a2s_ip", "")
- self.upper_threshold = config.get("upper_threshold", 0.8)
- self.down_threshold = config.get("down_threshold", 0.5)
- self.ecs_config = config.get("ecs_config", {})
- self.freeze_time = config.get("freeze_time", 60)
- self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
- self.ecs_config["instance_name"] = self.worker_name
- self.change_alert = config.get("change_alert", True)
- self.load_alert = config.get("load_alert", True)
- self.load_alert_interval = config.get("load_alert_interval", 300)
- self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
- self.weixin_bot_url = config.get("weixin_bot_url", "")
- self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
- self.worker_timeout = config.get("worker_timeout", 300)
- self.monitor_subject = config.get("monitor_subject", "monitorRelease")
- self.nats_url = config.get("nats_url", "nats://172.17.4.188:19090")
- self.instances = []
- self.release_instances = []
- self._routine = []
- self.last_update_time = 0
- self.last_load_alert_time = 0
- self.last_refresh_instance_time = 0
- self.release_freeze_time = 0
- self.ecs = EcsClient(self.ecs_config)
- self.__initialize()
- def get_instance_num(self):
- """
- 获取实例数量
- :return:
- """
- return len(self.instances)
- def get_node_num(self):
- """
- 获取节点数量
- :return:
- """
- return len(self.instances) * self.one_instance_node_num
- # 安全退出应用
- def safe_logout(self, hosts):
- with NATSClient(self.nats_url) as nc:
- nc.connect()
- for host in hosts:
- nc.publish(subject=self.monitor_subject, payload=json_serialize({"host": host}))
- def refresh_instance(self):
- if not self.instances:
- return
- instances = [] # 分页查询状态
- _instances = [] # 正常的机器
- # 检查状态
- for ids, ip in self.instances:
- instances.append(ids)
- if len(instances) > 5:
- instances_ids = self.ecs.instance_status(instances)
- if instances_ids:
- _instances.extend(instances_ids)
- instances = []
- if instances:
- instances_ids = self.ecs.instance_status(instances)
- _instances.extend(instances_ids)
- # 无异常
- if len(_instances) == len(self.instances):
- self.ecs_warning(f"实例状态更新成功,没有设备被释放!")
- return
- # 有异常
- for instance in self.instances:
- if instance not in _instances:
- self.ecs.release_instance(instance[0], force=True)
- self.ecs_warning(f"实例状态更新成功,有{len(self.instances) - len(_instances)}台设备被释放!")
- # 更新存储
- self.instances = _instances
- def loop_monitor(self):
- """
- 循环检查
- :return:
- """
- while self._run:
- try:
- this_time = time.time()
- add_watch_number = ceil(self.add_watch_time / self.watch_interval)
- release_watch_number = ceil(self.release_watch_time / self.watch_interval)
- max_number = max([add_watch_number, release_watch_number])
- # 获取当前负载
- current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
- if not isinstance(current_nodes, int):
- self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
- if self._routine:
- self._routine.pop(0)
- time.sleep(60)
- continue
- # 计算当前负载
- load = current_nodes / (len(self.instances) * self.one_instance_node_num)
- self._routine.append(load)
- # 检查ecs的状态,去除系统自己释放的实例
- if this_time - self.last_refresh_instance_time > self.refresh_instance_interval:
- self.refresh_instance()
- self.last_refresh_instance_time = this_time
- # 清理大于"最大观察时间"的负载
- if len(self._routine) > max_number:
- self._routine.pop(0)
- # 冻结时间
- if this_time - self.last_update_time < self.freeze_time:
- time.sleep(self.watch_interval)
- continue
- # 检查当前负载
- if this_time - self.last_load_alert_time > self.load_alert_interval:
- self.ecs_warning(
- f"[{self.worker_name}] 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
- self.last_load_alert_time = this_time
- # 检查是否新增
- if len(self._routine) >= add_watch_number:
- add_start_index = len(self._routine) - add_watch_number
- add_load = sum(self._routine[add_start_index:]) / add_watch_number
- # 检查是否需要新增
- if add_load > self.upper_threshold:
- num = ceil((add_load - self.upper_threshold) * len(self.instances))
- if self.change_alert:
- self.ecs_warning(f"[{self.worker_name}]新增,当前负载:{add_load},预期新增:{num}台")
- self.add_instances(num)
- self.release_freeze_time = time.time()
- continue
- # 检查是否释放
- if len(self._routine) >= release_watch_number and time.time() - self.release_freeze_time > self.release_freeze_interval:
- release_start_index = len(self._routine) - release_watch_number
- release_load = sum(self._routine[release_start_index:]) / release_watch_number
- # 检查是否需要释放
- if release_load < self.down_threshold:
- # 释放机器数量
- # 负载小于 0.1
- if release_load < 0.1:
- num = 3 if len(self.instances) > 70 else 2
- # 负载大于 0.6
- else:
- num = 2 if len(self.instances) > 10 else 1
- # 告警
- if self.change_alert:
- self.ecs_warning(f"[{self.worker_name}]释放,当前负载:{release_load},预期释放:{num}台")
- self.reduce_instances(num)
- self.release_freeze_time = time.time()
- except Exception as e:
- self.ecs_warning(f"警告:[{self.worker_name}]服务出现异常,尽快处理{e}")
- time.sleep(self.watch_interval)
- self.close_server_instances()
- def add_instances(self, num):
- """
- 新增设备
- :param num:
- :return:
- """
- if num < 1:
- return
- if len(self.instances) < self.max_instance:
- if self.max_instance < len(self.instances) + num:
- num = self.max_instance - len(self.instances)
- if len(self.instances) + num < self.min_instance:
- num = self.min_instance - len(self.instances)
- # 开始新增,最大每次增加50台
- if num > 50:
- num = 50
- total_instance_ids = []
- # 新增实例,分页5台
- for i in range(floor(num / 5)):
- instance_ids = self.ecs.add_instance(5)
- if instance_ids:
- total_instance_ids.extend(instance_ids)
- self.instances.extend(instance_ids)
- self.last_update_time = time.time()
- else:
- self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
- # 不足5台
- yu_num = num % 5
- if yu_num > 0:
- instance_ids = self.ecs.add_instance(yu_num)
- if instance_ids:
- total_instance_ids.extend(instance_ids)
- self.instances.extend(instance_ids)
- self.last_update_time = time.time()
- else:
- self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
- # 通知
- if self.change_alert:
- self.ecs_warning(
- f"[{self.worker_name}]新增{len(total_instance_ids)}(预期{num})台设备成功,当前ECS:{len(self.instances)}台")
- print(f"新增完成,结果::{self.worker_name}::释放列表:{len(self.release_instances)}, 当前ECS列表:{self.instances}")
- else:
- # 通知
- if self.change_alert:
- self.ecs_warning(f"[{self.worker_name}]ECS服务实例达到最大值:{self.max_instance},预期新增{num}台, 扩容失败。")
- self.last_update_time = time.time()
- def reduce_instances(self, num):
- """
- 减少设备
- :param num:
- :return:
- """
- if len(self.instances) > self.min_instance:
- if len(self.instances) < self.min_instance + num:
- num = len(self.instances) - self.min_instance
- # 开始释放操作
- release_instance_ids = self.instances[:num]
- self.instances = self.instances[num:]
- hosts = []
- for instance_id, ip in release_instance_ids:
- hosts.append(ip)
- self.release_instances.append([instance_id, ip, int(time.time())])
- self.safe_logout(hosts)
- print(f"{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
- self.ecs_warning(f"[{self.worker_name}]预计释放:{num}台实例,当前释放队列{len(self.release_instances)},预计剩余{len(self.instances)}")
- def loop_release_instances(self):
- """
- 释放实例
- :return:
- """
- while self._run:
- time.sleep(60)
- delete_ids = []
- for instance_detail in self.release_instances:
- instance_id, ip, t = instance_detail
- if int(time.time()) - t > self.worker_timeout:
- try:
- print(f"时间够了,开始释放{instance_id},时间IP{ip}:{t}")
- result = self.ecs.release_instance(instance_id, force=True)
- if result:
- delete_ids.append(instance_detail)
- except Exception as e:
- print(e)
- time.sleep(2)
- else:
- break
- try:
- for instance_detail in delete_ids:
- self.release_instances.remove(instance_detail)
- except Exception as e:
- self.ecs_warning(f"释放失败,请检查{e}")
- print(f"释放完成,结果::{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
- def ecs_warning(self, text):
- send_weixin(text, self.weixin_bot_url)
- def __initialize(self):
- if len(self.instances) < self.min_instance:
- while True:
- try:
- self.add_instances(self.min_instance - len(self.instances))
- break
- except Exception as e:
- print(e)
- print("申请设备失败,等待60秒后重试")
- time.sleep(60)
- def start(self):
- t1 = threading.Thread(target=self.loop_monitor)
- t1.start()
- t2 = threading.Thread(target=self.update_config)
- t2.start()
- t3 = threading.Thread(target=self.loop_release_instances)
- t3.start()
- t1.join()
- t2.join()
- t3.join()
- self.close_server_instances()
- def update_config(self):
- """
- 更新配置
- :param config:
- :return:
- """
- while self._run:
- config_md5 = calculate_file_hash(self.config_path)
- if config_md5 != self.config_md5:
- self.config_md5 = config_md5
- config = json.load(open(self.config_path, "r"))
- self.worker_name = config.get("name", "worker").replace(" ", "")
- self.one_instance_node_num = config.get("one_instance_node_num", 1)
- self.max_instance = config.get("max_instance", 5)
- self.min_instance = config.get("min_instance", 1)
- self.watch_interval = config.get("watch_interval", 5)
- self.add_watch_time = config.get("add_watch_time", 180)
- self.release_watch_time = config.get("release_watch_time", 1200)
- self.a2s_ip = config.get("a2s_ip", "")
- self.upper_threshold = config.get("upper_threshold", 0.8)
- self.down_threshold = config.get("down_threshold", 0.5)
- self.freeze_time = config.get("freeze_time", 60)
- self._run = config.get("is_run", True) # 调整配置文件可以关闭服务
- self.change_alert = config.get("change_alert", True)
- self.load_alert = config.get("load_alert", True)
- self.load_alert_interval = config.get("load_alert_interval", 300)
- self.weixin_bot_url = config.get("weixin_bot_url", "")
- self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
- self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
- self.worker_timeout = config.get("worker_timeout", 300)
- self.monitor_subject = config.get("monitor_subject", "monitorRelease")
- # 每间隔一段时间,查看一下配置文件的状态
- time.sleep(10)
- def close_server_instances(self):
- release_ids = []
- for ids, ip in self.instances:
- release_ids.append(ids)
- if self.release_instances:
- for instance_id, ip, t in self.release_instances:
- release_ids.append(instance_id)
- print("释放实例", release_ids)
- release_false = self.ecs.delete_instance(release_ids)
- # 释放完成
- if release_false:
- if self.change_alert:
- self.ecs_warning(f"[{self.worker_name}]]服务关闭,部分实例释放失败,请手动释放,问题实例:{release_false}")
- else:
- self.ecs_warning(f"[{self.worker_name}]服务关闭,所有实例释放完成")
|