# -*- coding: utf-8 -*- import base64 import io import pathlib import time import requests from a2s.a2s_client import a2s_execute from a2s.tools import grpc_deserialize from a2s.tools import json_serialize, json_deserialize from ddddocr import DdddOcr import setting from proto import ocr_pb2 from utils import ( is_specific_number_chinese_character, is_en, is_zero_or_o, is_digit ) def dg_ocr_client(image, retry=5, a2s_ip=None, topic=None, timeout=None): """ 读光ocr :param bytes image: 图片二进制 :param int retry: 重试次数 :param str a2s_ip: 服务host :param str topic: 调用功能的主题名称 :param int timeout: 超时时间 :return: """ if a2s_ip is None: a2s_ip = setting.DG_OCR_A2S_ADDRESS if topic is None: topic = setting.DG_OCR_TOPIC if timeout is None: timeout = setting.DG_OCR_TIMEOUT image_str = base64.b64encode(image).decode('utf-8') # 将bytes转换为base64字符串 data = json_serialize({'image': image_str}) for _ in range(retry): resp = a2s_execute(a2s_ip, topic, bytes_data=data, timeout=timeout) if resp is None: # 超时,链接异常断开重试 time.sleep(5) continue # 将Base64编码的字符串解码为二进制数据 resp_json = json_deserialize(resp) state = resp_json.get("state", 0) if state != 200: # 解析异常失败,重试or结束 return None return resp_json.get("output", []) def jy_ocr_client(data, retry=5, a2s_ip=None, topic=None, timeout=None, missing_ok=False): """ jy-ocr @param bytes data: @param int retry: @param str a2s_ip: @param str topic: @param int timeout: @param bool missing_ok: @return: """ if a2s_ip is None: a2s_ip = setting.JY_OCR_A2S_ADDRESS if topic is None: topic = setting.JY_OCR_TOPIC if timeout is None: timeout = setting.JY_OCR_A2S_TIMEOUT for r in range(retry): try: rsp = ocr_pb2.OcrRequest(image=data) rsp = rsp.SerializeToString() resp_data = a2s_execute(a2s_ip, topic, timeout, rsp) if resp_data is None: continue resp = ocr_pb2.OcrResponse() resp = grpc_deserialize(resp, resp_data) result = str(resp.message).strip() return result except Exception as e: if not missing_ok: raise e def jy_api_ocr_client(data, retry=5, timeout=None, missing_ok=False): if not isinstance(data, (str, bytes)): raise ValueError('未知数据类型') if isinstance(data, bytes): data = io.BytesIO(data) else: assert isinstance(data, str) with open(data, 'rb') as f: data = f.read() url = f'http://{setting.JY_API_OCR_ADDRESS}/api/tr-run/' if timeout is None: timeout = setting.JY_API_OCR_TIMEOUT files = {'file': data} for _ in range(retry): request_param = dict(files=files, data={'compress': 0}) try: response = requests.post(url, timeout=timeout, **request_param) if response.status_code != 200: continue raw_out = response.json()['data']['raw_out'] data = '' if not raw_out else raw_out[0][1] return [data] except requests.exceptions.RequestException as e: if not missing_ok: raise e def dg_ocr(image, default=''): return next(iter(dg_ocr_client(image=image)), default) def jy_ocr(image, default=''): return next(iter(jy_ocr_client(data=image)), default) def jy_api_ocr(image, default=''): return next(iter(jy_api_ocr_client(data=image)), default) ocr = DdddOcr(show_ad=False) def mix_ocr_image_extract(files): image = files.read_bytes() if isinstance(files, pathlib.Path) else files[1] file_content = ocr.classification(image) # print(f'{files.name} ', 'ddd-ocr => ', file_content) # print(f'{files.name} ', 'ddd-ocr => ', file_content, 'dg-ocr => ', dg_ocr_file_content) if ( not file_content or len(file_content) > 1 or is_specific_number_chinese_character(file_content) or is_en(file_content) ): file_content = dg_ocr(image) # _ = ' ' * (len(files.name)) # print(f'{_} ', 'dg-ocr => ', file_content) if is_zero_or_o(file_content): file_content = jy_ocr(image) # _ = ' ' * (len(files.name)) # print(f'{_} ', 'jy-ocr => ', file_content) return file_content def dg_ocr_image_extract(files): image = files.read_bytes() if isinstance(files, pathlib.Path) else files[1] file_content = dg_ocr(image) if ( not file_content or len(file_content) > 1 or is_zero_or_o(file_content) or is_en(file_content) or is_digit(file_content) ): file_content = jy_ocr(image) # file_content = jy_api_ocr(image) # _ = ' ' * (len(filename)) # print(f'{_} ', 'jy-ocr => ', file_content) return file_content if __name__ == '__main__': image_path = "./cache/image/0x100c3.jpg" with open(image_path, "rb") as rp: texts = dg_ocr_client(rp.read()) print(texts)