a2s_server.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # coding:utf-8
  2. import argparse
  3. from functools import wraps
  4. from pynats import NATSClient
  5. from proto.service_pb2 import NatsRequest, NatsResponse
  6. import time
  7. import logging
  8. def simple_params(func):
  9. @wraps(func)
  10. def wrapper():
  11. parser = argparse.ArgumentParser()
  12. parser.add_argument('-host', '--nats_host', type=str, default="192.168.3.240", help='nats主机')
  13. parser.add_argument('-port', '--nats_port', type=int, default=19090, help='nats端口')
  14. parser.add_argument('-subject', '--subject', type=str, default="test", help='监听主题名称')
  15. parser.add_argument('-queue', '--queue', type=str, default="main", help='监听主题名称')
  16. args = parser.parse_args()
  17. params = dict(args._get_kwargs())
  18. return func(**params)
  19. return wrapper
  20. def watch(func):
  21. @wraps(func)
  22. def wrapper(*args, **kwargs):
  23. nats_host = kwargs.get("nats_host")
  24. nats_port = kwargs.get("nats_port")
  25. subject = kwargs.get("subject")
  26. queue = kwargs.get("queue")
  27. if not (nats_host and nats_port and subject and queue):
  28. raise Exception("""请检查参数,使用举例:
  29. 1、使用方法
  30. @watch
  31. def say_hello(nats_host="192.168.3.109", nats_port=800, subject="text2vec", queue="main",data):
  32. print(data)
  33. 2、建议使用:
  34. @simple_params
  35. @watch
  36. def main(data: bytes, *args, **kwargs) -> bytes:
  37. """)
  38. NatsIp = f"nats://{nats_host}:{nats_port}"
  39. ListenSubject = subject + "_req"
  40. PublishSubject = subject + "_resp"
  41. logging.warning(f"服务启动中>>>>listen::{ListenSubject}:,,publish:{PublishSubject}")
  42. with NATSClient(NatsIp, name=subject) as nc:
  43. nc.connect()
  44. def message_handler(msg):
  45. try:
  46. nats_req = NatsRequest()
  47. nats_req.ParseFromString(msg.payload)
  48. # TODO 检查消息是否超时
  49. t = int(time.time())
  50. if t - nats_req.timestamp > nats_req.timeout:
  51. logging.debug("%s 这条消息超时了,舍弃" % (nats_req.msgId))
  52. return
  53. kwargs["data"] = nats_req.data
  54. bytes_result = func(*args, **kwargs)
  55. nats_resp = NatsResponse(msgId=nats_req.msgId, data=bytes_result)
  56. nc.publish(subject=PublishSubject, payload=nats_resp.SerializeToString())
  57. except Exception as e:
  58. logging.warning(e)
  59. logging.warning(f"服务启动成功......")
  60. nc.subscribe(subject=ListenSubject, callback=message_handler, queue=queue)
  61. nc.wait()
  62. return wrapper