apis.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import base64
  2. import io
  3. import re
  4. import time
  5. from typing import Union
  6. import cv2
  7. import ddddocr
  8. import numpy as np
  9. from fastapi import (
  10. APIRouter,
  11. Depends,
  12. HTTPException,
  13. UploadFile,
  14. File,
  15. Query
  16. )
  17. from fastapi.requests import Request
  18. from fastapi.security import OAuth2PasswordRequestForm
  19. import setting
  20. from services.chaojiying import CJ
  21. from services.defults import FAKE_USERS_DB
  22. from services.limiter import limiter
  23. from services.utils import calculate
  24. images_router = APIRouter(prefix="/images")
  25. @images_router.post("/verify", summary="Ocr")
  26. async def simple_captcha(file: UploadFile = File(...)):
  27. start = time.time()
  28. img_bytes = await file.read()
  29. ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False)
  30. if img_bytes.startswith(b'data:image'):
  31. src = img_bytes.decode()
  32. result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
  33. if result:
  34. # ext = result.groupdict().get("ext")
  35. img_base64 = result.groupdict().get("data")
  36. else:
  37. raise Exception("Do not parse!")
  38. verify_code = ocr.classification(img_base64)
  39. else:
  40. verify_code = ocr.classification(img_bytes)
  41. return {
  42. "msg": "success",
  43. "code": 0,
  44. "r": {
  45. "time": float("{:.2f}".format(time.time() - start)),
  46. "code": verify_code
  47. }
  48. }
  49. @images_router.post("/arithmetic", summary="100以内算术")
  50. async def arithmetic_captcha(file: UploadFile = File(...)):
  51. start = time.time()
  52. img_bytes = await file.read()
  53. onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx"
  54. charsets_path = "libs/tj_arithmetic/charsets.json"
  55. ocr = ddddocr.DdddOcr(det=False,
  56. ocr=False,
  57. show_ad=False,
  58. import_onnx_path=onnx_path,
  59. charsets_path=charsets_path)
  60. if img_bytes.startswith(b'data:image'):
  61. src = img_bytes.decode()
  62. result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
  63. if result:
  64. img_base64 = result.groupdict().get("data")
  65. else:
  66. raise Exception("Do not parse!")
  67. verify_code = ocr.classification(img_base64)
  68. else:
  69. verify_code = ocr.classification(img_bytes)
  70. verify_code = verify_code.replace("x", "*")
  71. return {
  72. "msg": "success",
  73. "code": 0,
  74. "r": {
  75. "time": float("{:.2f}".format(time.time() - start)),
  76. "code": calculate(verify_code)
  77. }
  78. }
  79. @images_router.post("/verify_det", summary="点验图片")
  80. async def det_captcha(file: UploadFile = File(..., description="验证码图片")):
  81. det = ddddocr.DdddOcr(det=True, show_ad=False)
  82. ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
  83. img_bytes = await file.read()
  84. if img_bytes.startswith(b'data:image'):
  85. src = img_bytes.decode()
  86. result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
  87. if result:
  88. img_base64 = result.groupdict().get("data")
  89. else:
  90. raise Exception("Do not parse!")
  91. poses = det.detection(img_base64=img_base64)
  92. img_bytes = base64.b64decode(img_base64)
  93. else:
  94. poses = det.detection(img_bytes=img_bytes)
  95. img_byte = io.BytesIO(img_bytes)
  96. file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
  97. image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
  98. strxys = {}
  99. for box in poses:
  100. # 对框内文字进行识别
  101. x1, y1, x2, y2 = box
  102. part = image[y1:y2, x1:x2]
  103. img = cv2.imencode(".jpg", part)[1]
  104. result = ocr.classification(img.tobytes())
  105. result = re.sub("[a-zA-Z0-9]+", "", result)
  106. if len(result) > 1:
  107. result = result[0]
  108. strxys[result] = [x1, y1, x2, y2]
  109. result = {
  110. "msg": "success",
  111. "code": 0,
  112. "r": {
  113. "code": strxys,
  114. "code_list": poses
  115. }
  116. }
  117. return result
  118. def user_authentication(form_data: OAuth2PasswordRequestForm):
  119. user_dict = FAKE_USERS_DB.get(form_data.username)
  120. if not user_dict:
  121. raise HTTPException(status_code=400, detail="用户名或密码错误")
  122. async def cjy_postpic_base64(
  123. pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"),
  124. jy_code: Union[str, None] = Query(None, description="爬虫代码"),
  125. file: UploadFile = File(..., description="验证码图片"),
  126. ):
  127. start = time.time()
  128. img_bytes = await file.read()
  129. base64_str = base64.b64encode(img_bytes)
  130. discern_result = CJ.postpic_base64(base64_str, int(pic_type))
  131. err_no = discern_result["err_no"]
  132. pic_id = discern_result["pic_id"]
  133. result = {
  134. "msg": discern_result["err_str"],
  135. "code": err_no,
  136. "r": {
  137. "time": float("{:.2f}".format(time.time() - start)),
  138. "pic_str": discern_result["pic_str"],
  139. "pic_id": pic_id,
  140. "md5": discern_result["md5"],
  141. "jy_code": jy_code or ""
  142. }
  143. }
  144. return result
  145. @images_router.post("/discern", summary="超级鹰识别")
  146. @limiter.limit(setting.LIMITER_VALUE)
  147. async def discern_complex_image(
  148. request: Request, # 必须设置,否则 limiter 无法起限制作用
  149. form_data: OAuth2PasswordRequestForm = Depends(),
  150. cjy_captcha: dict = Depends(cjy_postpic_base64),
  151. ):
  152. user_authentication(form_data)
  153. return cjy_captcha
  154. @images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分")
  155. async def cjy_report_error(
  156. form_data: OAuth2PasswordRequestForm = Depends(),
  157. pic_id: str = Query(..., description="验证码图片id"),
  158. ):
  159. user_authentication(form_data)
  160. report_error = CJ.report_error(pic_id)
  161. return {
  162. "msg": report_error["err_str"],
  163. "code": report_error["err_no"],
  164. "r": {}
  165. }
  166. @images_router.get("/reset", summary="重置超级鹰日访问次数")
  167. async def reset_limiter():
  168. limiter.reset()
  169. limiter.reset_warn()
  170. return {
  171. "msg": "success",
  172. "code": 0,
  173. "r": {}
  174. }