123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- import base64
- import io
- import re
- import time
- from typing import Union
- import cv2
- import ddddocr
- import numpy as np
- from fastapi import (
- APIRouter,
- Depends,
- HTTPException,
- UploadFile,
- File,
- Query
- )
- from fastapi.requests import Request
- from fastapi.security import OAuth2PasswordRequestForm
- import setting
- from services.chaojiying import CJ
- from services.defults import FAKE_USERS_DB
- from services.limiter import limiter
- from services.utils import calculate
- from services.zhipu import ZhipuFileExtract
- images_router = APIRouter(prefix="/images")
- def prepare_image(file):
- if file.startswith(b'data:image'):
- src = file.decode()
- result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
- if result:
- # ext = result.groupdict().get("ext")
- return result.groupdict().get("data") # img_base64
- else:
- raise TypeError("transform image type failed!")
- else:
- assert isinstance(file, bytes)
- return file # img_bytes
- @images_router.post("/verify", summary="Ocr")
- async def simple_captcha(file: UploadFile = File(...)):
- start = time.time()
- img_bytes = await file.read()
- ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False)
- image = prepare_image(img_bytes)
- ret = ocr.classification(image)
- return {
- "msg": "success",
- "code": 0,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "code": ret
- }
- }
- @images_router.post("/arithmetic", summary="100以内算术")
- async def arithmetic_captcha(file: UploadFile = File(...)):
- start = time.time()
- img_bytes = await file.read()
- onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx"
- charsets_path = "libs/tj_arithmetic/charsets.json"
- ocr = ddddocr.DdddOcr(det=False,
- ocr=False,
- show_ad=False,
- import_onnx_path=onnx_path,
- charsets_path=charsets_path)
- image = prepare_image(img_bytes)
- ret = image.replace("x", "*")
- return {
- "msg": "success",
- "code": 0,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "code": calculate(ret)
- }
- }
- @images_router.post("/verify_det", summary="点验图片")
- async def det_captcha(file: UploadFile = File(..., description="验证码图片")):
- det = ddddocr.DdddOcr(det=True, show_ad=False)
- ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
- img_bytes = await file.read()
- image = prepare_image(img_bytes)
- if isinstance(image, bytes):
- poses = det.detection(img_bytes=image)
- else:
- assert isinstance(image, str)
- poses = det.detection(img_base64=image)
- img_bytes = base64.b64decode(image)
- img_byte = io.BytesIO(img_bytes)
- file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
- image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
- strxys = {}
- for box in poses:
- # 对框内文字进行识别
- x1, y1, x2, y2 = box
- part = image[y1:y2, x1:x2]
- img = cv2.imencode(".jpg", part)[1]
- result = ocr.classification(img.tobytes())
- result = re.sub("[a-zA-Z0-9]+", "", result)
- if len(result) > 1:
- result = result[0]
- strxys[result] = [x1, y1, x2, y2]
- result = {
- "msg": "success",
- "code": 0,
- "r": {
- "code": strxys,
- "code_list": poses
- }
- }
- return result
- def user_authentication(form_data: OAuth2PasswordRequestForm):
- user_dict = FAKE_USERS_DB.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="用户名或密码错误")
- async def cjy_postpic_base64(
- pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"),
- jy_code: Union[str, None] = Query(None, description="爬虫代码"),
- file: UploadFile = File(..., description="验证码图片"),
- ):
- start = time.time()
- img_bytes = await file.read()
- base64_str = base64.b64encode(img_bytes)
- discern_result = CJ.postpic_base64(base64_str, int(pic_type))
- err_no = discern_result["err_no"]
- pic_id = discern_result["pic_id"]
- result = {
- "msg": discern_result["err_str"],
- "code": err_no,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "pic_str": discern_result["pic_str"],
- "pic_id": pic_id,
- "md5": discern_result["md5"],
- "jy_code": jy_code or ""
- }
- }
- return result
- @images_router.post("/discern", summary="超级鹰识别")
- @limiter.limit(setting.LIMITER_VALUE)
- async def discern_complex_image(
- request: Request, # 必须设置,否则 limiter 无法起限制作用
- form_data: OAuth2PasswordRequestForm = Depends(),
- cjy_captcha: dict = Depends(cjy_postpic_base64),
- ):
- user_authentication(form_data)
- return cjy_captcha
- @images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分")
- async def cjy_report_error(
- form_data: OAuth2PasswordRequestForm = Depends(),
- pic_id: str = Query(..., description="验证码图片id"),
- ):
- user_authentication(form_data)
- report_error = CJ.report_error(pic_id)
- return {
- "msg": report_error["err_str"],
- "code": report_error["err_no"],
- "r": {}
- }
- @images_router.get("/reset", summary="重置超级鹰日访问次数")
- async def reset_limiter():
- limiter.reset()
- limiter.reset_warn()
- return {
- "msg": "success",
- "code": 0,
- "r": {}
- }
- @images_router.post("/verify_z", summary="Ocr")
- async def zhipu_ocr(file: UploadFile = File(...)):
- start = time.time()
- img_bytes = await file.read()
- image = prepare_image(img_bytes)
- client = ZhipuFileExtract(
- api_key="4abf8f53d1daed90f8a03fc982899418.dyK8ivPgzm3M1Nx5"
- )
- ret = client.glm_4v_flash(
- image,
- text='这是一张100以内数学计算图片验证码,图片内容计算结果是什么?你只需要返回图中内容计算结果即可,不要解释,不要返回任何其它内容'
- )
- return {
- "msg": "success",
- "code": 0,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "code": ret
- }
- }
|