1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- # coding:utf-8
- import argparse
- from functools import wraps
- from pynats import NATSClient
- from proto.service_pb2 import NatsRequest, NatsResponse
- import time
- import logging
- def simple_params(func):
- @wraps(func)
- def wrapper():
- parser = argparse.ArgumentParser()
- parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机')
- parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口')
- parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称')
- parser.add_argument('-queue', '--queue', type=str, default="main", help='监听主题名称')
- args = parser.parse_args()
- params = dict(args._get_kwargs())
- return func(**params)
- return wrapper
- def watch(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- nats_host = kwargs.get("nats_host")
- nats_port = kwargs.get("nats_port")
- subject = kwargs.get("subject")
- queue = kwargs.get("queue")
- if not (nats_host and nats_port and subject and queue):
- raise Exception("""请检查参数,使用举例:
- 1、使用方法
- @watch
- def say_hello(nats_host="192.168.3.109", nats_port=800, subject="text2vec", queue="main",data):
- print(data)
-
- 2、建议使用:
- @simple_params
- @watch
- def main(data: bytes, *args, **kwargs) -> bytes:
- """)
- NatsIp = f"nats://{nats_host}:{nats_port}"
- ListenSubject = subject + "_req"
- PublishSubject = subject + "_resp"
- logging.warning(f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject}")
- with NATSClient(NatsIp, name=subject) as nc:
- nc.connect()
- def message_handler(msg):
- try:
- nats_req = NatsRequest()
- nats_req.ParseFromString(msg.payload)
- # TODO 检查消息是否超时
- t = int(time.time())
- if t - nats_req.timestamp > nats_req.timeout:
- logging.debug("%s 这条消息超时了,舍弃" % (nats_req.msgId))
- return
- kwargs["data"] = nats_req.data
- bytes_result = func(*args, **kwargs)
- nats_resp = NatsResponse(msgId=nats_req.msgId, data=bytes_result)
- nc.publish(subject=PublishSubject, payload=nats_resp.SerializeToString())
- except Exception as e:
- logging.warning(e)
- logging.warning(f"服务启动成功......")
- nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue)
- nc.wait()
- return wrapper
|