dzr 8 月之前
父节点
当前提交
afcc774e52
共有 6 个文件被更改,包括 0 次插入341 次删除
  1. 二进制
      .DS_Store
  2. 二进制
      g'y's/a2s-0.0.2-py3-none-any.whl
  3. 0 174
      g'y's/jy_ocr.py
  4. 0 33
      g'y's/log.py
  5. 0 130
      g'y's/net.py
  6. 0 4
      g'y's/requirements.txt

二进制
.DS_Store


二进制
g'y's/a2s-0.0.2-py3-none-any.whl


+ 0 - 174
g'y's/jy_ocr.py

@@ -1,174 +0,0 @@
-# -*- coding: utf-8 -*-
-import base64
-import io
-import pathlib
-import time
-
-import requests
-from a2s.a2s_client import a2s_execute
-from a2s.tools import grpc_deserialize
-from a2s.tools import json_serialize, json_deserialize
-from ddddocr import DdddOcr
-
-import setting
-from proto import ocr_pb2
-from utils import (
-    is_specific_number_chinese_character,
-    is_en,
-    is_zero_or_o,
-    is_digit
-)
-
-
-def dg_ocr_client(image, retry=5, a2s_ip=None, topic=None, timeout=None):
-    """
-      读光ocr
-    :param bytes image: 图片二进制
-    :param int retry: 重试次数
-    :param str a2s_ip: 服务host
-    :param str topic: 调用功能的主题名称
-    :param int timeout: 超时时间
-    :return:
-    """
-
-    if a2s_ip is None:
-        a2s_ip = setting.DG_OCR_A2S_ADDRESS
-    if topic is None:
-        topic = setting.DG_OCR_TOPIC
-    if timeout is None:
-        timeout = setting.DG_OCR_TIMEOUT
-
-    image_str = base64.b64encode(image).decode('utf-8')  # 将bytes转换为base64字符串
-    req_date = {"image": image_str}
-    req_date = json_serialize(req_date)
-    for num in range(retry):
-        ocr_response = a2s_execute(a2s_ip, topic, bytes_data=req_date, timeout=timeout)
-        if ocr_response is None:
-            # 超时,链接异常断开重试
-            time.sleep(5)
-            continue
-        # 将Base64编码的字符串解码为二进制数据
-        resp_json = json_deserialize(ocr_response)
-        state = resp_json.get("state", 0)
-        if state != 200:
-            # 解析异常失败,重试or结束
-            return None
-        return resp_json.get("output", [])
-
-
-def jy_ocr_client(data, retry=5, a2s_ip=None, topic=None, timeout=None, missing_ok=False):
-    """
-        jy-ocr
-    @param bytes data:
-    @param int retry:
-    @param str a2s_ip:
-    @param str topic:
-    @param int timeout:
-    @param bool missing_ok:
-    @return:
-    """
-
-    if a2s_ip is None:
-        a2s_ip = setting.JY_OCR_A2S_ADDRESS
-    if topic is None:
-        topic = setting.JY_OCR_TOPIC
-    if timeout is None:
-        timeout = setting.JY_OCR_A2S_TIMEOUT
-
-    for r in range(retry):
-        try:
-            rsp = ocr_pb2.OcrRequest(image=data)
-            rsp = rsp.SerializeToString()
-            resp_data = a2s_execute(a2s_ip, topic, timeout, rsp)
-            if resp_data is None:
-                continue
-            resp = ocr_pb2.OcrResponse()
-            resp = grpc_deserialize(resp, resp_data)
-            result = str(resp.message).strip()
-            return result
-        except Exception as e:
-            if not missing_ok:
-                raise e
-
-
-def jy_api_ocr_client(data, retry=5, timeout=None, missing_ok=False):
-    if not isinstance(data, (str, bytes)):
-        raise ValueError('未知数据类型')
-
-    if isinstance(data, bytes):
-        data = io.BytesIO(data)
-    else:
-        assert isinstance(data, str)
-        with open(data, 'rb') as f:
-            data = f.read()
-
-    url = f'http://{setting.JY_API_OCR_ADDRESS}/api/tr-run/'
-    if timeout is None:
-        timeout = setting.JY_API_OCR_TIMEOUT
-
-    files = {'file': data}
-    for _ in range(retry):
-        request_param = dict(files=files, data={'compress': 0})
-        try:
-            response = requests.post(url, timeout=timeout, **request_param)
-            if response.status_code != 200:
-                continue
-            raw_out = response.json()['data']['raw_out']
-            data = '' if not raw_out else raw_out[0][1]
-            return [data]
-        except requests.exceptions.RequestException as e:
-            if not missing_ok:
-                raise e
-
-
-def dg_ocr(image, default=''):
-    return next(iter(dg_ocr_client(image=image)),  default)
-
-
-def jy_ocr(image, default=''):
-    return next(iter(jy_ocr_client(data=image)),  default)
-
-
-def jy_api_ocr(image, default=''):
-    return next(iter(jy_api_ocr_client(data=image, missing_ok=True)),  default)
-
-
-ocr = DdddOcr(show_ad=False)
-
-
-def mix_ocr_image_extract(files):
-    img = files.read_bytes() if isinstance(files, pathlib.Path) else files[1]
-    file_content = ocr.classification(img)
-    # print(f'{files.name} ', 'ddd-ocr => ', file_content)
-    # print(f'{files.name} ', 'ddd-ocr => ', file_content, 'dg-ocr => ', dg_ocr_file_content)
-
-    if not file_content or len(file_content) > 1 or is_specific_number_chinese_character(file_content) or is_en(file_content):
-        file_content = dg_ocr(img)
-        # _ = ' ' * (len(files.name))
-        # print(f'{_} ', 'dg-ocr => ', file_content)
-
-    if is_zero_or_o(file_content):
-        file_content = jy_ocr(img)
-        # _ = ' ' * (len(files.name))
-        # print(f'{_} ', 'jy-ocr => ', file_content)
-
-    return file_content
-
-
-def dg_ocr_image_extract(files):
-    img = files.read_bytes() if isinstance(files, pathlib.Path) else files[1]
-    file_content = dg_ocr(img)
-    if not file_content or len(file_content) > 1 or is_zero_or_o(file_content) or is_en(file_content) or is_digit(file_content):
-        file_content = jy_ocr(img)
-        # file_content = jy_api_ocr(img)
-        # _ = ' ' * (len(filename))
-        # print(f'{_} ', 'jy-ocr => ', file_content)
-
-    return file_content
-
-
-if __name__ == '__main__':
-    image_path = "./cache/image/0x100c3.jpg"
-    with open(image_path, "rb") as rp:
-        texts = dg_ocr_client(rp.read())
-        print(texts)

+ 0 - 33
g'y's/log.py

@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2024-11-12 
----------
-@summary:  
----------
-@author: Dzr
-"""
-import sys
-from pathlib import Path
-
-from loguru import logger
-
-logger.remove()  # 删除默认logru配置
-
-_absolute = Path(__file__).absolute().parent
-_log_path = (_absolute / 'logs/log_{time:YYYYMMDD}.log').resolve()
-loguru_format = (
-    "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
-    "<level>{level: <4}</level> | "
-    "<cyan>{thread.name}</cyan> | "
-    "<cyan>{file.name}</cyan>:<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
-)
-logru_level = 'INFO'
-logger.add(
-    sink=_log_path,
-    format=loguru_format,
-    level=logru_level,
-    rotation='00:00',
-    retention='1 week',
-    encoding='utf-8',
-)
-logger.add(sys.stdout, format=loguru_format, colorize=True, level=logru_level)

+ 0 - 130
g'y's/net.py

@@ -1,130 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2024-11-06 
----------
-@summary:  
----------
-@author: Dzr
-"""
-
-import requests
-
-from clean_html import cleaner
-from font_tool import parse_font, download_font
-from jy_ocr import dg_ocr_image_extract
-from log import logger
-from utils import extract_list, extract_detail_html
-
-
-def get_proxy(scheme=None, socks5h=False, retry=5, default=None, missing_ok=True):
-    url = 'http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch'
-    headers = {'Authorization': 'Basic amlhbnl1MDAxOjEyM3F3ZSFB'}
-
-    def _fetch_proxy():
-        for _ in range(retry):
-            try:
-                return requests.get(url, headers=headers, timeout=15).json()
-            except requests.RequestException as e:
-                if not missing_ok:
-                    raise e
-        return {}
-
-    proxies = _fetch_proxy().get('data')
-    if not proxies:
-        return default
-
-    if socks5h:
-        proxy_item = proxies.get('http')
-        proxies = {
-            'http': proxy_item.replace('socks5', 'socks5h'),
-            'https': proxy_item.replace('socks5', 'socks5h')
-        }
-
-    return proxies if not scheme else proxies.get(scheme, default)
-
-
-def fetch(url, headers, **kwargs):
-    response = requests.get(url, headers=headers, **kwargs)
-    # print(response)
-    response.encoding = 'gb2312'
-    return response
-
-
-def download(url, headers, proxies=None, **kwargs):
-    try:
-        response = fetch(url, headers, timeout=60, proxies=proxies, **kwargs)
-    except requests.RequestException as e:
-        # logger.error(f'网络访问|请求失败|{url}')
-        # logger.exception(e)
-        logger.error(e)
-        return False
-
-    html = response.content.decode('gbk')
-    try:
-        font_file = download_font(html=html)  # 下载动态字体
-    except ValueError:
-        # logger.error(f'网络访问|请求失败|字体文件|{url}')
-        return False
-
-    ft = parse_font(font_file, ocr_extract=dg_ocr_image_extract)
-    return html, ft
-
-
-def download_list(url, proxies=None, **kwargs):
-    headers = {
-        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
-        'accept-language': 'zh-CN,zh;q=0.9',
-        'cache-control': 'no-cache',
-        'pragma': 'no-cache',
-        'priority': 'u=0, i',
-        'upgrade-insecure-requests': '1',
-        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
-    }
-    rets = download(url, headers, proxies=proxies, **kwargs)
-    if rets is False:
-        return
-
-    html, ft = rets
-    return extract_list(html, ft.font_maps)
-
-
-def download_detail(url, proxies=None, missing_ok=True, **kwargs):
-    headers = {
-        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
-        'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
-        'cache-control': 'no-cache',
-        'pragma': 'no-cache',
-        'priority': 'u=0, i',
-        'upgrade-insecure-requests': '1',
-        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
-    }
-    rets = download(url, headers, proxies=proxies, **kwargs)
-    if not rets:
-        return
-
-    html, ft = rets
-    try:
-        contenthtml = extract_detail_html(html, ft.font_maps)
-        return {
-            'contenthtml': contenthtml,
-            'detail': cleaner(contenthtml),
-        }
-    except ValueError as e:
-        if not missing_ok:
-            raise e
-
-
-def send_wechat_warning(msg, send=True):
-    markdown = f'采集异常,请相关同事注意。'
-    markdown += f'\n>异常详情:<font color=\"warning\">**{msg}**</font>'
-
-    if not send:
-        logger.info(markdown)
-        return
-
-    url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=860e3acc-4e5b-4b52-ac19-a49d6a1b5a69'
-    headers = {'Content-Type': 'application/json'}
-    json_data = {'msgtype': 'markdown', 'markdown': {'content': markdown}}
-    request_params = dict(headers=headers, json=json_data, timeout=10)
-    response = requests.post(url, **request_params)
-    logger.info(response.json())

+ 0 - 4
g'y's/requirements.txt

@@ -1,4 +0,0 @@
-grpcio
-protobuf==3.19.0
-fonttools==4.54.1
-freetype-py==2.5.1