123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import base64
- import io
- import re
- import time
- from typing import Union
- import cv2
- import ddddocr
- import numpy as np
- from fastapi import (
- APIRouter,
- Depends,
- HTTPException,
- UploadFile,
- File,
- Query
- )
- from fastapi.requests import Request
- from fastapi.security import OAuth2PasswordRequestForm
- import setting
- from services.chaojiying import CJ
- from services.defults import FAKE_USERS_DB
- from services.limiter import limiter
- from services.utils import calculate
- images_router = APIRouter(prefix="/images")
- @images_router.post("/verify", summary="Ocr")
- async def simple_captcha(file: UploadFile = File(...)):
- start = time.time()
- img_bytes = await file.read()
- ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False)
- if img_bytes.startswith(b'data:image'):
- src = img_bytes.decode()
- result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
- if result:
- # ext = result.groupdict().get("ext")
- img_base64 = result.groupdict().get("data")
- else:
- raise Exception("Do not parse!")
- verify_code = ocr.classification(img_base64)
- else:
- verify_code = ocr.classification(img_bytes)
- return {
- "msg": "success",
- "code": 0,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "code": verify_code
- }
- }
- @images_router.post("/arithmetic", summary="100以内算术")
- async def arithmetic_captcha(file: UploadFile = File(...)):
- start = time.time()
- img_bytes = await file.read()
- onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx"
- charsets_path = "libs/tj_arithmetic/charsets.json"
- ocr = ddddocr.DdddOcr(det=False,
- ocr=False,
- show_ad=False,
- import_onnx_path=onnx_path,
- charsets_path=charsets_path)
- if img_bytes.startswith(b'data:image'):
- src = img_bytes.decode()
- result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
- if result:
- img_base64 = result.groupdict().get("data")
- else:
- raise Exception("Do not parse!")
- verify_code = ocr.classification(img_base64)
- else:
- verify_code = ocr.classification(img_bytes)
- verify_code = verify_code.replace("x", "*")
- return {
- "msg": "success",
- "code": 0,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "code": calculate(verify_code)
- }
- }
- @images_router.post("/verify_det", summary="点验图片")
- async def det_captcha(file: UploadFile = File(..., description="验证码图片")):
- det = ddddocr.DdddOcr(det=True, show_ad=False)
- ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
- img_bytes = await file.read()
- if img_bytes.startswith(b'data:image'):
- src = img_bytes.decode()
- result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
- if result:
- img_base64 = result.groupdict().get("data")
- else:
- raise Exception("Do not parse!")
- poses = det.detection(img_base64=img_base64)
- img_bytes = base64.b64decode(img_base64)
- else:
- poses = det.detection(img_bytes=img_bytes)
- img_byte = io.BytesIO(img_bytes)
- file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
- image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
- strxys = {}
- for box in poses:
- # 对框内文字进行识别
- x1, y1, x2, y2 = box
- part = image[y1:y2, x1:x2]
- img = cv2.imencode(".jpg", part)[1]
- result = ocr.classification(img.tobytes())
- result = re.sub("[a-zA-Z0-9]+", "", result)
- if len(result) > 1:
- result = result[0]
- strxys[result] = [x1, y1, x2, y2]
- result = {
- "msg": "success",
- "code": 0,
- "r": {
- "code": strxys,
- "code_list": poses
- }
- }
- return result
- def user_authentication(form_data: OAuth2PasswordRequestForm):
- user_dict = FAKE_USERS_DB.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="用户名或密码错误")
- async def cjy_postpic_base64(
- pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"),
- jy_code: Union[str, None] = Query(None, description="爬虫代码"),
- file: UploadFile = File(..., description="验证码图片"),
- ):
- start = time.time()
- img_bytes = await file.read()
- base64_str = base64.b64encode(img_bytes)
- discern_result = CJ.postpic_base64(base64_str, int(pic_type))
- err_no = discern_result["err_no"]
- pic_id = discern_result["pic_id"]
- result = {
- "msg": discern_result["err_str"],
- "code": err_no,
- "r": {
- "time": float("{:.2f}".format(time.time() - start)),
- "pic_str": discern_result["pic_str"],
- "pic_id": pic_id,
- "md5": discern_result["md5"],
- "jy_code": jy_code or ""
- }
- }
- return result
- @images_router.post("/discern", summary="超级鹰识别")
- @limiter.limit(setting.LIMITER_VALUE)
- async def discern_complex_image(
- request: Request, # 必须设置,否则 limiter 无法起限制作用
- form_data: OAuth2PasswordRequestForm = Depends(),
- cjy_captcha: dict = Depends(cjy_postpic_base64),
- ):
- user_authentication(form_data)
- return cjy_captcha
- @images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分")
- async def cjy_report_error(
- form_data: OAuth2PasswordRequestForm = Depends(),
- pic_id: str = Query(..., description="验证码图片id"),
- ):
- user_authentication(form_data)
- report_error = CJ.report_error(pic_id)
- return {
- "msg": report_error["err_str"],
- "code": report_error["err_no"],
- "r": {}
- }
- @images_router.get("/reset", summary="重置超级鹰日访问次数")
- async def reset_limiter():
- limiter.reset()
- limiter.reset_warn()
- return {
- "msg": "success",
- "code": 0,
- "r": {}
- }
|