lijunliang il y a 1 an
Parent
commit
91b955cf42
7 fichiers modifiés avec 95 ajouts et 44 suppressions
  1. 13 2
      README.md
  2. 33 0
      a2s/a2s_monitor.py
  3. 1 1
      a2s/demo/client.py
  4. 2 2
      a2s/demo/server.py
  5. 4 4
      a2s/proto/service.proto
  6. 36 29
      a2s/proto/service_pb2.py
  7. 6 6
      a2s/proto/service_pb2_grpc.py

+ 13 - 2
README.md

@@ -1,7 +1,7 @@
 ## **a2s服务的python工具包,更容易大规模应用**
 
-
 ## *启动服务*
+
 ```python
 # 使用前请安装工具包
 # pip install a2s
@@ -50,8 +50,11 @@ def main(data: bytes, *args, **kwargs) -> bytes:
 
 main(nats_host="192.168.3.240", nats_port=19090, subject="test", queue="main")
 ```
+
 #### 方式一 更简单方式启动
+
 #### 方法二 灵活性更高
+
 #### 推荐使用 *方法一*
 
 ## *调用服务*
@@ -70,11 +73,20 @@ if __name__ == '__main__':
 
 ```
 
+## *监控*
+
+```python
+from a2s.a2s_monitor import watch_monitor
+
+service_count = watch_monitor('192.168.3.240:9090', 'goods_service')
+```
+
 ## *工具*
 
 dict(字典) 序列化反序列化
 
 grpc 序列化反序列化
+
 ```python
 
 import json
@@ -90,7 +102,6 @@ def grpc_serialize(resp):
     return resp
 
 
-
 def grpc_deserialize(req, data):
     """
     grpc对象反序列化

+ 33 - 0
a2s/a2s_monitor.py

@@ -0,0 +1,33 @@
+# coding:utf-8
+from a2s.proto import service_pb2
+from a2s.proto import service_pb2_grpc
+import grpc
+
+
+def watch_monitor(a2s_ip, topic):
+    """
+    监控
+    :return:
+    """
+    try:
+        with grpc.insecure_channel(a2s_ip) as channel:
+            # 客户端实例
+            stub = service_pb2_grpc.CallerStub(channel)
+            # 调用服务端方法
+            response = stub.ViewState(service_pb2.StateReq(topic=topic))
+            # 反序列化
+            count = response.currentRequest
+            return count
+    except:
+        return None
+
+
+if __name__ == '__main__':
+    import time
+
+    for i in range(1000):
+        ner_count = watch_monitor('192.168.3.240:9090', 'goods_ner')
+        field_count = watch_monitor('192.168.3.240:9090', 'goods_field')
+        service_count = watch_monitor('192.168.3.240:9090', 'goods_service')
+        print("service_count:", service_count, "ner_count:", ner_count, "field_count:", field_count)
+        time.sleep(0.5)

+ 1 - 1
a2s/demo/client.py

@@ -2,5 +2,5 @@
 from a2s.a2s_client import a2s_execute
 
 if __name__ == '__main__':
-    result = a2s_execute(a2s_ip="192.168.3.240:19094", topic="test", timeout=10, bytes_data=b"hello")
+    result = a2s_execute(a2s_ip="192.168.3.240:9090", topic="test", timeout=10, bytes_data=b"hello")
     print(result.decode("utf-8"))

+ 2 - 2
a2s/demo/server.py

@@ -16,7 +16,7 @@ def main(data: bytes, *args, **kwargs) -> bytes:
     data += "服务端处理过了数据"
     return data.encode("utf-8")
 
-main()
+# main()
 # 方式二 使用简单方式启动   .py文件命令行参数   python server.py --h
 
 @watch
@@ -33,4 +33,4 @@ def main(data: bytes, *args, **kwargs) -> bytes:
     data += "服务端处理过了数据"
     return data.encode("utf-8")
 
-main(nats_host="192.168.3.240",nats_port=19090,subject="test",queue="main")
+#main(nats_host="192.168.3.240",nats_port=19090,subject="test",queue="main")

+ 4 - 4
a2s/proto/service.proto

@@ -6,7 +6,7 @@ service Caller {
   //远程调用
   rpc Call (Request) returns (Response) {}
   //状态查看
-  rpc ViewState(Empty)returns(State){}
+  rpc ViewState(StateReq)returns(StateResp){}
 }
 //请求
 message Request {
@@ -21,11 +21,11 @@ message Response {
   bytes data =3;//结构数据
 }
 //空消息体
-message Empty{
-	
+message StateReq{
+	string topic =1;
 }
 //状态消息体
-message State{
+message StateResp{
   int32 currentRequest =1 ; //当前处理的请求数
 }
 

+ 36 - 29
a2s/proto/service_pb2.py

@@ -18,7 +18,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   package='main',
   syntax='proto3',
   serialized_options=b'Z\005.;a2s',
-  serialized_pb=b'\n\rservice.proto\x12\x04main\"7\n\x07Request\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x0f\n\x07timeout\x18\x02 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"3\n\x08Response\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0b\n\x03msg\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x07\n\x05\x45mpty\"\x1f\n\x05State\x12\x16\n\x0e\x63urrentRequest\x18\x01 \x01(\x05\"N\n\x0bNatsRequest\x12\r\n\x05msgId\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0f\n\x07timeout\x18\x03 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\"+\n\x0cNatsResponse\x12\r\n\x05msgId\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x32Z\n\x06\x43\x61ller\x12\'\n\x04\x43\x61ll\x12\r.main.Request\x1a\x0e.main.Response\"\x00\x12\'\n\tViewState\x12\x0b.main.Empty\x1a\x0b.main.State\"\x00\x42\x07Z\x05.;a2sb\x06proto3'
+  serialized_pb=b'\n\rservice.proto\x12\x04main\"7\n\x07Request\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x0f\n\x07timeout\x18\x02 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"3\n\x08Response\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0b\n\x03msg\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x19\n\x08StateReq\x12\r\n\x05topic\x18\x01 \x01(\t\"#\n\tStateResp\x12\x16\n\x0e\x63urrentRequest\x18\x01 \x01(\x05\"N\n\x0bNatsRequest\x12\r\n\x05msgId\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0f\n\x07timeout\x18\x03 \x01(\x03\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\"+\n\x0cNatsResponse\x12\r\n\x05msgId\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x32\x61\n\x06\x43\x61ller\x12\'\n\x04\x43\x61ll\x12\r.main.Request\x1a\x0e.main.Response\"\x00\x12.\n\tViewState\x12\x0e.main.StateReq\x1a\x0f.main.StateResp\"\x00\x42\x07Z\x05.;a2sb\x06proto3'
 )
 
 
@@ -114,13 +114,20 @@ _RESPONSE = _descriptor.Descriptor(
 )
 
 
-_EMPTY = _descriptor.Descriptor(
-  name='Empty',
-  full_name='main.Empty',
+_STATEREQ = _descriptor.Descriptor(
+  name='StateReq',
+  full_name='main.StateReq',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
+    _descriptor.FieldDescriptor(
+      name='topic', full_name='main.StateReq.topic', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -134,19 +141,19 @@ _EMPTY = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=133,
-  serialized_end=140,
+  serialized_end=158,
 )
 
 
-_STATE = _descriptor.Descriptor(
-  name='State',
-  full_name='main.State',
+_STATERESP = _descriptor.Descriptor(
+  name='StateResp',
+  full_name='main.StateResp',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='currentRequest', full_name='main.State.currentRequest', index=0,
+      name='currentRequest', full_name='main.StateResp.currentRequest', index=0,
       number=1, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
@@ -164,8 +171,8 @@ _STATE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=142,
-  serialized_end=173,
+  serialized_start=160,
+  serialized_end=195,
 )
 
 
@@ -216,8 +223,8 @@ _NATSREQUEST = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=175,
-  serialized_end=253,
+  serialized_start=197,
+  serialized_end=275,
 )
 
 
@@ -254,14 +261,14 @@ _NATSRESPONSE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=255,
-  serialized_end=298,
+  serialized_start=277,
+  serialized_end=320,
 )
 
 DESCRIPTOR.message_types_by_name['Request'] = _REQUEST
 DESCRIPTOR.message_types_by_name['Response'] = _RESPONSE
-DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY
-DESCRIPTOR.message_types_by_name['State'] = _STATE
+DESCRIPTOR.message_types_by_name['StateReq'] = _STATEREQ
+DESCRIPTOR.message_types_by_name['StateResp'] = _STATERESP
 DESCRIPTOR.message_types_by_name['NatsRequest'] = _NATSREQUEST
 DESCRIPTOR.message_types_by_name['NatsResponse'] = _NATSRESPONSE
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -280,19 +287,19 @@ Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Messag
   })
 _sym_db.RegisterMessage(Response)
 
-Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), {
-  'DESCRIPTOR' : _EMPTY,
+StateReq = _reflection.GeneratedProtocolMessageType('StateReq', (_message.Message,), {
+  'DESCRIPTOR' : _STATEREQ,
   '__module__' : 'service_pb2'
-  # @@protoc_insertion_point(class_scope:main.Empty)
+  # @@protoc_insertion_point(class_scope:main.StateReq)
   })
-_sym_db.RegisterMessage(Empty)
+_sym_db.RegisterMessage(StateReq)
 
-State = _reflection.GeneratedProtocolMessageType('State', (_message.Message,), {
-  'DESCRIPTOR' : _STATE,
+StateResp = _reflection.GeneratedProtocolMessageType('StateResp', (_message.Message,), {
+  'DESCRIPTOR' : _STATERESP,
   '__module__' : 'service_pb2'
-  # @@protoc_insertion_point(class_scope:main.State)
+  # @@protoc_insertion_point(class_scope:main.StateResp)
   })
-_sym_db.RegisterMessage(State)
+_sym_db.RegisterMessage(StateResp)
 
 NatsRequest = _reflection.GeneratedProtocolMessageType('NatsRequest', (_message.Message,), {
   'DESCRIPTOR' : _NATSREQUEST,
@@ -317,8 +324,8 @@ _CALLER = _descriptor.ServiceDescriptor(
   file=DESCRIPTOR,
   index=0,
   serialized_options=None,
-  serialized_start=300,
-  serialized_end=390,
+  serialized_start=322,
+  serialized_end=419,
   methods=[
   _descriptor.MethodDescriptor(
     name='Call',
@@ -334,8 +341,8 @@ _CALLER = _descriptor.ServiceDescriptor(
     full_name='main.Caller.ViewState',
     index=1,
     containing_service=None,
-    input_type=_EMPTY,
-    output_type=_STATE,
+    input_type=_STATEREQ,
+    output_type=_STATERESP,
     serialized_options=None,
   ),
 ])

+ 6 - 6
a2s/proto/service_pb2_grpc.py

@@ -20,8 +20,8 @@ class CallerStub(object):
                 )
         self.ViewState = channel.unary_unary(
                 '/main.Caller/ViewState',
-                request_serializer=service__pb2.Empty.SerializeToString,
-                response_deserializer=service__pb2.State.FromString,
+                request_serializer=service__pb2.StateReq.SerializeToString,
+                response_deserializer=service__pb2.StateResp.FromString,
                 )
 
 
@@ -52,8 +52,8 @@ def add_CallerServicer_to_server(servicer, server):
             ),
             'ViewState': grpc.unary_unary_rpc_method_handler(
                     servicer.ViewState,
-                    request_deserializer=service__pb2.Empty.FromString,
-                    response_serializer=service__pb2.State.SerializeToString,
+                    request_deserializer=service__pb2.StateReq.FromString,
+                    response_serializer=service__pb2.StateResp.SerializeToString,
             ),
     }
     generic_handler = grpc.method_handlers_generic_handler(
@@ -92,7 +92,7 @@ class Caller(object):
             timeout=None,
             metadata=None):
         return grpc.experimental.unary_unary(request, target, '/main.Caller/ViewState',
-            service__pb2.Empty.SerializeToString,
-            service__pb2.State.FromString,
+            service__pb2.StateReq.SerializeToString,
+            service__pb2.StateResp.FromString,
             options, channel_credentials,
             call_credentials, compression, wait_for_ready, timeout, metadata)