Ver Fonte

添加监控

lijunliang há 1 ano atrás
pai
commit
d76dac6c28
5 ficheiros alterados com 23 adições e 35 exclusões
  1. 0 10
      a2s/a2s_monitor.py
  2. 13 3
      a2s/a2s_server.py
  3. 1 1
      a2s/demo/client.py
  4. 7 19
      a2s/demo/server.py
  5. 2 2
      a2s/tools.py

+ 0 - 10
a2s/a2s_monitor.py

@@ -21,13 +21,3 @@ def watch_monitor(a2s_ip, topic):
     except:
         return None
 
-
-if __name__ == '__main__':
-    import time
-
-    for i in range(1000):
-        ner_count = watch_monitor('192.168.3.240:9090', 'goods_ner')
-        field_count = watch_monitor('192.168.3.240:9090', 'goods_field')
-        service_count = watch_monitor('192.168.3.240:9090', 'goods_service')
-        print("service_count:", service_count, "ner_count:", ner_count, "field_count:", field_count)
-        time.sleep(0.5)

+ 13 - 3
a2s/a2s_server.py

@@ -7,16 +7,23 @@ import time
 import logging
 
 
+def callback():
+    pass
+
+
 def simple_params(func):
     @wraps(func)
-    def wrapper():
+    def wrapper(*args, **kwargs):
         parser = argparse.ArgumentParser()
         parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机')
         parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口')
         parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称')
-        parser.add_argument('-queue', '--queue', type=str, default="main", help='监听主题名称')
+        parser.add_argument('-queue', '--queue', type=str, default="main", help='队列名称')
+        parser.add_argument('-minWorktime', '--min_worktime', type=int, default=0,
+                            help='用于修正超时时间,超时时间-min_worktime为真正的超时时间,减少无用的数据处理')
         args = parser.parse_args()
         params = dict(args._get_kwargs())
+        params.update(kwargs)
         return func(**params)
 
     return wrapper
@@ -29,6 +36,8 @@ def watch(func):
         nats_port = kwargs.get("nats_port")
         subject = kwargs.get("subject")
         queue = kwargs.get("queue")
+        min_worktime = kwargs.get("min_worktime", 0)
+        call_back = kwargs.get("callback", callback)
         if not (nats_host and nats_port and subject and queue):
             raise Exception("""请检查参数,使用举例:
 1、使用方法            
@@ -54,13 +63,14 @@ def main(data: bytes, *args, **kwargs) -> bytes:
                     nats_req.ParseFromString(msg.payload)
                     # TODO 检查消息是否超时
                     t = int(time.time())
-                    if t - nats_req.timestamp > nats_req.timeout:
+                    if t - nats_req.timestamp >= nats_req.timeout - min_worktime:
                         logging.debug("%s 这条消息超时了,舍弃" % (nats_req.msgId))
                         return
                     kwargs["data"] = nats_req.data
                     bytes_result = func(*args, **kwargs)
                     nats_resp = NatsResponse(msgId=nats_req.msgId, data=bytes_result)
                     nc.publish(subject=PublishSubject, payload=nats_resp.SerializeToString())
+                    call_back()  # 回调函数,程序运行结束后执行的操作
                 except Exception as e:
                     logging.warning(e)
 

+ 1 - 1
a2s/demo/client.py

@@ -2,5 +2,5 @@
 from a2s.a2s_client import a2s_execute
 
 if __name__ == '__main__':
-    result = a2s_execute(a2s_ip="192.168.3.240:9090", topic="test", timeout=10, bytes_data=b"hello")
+    result = a2s_execute(a2s_ip="192.168.3.240:9090", topic="test1", timeout=10, bytes_data=b"hello")
     print(result.decode("utf-8"))

+ 7 - 19
a2s/demo/server.py

@@ -13,24 +13,12 @@ def main(data: bytes, *args, **kwargs) -> bytes:
     '''
     # 内部处理程序
     data = data.decode("utf-8")
-    data += "服务端处理过了数据"
+    data += "test1服务端处理过了数据"
     return data.encode("utf-8")
+import sys
+def callback():
+    print("运行结束")
+    sys.exit(1)
 
-# main()
-# 方式二 使用简单方式启动   .py文件命令行参数   python server.py --h
-
-@watch
-def main(data: bytes, *args, **kwargs) -> bytes:
-    '''
-
-    :param data: 固定格式
-    :param args: 固定格式
-    :param kwargs: 固定格式
-    :return:
-    '''
-    # 内部处理程序
-    data = data.decode("utf-8")
-    data += "服务端处理过了数据"
-    return data.encode("utf-8")
-
-#main(nats_host="192.168.3.240",nats_port=19090,subject="test",queue="main")
+if __name__ == '__main__':
+    main(subject="test1", callback=callback)

+ 2 - 2
a2s/tools.py

@@ -8,8 +8,8 @@ def grpc_serialize(resp):
     :param resp:resp = Text2VectorReq(text=data)
     :return:
     """
-    resp.SerializeToString()
-    return resp
+    strint_resp=resp.SerializeToString()
+    return strint_resp