123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*-coding:utf-8 -*-
- """
- 任务发布者
- """
- import cv2 as cv
- import grpc
- from util.picture_zooming import PictureZooming
- from servicerd.client import RdClient
- from proto import ocr_pb2_grpc
- from proto import ocr_pb2
- from docs.config import picture_config
- from loguru import logger
- from file_processing.tr_ocr import tr_ocr_patch
- from docs.config import error_number
- zoom = PictureZooming()
- cd = RdClient(service_name=picture_config["service_name"],
- rd_server=picture_config["rd_server"],
- balance_type=picture_config["balance_type"])
- def run(param, **kwargs) -> (bool, any):
- address = '{}:{}'.format(kwargs['ip'], kwargs['port'])
- print(address)
- with grpc.insecure_channel(address) as channel:
- stub = ocr_pb2_grpc.OcrStub(channel)
- response = stub.Ocr(ocr_pb2.OcrRequest(image=param[1]))
- # print("Greeter client received: " + response.message)
- return param[0], response.message
- def execute_ocr(image_list):
- try:
- image_text = {}
- images = []
- # 图片缩放
- for position, image_path in image_list:
- img_file = read_image(image_path)
- if (not img_file) or isinstance(img_file, str):
- continue
- images.append((position, img_file))
- if images:
- # 批量识别
- ocr_results = cd.parallel_computing(images, 20, run)
- for values in ocr_results:
- if len(values) < 2:
- continue
- if values[1]:
- image_text[values[0]] = values[1]
- return image_text
- except Exception as e:
- logger.warning(f"ocr is fail->{e}"), error_number["解析错误"]
- return {}
- def read_image(filepath):
- try:
- load_image = cv.imread(filepath)
- x, y, z = load_image.shape
- # 图片大小判断
- if y < 600 or x < 500:
- return ""
- # 图片缩放
- load_image = zoom.patch(load_image)
- # 格式转换
- load_image = cv.imencode(".jpg", load_image)[1].tobytes()
- return load_image
- except Exception as e:
- return {}
- OcrType = "common"
- ocr_select = {"tr": tr_ocr_patch, "common": execute_ocr}
- def ocr(images_path):
- global OcrType
- if not images_path:
- return {}, error_number["解析错误"]
- images = [(0, images_path)]
- result = ocr_select[OcrType](images)
- return result.get(0, ""), error_number["成功"]
- def ocr_patch(images):
- '''
- :param images:
- :param ocr_type:
- :return:
- '''
- global OcrType
- if not images:
- return {}
- result = ocr_select[OcrType](images)
- return result
|