# coding:utf-8 import nsq from app import FileApps import argparse import json from util.nsq_client import Nsq from loguru import logger logger.add('./logs/runtime_{time}.log', rotation='00:00') parser = argparse.ArgumentParser() parser.add_argument('-host', '--serve_host', type=str, default="192.168.21.183", help='客户端主机地址') parser.add_argument('-port', '--serve_port', type=str, default="10010", help='客户端服务端口') parser.add_argument('-cport', '--nsq_port', type=str, default="27017", help='服务端口') parser.add_argument('-chost', '--nsq_host', type=str, default="192.168.3.13", help='服务主机地址') parser.add_argument('-sname', '--topic', type=str, default="attachment", help='监听主题名称') args = parser.parse_args() NSQ_IP = args.nsq_host nsq_obj = Nsq(NSQ_IP) Channel = f"extract{args.serve_host}{args.serve_port}".replace(".", "") print(NSQ_IP, Channel, args.topic) def object_to_json(response_objects: list): """ 类型转换 :param response_objects: :return: """ response_list = [] for file_object in response_objects: response_list.append({"fileName": file_object.fileName, "textContent": file_object.textContent, "textUrl": file_object.textUrl, "filePath": file_object.filePath, "errorState": file_object.errorState}) return response_list def handler(message): body = message.body body_json = {} topic = "" try: response_result = [] body_json = json.loads(body) topic = body_json.get("topic", "") print(f"-->{body}") for file in body_json.get("message", []): request_attr = {"file_name": file.get("fileName", ""), "file_url": file.get("fileUrl", ""), "file_bytes": file.get("fileBytes"), "file_type": file.get("fileType", ""), "return_type": file.get("returnType", 0), "extract_type": file.get("extractType", 0)} file_factory = FileApps(request_attr) parse_detail = file_factory.start() response_result.extend(parse_detail) response_result = object_to_json(response_result) body_json["result"] = response_result if "message" in body_json: del body_json["message"] result = nsq_obj.pub(topic, msg=body_json) print(result) print(f"上传成功-->{body_json}") except Exception as e: print(e) logger.warning("warning not channel-->") if topic: body_json["errorState"] = 600 nsq_obj.pub(topic, msg=body_json) return True nsq.Reader(message_handler=handler, nsqd_tcp_addresses=[f'{NSQ_IP}:4150'], topic=f'{args.topic}', channel=Channel, lookupd_poll_interval=3) if __name__ == '__main__': nsq.run()