|
@@ -0,0 +1,197 @@
|
|
|
+import base64
|
|
|
+import io
|
|
|
+import re
|
|
|
+import time
|
|
|
+
|
|
|
+import cv2
|
|
|
+import ddddocr
|
|
|
+import numpy as np
|
|
|
+from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
|
|
|
+from fastapi.requests import Request
|
|
|
+from fastapi.security import OAuth2PasswordRequestForm
|
|
|
+
|
|
|
+import glovar
|
|
|
+from chaojiying import CJ
|
|
|
+from services.defults import FAKE_USERS_DB
|
|
|
+from services.limiter import LIMITER, limiter_warring, flush_limiter
|
|
|
+from services.utils import calculate
|
|
|
+
|
|
|
+images_router = APIRouter(prefix="/images")
|
|
|
+
|
|
|
+
|
|
|
+def verify_identity(form_data: OAuth2PasswordRequestForm):
|
|
|
+ user_dict = FAKE_USERS_DB.get(form_data.username)
|
|
|
+ if not user_dict:
|
|
|
+ raise HTTPException(status_code=400, detail="用户名或密码错误")
|
|
|
+
|
|
|
+
|
|
|
+@images_router.post("/verify", summary="ocr")
|
|
|
+async def simple_captcha(file: UploadFile = File(...)):
|
|
|
+ start = time.time()
|
|
|
+ img_bytes = await file.read()
|
|
|
+ ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False)
|
|
|
+ if img_bytes.startswith(b'data:image'):
|
|
|
+ src = img_bytes.decode()
|
|
|
+ result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
|
|
|
+ if result:
|
|
|
+ # ext = result.groupdict().get("ext")
|
|
|
+ img_base64 = result.groupdict().get("data")
|
|
|
+ else:
|
|
|
+ raise Exception("Do not parse!")
|
|
|
+ verify_code = ocr.classification(img_base64)
|
|
|
+ else:
|
|
|
+ verify_code = ocr.classification(img_bytes)
|
|
|
+ spend = "{:.2f}".format(time.time() - start)
|
|
|
+ return {
|
|
|
+ "msg": "success",
|
|
|
+ "code": 0,
|
|
|
+ "r": {
|
|
|
+ 'time': float(spend),
|
|
|
+ 'code': verify_code
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+@images_router.post("/arithmetic", summary="100以内算术")
|
|
|
+async def arithmetic_captcha(file: UploadFile = File(...)):
|
|
|
+ start = time.time()
|
|
|
+ img_bytes = await file.read()
|
|
|
+ onnx_path = 'libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx'
|
|
|
+ charsets_path = 'libs/tj_arithmetic/charsets.json'
|
|
|
+ ocr = ddddocr.DdddOcr(det=False, ocr=False,show_ad=False,
|
|
|
+ import_onnx_path=onnx_path,
|
|
|
+ charsets_path=charsets_path)
|
|
|
+ if img_bytes.startswith(b'data:image'):
|
|
|
+ src = img_bytes.decode()
|
|
|
+ result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
|
|
|
+ if result:
|
|
|
+ # ext = result.groupdict().get("ext")
|
|
|
+ img_base64 = result.groupdict().get("data")
|
|
|
+ else:
|
|
|
+ raise Exception("Do not parse!")
|
|
|
+ verify_code = ocr.classification(img_base64)
|
|
|
+ else:
|
|
|
+ verify_code = ocr.classification(img_bytes)
|
|
|
+
|
|
|
+ verify_code = verify_code.replace('x', '*')
|
|
|
+
|
|
|
+ spend = "{:.2f}".format(time.time() - start)
|
|
|
+ return {
|
|
|
+ "msg": "success",
|
|
|
+ "code": 0,
|
|
|
+ "r": {
|
|
|
+ 'time': float(spend),
|
|
|
+ 'code': calculate(verify_code)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+@images_router.post("/verify_det", summary="点验")
|
|
|
+async def det_captcha(
|
|
|
+ image_content: UploadFile = File(..., description='验证码图片')
|
|
|
+):
|
|
|
+ det = ddddocr.DdddOcr(det=True, show_ad=False)
|
|
|
+ ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
|
|
|
+ img_bytes = await image_content.read()
|
|
|
+ if img_bytes.startswith(b'data:image'):
|
|
|
+ src = img_bytes.decode()
|
|
|
+ result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
|
|
|
+ if result:
|
|
|
+ img_base64 = result.groupdict().get("data")
|
|
|
+ else:
|
|
|
+ raise Exception("Do not parse!")
|
|
|
+ poses = det.detection(img_base64=img_base64)
|
|
|
+ img_bytes = base64.b64decode(img_base64)
|
|
|
+ else:
|
|
|
+ poses = det.detection(img_bytes=img_bytes)
|
|
|
+ img_byte = io.BytesIO(img_bytes)
|
|
|
+ file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
|
|
|
+ image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
|
|
|
+ strxys = {}
|
|
|
+ for box in poses:
|
|
|
+ # 对框内文字进行识别
|
|
|
+ x1, y1, x2, y2 = box
|
|
|
+ part = image[y1:y2, x1:x2]
|
|
|
+ img = cv2.imencode('.jpg', part)[1]
|
|
|
+ result = ocr.classification(img.tobytes())
|
|
|
+ result = re.sub("[a-zA-Z0-9]+", "", result)
|
|
|
+ if len(result) > 1:
|
|
|
+ result = result[0]
|
|
|
+ strxys[result] = [x1, y1, x2, y2]
|
|
|
+ result = {
|
|
|
+ "msg": "success",
|
|
|
+ "code": 0,
|
|
|
+ "r": {
|
|
|
+ "code": strxys,
|
|
|
+ "code_list": poses}
|
|
|
+ }
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@images_router.get("/reset", summary="解除限制")
|
|
|
+async def reset_limiter():
|
|
|
+ LIMITER.reset()
|
|
|
+ flush_limiter()
|
|
|
+ return {
|
|
|
+ "msg": "success",
|
|
|
+ "code": 0,
|
|
|
+ "r": {}
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+async def cjy_postpic_base64(
|
|
|
+ pic_type: str = Query(..., min_length=4, max_length=4, description='验证码图片类型'),
|
|
|
+ file: UploadFile = File(..., description='验证码图片'),
|
|
|
+):
|
|
|
+ start = time.time()
|
|
|
+ img_bytes = await file.read()
|
|
|
+ base64_str = base64.b64encode(img_bytes)
|
|
|
+ discern_result = CJ.postpic_base64(base64_str, int(pic_type))
|
|
|
+ spend = "{:.2f}".format(time.time() - start)
|
|
|
+ err_no = discern_result['err_no']
|
|
|
+ pic_id = discern_result['pic_id']
|
|
|
+ result = {
|
|
|
+ "msg": discern_result['err_str'],
|
|
|
+ "code": err_no,
|
|
|
+ "r": {
|
|
|
+ 'time': float(spend),
|
|
|
+ 'pic_str': discern_result['pic_str'],
|
|
|
+ 'pic_id': pic_id,
|
|
|
+ 'md5': discern_result['md5'],
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@images_router.post("/discern", summary="超级鹰")
|
|
|
+@LIMITER.limit(glovar.lm_value)
|
|
|
+async def discern_complex_image(
|
|
|
+ request: Request,
|
|
|
+ pic_type: str,
|
|
|
+ file: UploadFile = File(..., description='验证码图片'),
|
|
|
+ form_data: OAuth2PasswordRequestForm = Depends()
|
|
|
+):
|
|
|
+ verify_identity(form_data)
|
|
|
+ result = await cjy_postpic_base64(pic_type=pic_type, file=file)
|
|
|
+ glovar.lm_counter += 1
|
|
|
+ limiter_warring()
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@images_router.post("/report_err", summary="上传超级鹰识别错误")
|
|
|
+async def cjy_report_error(
|
|
|
+ pic_id: str = Query(
|
|
|
+ ...,
|
|
|
+ description='验证码图片id',
|
|
|
+ ),
|
|
|
+ form_data: OAuth2PasswordRequestForm = Depends()
|
|
|
+):
|
|
|
+ user_dict = FAKE_USERS_DB.get(form_data.username)
|
|
|
+ if not user_dict:
|
|
|
+ raise HTTPException(status_code=400, detail="用户名或密码错误")
|
|
|
+ report_error = CJ.report_error(pic_id)
|
|
|
+ return {
|
|
|
+ "msg": report_error['err_str'],
|
|
|
+ "code": report_error['err_no'],
|
|
|
+ "r": {}
|
|
|
+ }
|