lijunliang 9 mesiacov pred
rodič
commit
48a9763abc
7 zmenil súbory, kde vykonal 132 pridanie a 16 odobranie
  1. 22 0
      README.md
  2. 20 0
      a2s/a2s_monitor.py
  3. 29 9
      a2s/a2s_server.py
  4. 6 2
      a2s/demo/client.py
  5. 5 1
      a2s/demo/server.py
  6. 43 2
      a2s/tools.py
  7. 7 2
      setup.py

+ 22 - 0
README.md

@@ -90,6 +90,7 @@ grpc 序列化反序列化
 ```python
 
 import json
+import base64
 
 
 def grpc_serialize(resp):
@@ -140,4 +141,25 @@ def json_deserialize(bytes_data):
     dict_data = json.loads(json_data)
     return dict_data
 
+# 二进制文件传输,json序列化解决方案:
+# 1. 将二进制文件转换为base64编码的字符串
+# 2. 将base64编码的字符串转换为二进制文件
+def binary_to_base64(binary_data):
+    """
+    二进制转化为base64字符串
+    :param binary_data:
+    :return:
+    """
+    base64_str = base64.b64encode(binary_data).decode('utf-8')  # 将bytes转换为base64字符串
+    return base64_str
+
+
+def base64_to_binary(base64_str):
+    """
+    base64字符串转化为二进制
+    :param base64_str:
+    :return:
+    """
+    binary_data = base64.b64decode(base64_str)  # 将base64字符串转换为bytes
+    return binary_data
 ```

+ 20 - 0
a2s/a2s_monitor.py

@@ -2,6 +2,8 @@
 from a2s.proto import service_pb2
 from a2s.proto import service_pb2_grpc
 import grpc
+from pynats import NATSClient
+from a2s.tools import json_serialize
 
 
 def watch_monitor(a2s_ip, topic):
@@ -21,3 +23,21 @@ def watch_monitor(a2s_ip, topic):
     except:
         return None
 
+
+def close_worker(ip_list, a2s_ip, topic):
+    """
+    关闭worker
+    :param ip_list:
+    :param a2s_ip:
+    :param topic:
+    :return:
+    """
+    with NATSClient(a2s_ip, name=topic) as nc:
+        nc.connect()
+        try:
+            for host in ip_list:
+                nc.publish(subject=topic, payload=json_serialize({"host": host}))
+            return True
+        except Exception as e:
+            print(e)
+            return False

+ 29 - 9
a2s/a2s_server.py

@@ -3,8 +3,10 @@ import argparse
 from functools import wraps
 from pynats import NATSClient
 from a2s.proto.service_pb2 import NatsRequest, NatsResponse
+from a2s.tools import json_serialize, json_deserialize, get_host_ip
 import time
 import logging
+import uuid
 
 
 def callback():
@@ -18,6 +20,9 @@ def simple_params(func):
         parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机')
         parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口')
         parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称')
+        parser.add_argument('-manager_subject', '--manager_subject', type=str, help='监听关闭服务的主题')
+        parser.add_argument('-manager_queue', '--manager_queue', type=str, help='关闭服务的队列名称,推荐使用默认的uuid')
+        parser.add_argument('-server_host', '--server_host', type=str, help='关闭服务的队列名称,推荐使用默认的uuid')
         parser.add_argument('-queue', '--queue', type=str, default="main", help='队列名称')
         parser.add_argument('-minWorktime', '--min_worktime', type=int, default=0,
                             help='用于修正超时时间,超时时间-min_worktime为真正的超时时间,减少无用的数据处理')
@@ -35,17 +40,17 @@ def watch(func):
         nats_host = kwargs.get("nats_host")
         nats_port = kwargs.get("nats_port")
         subject = kwargs.get("subject")
+        manager_subject = kwargs.get("manager_subject")
+        manager_queue = uuid.uuid4().hex if not kwargs.get("manager_queue") else kwargs.get("manager_queue")
         queue = kwargs.get("queue")
+        server_host = kwargs.get("server_host", None)
         min_worktime = kwargs.get("min_worktime", 0)
         call_back = kwargs.get("callback", callback)
+        if manager_subject and not server_host:
+            logging.warning("不知道你是否需要启用监控系统,因为你的server_host没有设置,监控系统将不会启用")
         if not (nats_host and nats_port and subject and queue):
-            raise Exception("""请检查参数,使用举例:
-1、使用方法            
-@watch
-def say_hello(nats_host="192.168.3.109", nats_port=800, subject="text2vec", queue="main",data):
-    print(data)
-    
-2、建议使用:
+            raise Exception("""
+建议使用:
 @simple_params
 @watch
 def main(data: bytes, *args, **kwargs) -> bytes:
@@ -53,9 +58,11 @@ def main(data: bytes, *args, **kwargs) -> bytes:
         NatsIp = f"nats://{nats_host}:{nats_port}"
         ListenSubject = subject + "_req"
         PublishSubject = subject + "_resp"
-        logging.warning(f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject}")
+        logging.warning(
+            f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject},queue:{queue},manager:{manager_subject},manager_queue:{manager_queue},server_host:{server_host}")
         with NATSClient(NatsIp, name=subject) as nc:
             nc.connect()
+            worker_closed = False
 
             def message_handler(msg):
                 try:
@@ -74,8 +81,21 @@ def main(data: bytes, *args, **kwargs) -> bytes:
                 except Exception as e:
                     logging.warning(e)
 
+            def nodes_worker_closed(msg):
+                nonlocal worker
+                nonlocal worker_closed
+                close_msg = json_deserialize(msg.payload)
+                host = close_msg.get("host", None)
+                logging.warning(f"关闭服务节点Subject: {msg.subject}:::{msg.payload}::{host}::{server_host}")
+                if not worker_closed and host and host == server_host:
+                    nc.unsubscribe(worker)
+                    worker_closed = True
+                    logging.warning("订阅服务关闭成功......")
+
             logging.warning(f"服务启动成功......")
-            nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue)
+            worker = nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue)
+            if manager_subject:
+                work_manager = nc.subscribe(subject=manager_subject, callback=nodes_worker_closed, queue=manager_queue)
             nc.wait()
 
     return wrapper

+ 6 - 2
a2s/demo/client.py

@@ -1,6 +1,10 @@
 # coding:utf-8
 from a2s.a2s_client import a2s_execute
+import time
 
 if __name__ == '__main__':
-    result = a2s_execute(a2s_ip="192.168.3.240:9090", topic="test1", timeout=10, bytes_data=b"hello")
-    print(result.decode("utf-8"))
+    for i in range(20):
+        print("开始第%d次发送"%i)
+        result = a2s_execute(a2s_ip="192.168.3.240:9090", topic="test1", timeout=6, bytes_data=b"hello")
+        print(result.decode("utf-8"))
+        print("结束第%d次发送"%i)

+ 5 - 1
a2s/demo/server.py

@@ -1,4 +1,5 @@
 from a2s.a2s_server import watch, simple_params
+import time
 
 #方式一 使用简单方式启动   .py文件命令行参数   python server.py --h
 @simple_params
@@ -12,13 +13,16 @@ def main(data: bytes, *args, **kwargs) -> bytes:
     :return:
     '''
     # 内部处理程序
+    print("服务端运行中")
     data = data.decode("utf-8")
     data += "test1服务端处理过了数据"
+    time.sleep(10)
     return data.encode("utf-8")
+
 import sys
 def callback():
     print("运行结束")
     sys.exit(1)
 
 if __name__ == '__main__':
-    main(subject="test1", callback=callback)
+    main(subject="test1")

+ 43 - 2
a2s/tools.py

@@ -1,5 +1,22 @@
 # coding:utf-8
 import json
+import socket
+import base64
+
+
+def get_host_ip():
+    """
+    获取本机ip
+    :return:
+    """
+    try:
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        s.connect(('8.8.8.8', 80))
+        ip = s.getsockname()[0]
+    finally:
+        s.close()
+
+    return ip
 
 
 def grpc_serialize(resp):
@@ -8,11 +25,10 @@ def grpc_serialize(resp):
     :param resp:resp = Text2VectorReq(text=data)
     :return:
     """
-    strint_resp=resp.SerializeToString()
+    strint_resp = resp.SerializeToString()
     return strint_resp
 
 
-
 def grpc_deserialize(req, data):
     """
     grpc对象反序列化
@@ -50,3 +66,28 @@ def json_deserialize(bytes_data):
     # 将 JSON 字符串转换为字典对象
     dict_data = json.loads(json_data)
     return dict_data
+
+
+def binary_to_base64(binary_data):
+    """
+    二进制转化为base64字符串
+    :param binary_data:
+    :return:
+    """
+    base64_str = base64.b64encode(binary_data).decode('utf-8')  # 将bytes转换为base64字符串
+    return base64_str
+
+
+def base64_to_binary(base64_str):
+    """
+    base64字符串转化为二进制
+    :param base64_str:
+    :return:
+    """
+    binary_data = base64.b64decode(base64_str)  # 将base64字符串转换为bytes
+    return binary_data
+
+
+if __name__ == '__main__':
+    a = binary_to_base64(open("./a2s_monitor.py", "rb").read())
+    print(a)

+ 7 - 2
setup.py

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
 
 setuptools.setup(
     name="a2s",
-    version="0.0.1",
+    version="0.0.3",
     author="LJL",
     author_email="author@example.com",
     description="A small example package",
@@ -18,6 +18,11 @@ setuptools.setup(
         "License :: OSI Approved :: MIT License",
         "Operating System :: OS Independent",
     ],
-   install_requires=[]
+   install_requires=[
+       "nats-python",
+       "grpcio",
+       "grpcio-tools",
+       "protobuf==3.19",
+   ]
 
 )