# coding:utf-8 import argparse from functools import wraps from pynats import NATSClient from a2s.proto.service_pb2 import NatsRequest, NatsResponse from a2s.tools import json_serialize, json_deserialize, get_host_ip import time import logging import uuid def callback(): pass def simple_params(func): @wraps(func) def wrapper(*args, **kwargs): parser = argparse.ArgumentParser() parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机') parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口') parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称') parser.add_argument('-manager_subject', '--manager_subject', type=str, help='监听关闭服务的主题') parser.add_argument('-manager_queue', '--manager_queue', type=str, help='关闭服务的队列名称,推荐使用默认的uuid') parser.add_argument('-server_host', '--server_host', type=str, help='关闭服务的队列名称,推荐使用默认的uuid') parser.add_argument('-queue', '--queue', type=str, default="main", help='队列名称') parser.add_argument('-minWorktime', '--min_worktime', type=int, default=0, help='用于修正超时时间,超时时间-min_worktime为真正的超时时间,减少无用的数据处理') args = parser.parse_args() params = dict(args._get_kwargs()) params.update(kwargs) return func(**params) return wrapper def watch(func): @wraps(func) def wrapper(*args, **kwargs): nats_host = kwargs.get("nats_host") nats_port = kwargs.get("nats_port") subject = kwargs.get("subject") manager_subject = kwargs.get("manager_subject") manager_queue = uuid.uuid4().hex if not kwargs.get("manager_queue") else kwargs.get("manager_queue") queue = kwargs.get("queue") server_host = kwargs.get("server_host", None) min_worktime = kwargs.get("min_worktime", 0) call_back = kwargs.get("callback", callback) if manager_subject and not server_host: logging.warning("不知道你是否需要启用监控系统,因为你的server_host没有设置,监控系统将不会启用") if not (nats_host and nats_port and subject and queue): raise Exception(""" 建议使用: @simple_params @watch def main(data: bytes, *args, **kwargs) -> bytes: """) NatsIp = f"nats://{nats_host}:{nats_port}" ListenSubject = subject + "_req" PublishSubject = subject + "_resp" logging.warning( f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject},queue:{queue},manager:{manager_subject},manager_queue:{manager_queue},server_host:{server_host}") with NATSClient(NatsIp, name=subject) as nc: nc.connect() worker_closed = False def message_handler(msg): try: nats_req = NatsRequest() nats_req.ParseFromString(msg.payload) # 检查消息是否超时 t = int(time.time()) if t - nats_req.timestamp >= nats_req.timeout - min_worktime: logging.debug("%s 这条消息超时了,舍弃" % (nats_req.msgId)) return kwargs["data"] = nats_req.data bytes_result = func(*args, **kwargs) nats_resp = NatsResponse(msgId=nats_req.msgId, data=bytes_result) nc.publish(subject=PublishSubject, payload=nats_resp.SerializeToString()) call_back() # 回调函数,程序运行结束后执行的操作 except Exception as e: logging.warning(e) def nodes_worker_closed(msg): nonlocal worker nonlocal worker_closed close_msg = json_deserialize(msg.payload) host = close_msg.get("host", None) logging.warning(f"关闭服务节点Subject: {msg.subject}:::{msg.payload}::{host}::{server_host}") if not worker_closed and host and host == server_host: nc.unsubscribe(worker) worker_closed = True logging.warning("订阅服务关闭成功......") logging.warning(f"服务启动成功......") worker = nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue) if manager_subject: work_manager = nc.subscribe(subject=manager_subject, callback=nodes_worker_closed, queue=manager_queue) nc.wait() return wrapper