123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # coding:utf-8
- import argparse
- from functools import wraps
- from pynats import NATSClient
- from a2s.proto.service_pb2 import NatsRequest, NatsResponse
- from a2s.tools import json_serialize, json_deserialize, get_host_ip
- import time
- import logging
- import uuid
- def callback():
- pass
- def simple_params(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- parser = argparse.ArgumentParser()
- parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机')
- parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口')
- parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称')
- parser.add_argument('-manager_subject', '--manager_subject', type=str, help='监听关闭服务的主题')
- parser.add_argument('-manager_queue', '--manager_queue', type=str, help='关闭服务的队列名称,推荐使用默认的uuid')
- parser.add_argument('-server_host', '--server_host', type=str, help='关闭服务的队列名称,推荐使用默认的uuid')
- parser.add_argument('-queue', '--queue', type=str, default="main", help='队列名称')
- parser.add_argument('-minWorktime', '--min_worktime', type=int, default=0,
- help='用于修正超时时间,超时时间-min_worktime为真正的超时时间,减少无用的数据处理')
- args = parser.parse_args()
- params = dict(args._get_kwargs())
- params.update(kwargs)
- return func(**params)
- return wrapper
- def watch(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- nats_host = kwargs.get("nats_host")
- nats_port = kwargs.get("nats_port")
- subject = kwargs.get("subject")
- manager_subject = kwargs.get("manager_subject")
- manager_queue = uuid.uuid4().hex if not kwargs.get("manager_queue") else kwargs.get("manager_queue")
- queue = kwargs.get("queue")
- server_host = kwargs.get("server_host", None)
- min_worktime = kwargs.get("min_worktime", 0)
- call_back = kwargs.get("callback", callback)
- if manager_subject and not server_host:
- logging.warning("不知道你是否需要启用监控系统,因为你的server_host没有设置,监控系统将不会启用")
- if not (nats_host and nats_port and subject and queue):
- raise Exception("""
- 建议使用:
- @simple_params
- @watch
- def main(data: bytes, *args, **kwargs) -> bytes:
- """)
- NatsIp = f"nats://{nats_host}:{nats_port}"
- ListenSubject = subject + "_req"
- PublishSubject = subject + "_resp"
- logging.warning(
- f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject},queue:{queue},manager:{manager_subject},manager_queue:{manager_queue},server_host:{server_host}")
- with NATSClient(NatsIp, name=subject) as nc:
- nc.connect()
- worker_closed = False
- def message_handler(msg):
- try:
- nats_req = NatsRequest()
- nats_req.ParseFromString(msg.payload)
- # 检查消息是否超时
- t = int(time.time())
- if t - nats_req.timestamp >= nats_req.timeout - min_worktime:
- logging.debug("%s 这条消息超时了,舍弃" % (nats_req.msgId))
- return
- kwargs["data"] = nats_req.data
- bytes_result = func(*args, **kwargs)
- nats_resp = NatsResponse(msgId=nats_req.msgId, data=bytes_result)
- nc.publish(subject=PublishSubject, payload=nats_resp.SerializeToString())
- call_back() # 回调函数,程序运行结束后执行的操作
- except Exception as e:
- logging.warning(e)
- def nodes_worker_closed(msg):
- nonlocal worker
- nonlocal worker_closed
- close_msg = json_deserialize(msg.payload)
- host = close_msg.get("host", None)
- logging.warning(f"关闭服务节点Subject: {msg.subject}:::{msg.payload}::{host}::{server_host}")
- if not worker_closed and host and host == server_host:
- nc.unsubscribe(worker)
- worker_closed = True
- logging.warning("订阅服务关闭成功......")
- logging.warning(f"服务启动成功......")
- worker = nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue)
- if manager_subject:
- work_manager = nc.subscribe(subject=manager_subject, callback=nodes_worker_closed, queue=manager_queue)
- nc.wait()
- return wrapper
|