|
@@ -2,30 +2,32 @@ import base64
|
|
|
import io
|
|
|
import re
|
|
|
import time
|
|
|
+from typing import Union
|
|
|
|
|
|
import cv2
|
|
|
import ddddocr
|
|
|
import numpy as np
|
|
|
-from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
|
|
|
+from fastapi import (
|
|
|
+ APIRouter,
|
|
|
+ Depends,
|
|
|
+ HTTPException,
|
|
|
+ UploadFile,
|
|
|
+ File,
|
|
|
+ Query
|
|
|
+)
|
|
|
from fastapi.requests import Request
|
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
|
|
|
|
-import glovar
|
|
|
+import services.limiter as limiter
|
|
|
from chaojiying import CJ
|
|
|
from services.defults import FAKE_USERS_DB
|
|
|
-from services.limiter import LIMITER, limiter_warring, flush_limiter
|
|
|
+from services.limiter import LIMITER
|
|
|
from services.utils import calculate
|
|
|
|
|
|
images_router = APIRouter(prefix="/images")
|
|
|
|
|
|
|
|
|
-def verify_identity(form_data: OAuth2PasswordRequestForm):
|
|
|
- user_dict = FAKE_USERS_DB.get(form_data.username)
|
|
|
- if not user_dict:
|
|
|
- raise HTTPException(status_code=400, detail="用户名或密码错误")
|
|
|
-
|
|
|
-
|
|
|
-@images_router.post("/verify", summary="ocr")
|
|
|
+@images_router.post("/verify", summary="OCR")
|
|
|
async def simple_captcha(file: UploadFile = File(...)):
|
|
|
start = time.time()
|
|
|
img_bytes = await file.read()
|
|
@@ -41,13 +43,13 @@ async def simple_captcha(file: UploadFile = File(...)):
|
|
|
verify_code = ocr.classification(img_base64)
|
|
|
else:
|
|
|
verify_code = ocr.classification(img_bytes)
|
|
|
- spend = "{:.2f}".format(time.time() - start)
|
|
|
+
|
|
|
return {
|
|
|
"msg": "success",
|
|
|
"code": 0,
|
|
|
"r": {
|
|
|
- 'time': float(spend),
|
|
|
- 'code': verify_code
|
|
|
+ "time": float("{:.2f}".format(time.time() - start)),
|
|
|
+ "code": verify_code
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -56,16 +58,17 @@ async def simple_captcha(file: UploadFile = File(...)):
|
|
|
async def arithmetic_captcha(file: UploadFile = File(...)):
|
|
|
start = time.time()
|
|
|
img_bytes = await file.read()
|
|
|
- onnx_path = 'libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx'
|
|
|
- charsets_path = 'libs/tj_arithmetic/charsets.json'
|
|
|
- ocr = ddddocr.DdddOcr(det=False, ocr=False,show_ad=False,
|
|
|
+ onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx"
|
|
|
+ charsets_path = "libs/tj_arithmetic/charsets.json"
|
|
|
+ ocr = ddddocr.DdddOcr(det=False,
|
|
|
+ ocr=False,
|
|
|
+ show_ad=False,
|
|
|
import_onnx_path=onnx_path,
|
|
|
charsets_path=charsets_path)
|
|
|
if img_bytes.startswith(b'data:image'):
|
|
|
src = img_bytes.decode()
|
|
|
result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
|
|
|
if result:
|
|
|
- # ext = result.groupdict().get("ext")
|
|
|
img_base64 = result.groupdict().get("data")
|
|
|
else:
|
|
|
raise Exception("Do not parse!")
|
|
@@ -73,26 +76,22 @@ async def arithmetic_captcha(file: UploadFile = File(...)):
|
|
|
else:
|
|
|
verify_code = ocr.classification(img_bytes)
|
|
|
|
|
|
- verify_code = verify_code.replace('x', '*')
|
|
|
-
|
|
|
- spend = "{:.2f}".format(time.time() - start)
|
|
|
+ verify_code = verify_code.replace("x", "*")
|
|
|
return {
|
|
|
"msg": "success",
|
|
|
"code": 0,
|
|
|
"r": {
|
|
|
- 'time': float(spend),
|
|
|
- 'code': calculate(verify_code)
|
|
|
+ "time": float("{:.2f}".format(time.time() - start)),
|
|
|
+ "code": calculate(verify_code)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
-@images_router.post("/verify_det", summary="点验")
|
|
|
-async def det_captcha(
|
|
|
- image_content: UploadFile = File(..., description='验证码图片')
|
|
|
-):
|
|
|
+@images_router.post("/verify_det", summary="点验图片")
|
|
|
+async def det_captcha(file: UploadFile = File(..., description="验证码图片")):
|
|
|
det = ddddocr.DdddOcr(det=True, show_ad=False)
|
|
|
ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
|
|
|
- img_bytes = await image_content.read()
|
|
|
+ img_bytes = await file.read()
|
|
|
if img_bytes.startswith(b'data:image'):
|
|
|
src = img_bytes.decode()
|
|
|
result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
|
|
@@ -104,6 +103,7 @@ async def det_captcha(
|
|
|
img_bytes = base64.b64decode(img_base64)
|
|
|
else:
|
|
|
poses = det.detection(img_bytes=img_bytes)
|
|
|
+
|
|
|
img_byte = io.BytesIO(img_bytes)
|
|
|
file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
|
|
|
image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
|
|
@@ -112,86 +112,88 @@ async def det_captcha(
|
|
|
# 对框内文字进行识别
|
|
|
x1, y1, x2, y2 = box
|
|
|
part = image[y1:y2, x1:x2]
|
|
|
- img = cv2.imencode('.jpg', part)[1]
|
|
|
+ img = cv2.imencode(".jpg", part)[1]
|
|
|
result = ocr.classification(img.tobytes())
|
|
|
result = re.sub("[a-zA-Z0-9]+", "", result)
|
|
|
if len(result) > 1:
|
|
|
result = result[0]
|
|
|
strxys[result] = [x1, y1, x2, y2]
|
|
|
+
|
|
|
result = {
|
|
|
"msg": "success",
|
|
|
"code": 0,
|
|
|
"r": {
|
|
|
"code": strxys,
|
|
|
- "code_list": poses}
|
|
|
+ "code_list": poses
|
|
|
+ }
|
|
|
}
|
|
|
return result
|
|
|
|
|
|
|
|
|
-@images_router.get("/reset", summary="解除限制")
|
|
|
-async def reset_limiter():
|
|
|
- LIMITER.reset()
|
|
|
- flush_limiter()
|
|
|
- return {
|
|
|
- "msg": "success",
|
|
|
- "code": 0,
|
|
|
- "r": {}
|
|
|
- }
|
|
|
+def user_authentication(form_data: OAuth2PasswordRequestForm):
|
|
|
+ user_dict = FAKE_USERS_DB.get(form_data.username)
|
|
|
+ if not user_dict:
|
|
|
+ raise HTTPException(status_code=400, detail="用户名或密码错误")
|
|
|
|
|
|
|
|
|
async def cjy_postpic_base64(
|
|
|
- pic_type: str = Query(..., min_length=4, max_length=4, description='验证码图片类型'),
|
|
|
- file: UploadFile = File(..., description='验证码图片'),
|
|
|
+ pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"),
|
|
|
+ jy_code: Union[str, None] = Query(None, description="爬虫代码"),
|
|
|
+ file: UploadFile = File(..., description="验证码图片"),
|
|
|
):
|
|
|
start = time.time()
|
|
|
img_bytes = await file.read()
|
|
|
base64_str = base64.b64encode(img_bytes)
|
|
|
discern_result = CJ.postpic_base64(base64_str, int(pic_type))
|
|
|
- spend = "{:.2f}".format(time.time() - start)
|
|
|
- err_no = discern_result['err_no']
|
|
|
- pic_id = discern_result['pic_id']
|
|
|
+ err_no = discern_result["err_no"]
|
|
|
+ pic_id = discern_result["pic_id"]
|
|
|
result = {
|
|
|
- "msg": discern_result['err_str'],
|
|
|
+ "msg": discern_result["err_str"],
|
|
|
"code": err_no,
|
|
|
"r": {
|
|
|
- 'time': float(spend),
|
|
|
- 'pic_str': discern_result['pic_str'],
|
|
|
- 'pic_id': pic_id,
|
|
|
- 'md5': discern_result['md5'],
|
|
|
+ "time": float("{:.2f}".format(time.time() - start)),
|
|
|
+ "pic_str": discern_result["pic_str"],
|
|
|
+ "pic_id": pic_id,
|
|
|
+ "md5": discern_result["md5"],
|
|
|
+ "jy_code": jy_code or ""
|
|
|
}
|
|
|
}
|
|
|
return result
|
|
|
|
|
|
|
|
|
-@images_router.post("/discern", summary="超级鹰")
|
|
|
-@LIMITER.limit(glovar.lm_value)
|
|
|
+@images_router.post("/discern", summary="超级鹰识别")
|
|
|
+@LIMITER.limit(limiter.LM_VALUE)
|
|
|
async def discern_complex_image(
|
|
|
request: Request,
|
|
|
- pic_type: str,
|
|
|
- file: UploadFile = File(..., description='验证码图片'),
|
|
|
- form_data: OAuth2PasswordRequestForm = Depends()
|
|
|
+ form_data: OAuth2PasswordRequestForm = Depends(),
|
|
|
+ cjy_captcha: dict = Depends(cjy_postpic_base64),
|
|
|
):
|
|
|
- verify_identity(form_data)
|
|
|
- result = await cjy_postpic_base64(pic_type=pic_type, file=file)
|
|
|
- glovar.lm_counter += 1
|
|
|
- limiter_warring()
|
|
|
- return result
|
|
|
+ user_authentication(form_data)
|
|
|
+ limiter.LM_COUNTER += 1
|
|
|
+ limiter.limiter_warring()
|
|
|
+ return cjy_captcha
|
|
|
|
|
|
|
|
|
-@images_router.post("/report_err", summary="上传超级鹰识别错误")
|
|
|
+@images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分")
|
|
|
async def cjy_report_error(
|
|
|
- pic_id: str = Query(
|
|
|
- ...,
|
|
|
- description='验证码图片id',
|
|
|
- ),
|
|
|
- form_data: OAuth2PasswordRequestForm = Depends()
|
|
|
+ form_data: OAuth2PasswordRequestForm = Depends(),
|
|
|
+ pic_id: str = Query(..., description="验证码图片id"),
|
|
|
):
|
|
|
- user_dict = FAKE_USERS_DB.get(form_data.username)
|
|
|
- if not user_dict:
|
|
|
- raise HTTPException(status_code=400, detail="用户名或密码错误")
|
|
|
+ user_authentication(form_data)
|
|
|
report_error = CJ.report_error(pic_id)
|
|
|
return {
|
|
|
- "msg": report_error['err_str'],
|
|
|
- "code": report_error['err_no'],
|
|
|
+ "msg": report_error["err_str"],
|
|
|
+ "code": report_error["err_no"],
|
|
|
+ "r": {}
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+@images_router.get("/reset", summary="重置超级鹰日访问次数")
|
|
|
+async def reset_limiter():
|
|
|
+ LIMITER.reset()
|
|
|
+ limiter.flush_limiter()
|
|
|
+ return {
|
|
|
+ "msg": "success",
|
|
|
+ "code": 0,
|
|
|
"r": {}
|
|
|
}
|