123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- # coding:utf-8
- import nsq
- from app import FileApps
- import argparse
- import json
- from util.nsq_client import Nsq
- from loguru import logger
- logger.add('./logs/runtime_{time}.log', rotation='00:00')
- parser = argparse.ArgumentParser()
- parser.add_argument('-host', '--serve_host', type=str, default="192.168.21.183", help='客户端主机地址')
- parser.add_argument('-port', '--serve_port', type=str, default="10010", help='客户端服务端口')
- parser.add_argument('-cport', '--nsq_port', type=str, default="27017", help='服务端口')
- parser.add_argument('-chost', '--nsq_host', type=str, default="192.168.3.13", help='服务主机地址')
- parser.add_argument('-sname', '--topic', type=str, default="attachment", help='监听主题名称')
- args = parser.parse_args()
- NSQ_IP = args.nsq_host
- nsq_obj = Nsq(NSQ_IP)
- Channel = f"extract{args.serve_host}{args.serve_port}".replace(".", "")
- print(NSQ_IP, Channel, args.topic)
- def object_to_json(response_objects: list):
- """
- 类型转换
- :param response_objects:
- :return:
- """
- response_list = []
- for file_object in response_objects:
- response_list.append({"fileName": file_object.fileName, "textContent": file_object.textContent,
- "textUrl": file_object.textUrl, "filePath": file_object.filePath,
- "errorState": file_object.errorState})
- return response_list
- def handler(message):
- body = message.body
- body_json = {}
- topic = ""
- try:
- response_result = []
- body_json = json.loads(body)
- topic = body_json.get("topic", "")
- print(f"-->{body}")
- for file in body_json.get("message", []):
- request_attr = {"file_name": file.get("fileName", ""),
- "file_url": file.get("fileUrl", ""),
- "file_bytes": file.get("fileBytes"),
- "file_type": file.get("fileType", ""),
- "return_type": file.get("returnType", 0),
- "extract_type": file.get("extractType", 0)}
- file_factory = FileApps(request_attr)
- parse_detail = file_factory.start()
- response_result.extend(parse_detail)
- response_result = object_to_json(response_result)
- body_json["result"] = response_result
- if "message" in body_json:
- del body_json["message"]
- result = nsq_obj.pub(topic, msg=body_json)
- print(result)
- print(f"上传成功-->{body_json}")
- except Exception as e:
- print(e)
- logger.warning("warning not channel-->")
- if topic:
- body_json["errorState"] = 600
- nsq_obj.pub(topic, msg=body_json)
- return True
- nsq.Reader(message_handler=handler, nsqd_tcp_addresses=[f'{NSQ_IP}:4150'], topic=f'{args.topic}',
- channel=Channel,
- lookupd_poll_interval=3)
- if __name__ == '__main__':
- nsq.run()
|