expert_server.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # coding:utf-8
  2. '''
  3. 微服务服务端
  4. '''
  5. from proto import exists_expert_pb2
  6. from servicerd.service import ServiceManage
  7. from proto import exists_expert_pb2_grpc
  8. from concurrent import futures
  9. from loguru import logger
  10. from predict import extract_expert
  11. import grpc
  12. import argparse
  13. import time
  14. import json
  15. logger.add('./logs/runtime_{time}.log', rotation='00:00')
  16. parser = argparse.ArgumentParser()
  17. parser.add_argument('-host', '--serve_host', type=str, default="192.168.20.69", help='提供服务的主机地址')
  18. parser.add_argument('-port', '--serve_port', type=int, default=14001, help='提供>服务的主机端口')
  19. parser.add_argument('-chost', '--consul_serve_host', type=str, default='192.168.3.12', help='consul主机地址')
  20. parser.add_argument('-cport', '--consul_serve_port', type=int, default=10021, help='consul主机端口')
  21. args = parser.parse_args()
  22. _ONE_DAY_IN_SECONDS = 60 * 60 * 24
  23. _SERVICE_NAME = 'extract_expert_service'
  24. _HOST = args.serve_host
  25. _PORT = args.serve_port
  26. RD_SERVER = "{}:{}".format(args.consul_serve_host, args.consul_serve_port)
  27. WORKERS = 10
  28. class ExtractExpertServices(exists_expert_pb2_grpc.ExistsExpertServicer):
  29. def extract(self, request, context):
  30. # TODO 调业务
  31. try:
  32. request_params = json.loads(request.contents)
  33. print(request_params)
  34. predict_result = extract_expert(request_params)
  35. result = json.dumps({"result": predict_result, "status": 200})
  36. return exists_expert_pb2.ExpertResponse(results=result)
  37. except Exception as e:
  38. logger.warning(e)
  39. result = json.dumps({"status": 300})
  40. return exists_expert_pb2.ExpertResponse(results=result)
  41. def start_server(destory_fn: any):
  42. """
  43. 启动服务
  44. :param destory_fn:
  45. :return:
  46. """
  47. server = grpc.server(futures.ThreadPoolExecutor(max_workers=WORKERS), options=[
  48. ('grpc.max_send_message_length', 100 * 1024 * 1024),
  49. ('grpc.max_receive_message_length', 100 * 1024 * 1024)])
  50. exists_expert_pb2_grpc.add_ExistsExpertServicer_to_server(ExtractExpertServices(), server)
  51. server.add_insecure_port('[::]:%d' % (_PORT))
  52. server.start()
  53. try:
  54. while True:
  55. time.sleep(1)
  56. except KeyboardInterrupt:
  57. destory_fn()
  58. server.stop(0)
  59. if __name__ == '__main__':
  60. sm = ServiceManage(rd_server=RD_SERVER,
  61. name=_SERVICE_NAME,
  62. local_ip=_HOST,
  63. local_port=_PORT,
  64. workers=WORKERS,
  65. balance_type=2)
  66. start_server(sm.destory)