123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- # -*- coding: utf-8 -*-
- import base64
- import io
- import pathlib
- import time
- import requests
- from a2s.a2s_client import a2s_execute
- from a2s.tools import grpc_deserialize
- from a2s.tools import json_serialize, json_deserialize
- from ddddocr import DdddOcr
- import setting
- from proto import ocr_pb2
- from utils import (
- is_specific_number_chinese_character,
- is_en,
- is_zero_or_o,
- is_digit
- )
- def dg_ocr_client(image, retry=5, a2s_ip=None, topic=None, timeout=None):
- """
- 读光ocr
- :param bytes image: 图片二进制
- :param int retry: 重试次数
- :param str a2s_ip: 服务host
- :param str topic: 调用功能的主题名称
- :param int timeout: 超时时间
- :return:
- """
- if a2s_ip is None:
- a2s_ip = setting.DG_OCR_A2S_ADDRESS
- if topic is None:
- topic = setting.DG_OCR_TOPIC
- if timeout is None:
- timeout = setting.DG_OCR_TIMEOUT
- image_str = base64.b64encode(image).decode('utf-8') # 将bytes转换为base64字符串
- data = json_serialize({'image': image_str})
- for _ in range(retry):
- resp = a2s_execute(a2s_ip, topic, bytes_data=data, timeout=timeout)
- if resp is None:
- # 超时,链接异常断开重试
- time.sleep(5)
- continue
- # 将Base64编码的字符串解码为二进制数据
- resp_json = json_deserialize(resp)
- state = resp_json.get("state", 0)
- if state != 200:
- # 解析异常失败,重试or结束
- return None
- return resp_json.get("output", [])
- def jy_ocr_client(data, retry=5, a2s_ip=None, topic=None, timeout=None, missing_ok=False):
- """
- jy-ocr
- @param bytes data:
- @param int retry:
- @param str a2s_ip:
- @param str topic:
- @param int timeout:
- @param bool missing_ok:
- @return:
- """
- if a2s_ip is None:
- a2s_ip = setting.JY_OCR_A2S_ADDRESS
- if topic is None:
- topic = setting.JY_OCR_TOPIC
- if timeout is None:
- timeout = setting.JY_OCR_A2S_TIMEOUT
- for r in range(retry):
- try:
- rsp = ocr_pb2.OcrRequest(image=data)
- rsp = rsp.SerializeToString()
- resp_data = a2s_execute(a2s_ip, topic, timeout, rsp)
- if resp_data is None:
- continue
- resp = ocr_pb2.OcrResponse()
- resp = grpc_deserialize(resp, resp_data)
- result = str(resp.message).strip()
- return result
- except Exception as e:
- if not missing_ok:
- raise e
- def jy_api_ocr_client(data, retry=5, timeout=None, missing_ok=False):
- if not isinstance(data, (str, bytes)):
- raise ValueError('未知数据类型')
- if isinstance(data, bytes):
- data = io.BytesIO(data)
- else:
- assert isinstance(data, str)
- with open(data, 'rb') as f:
- data = f.read()
- url = f'http://{setting.JY_API_OCR_ADDRESS}/api/tr-run/'
- if timeout is None:
- timeout = setting.JY_API_OCR_TIMEOUT
- files = {'file': data}
- for _ in range(retry):
- request_param = dict(files=files, data={'compress': 0})
- try:
- response = requests.post(url, timeout=timeout, **request_param)
- if response.status_code != 200:
- continue
- raw_out = response.json()['data']['raw_out']
- data = '' if not raw_out else raw_out[0][1]
- return [data]
- except requests.exceptions.RequestException as e:
- if not missing_ok:
- raise e
- def dg_ocr(image, default=''):
- return next(iter(dg_ocr_client(image=image)), default)
- def jy_ocr(image, default=''):
- return next(iter(jy_ocr_client(data=image)), default)
- def jy_api_ocr(image, default=''):
- return next(iter(jy_api_ocr_client(data=image)), default)
- ocr = DdddOcr(show_ad=False)
- def mix_ocr_image_extract(files):
- image = files.read_bytes() if isinstance(files, pathlib.Path) else files[1]
- file_content = ocr.classification(image)
- # print(f'{files.name} ', 'ddd-ocr => ', file_content)
- # print(f'{files.name} ', 'ddd-ocr => ', file_content, 'dg-ocr => ', dg_ocr_file_content)
- if (
- not file_content
- or len(file_content) > 1
- or is_specific_number_chinese_character(file_content)
- or is_en(file_content)
- ):
- file_content = dg_ocr(image)
- # _ = ' ' * (len(files.name))
- # print(f'{_} ', 'dg-ocr => ', file_content)
- if is_zero_or_o(file_content):
- file_content = jy_ocr(image)
- # _ = ' ' * (len(files.name))
- # print(f'{_} ', 'jy-ocr => ', file_content)
- return file_content
- def dg_ocr_image_extract(files):
- image = files.read_bytes() if isinstance(files, pathlib.Path) else files[1]
- file_content = dg_ocr(image)
- if (
- not file_content
- or len(file_content) > 1
- or is_zero_or_o(file_content)
- or is_en(file_content)
- or is_digit(file_content)
- ):
- file_content = jy_ocr(image)
- # file_content = jy_api_ocr(image)
- # _ = ' ' * (len(filename))
- # print(f'{_} ', 'jy-ocr => ', file_content)
- return file_content
- if __name__ == '__main__':
- image_path = "./cache/image/0x100c3.jpg"
- with open(image_path, "rb") as rp:
- texts = dg_ocr_client(rp.read())
- print(texts)
|