# -*-coding:utf-8 -*- """ 任务发布者 """ import cv2 as cv import grpc from util.picture_zooming import PictureZooming from servicerd.client import RdClient from proto import ocr_pb2_grpc from proto import ocr_pb2 from docs.config import picture_config from loguru import logger from file_processing.tr_ocr import tr_ocr_patch from docs.config import error_number zoom = PictureZooming() cd = RdClient(service_name=picture_config["service_name"], rd_server=picture_config["rd_server"], balance_type=picture_config["balance_type"]) def run(param, **kwargs) -> (bool, any): address = '{}:{}'.format(kwargs['ip'], kwargs['port']) print(address) with grpc.insecure_channel(address) as channel: stub = ocr_pb2_grpc.OcrStub(channel) response = stub.Ocr(ocr_pb2.OcrRequest(image=param[1])) # print("Greeter client received: " + response.message) return param[0], response.message def execute_ocr(image_list): try: image_text = {} images = [] # 图片缩放 for position, image_path in image_list: img_file = read_image(image_path) if (not img_file) or isinstance(img_file, str): continue images.append((position, img_file)) if images: # 批量识别 ocr_results = cd.parallel_computing(images, 20, run) for values in ocr_results: if len(values) < 2: continue if values[1]: image_text[values[0]] = values[1] return image_text except Exception as e: logger.warning(f"ocr is fail->{e}"), error_number["解析错误"] return {} def read_image(filepath): try: load_image = cv.imread(filepath) x, y, z = load_image.shape # 图片大小判断 if y < 600 or x < 500: return "" # 图片缩放 load_image = zoom.patch(load_image) # 格式转换 load_image = cv.imencode(".jpg", load_image)[1].tobytes() return load_image except Exception as e: return {} OcrType = "common" ocr_select = {"tr": tr_ocr_patch, "common": execute_ocr} def ocr(images_path): global OcrType if not images_path: return {}, error_number["解析错误"] images = [(0, images_path)] result = ocr_select[OcrType](images) return result.get(0, ""), error_number["成功"] def ocr_patch(images): ''' :param images: :param ocr_type: :return: ''' global OcrType if not images: return {} result = ocr_select[OcrType](images) return result