lijunliang 8 mesi fa
commit
8e8ddbcc3b

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 11 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,11 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
+      <Languages>
+        <language minSize="161" name="Python" />
+      </Languages>
+    </inspection_tool>
+    <inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
+  </profile>
+</component>

+ 6 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="USE_PROJECT_PROFILE" value="false" />
+    <version value="1.0" />
+  </settings>
+</component>

+ 4 - 0
.idea/misc.xml

@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (awesome_extract)" project-jdk-type="Python SDK" />
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/monitor.iml" filepath="$PROJECT_DIR$/.idea/monitor.iml" />
+    </modules>
+  </component>
+</project>

+ 10 - 0
.idea/monitor.iml

@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$">
+      <excludeFolder url="file://$MODULE_DIR$/venv" />
+    </content>
+    <orderEntry type="jdk" jdkName="Python 3.7 (awesome_extract)" jdkType="Python SDK" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

BIN
a2s-0.0.1-py3-none-any.whl


+ 66 - 0
docs/config.py

@@ -0,0 +1,66 @@
+config_topic_ecs = {
+    "goods_extract":
+        {
+            "config": {
+                "image_id": "",
+                "instance_type": "",
+                "vswitch_id": "",
+                "security_group_id": "",
+                "io_optimized": "",
+                "system_disk_category": "",
+                "system_disk_size": "",
+                "spot_price_limit": 0.56
+            },
+            "max_instance_num": 50,
+            "min_instance_num": 5,
+            "one_instance_node_num": 4,
+        },
+    "goods_field_ner":
+        {
+            "config": {
+                "image_id": "",
+                "instance_type": "",
+                "vswitch_id": "",
+                "security_group_id": "",
+                "io_optimized": "",
+                "system_disk_category": "",
+                "system_disk_size": "",
+                "spot_price_limit": 0.56
+            },
+            "max_instance_num": 50,
+            "min_instance_num": 5,
+            "one_instance_node_num": 4,
+        },
+    "title_ner":
+        {
+            "config": {
+                "image_id": "",
+                "instance_type": "",
+                "vswitch_id": "",
+                "security_group_id": "",
+                "io_optimized": "",
+                "system_disk_category": "",
+                "system_disk_size": "",
+                "spot_price_limit": 0.56
+            },
+            "max_instance_num": 50,
+            "min_instance_num": 5,
+            "one_instance_node_num": 4,
+        },
+    "goods_ner":
+        {
+            "config": {
+                "image_id": "",
+                "instance_type": "",
+                "vswitch_id": "",
+                "security_group_id": "",
+                "io_optimized": "",
+                "system_disk_category": "",
+                "system_disk_size": "",
+                "spot_price_limit": 0.56
+            },
+            "max_instance_num": 50,
+            "min_instance_num": 5,
+            "one_instance_node_num": 4,
+        },
+}

+ 28 - 0
docs/file2txt_nats.json

@@ -0,0 +1,28 @@
+{
+    "name": "file2txt",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2ze4tlghfobrgbp6koac",
+        "instance_type": "ecs.g5.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze1n1k3mo3fv2irsfdps"
+    },
+    "max_instance": 100,
+    "min_instance": 1,
+    "one_instance_node_num": 4,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": false,
+    "freeze_time": 120,
+    "change_alert": true,
+    "load_alert": true,
+    "load_alert_interval": 3600,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=60d08d19-ea5d-4f91-b6c9-65a133561aa1",
+    "refresh_instance_interval": 3600
+}

+ 28 - 0
docs/goods_field_nats.json

@@ -0,0 +1,28 @@
+{
+    "name": "goods_field",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2zeadgyogifc5fzqadcd",
+        "instance_type": "ecs.c6.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze586sxfwsaov4s5w88d"
+    },
+    "max_instance": 100,
+    "min_instance": 1,
+    "one_instance_node_num": 4,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": false,
+    "freeze_time": 120,
+    "change_alert": true,
+    "load_alert": true,
+    "load_alert_interval": 3600,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=60d08d19-ea5d-4f91-b6c9-65a133561aa1",
+    "refresh_instance_interval": 3600
+}

+ 28 - 0
docs/goods_service_nats.json

@@ -0,0 +1,28 @@
+{
+    "name": "goods_service",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2zeihn2b3vxj7s5z1zxp",
+        "instance_type": "ecs.g5.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze1n1k3mo3fv2irsfdps"
+    },
+    "max_instance": 80,
+    "min_instance": 1,
+    "one_instance_node_num": 5,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": false,
+    "freeze_time": 120,
+    "change_alert": true,
+    "load_alert": true,
+    "load_alert_interval": 3600,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=60d08d19-ea5d-4f91-b6c9-65a133561aa1",
+    "refresh_instance_interval": 3600
+}

+ 28 - 0
docs/img2txt_nats.json

@@ -0,0 +1,28 @@
+{
+    "name": "img2txt",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2ze3uv2gz5dsy0ol2i4x",
+        "instance_type": "ecs.c6.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze586sxfwsaov4s5w88d"
+    },
+    "max_instance": 300,
+    "min_instance": 1,
+    "one_instance_node_num": 2,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": true,
+    "freeze_time": 120,
+    "change_alert": true,
+    "load_alert": true,
+    "load_alert_interval": 3600,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=60d08d19-ea5d-4f91-b6c9-65a133561aa1",
+    "refresh_instance_interval": 3600
+}

+ 133 - 0
docs/test.py

@@ -0,0 +1,133 @@
+# coding:utf-8
+
+import json
+
+picture = {
+    "name": "img2txt",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2ze3uv2gz5dsy0ol2i4x",
+        "instance_type": "ecs.c6.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze586sxfwsaov4s5w88d"
+    },
+    "max_instance": 300,
+    "min_instance": 1,
+    "one_instance_node_num": 2,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": True,
+    "freeze_time": 120,
+    "change_alert": True,
+    "load_alert": True,
+    "load_alert_interval": 60 * 60,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=e7d4459d-4b6e-49a0-8d1b-a1996a1b3dd6",
+    "refresh_instance_interval": 60 * 60,
+}
+json.dump(picture, open(f'{picture["name"]}_nats.json', 'w'), indent=4)
+
+# ret = json.load(open(f'{picture["name"]}.json'))
+# print(ret)
+
+file = {
+    "name": "file2txt",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2ze4tlghfobrgbp6koac",
+        "instance_type": "ecs.g5.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze1n1k3mo3fv2irsfdps"
+    },
+
+    "max_instance": 100,
+    "min_instance": 1,
+    "one_instance_node_num": 4,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": False,
+    "freeze_time": 120,
+    "change_alert": True,
+    "load_alert": True,
+    "load_alert_interval": 60 * 60,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=e7d4459d-4b6e-49a0-8d1b-a1996a1b3dd6",
+    "refresh_instance_interval": 60 * 60,
+}
+
+json.dump(file, open(f'{file["name"]}_nats.json', 'w'), indent=4)
+
+
+goods = {
+    "name": "goods_service",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2zeihn2b3vxj7s5z1zxp",
+        "instance_type": "ecs.g5.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze1n1k3mo3fv2irsfdps"
+    },
+    "max_instance": 80,
+    "min_instance": 1,
+    "one_instance_node_num": 5,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": False,
+    "freeze_time": 120,
+    "change_alert": True,
+    "load_alert": True,
+    "load_alert_interval": 60 * 60,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=e7d4459d-4b6e-49a0-8d1b-a1996a1b3dd6",
+    "refresh_instance_interval": 60 * 60,
+}
+
+json.dump(goods, open(f'{goods["name"]}_nats.json', 'w'), indent=4)
+
+
+
+field = {
+    "name": "goods_field",
+    "ecs_config": {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2zeadgyogifc5fzqadcd",
+        "instance_type": "ecs.c6.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze586sxfwsaov4s5w88d"
+    },
+    "max_instance": 100,
+    "min_instance": 1,
+    "one_instance_node_num": 4,
+    "watch_interval": 5,
+    "add_watch_time": 120,
+    "release_watch_time": 1200,
+    "a2s_ip": "172.17.4.188:9090",
+    "upper_threshold": 0.8,
+    "down_threshold": 0.5,
+    "is_run": False,
+    "freeze_time": 120,
+    "change_alert": True,
+    "load_alert": True,
+    "load_alert_interval": 60 * 60,
+    "weixin_bot_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=e7d4459d-4b6e-49a0-8d1b-a1996a1b3dd6",
+    "refresh_instance_interval": 60 * 60,
+}
+
+json.dump(field, open(f'{field["name"]}_nats.json', 'w'), indent=4)

+ 80 - 0
monitor_server.py

@@ -0,0 +1,80 @@
+# coding:utf-8
+import json
+import os
+import time
+from utils.watch_load import WorkerNodes
+from loguru import logger
+from multiprocessing import Process
+import psutil
+from utils.send_message import send_weixin
+
+ServerMonitor = {}
+
+
+def add_server(config_path):
+    """
+    添加服务器
+    :return:
+    """
+    worker = WorkerNodes(config_path)
+    worker.start()
+    return None
+
+
+def config_changed(config_dir):
+    """
+    检查配置文件的变动
+    """
+    for file in os.listdir(config_dir):
+        file_path = os.path.join(config_dir, file)
+        if os.path.isfile(file_path):
+            if file_path.endswith("_nats.json"):
+                config_value = json.load(open(file_path, "r"))
+                name = config_value["name"]
+                is_run = config_value.get("is_run", True)
+
+                # 关闭服务
+                if name in ServerMonitor:
+                    if not is_run:
+                        ServerMonitor[name][0].join()
+                        del ServerMonitor[name]
+                        send_weixin("关闭{}服务成功!!".format(name), config_value.get("weixin_bot_url", ""))
+                        logger.warning("关闭{}服务成功".format(name))
+                    continue
+
+                # 启动服务过滤关闭的服务
+                if not is_run:
+                    continue
+                try:
+                    p = Process(target=add_server, args=(file_path,))
+                    p.start()
+                    ServerMonitor[name] = (p, p.pid)
+                    send_weixin("启动{}服务成功!!".format(name), config_value.get("weixin_bot_url", ""))
+                    logger.warning("启动{}服务成功".format(name))
+                except Exception as e:
+                    send_weixin("启动{}服务失败!!".format(name), config_value.get("weixin_bot_url", ""))
+                    print(e)
+
+
+def check_server():
+    # 意外退出检查
+    global ServerMonitor
+    exists_pid = []
+    for name, (_, p) in ServerMonitor.items():
+        if not psutil.pid_exists(p):
+            logger.warning(f"{name}::a2s监控服务意外退出,请尽快处理或重启")
+            exists_pid.append(name)
+    for name in exists_pid:
+        del ServerMonitor[name]
+
+
+def run_server(config_dir_path):
+    while True:
+        config_changed(config_dir_path)
+        time.sleep(10)
+        check_server()
+        time.sleep(20)
+
+
+if __name__ == '__main__':
+    run_server("./docs")

BIN
test_dir/__pycache__/file_server.cpython-37.pyc


BIN
test_dir/__pycache__/mon.cpython-37.pyc


BIN
test_dir/__pycache__/picture_server.cpython-37.pyc


+ 47 - 0
test_dir/file_server.py

@@ -0,0 +1,47 @@
+# coding:utf-8
+
+from a2s.a2s_server import watch
+from a2s.a2s_client import a2s_execute
+import multiprocessing
+import random
+from concurrent.futures import ThreadPoolExecutor
+import time
+
+
+def start_test2img(data):
+    # 本次不使用SSL,所以channel是不安全的
+    try:
+        for t in range(5):
+            result = a2s_execute("192.168.3.240:9090", "test2img", 240, b"test")
+            result = result.decode("utf-8")
+            return result
+    except Exception as e:
+        print("test2img异常::",e)
+        return ""
+
+
+@watch
+def run(data: bytes, *args, **kwargs):
+    # 是否有图片
+    a = random.randint(1, 2)
+    print("picture:::>", a)
+    if a == 1:
+        # 有几张图片
+        picture_num = random.randint(2, 40)
+        pictures = [i for i in range(picture_num)]
+        with ThreadPoolExecutor(max_workers=30) as executor:
+            result = executor.map(start_test2img, pictures)
+        list(result)
+    a = random.randint(5, 20)
+    time.sleep(a)
+    return data
+
+
+def worker():
+    run(nats_host="192.168.3.240", nats_port=19090, subject="test2txt", queue="main")
+
+
+def add_file_process():
+    new_process = multiprocessing.Process(target=worker)
+    new_process.start()
+    return new_process

+ 195 - 0
test_dir/mon.py

@@ -0,0 +1,195 @@
+# coding:utf-8
+from a2s.a2s_monitor import watch_monitor
+import time
+from math import ceil, floor
+from file_server import add_file_process
+from picture_server import add_picture_process
+from utils.tools import calculate_file_hash
+import threading
+import json
+import multiprocessing
+
+func_dict = {
+    "test2txt_add": add_file_process,
+    "test2img_add": add_picture_process
+}
+
+
+# 负载计算
+class WorkerNodes(object):
+    def __init__(self, config_path):
+        self.config_path = config_path
+        self.config_md5 = calculate_file_hash(self.config_path)
+        config = json.load(open(self.config_path, "r"))
+        self.worker_name = config.get("name", "worker")
+        self.one_instance_node_num = config.get("one_instance_node_num", 1)
+        self.max_instance = config.get("max_instance", 5)
+        self.min_instance = config.get("min_instance", 1)
+        self.watch_interval = config.get("watch_interval", 5)
+        self.add_watch_time = config.get("add_watch_time", 180)
+        self.release_watch_time = config.get("release_watch_time", 1200)
+        self.a2s_ip = config.get("a2s_ip", "")
+        self.upper_threshold = config.get("upper_threshold", 0.8)
+        self.down_threshold = config.get("down_threshold", 0.5)
+        self.ecs_config = config.get("ecs_config", {})
+        self.freeze_time = config.get("freeze_time", 60)
+        self._run = config.get("is_run", True)  # 调整配置文件可以关闭服务
+        self.instances = []
+        self._routine = []
+        self.last_update_time = 0
+        self.__initialize()
+
+    def get_instance_num(self):
+        """
+        获取实例数量
+        :return:
+        """
+        return len(self.instances)
+
+    def get_node_num(self):
+        """
+        获取节点数量
+        :return:
+        """
+        return len(self.instances) * self.one_instance_node_num
+
+    def loop_monitor(self):
+        """
+        循环检查
+        :return:
+        """
+        while self._run:
+            try:
+                add_watch_number = ceil(self.add_watch_time / self.watch_interval)
+                release_watch_number = ceil(self.release_watch_time / self.watch_interval)
+                max_number = max([add_watch_number, release_watch_number])
+                current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
+                if not isinstance(current_nodes, int):
+                    self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
+                    if self._routine:
+                        self._routine.remove(0)
+                    time.sleep(60)
+                    continue
+
+                # 获取当前负载
+                load = current_nodes / len(self.instances) * self.one_instance_node_num
+                self._routine.append(load)
+
+                # 清理大于"最大观察时间"的负载
+                if len(self._routine) > max_number:
+                    self._routine.pop(0)
+                if time.time() - self.last_update_time < self.freeze_time:
+                    print(f"{self.worker_name}::冻结时间未到,不进行增减设备")
+                    time.sleep(self.watch_interval)
+                    continue
+                # 检查是否新增
+                if len(self._routine) >= add_watch_number:
+                    add_start_index = len(self._routine) - add_watch_number
+                    print("长度比:::", len(self._routine), len(self._routine[add_start_index:]), add_watch_number)
+                    add_load = sum(self._routine[add_start_index:]) / add_watch_number
+                    if add_load > self.upper_threshold:
+                        num = ceil((add_load - self.upper_threshold) * len(self.instances))
+                        print(f"{self.worker_name}新增add_load:::", add_load, self.upper_threshold, num)
+                        self.add_instances(num)
+                        self.last_update_time = time.time()
+                        continue
+
+                # 检查是否释放
+                if len(self._routine) >= release_watch_number:
+                    release_start_index = len(self._routine) - release_watch_number
+                    release_load = sum(self._routine[release_start_index:]) / release_watch_number
+                    if release_load < self.down_threshold:
+                        num = floor((1 - self.down_threshold) * len(self.instances))
+                        print(f"{self.worker_name}释放release_load:::", release_load, self.down_threshold, num)
+                        self.reduce_instances(num)
+                        self.last_update_time = time.time()
+
+                # 检查间隔
+                print(
+                    f"{self.worker_name} 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
+            except Exception as e:
+                print("baocuole==>", e)
+            time.sleep(self.watch_interval)
+        self.close_server_instances()
+
+    def add_instances(self, num):
+        """
+        新增设备
+        :param num:
+        :return:
+        """
+        if len(self.instances) < self.max_instance:
+            if self.max_instance < len(self.instances) + num:
+                num = self.max_instance - len(self.instances)
+            if len(self.instances) + num < self.min_instance:
+                num = self.min_instance - len(self.instances)
+            print(f"ECS新增{num}设备,当前ECS数量为{len(self.instances)}")
+            for i in range(num):
+                p = multiprocessing.Process(target=func_dict[f"{self.worker_name}_add"])
+                p.start()
+                self.instances.append(p)
+            # 开始新增
+        else:
+            self.ecs_warning(f"ECS 扩容失败,{self.worker_name}服务已达到最大实例数量:{self.max_instance}")
+
+    def reduce_instances(self, num):
+        """
+        减少设备
+        :param num:
+        :return:
+        """
+        if len(self.instances) > self.min_instance:
+            if len(self.instances) < self.min_instance + num:
+                num = len(self.instances) - self.min_instance
+            # 开始释放操作
+            for ind in range(num):
+                if ind < num:
+                    p = self.instances.pop(0)
+                    print("ECS释放进程:", p, p.pid)
+                    p.terminate()
+        else:
+            self.ecs_warning(f"ECS 缩减失败,{self.worker_name}服务已达到最小实例数量:{self.min_instance}")
+
+    def ecs_warning(self, text):
+        print("警告:::⚠️", text)
+
+    def __initialize(self):
+        if len(self.instances) < self.min_instance:
+            self.add_instances(self.min_instance - len(self.instances))
+
+    def start(self):
+        t1 = threading.Thread(target=self.loop_monitor)
+        t1.start()
+        t2 = threading.Thread(target=self.update_config)
+        t2.start()
+        t1.join()
+        t2.join()
+
+    def update_config(self):
+        """
+        更新配置
+        :param config:
+        :return:
+        """
+        while self._run:
+            config_md5 = calculate_file_hash(self.config_path)
+            if config_md5 != self.config_md5:
+                config = json.load(open(self.config_path, "r"))
+                self.worker_name = config.get("name", "worker")
+                self.one_instance_node_num = config.get("one_instance_node_num", 1)
+                self.max_instance = config.get("max_instance", 5)
+                self.min_instance = config.get("min_instance", 1)
+                self.watch_interval = config.get("watch_interval", 5)
+                self.add_watch_time = config.get("add_watch_time", 180)
+                self.release_watch_time = config.get("release_watch_time", 1200)
+                self.a2s_ip = config.get("a2s_ip", "")
+                self.upper_threshold = config.get("upper_threshold", 0.8)
+                self.down_threshold = config.get("down_threshold", 0.5)
+                self.ecs_config = config.get("ecs_config", {})
+                self._run = config.get("is_run", True)  # 调整配置文件可以关闭服务
+            # 每间隔一段时间,查看一下配置文件的状态
+            time.sleep(10)
+
+    def close_server_instances(self):
+        for p in self.instances:
+            p.terminate()

+ 76 - 0
test_dir/monitor_server.py

@@ -0,0 +1,76 @@
+# coding:utf-8
+import json
+import os
+import time
+from mon import WorkerNodes
+from loguru import logger
+from multiprocessing import Process
+import psutil
+
+ServerMonitor = {}
+
+
+def add_server(config_path):
+    """
+    添加服务器
+    :return:
+    """
+    try:
+        worker = WorkerNodes(config_path)
+        worker.start()
+        return None
+    except Exception as e:
+        logger.warning(f"{config_path}::启动失败", e)
+        return False
+
+
+def config_changed(config_dir):
+    """
+    检查配置文件的变动
+    """
+    for file in os.listdir(config_dir):
+        file_path = os.path.join(config_dir, file)
+        if os.path.isfile(file_path):
+            if file_path.endswith("_nats.json"):
+                config_value = json.load(open(file_path, "r"))
+                name = config_value["name"]
+                is_run = config_value.get("is_run", True)
+
+                # 关闭服务
+                if name in ServerMonitor:
+                    if not is_run:
+                        ServerMonitor[name][0].terminate()
+                        del ServerMonitor[name]
+                        logger.warning("关闭{}服务成功".format(name))
+                    continue
+
+                # 启动服务过滤关闭的服务
+                if not is_run:
+                    continue
+                p = Process(target=add_server, args=(file_path,))
+                p.start()
+                ServerMonitor[name] = (p, p.pid)
+
+
+def check_server():
+    # 意外退出检查
+    global ServerMonitor
+    exists_pid = []
+    for name, (_, p) in ServerMonitor.items():
+        if not psutil.pid_exists(p):
+            logger.warning(f"{name}::a2s监控服务意外退出,请尽快处理或重启")
+            exists_pid.append(name)
+    for name in exists_pid:
+        del ServerMonitor[name]
+
+
+def run_server(config_dir_path):
+    while True:
+        config_changed(config_dir_path)
+        time.sleep(10)
+        check_server()
+        time.sleep(20)
+
+
+if __name__ == '__main__':
+    run_server("./docs")

+ 26 - 0
test_dir/picture_server.py

@@ -0,0 +1,26 @@
+# coding:utf-8
+
+from a2s.a2s_server import watch
+import multiprocessing
+import random
+import  time
+
+
+@watch
+def run(data: bytes, *args, **kwargs):
+    t=random.randint(4,30)
+    time.sleep(t)
+    return data
+
+
+def worker():
+    run(nats_host="192.168.3.240", nats_port=19090, subject="test2img", queue="main")
+
+
+def add_picture_process():
+    new_process = multiprocessing.Process(target=worker)
+    new_process.start()
+    return new_process
+
+if __name__ == '__main__':
+    worker()

+ 32 - 0
test_dir/test_client.py

@@ -0,0 +1,32 @@
+## coding:utf-8
+from a2s.a2s_client import a2s_execute
+import time
+
+def start_file2txt(data: list):
+    # 本次不使用SSL,所以channel是不安全的
+    try:
+        for t in range(5):
+            result = a2s_execute("192.168.3.240:9090", "test2txt", 240, b"test")
+            result = result.decode("utf-8")
+            return result
+    except Exception as e:
+        print(e)
+        return ""
+
+def start_test2img(data: list):
+    # 本次不使用SSL,所以channel是不安全的
+    try:
+        for t in range(5):
+            result = a2s_execute("192.168.3.240:9090", "test2img", 60, b"test")
+            result = result.decode("utf-8")
+            return result
+    except Exception as e:
+        print("test2img异常::",e)
+        return ""
+
+if __name__ == '__main__':
+    start_time=time.time()
+    result=start_file2txt([])
+    # result=start_test2img([])
+    print(result)
+    print(time.time()-start_time)

+ 9 - 0
update_data_now.py

@@ -0,0 +1,9 @@
+from pynats import NATSClient
+from a2s.tools import json_serialize
+
+def safe_logout():
+    with NATSClient("nats://172.17.4.188:19090") as nc:
+        nc.connect()
+        nc.publish(subject="monitorRelease", payload=json_serialize({"host": "ai-01"}))
+
+safe_logout()

BIN
utils/__pycache__/tools.cpython-37.pyc


+ 180 - 0
utils/ecs.py

@@ -0,0 +1,180 @@
+# coding:utf-8
+import json
+import logging
+import time
+from aliyunsdkcore import client
+from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
+from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
+from aliyunsdkecs.request.v20140526.DeleteInstanceRequest import DeleteInstanceRequest
+
+
+class EcsClient(object):
+    def __init__(self, config):
+        # 区域设置
+        self.region_id = config.get("region_id", "cn-beijing")
+        self.ak_id = config.get("ak_id")
+        self.ak_secret = config.get("ak_secret")
+        self.config = config
+        self.clt = client.AcsClient(self.ak_id, self.ak_secret, self.region_id)
+        # pass
+
+    # 创建请求
+    @staticmethod
+    def build_request(config):
+        request = RunInstancesRequest()
+        #  实例所属的地域区
+        request.set_ZoneId(config.get("zone_id")) if config.get("zone_id") else None
+        # 交换机ID
+        request.set_VSwitchId(config.get("vswitch_id")) if config.get("vswitch_id") else None
+        # 镜像ID
+        request.set_ImageId(config.get("image_id")) if config.get("image_id") else None
+        # 安全组ID
+        request.set_SecurityGroupId(config.get("security_group_id")) if config.get("security_group_id") else None
+        # 实例规格
+        request.set_InstanceType(config.get("instance_type")) if config.get("instance_type") else None
+        # 优化本地盘
+        request.set_IoOptimized(config.get("io_optimized")) if config.get("io_optimized") else None
+        # 高效云盘
+        request.set_SystemDiskCategory(config.get("system_disk_category")) if config.get(
+            "system_disk_category") else None
+        # 默认40G
+        request.set_SystemDiskSize(config.get("system_disk_size")) if config.get("system_disk_size") else None
+        request.set_SpotStrategy("SpotAsPriceGo")  # 竞价模式自动出价
+        request.set_PasswordInherit(True)  # 使用镜像用户名密码
+        # 实例名称
+        request.set_InstanceName(config.get("instance_name")) if config.get("instance_name") else "auto_create_instance"
+        # 保护期
+        request.set_SpotDuration(config.get("spot_duration")) if config.get("spot_duration") else 0
+        return request
+
+    # 创建实例并启动
+    def create_multiple_instances(self, amount):
+        '''
+        创建实例
+        :param amount: 创建数量
+        :param config: 实例配置见 build_request
+        :return:
+        '''
+        request = self.build_request(self.config)
+        request.set_Amount(amount)
+        return self._execute_request(request, amount)
+
+    # 发送API请求
+    def _send_request(self, request):
+        request.set_accept_format('json')
+        try:
+            response_str = self.clt.do_action_with_exception(request)
+            logging.info(response_str)
+            response_detail = json.loads(response_str)
+            return response_detail
+        except Exception as e:
+            logging.error(e)
+            return None
+
+    def _execute_request(self, request, amount):
+        response = self._send_request(request)
+        if response.get('Code') is None:
+            instance_ids = response.get('InstanceIdSets').get('InstanceIdSet')
+            instances = []
+            while len(instances) < amount:
+                time.sleep(10)
+                instances = self.check_instance_running(instance_ids)
+            return instances
+
+    # 检查实例的运行状态
+    def check_instance_running(self, instance_ids):
+        request = DescribeInstancesRequest()
+        request.set_InstanceIds(json.dumps(instance_ids))
+        response = self._send_request(request)
+        if response.get('Code') is None:
+            instances_list = response.get('Instances').get('Instance')
+            instances = []
+            for instance_detail in instances_list:
+                if instance_detail.get('Status') == "Running":
+                    vpc_attributes = instance_detail.get('VpcAttributes', {}).get('PrivateIpAddress', {}).get(
+                        'IpAddress',[])
+                    intranet_ip = vpc_attributes[0] if len(vpc_attributes) > 0 else None
+                    instances.append((instance_detail.get('InstanceId'), intranet_ip))
+            return instances
+
+    def release_instance(self, instance_id, force=False):
+        """
+        根据实例id删除实例
+        :param instance_id: 实例id
+        :param force: 当值为True时,运行的中实例也会被释放
+        :return:
+        """
+        request = DeleteInstanceRequest()
+        request.set_InstanceId(instance_id)
+        request.set_Force(force)
+        return self._send_request(request)
+
+    # 检查实例的运行状态
+    def check_instance_status(self, instance_ids):
+        instances = []
+        request = DescribeInstancesRequest()
+        request.set_InstanceIds(json.dumps(instance_ids))
+        response = self._send_request(request)
+        if response.get('Code') is None:
+            instances_list = response.get('Instances').get('Instance')
+            for instance_detail in instances_list:
+                vpc_attributes = instance_detail.get('VpcAttributes', {}).get('PrivateIpAddress', {}).get('IpAddress',
+                                                                                                          [])
+                intranet_ip = vpc_attributes[0] if len(vpc_attributes) > 0 else None
+                instances.append((instance_detail.get('InstanceId'), intranet_ip))
+            return instances
+        else:
+            return None
+
+    def add_instance(self, num):
+        for i in range(5):
+            try:
+                instance_ids = self.create_multiple_instances(num)
+                if instance_ids:
+                    return instance_ids
+            except Exception as e:
+                print(e)
+                time.sleep(5)
+
+    def delete_instance(self, instance_ids):
+        delete_false = []
+        for instance_id in instance_ids:
+            try:
+                result = self.release_instance(instance_id, force=True)
+                if not result:
+                    delete_false.append(instance_id)
+            except Exception as e:
+                print(e)
+                time.sleep(2)
+                delete_false.append(instance_id)
+        return delete_false
+
+    def instance_status(self, instance_ids):
+        for r in range(5):
+            try:
+                result = self.check_instance_status(instance_ids)
+                if result:
+                    return result
+            except Exception as e:
+                print(e)
+                time.sleep(2)
+
+
+if __name__ == '__main__':
+    config = {
+        "ak_id": "LTAI4G5x9aoZx8dDamQ7vfZi",
+        "region_id": "cn-beijing",
+        "ak_secret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "image_id": "m-2zeihn2b3vxj7s5z1zxp",
+        "instance_type": "ecs.g5.xlarge",
+        "security_group_id": "sg-2ze6zhxrqy9vueae27xy",
+        "vswitch_id": "vsw-2ze1n1k3mo3fv2irsfdps"
+    }
+    ecs = EcsClient(config)
+    # result=ecs.create_multiple_instances(1)
+    # print(result)
+
+    # result = ecs.check_instance_status(["i-2ze4j0syekh1oq9017ok"])
+    # print(result)
+    # result = ecs.release_instance("i-2ze4j0syekh1oq9017ok", force=True)
+    # print(result)

+ 28 - 0
utils/send_message.py

@@ -0,0 +1,28 @@
+# coding:utf-8
+import requests
+from loguru import logger
+
+
+def send_weixin(content,url):
+    try:
+        if not url:
+            return
+        headers = {"Content-Type": "application/json"}  # http数据头,类型为json
+        data = {
+            "msgtype": "text",
+            "text": {
+                "content": content,  # 让群机器人发送的消息内容。
+                "mentioned_list": [],
+            }
+        }
+        proxy = {
+            "http": "http://172.17.221.150:10991",
+            "https": "http://172.17.221.150:10991",
+        }
+        r = requests.post(url, headers=headers, json=data, proxies=proxy)  # 利用requests库发送post请求
+        if r.status_code == 200:
+            logger.warning("告警发送成功")
+            return
+        logger.warning("告警发送失败")
+    except Exception as e:
+        logger.warning({f"告警程序错误::{e}"})

+ 97 - 0
utils/test.py

@@ -0,0 +1,97 @@
+# 检查实例的运行状态
+def check_instance_running(instance_ids):
+    instances_list = [
+        {'ResourceGroupId': '', 'Memory': 8192, 'InstanceChargeType': 'PostPaid', 'Cpu': 4, 'OSName': 'CentOS  7.8 64位',
+         'InstanceNetworkType': 'vpc', 'InnerIpAddress': {'IpAddress': []}, 'ExpiredTime': '2099-12-31T15:59Z',
+         'ImageId': 'm-2ze3uv2gz5dsy0ol2i4x',
+         'EipAddress': {'AllocationId': '', 'IpAddress': '', 'InternetChargeType': ''}, 'ImageOptions': {},
+         'HostName': 'iZ2zegj32en19tn3f09u4fZ', 'Tags': {'Tag': [{'TagKey': 'acs:ecs:payType', 'TagValue': 'spot'}]},
+         'VlanId': '', 'Status': 'Running', 'HibernationOptions': {'Configured': False},
+         'MetadataOptions': {'HttpTokens': '', 'HttpEndpoint': ''}, 'InstanceId': 'i-2zegj32en19tn3f09u4f',
+         'StoppedMode': 'Not-applicable', 'CpuOptions': {'ThreadsPerCore': 2, 'Numa': 'N/A', 'CoreCount': 2},
+         'StartTime': '2023-10-16T02:57Z', 'DeletionProtection': False,
+         'SecurityGroupIds': {'SecurityGroupId': ['sg-2ze6zhxrqy9vueae27xy']},
+         'VpcAttributes': {'PrivateIpAddress': {'IpAddress': ['172.17.4.192']}, 'VpcId': 'vpc-2zee8yqxobahsyfg9d11h',
+                           'VSwitchId': 'vsw-2ze586sxfwsaov4s5w88d', 'NatIpAddress': ''},
+         'InternetChargeType': 'PayByTraffic', 'InstanceName': 'img2txt', 'DeploymentSetId': '',
+         'InternetMaxBandwidthOut': 0, 'SerialNumber': '6428aa21-5136-44ce-952d-cb20983ff2ef', 'OSType': 'linux',
+         'CreationTime': '2023-10-16T02:57Z', 'AutoReleaseTime': '', 'Description': '', 'InstanceTypeFamily': 'ecs.c6',
+         'DedicatedInstanceAttribute': {'Tenancy': '', 'Affinity': ''}, 'SpotDuration': 1,
+         'PublicIpAddress': {'IpAddress': []}, 'GPUSpec': '', 'NetworkInterfaces': {'NetworkInterface': [
+            {'Type': 'Primary', 'PrimaryIpAddress': '172.17.4.192', 'MacAddress': '00:16:3e:0a:b6:5f',
+             'NetworkInterfaceId': 'eni-2ze6xmbgo5yki9613pxa',
+             'PrivateIpSets': {'PrivateIpSet': [{'PrivateIpAddress': '172.17.4.192', 'Primary': True}]}}]},
+         'SpotPriceLimit': 0.0, 'DeviceAvailable': True, 'SaleCycle': '', 'InstanceType': 'ecs.c6.xlarge',
+         'SpotStrategy': 'SpotAsPriceGo', 'OSNameEn': 'CentOS  7.8 64 bit', 'IoOptimized': True,
+         'ZoneId': 'cn-beijing-g', 'ClusterId': '',
+         'EcsCapacityReservationAttr': {'CapacityReservationPreference': '', 'CapacityReservationId': ''},
+         'DedicatedHostAttribute': {'DedicatedHostId': '', 'DedicatedHostName': '', 'DedicatedHostClusterId': ''},
+         'GPUAmount': 0, 'OperationLocks': {'LockReason': []}, 'InternetMaxBandwidthIn': -1, 'Recyclable': False,
+         'RegionId': 'cn-beijing', 'CreditSpecification': ''},
+        {'ResourceGroupId': '', 'Memory': 8192, 'InstanceChargeType': 'PostPaid', 'Cpu': 4, 'OSName': 'CentOS  7.8 64位',
+         'InstanceNetworkType': 'vpc', 'InnerIpAddress': {'IpAddress': []}, 'ExpiredTime': '2099-12-31T15:59Z',
+         'ImageId': 'm-2ze3uv2gz5dsy0ol2i4x',
+         'EipAddress': {'AllocationId': '', 'IpAddress': '', 'InternetChargeType': ''}, 'ImageOptions': {},
+         'HostName': 'iZ2ze8cqacj7opo8vu6l4lZ', 'Tags': {'Tag': [{'TagKey': 'acs:ecs:payType', 'TagValue': 'spot'}]},
+         'VlanId': '', 'Status': 'Running', 'HibernationOptions': {'Configured': False},
+         'MetadataOptions': {'HttpTokens': '', 'HttpEndpoint': ''}, 'InstanceId': 'i-2ze8cqacj7opo8vu6l4l',
+         'StoppedMode': 'Not-applicable', 'CpuOptions': {'ThreadsPerCore': 2, 'Numa': 'N/A', 'CoreCount': 2},
+         'StartTime': '2023-10-16T02:58Z', 'DeletionProtection': False,
+         'SecurityGroupIds': {'SecurityGroupId': ['sg-2ze6zhxrqy9vueae27xy']},
+         'VpcAttributes': {'PrivateIpAddress': {'IpAddress': ['172.17.4.208']}, 'VpcId': 'vpc-2zee8yqxobahsyfg9d11h',
+                           'VSwitchId': 'vsw-2ze586sxfwsaov4s5w88d', 'NatIpAddress': ''},
+         'InternetChargeType': 'PayByTraffic', 'InstanceName': 'img2txt', 'DeploymentSetId': '',
+         'InternetMaxBandwidthOut': 0, 'SerialNumber': '36983a92-e480-4c66-832f-8085b8f26625', 'OSType': 'linux',
+         'CreationTime': '2023-10-16T02:57Z', 'AutoReleaseTime': '', 'Description': '', 'InstanceTypeFamily': 'ecs.c6',
+         'DedicatedInstanceAttribute': {'Tenancy': '', 'Affinity': ''}, 'SpotDuration': 1,
+         'PublicIpAddress': {'IpAddress': []}, 'GPUSpec': '', 'NetworkInterfaces': {'NetworkInterface': [
+            {'Type': 'Primary', 'PrimaryIpAddress': '172.17.4.208', 'MacAddress': '00:16:3e:0c:5f:2f',
+             'NetworkInterfaceId': 'eni-2ze8uw6bxge799csyuun',
+             'PrivateIpSets': {'PrivateIpSet': [{'PrivateIpAddress': '172.17.4.208', 'Primary': True}]}}]},
+         'SpotPriceLimit': 0.0, 'DeviceAvailable': True, 'SaleCycle': '', 'InstanceType': 'ecs.c6.xlarge',
+         'SpotStrategy': 'SpotAsPriceGo', 'OSNameEn': 'CentOS  7.8 64 bit', 'IoOptimized': True,
+         'ZoneId': 'cn-beijing-g', 'ClusterId': '',
+         'EcsCapacityReservationAttr': {'CapacityReservationPreference': '', 'CapacityReservationId': ''},
+         'DedicatedHostAttribute': {'DedicatedHostId': '', 'DedicatedHostName': '', 'DedicatedHostClusterId': ''},
+         'GPUAmount': 0, 'OperationLocks': {'LockReason': []}, 'InternetMaxBandwidthIn': -1, 'Recyclable': False,
+         'RegionId': 'cn-beijing', 'CreditSpecification': ''},
+        {'ResourceGroupId': '', 'Memory': 8192, 'InstanceChargeType': 'PostPaid', 'Cpu': 4, 'OSName': 'CentOS  7.8 64位',
+         'InstanceNetworkType': 'vpc', 'InnerIpAddress': {'IpAddress': []}, 'ExpiredTime': '2099-12-31T15:59Z',
+         'ImageId': 'm-2ze3uv2gz5dsy0ol2i4x',
+         'EipAddress': {'AllocationId': '', 'IpAddress': '', 'InternetChargeType': ''}, 'ImageOptions': {},
+         'HostName': 'iZ2zegj32en19tn3f09u4jZ', 'Tags': {'Tag': [{'TagKey': 'acs:ecs:payType', 'TagValue': 'spot'}]},
+         'VlanId': '', 'Status': 'Running', 'HibernationOptions': {'Configured': False},
+         'MetadataOptions': {'HttpTokens': '', 'HttpEndpoint': ''}, 'InstanceId': 'i-2zegj32en19tn3f09u4j',
+         'StoppedMode': 'Not-applicable', 'CpuOptions': {'ThreadsPerCore': 2, 'Numa': 'N/A', 'CoreCount': 2},
+         'StartTime': '2023-10-16T02:57Z', 'DeletionProtection': False,
+         'SecurityGroupIds': {'SecurityGroupId': ['sg-2ze6zhxrqy9vueae27xy']},
+         'VpcAttributes': {'PrivateIpAddress': {'IpAddress': ['172.17.4.194']}, 'VpcId': 'vpc-2zee8yqxobahsyfg9d11h',
+                           'VSwitchId': 'vsw-2ze586sxfwsaov4s5w88d', 'NatIpAddress': ''},
+         'InternetChargeType': 'PayByTraffic', 'InstanceName': 'img2txt', 'DeploymentSetId': '',
+         'InternetMaxBandwidthOut': 0, 'SerialNumber': '6591e447-686e-48bb-82e9-e66977310d7d', 'OSType': 'linux',
+         'CreationTime': '2023-10-16T02:57Z', 'AutoReleaseTime': '', 'Description': '', 'InstanceTypeFamily': 'ecs.c6',
+         'DedicatedInstanceAttribute': {'Tenancy': '', 'Affinity': ''}, 'SpotDuration': 1,
+         'PublicIpAddress': {'IpAddress': []}, 'GPUSpec': '', 'NetworkInterfaces': {'NetworkInterface': [
+            {'Type': 'Primary', 'PrimaryIpAddress': '172.17.4.194', 'MacAddress': '00:16:3e:34:4d:c6',
+             'NetworkInterfaceId': 'eni-2zefr02rdalqltlrbxk2',
+             'PrivateIpSets': {'PrivateIpSet': [{'PrivateIpAddress': '172.17.4.194', 'Primary': True}]}}]},
+         'SpotPriceLimit': 0.0, 'DeviceAvailable': True, 'SaleCycle': '', 'InstanceType': 'ecs.c6.xlarge',
+         'SpotStrategy': 'SpotAsPriceGo', 'OSNameEn': 'CentOS  7.8 64 bit', 'IoOptimized': True,
+         'ZoneId': 'cn-beijing-g', 'ClusterId': '',
+         'EcsCapacityReservationAttr': {'CapacityReservationPreference': '', 'CapacityReservationId': ''},
+         'DedicatedHostAttribute': {'DedicatedHostId': '', 'DedicatedHostName': '', 'DedicatedHostClusterId': ''},
+         'GPUAmount': 0, 'OperationLocks': {'LockReason': []}, 'InternetMaxBandwidthIn': -1, 'Recyclable': False,
+         'RegionId': 'cn-beijing', 'CreditSpecification': ''}]
+    instances = []
+    for instance_detail in instances_list:
+        vpc_attributes = instance_detail.get('VpcAttributes',{}).get('PrivateIpAddress',{}).get('IpAddress',[])
+        intranet_ip = vpc_attributes[0] if len(vpc_attributes) > 0 else None
+        instances.append((instance_detail.get('InstanceId'), instance_detail.get('Status'),intranet_ip))
+    return instances
+
+
+d={(1,2):3}
+for (k,v),v2 in d.items():
+  print(k,v,v2)
+d.pop((1,2))
+print(d)

+ 24 - 0
utils/tools.py

@@ -0,0 +1,24 @@
+# coding:utf-8
+import hashlib
+
+
+def calculate_file_hash(file_path, hash_algorithm='sha256'):
+    """
+    计算文件的哈希值
+    :param file_path: 文件路径
+    :param hash_algorithm: 哈希算法,默认为sha256
+    :return: 文件的哈希值
+    """
+    # 创建哈希对象
+    hash_obj = hashlib.new(hash_algorithm)
+
+    # 以二进制方式打开文件
+    with open(file_path, 'rb') as f:
+        # 循环读取文件内容,更新哈希对象
+        for chunk in iter(lambda: f.read(4096), b''):
+            hash_obj.update(chunk)
+
+    # 获取哈希值的十六进制表示
+    file_hash = hash_obj.hexdigest()
+
+    return file_hash

+ 360 - 0
utils/watch_load.py

@@ -0,0 +1,360 @@
+# coding:utf-8
+from a2s.a2s_monitor import watch_monitor
+from utils.send_message import send_weixin
+import time
+from math import ceil, floor
+from utils.tools import calculate_file_hash
+import threading
+import json
+from utils.ecs import EcsClient
+from pynats import NATSClient
+from a2s.tools import json_serialize
+
+
+# 负载计算
+class WorkerNodes(object):
+    def __init__(self, config_path):
+        self.config_path = config_path
+        self.config_md5 = calculate_file_hash(self.config_path)
+        config = json.load(open(self.config_path, "r"))
+        self.worker_name = config.get("name", "worker").replace(" ", "")
+        self.one_instance_node_num = config.get("one_instance_node_num", 1)
+        self.max_instance = config.get("max_instance", 5)
+        self.min_instance = config.get("min_instance", 1)
+        self.watch_interval = config.get("watch_interval", 5)
+        self.add_watch_time = config.get("add_watch_time", 180)
+        self.release_watch_time = config.get("release_watch_time", 1200)
+        self.a2s_ip = config.get("a2s_ip", "")
+        self.upper_threshold = config.get("upper_threshold", 0.8)
+        self.down_threshold = config.get("down_threshold", 0.5)
+        self.ecs_config = config.get("ecs_config", {})
+        self.freeze_time = config.get("freeze_time", 60)
+        self._run = config.get("is_run", True)  # 调整配置文件可以关闭服务
+        self.ecs_config["instance_name"] = self.worker_name
+        self.change_alert = config.get("change_alert", True)
+        self.load_alert = config.get("load_alert", True)
+        self.load_alert_interval = config.get("load_alert_interval", 300)
+        self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
+        self.weixin_bot_url = config.get("weixin_bot_url", "")
+        self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
+        self.worker_timeout = config.get("worker_timeout", 300)
+        self.monitor_subject = config.get("monitor_subject", "monitorRelease")
+        self.nats_url = config.get("nats_url", "nats://172.17.4.188:19090")
+        self.instances = []
+        self.release_instances = []
+        self._routine = []
+        self.last_update_time = 0
+        self.last_load_alert_time = 0
+        self.last_refresh_instance_time = 0
+        self.release_freeze_time = 0
+        self.ecs = EcsClient(self.ecs_config)
+        self.__initialize()
+
+    def get_instance_num(self):
+        """
+        获取实例数量
+        :return:
+        """
+        return len(self.instances)
+
+    def get_node_num(self):
+        """
+        获取节点数量
+        :return:
+        """
+        return len(self.instances) * self.one_instance_node_num
+
+    # 安全退出应用
+    def safe_logout(self, hosts):
+        with NATSClient(self.nats_url) as nc:
+            nc.connect()
+            for host in hosts:
+                nc.publish(subject=self.monitor_subject, payload=json_serialize({"host": host}))
+
+    def refresh_instance(self):
+        if not self.instances:
+            return
+        instances = []  # 分页查询状态
+        _instances = []  # 正常的机器
+        # 检查状态
+        for ids, ip in self.instances:
+            instances.append(ids)
+            if len(instances) > 5:
+                instances_ids = self.ecs.instance_status(instances)
+                if instances_ids:
+                    _instances.extend(instances_ids)
+                instances = []
+        if instances:
+            instances_ids = self.ecs.instance_status(instances)
+            _instances.extend(instances_ids)
+
+        # 无异常
+        if len(_instances) == len(self.instances):
+            self.ecs_warning(f"实例状态更新成功,没有设备被释放!")
+            return
+
+        # 有异常
+        for instance in self.instances:
+            if instance not in _instances:
+                self.ecs.release_instance(instance[0], force=True)
+        self.ecs_warning(f"实例状态更新成功,有{len(self.instances) - len(_instances)}台设备被释放!")
+
+        # 更新存储
+        self.instances = _instances
+
+    def loop_monitor(self):
+        """
+        循环检查
+        :return:
+        """
+        while self._run:
+            try:
+                this_time = time.time()
+                add_watch_number = ceil(self.add_watch_time / self.watch_interval)
+                release_watch_number = ceil(self.release_watch_time / self.watch_interval)
+                max_number = max([add_watch_number, release_watch_number])
+
+                # 获取当前负载
+                current_nodes = watch_monitor(self.a2s_ip, self.worker_name)
+                if not isinstance(current_nodes, int):
+                    self.ecs_warning("警告:A2S服务监控出现异常,获取当前请求数量失败,当前服务停止增减设备。尽快解决当前问题,避免服务中断!!!")
+                    if self._routine:
+                        self._routine.pop(0)
+                    time.sleep(60)
+                    continue
+
+                # 计算当前负载
+                load = current_nodes / (len(self.instances) * self.one_instance_node_num)
+                self._routine.append(load)
+
+                # 检查ecs的状态,去除系统自己释放的实例
+                if this_time - self.last_refresh_instance_time > self.refresh_instance_interval:
+                    self.refresh_instance()
+                    self.last_refresh_instance_time = this_time
+
+                # 清理大于"最大观察时间"的负载
+                if len(self._routine) > max_number:
+                    self._routine.pop(0)
+
+                # 冻结时间
+                if this_time - self.last_update_time < self.freeze_time:
+                    time.sleep(self.watch_interval)
+                    continue
+
+                # 检查当前负载
+                if this_time - self.last_load_alert_time > self.load_alert_interval:
+                    self.ecs_warning(
+                        f"[{self.worker_name}] 当前负载({current_nodes}::{load})为节点数::{self.get_node_num()},实例数::{self.get_instance_num()}")
+                    self.last_load_alert_time = this_time
+
+                # 检查是否新增
+                if len(self._routine) >= add_watch_number:
+                    add_start_index = len(self._routine) - add_watch_number
+                    add_load = sum(self._routine[add_start_index:]) / add_watch_number
+
+                    #  检查是否需要新增
+                    if add_load > self.upper_threshold:
+                        num = ceil((add_load - self.upper_threshold) * len(self.instances))
+                        if self.change_alert:
+                            self.ecs_warning(f"[{self.worker_name}]新增,当前负载:{add_load},预期新增:{num}台")
+                        self.add_instances(num)
+                        self.release_freeze_time = time.time()
+                        continue
+
+                # 检查是否释放
+                if len(self._routine) >= release_watch_number and time.time() - self.release_freeze_time > self.release_freeze_interval:
+                    release_start_index = len(self._routine) - release_watch_number
+                    release_load = sum(self._routine[release_start_index:]) / release_watch_number
+
+                    # 检查是否需要释放
+                    if release_load < self.down_threshold:
+                        #  释放机器数量
+                        # 负载小于 0.1
+                        if release_load < 0.1:
+                            num = 3 if len(self.instances) > 70 else 2
+                        # 负载大于 0.6
+                        else:
+                            num = 2 if len(self.instances) > 10 else 1
+
+                        # 告警
+                        if self.change_alert:
+                            self.ecs_warning(f"[{self.worker_name}]释放,当前负载:{release_load},预期释放:{num}台")
+                        self.reduce_instances(num)
+                        self.release_freeze_time = time.time()
+            except Exception as e:
+                self.ecs_warning(f"警告:[{self.worker_name}]服务出现异常,尽快处理{e}")
+            time.sleep(self.watch_interval)
+        self.close_server_instances()
+
+    def add_instances(self, num):
+        """
+        新增设备
+        :param num:
+        :return:
+        """
+        if num < 1:
+            return
+        if len(self.instances) < self.max_instance:
+            if self.max_instance < len(self.instances) + num:
+                num = self.max_instance - len(self.instances)
+            if len(self.instances) + num < self.min_instance:
+                num = self.min_instance - len(self.instances)
+            # 开始新增,最大每次增加50台
+            if num > 50:
+                num = 50
+            total_instance_ids = []
+            # 新增实例,分页5台
+            for i in range(floor(num / 5)):
+                instance_ids = self.ecs.add_instance(5)
+                if instance_ids:
+                    total_instance_ids.extend(instance_ids)
+                    self.instances.extend(instance_ids)
+                    self.last_update_time = time.time()
+                else:
+                    self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
+
+            # 不足5台
+            yu_num = num % 5
+            if yu_num > 0:
+                instance_ids = self.ecs.add_instance(yu_num)
+                if instance_ids:
+                    total_instance_ids.extend(instance_ids)
+                    self.instances.extend(instance_ids)
+                    self.last_update_time = time.time()
+                else:
+                    self.ecs_warning(f"[{self.worker_name}]预期新增{num}设备,扩容失败!当前ECS:{len(self.instances)}台")
+
+            # 通知
+            if self.change_alert:
+                self.ecs_warning(
+                    f"[{self.worker_name}]新增{len(total_instance_ids)}(预期{num})台设备成功,当前ECS:{len(self.instances)}台")
+            print(f"新增完成,结果::{self.worker_name}::释放列表:{len(self.release_instances)}, 当前ECS列表:{self.instances}")
+        else:
+            # 通知
+            if self.change_alert:
+                self.ecs_warning(f"[{self.worker_name}]ECS服务实例达到最大值:{self.max_instance},预期新增{num}台, 扩容失败。")
+                self.last_update_time = time.time()
+
+    def reduce_instances(self, num):
+        """
+        减少设备
+        :param num:
+        :return:
+        """
+        if len(self.instances) > self.min_instance:
+            if len(self.instances) < self.min_instance + num:
+                num = len(self.instances) - self.min_instance
+            # 开始释放操作
+            release_instance_ids = self.instances[:num]
+            self.instances = self.instances[num:]
+            hosts = []
+            for instance_id, ip in release_instance_ids:
+                hosts.append(ip)
+                self.release_instances.append([instance_id, ip, int(time.time())])
+            self.safe_logout(hosts)
+            print(f"{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
+            self.ecs_warning(f"[{self.worker_name}]预计释放:{num}台实例,当前释放队列{len(self.release_instances)},预计剩余{len(self.instances)}")
+
+    def loop_release_instances(self):
+        """
+        释放实例
+        :return:
+        """
+        while self._run:
+            time.sleep(60)
+            delete_ids = []
+            for instance_detail in self.release_instances:
+                instance_id, ip, t = instance_detail
+                if int(time.time()) - t > self.worker_timeout:
+                    try:
+                        print(f"时间够了,开始释放{instance_id},时间IP{ip}:{t}")
+                        result = self.ecs.release_instance(instance_id, force=True)
+                        if result:
+                            delete_ids.append(instance_detail)
+                    except Exception as e:
+                        print(e)
+                        time.sleep(2)
+                else:
+                    break
+            try:
+                for instance_detail in delete_ids:
+                    self.release_instances.remove(instance_detail)
+            except Exception as e:
+                self.ecs_warning(f"释放失败,请检查{e}")
+            print(f"释放完成,结果::{self.worker_name}::释放列表:{self.release_instances}, 当前ECS列表:{self.instances}")
+
+    def ecs_warning(self, text):
+        send_weixin(text, self.weixin_bot_url)
+
+    def __initialize(self):
+        if len(self.instances) < self.min_instance:
+            while True:
+                try:
+                    self.add_instances(self.min_instance - len(self.instances))
+                    break
+                except Exception as e:
+                    print(e)
+                print("申请设备失败,等待60秒后重试")
+                time.sleep(60)
+
+    def start(self):
+        t1 = threading.Thread(target=self.loop_monitor)
+        t1.start()
+        t2 = threading.Thread(target=self.update_config)
+        t2.start()
+        t3 = threading.Thread(target=self.loop_release_instances)
+        t3.start()
+        t1.join()
+        t2.join()
+        t3.join()
+        self.close_server_instances()
+
+    def update_config(self):
+        """
+        更新配置
+        :param config:
+        :return:
+        """
+        while self._run:
+            config_md5 = calculate_file_hash(self.config_path)
+            if config_md5 != self.config_md5:
+                self.config_md5 = config_md5
+                config = json.load(open(self.config_path, "r"))
+                self.worker_name = config.get("name", "worker").replace(" ", "")
+                self.one_instance_node_num = config.get("one_instance_node_num", 1)
+                self.max_instance = config.get("max_instance", 5)
+                self.min_instance = config.get("min_instance", 1)
+                self.watch_interval = config.get("watch_interval", 5)
+                self.add_watch_time = config.get("add_watch_time", 180)
+                self.release_watch_time = config.get("release_watch_time", 1200)
+                self.a2s_ip = config.get("a2s_ip", "")
+                self.upper_threshold = config.get("upper_threshold", 0.8)
+                self.down_threshold = config.get("down_threshold", 0.5)
+                self.freeze_time = config.get("freeze_time", 60)
+                self._run = config.get("is_run", True)  # 调整配置文件可以关闭服务
+                self.change_alert = config.get("change_alert", True)
+                self.load_alert = config.get("load_alert", True)
+                self.load_alert_interval = config.get("load_alert_interval", 300)
+                self.weixin_bot_url = config.get("weixin_bot_url", "")
+                self.release_freeze_interval = config.get("release_freeze_interval", 60 * 30)
+                self.refresh_instance_interval = config.get("refresh_instance_interval", 300)
+                self.worker_timeout = config.get("worker_timeout", 300)
+                self.monitor_subject = config.get("monitor_subject", "monitorRelease")
+            # 每间隔一段时间,查看一下配置文件的状态
+            time.sleep(10)
+
+    def close_server_instances(self):
+        release_ids = []
+        for ids, ip in self.instances:
+            release_ids.append(ids)
+        if self.release_instances:
+            for instance_id, ip, t in self.release_instances:
+                release_ids.append(instance_id)
+        print("释放实例", release_ids)
+        release_false = self.ecs.delete_instance(release_ids)
+        # 释放完成
+        if release_false:
+            if self.change_alert:
+                self.ecs_warning(f"[{self.worker_name}]]服务关闭,部分实例释放失败,请手动释放,问题实例:{release_false}")
+        else:
+            self.ecs_warning(f"[{self.worker_name}]服务关闭,所有实例释放完成")