import base64 import io import re import time from typing import Union import cv2 import ddddocr import numpy as np from fastapi import ( APIRouter, Depends, HTTPException, UploadFile, File, Query ) from fastapi.requests import Request from fastapi.security import OAuth2PasswordRequestForm import services.limiter as limiter from services.chaojiying import CJ from services.defults import FAKE_USERS_DB from services.limiter import LIMITER from services.utils import calculate images_router = APIRouter(prefix="/images") @images_router.post("/verify", summary="Ocr") async def simple_captcha(file: UploadFile = File(...)): start = time.time() img_bytes = await file.read() ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False) if img_bytes.startswith(b'data:image'): src = img_bytes.decode() result = re.search("data:image/(?P.*?);base64,(?P.*)", src, re.DOTALL) if result: # ext = result.groupdict().get("ext") img_base64 = result.groupdict().get("data") else: raise Exception("Do not parse!") verify_code = ocr.classification(img_base64) else: verify_code = ocr.classification(img_bytes) return { "msg": "success", "code": 0, "r": { "time": float("{:.2f}".format(time.time() - start)), "code": verify_code } } @images_router.post("/arithmetic", summary="100以内算术") async def arithmetic_captcha(file: UploadFile = File(...)): start = time.time() img_bytes = await file.read() onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx" charsets_path = "libs/tj_arithmetic/charsets.json" ocr = ddddocr.DdddOcr(det=False, ocr=False, show_ad=False, import_onnx_path=onnx_path, charsets_path=charsets_path) if img_bytes.startswith(b'data:image'): src = img_bytes.decode() result = re.search("data:image/(?P.*?);base64,(?P.*)", src, re.DOTALL) if result: img_base64 = result.groupdict().get("data") else: raise Exception("Do not parse!") verify_code = ocr.classification(img_base64) else: verify_code = ocr.classification(img_bytes) verify_code = verify_code.replace("x", "*") return { "msg": "success", "code": 0, "r": { "time": float("{:.2f}".format(time.time() - start)), "code": calculate(verify_code) } } @images_router.post("/verify_det", summary="点验图片") async def det_captcha(file: UploadFile = File(..., description="验证码图片")): det = ddddocr.DdddOcr(det=True, show_ad=False) ocr = ddddocr.DdddOcr(ocr=True, show_ad=False) img_bytes = await file.read() if img_bytes.startswith(b'data:image'): src = img_bytes.decode() result = re.search("data:image/(?P.*?);base64,(?P.*)", src, re.DOTALL) if result: img_base64 = result.groupdict().get("data") else: raise Exception("Do not parse!") poses = det.detection(img_base64=img_base64) img_bytes = base64.b64decode(img_base64) else: poses = det.detection(img_bytes=img_bytes) img_byte = io.BytesIO(img_bytes) file_array = np.frombuffer(img_byte.getbuffer(), np.uint8) image = cv2.imdecode(file_array, cv2.IMREAD_COLOR) strxys = {} for box in poses: # 对框内文字进行识别 x1, y1, x2, y2 = box part = image[y1:y2, x1:x2] img = cv2.imencode(".jpg", part)[1] result = ocr.classification(img.tobytes()) result = re.sub("[a-zA-Z0-9]+", "", result) if len(result) > 1: result = result[0] strxys[result] = [x1, y1, x2, y2] result = { "msg": "success", "code": 0, "r": { "code": strxys, "code_list": poses } } return result def user_authentication(form_data: OAuth2PasswordRequestForm): user_dict = FAKE_USERS_DB.get(form_data.username) if not user_dict: raise HTTPException(status_code=400, detail="用户名或密码错误") async def cjy_postpic_base64( pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"), jy_code: Union[str, None] = Query(None, description="爬虫代码"), file: UploadFile = File(..., description="验证码图片"), ): start = time.time() img_bytes = await file.read() base64_str = base64.b64encode(img_bytes) discern_result = CJ.postpic_base64(base64_str, int(pic_type)) err_no = discern_result["err_no"] pic_id = discern_result["pic_id"] result = { "msg": discern_result["err_str"], "code": err_no, "r": { "time": float("{:.2f}".format(time.time() - start)), "pic_str": discern_result["pic_str"], "pic_id": pic_id, "md5": discern_result["md5"], "jy_code": jy_code or "" } } return result @images_router.post("/discern", summary="超级鹰识别") @LIMITER.limit(limiter.LM_VALUE) async def discern_complex_image( request: Request, form_data: OAuth2PasswordRequestForm = Depends(), cjy_captcha: dict = Depends(cjy_postpic_base64), ): user_authentication(form_data) limiter.increase_usages_count() limiter.limiter_warring() return cjy_captcha @images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分") async def cjy_report_error( form_data: OAuth2PasswordRequestForm = Depends(), pic_id: str = Query(..., description="验证码图片id"), ): user_authentication(form_data) report_error = CJ.report_error(pic_id) return { "msg": report_error["err_str"], "code": report_error["err_no"], "r": {} } @images_router.get("/reset", summary="重置超级鹰日访问次数") async def reset_limiter(): LIMITER.reset() limiter.flush_limiter() return { "msg": "success", "code": 0, "r": {} }