nsq_server.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # coding:utf-8
  2. import nsq
  3. from app import FileApps
  4. import argparse
  5. import json
  6. from util.nsq_client import Nsq
  7. from loguru import logger
  8. logger.add('./logs/runtime_{time}.log', rotation='00:00')
  9. parser = argparse.ArgumentParser()
  10. parser.add_argument('-host', '--serve_host', type=str, default="192.168.21.183", help='客户端主机地址')
  11. parser.add_argument('-port', '--serve_port', type=str, default="10010", help='客户端服务端口')
  12. parser.add_argument('-cport', '--nsq_port', type=str, default="27017", help='服务端口')
  13. parser.add_argument('-chost', '--nsq_host', type=str, default="192.168.3.13", help='服务主机地址')
  14. parser.add_argument('-sname', '--topic', type=str, default="attachment", help='监听主题名称')
  15. args = parser.parse_args()
  16. NSQ_IP = args.nsq_host
  17. nsq_obj = Nsq(NSQ_IP)
  18. Channel = f"extract{args.serve_host}{args.serve_port}".replace(".", "")
  19. print(NSQ_IP, Channel, args.topic)
  20. def object_to_json(response_objects: list):
  21. """
  22. 类型转换
  23. :param response_objects:
  24. :return:
  25. """
  26. response_list = []
  27. for file_object in response_objects:
  28. response_list.append({"fileName": file_object.fileName, "textContent": file_object.textContent,
  29. "textUrl": file_object.textUrl, "filePath": file_object.filePath,
  30. "errorState": file_object.errorState})
  31. return response_list
  32. def handler(message):
  33. body = message.body
  34. body_json = {}
  35. topic = ""
  36. try:
  37. response_result = []
  38. body_json = json.loads(body)
  39. topic = body_json.get("topic", "")
  40. print(f"-->{body}")
  41. for file in body_json.get("message", []):
  42. request_attr = {"file_name": file.get("fileName", ""),
  43. "file_url": file.get("fileUrl", ""),
  44. "file_bytes": file.get("fileBytes"),
  45. "file_type": file.get("fileType", ""),
  46. "return_type": file.get("returnType", 0),
  47. "extract_type": file.get("extractType", 0)}
  48. file_factory = FileApps(request_attr)
  49. parse_detail = file_factory.start()
  50. response_result.extend(parse_detail)
  51. response_result = object_to_json(response_result)
  52. body_json["result"] = response_result
  53. if "message" in body_json:
  54. del body_json["message"]
  55. result = nsq_obj.pub(topic, msg=body_json)
  56. print(result)
  57. print(f"上传成功-->{body_json}")
  58. except Exception as e:
  59. print(e)
  60. logger.warning("warning not channel-->")
  61. if topic:
  62. body_json["errorState"] = 600
  63. nsq_obj.pub(topic, msg=body_json)
  64. return True
  65. nsq.Reader(message_handler=handler, nsqd_tcp_addresses=[f'{NSQ_IP}:4150'], topic=f'{args.topic}',
  66. channel=Channel,
  67. lookupd_poll_interval=3)
  68. if __name__ == '__main__':
  69. nsq.run()