apis.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import base64
  2. import io
  3. import re
  4. import time
  5. from typing import Union
  6. import cv2
  7. import ddddocr
  8. import numpy as np
  9. from fastapi import (
  10. APIRouter,
  11. Depends,
  12. HTTPException,
  13. UploadFile,
  14. File,
  15. Query
  16. )
  17. from fastapi.requests import Request
  18. from fastapi.security import OAuth2PasswordRequestForm
  19. import setting
  20. from services.chaojiying import CJ
  21. from services.defults import FAKE_USERS_DB
  22. from services.limiter import limiter
  23. from services.utils import calculate
  24. from services.zhipu import ZhipuFileExtract
  25. images_router = APIRouter(prefix="/images")
  26. def prepare_image(file):
  27. if file.startswith(b'data:image'):
  28. src = file.decode()
  29. result = re.search("data:image/(?P<ext>.*?);base64,(?P<data>.*)", src, re.DOTALL)
  30. if result:
  31. # ext = result.groupdict().get("ext")
  32. return result.groupdict().get("data") # img_base64
  33. else:
  34. raise TypeError("transform image type failed!")
  35. else:
  36. assert isinstance(file, bytes)
  37. return file # img_bytes
  38. @images_router.post("/verify", summary="Ocr")
  39. async def simple_captcha(file: UploadFile = File(...)):
  40. start = time.time()
  41. img_bytes = await file.read()
  42. ocr = ddddocr.DdddOcr(det=False, ocr=True, show_ad=False)
  43. image = prepare_image(img_bytes)
  44. ret = ocr.classification(image)
  45. return {
  46. "msg": "success",
  47. "code": 0,
  48. "r": {
  49. "time": float("{:.2f}".format(time.time() - start)),
  50. "code": ret
  51. }
  52. }
  53. @images_router.post("/arithmetic", summary="100以内算术")
  54. async def arithmetic_captcha(file: UploadFile = File(...)):
  55. start = time.time()
  56. img_bytes = await file.read()
  57. onnx_path = "libs/tj_arithmetic/tj_project_1.0_23_15000_2023-01-14-10-58-23.onnx"
  58. charsets_path = "libs/tj_arithmetic/charsets.json"
  59. ocr = ddddocr.DdddOcr(det=False,
  60. ocr=False,
  61. show_ad=False,
  62. import_onnx_path=onnx_path,
  63. charsets_path=charsets_path)
  64. image = prepare_image(img_bytes)
  65. ret = image.replace("x", "*")
  66. return {
  67. "msg": "success",
  68. "code": 0,
  69. "r": {
  70. "time": float("{:.2f}".format(time.time() - start)),
  71. "code": calculate(ret)
  72. }
  73. }
  74. @images_router.post("/verify_det", summary="点验图片")
  75. async def det_captcha(file: UploadFile = File(..., description="验证码图片")):
  76. det = ddddocr.DdddOcr(det=True, show_ad=False)
  77. ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
  78. img_bytes = await file.read()
  79. image = prepare_image(img_bytes)
  80. if isinstance(image, bytes):
  81. poses = det.detection(img_bytes=image)
  82. else:
  83. assert isinstance(image, str)
  84. poses = det.detection(img_base64=image)
  85. img_bytes = base64.b64decode(image)
  86. img_byte = io.BytesIO(img_bytes)
  87. file_array = np.frombuffer(img_byte.getbuffer(), np.uint8)
  88. image = cv2.imdecode(file_array, cv2.IMREAD_COLOR)
  89. strxys = {}
  90. for box in poses:
  91. # 对框内文字进行识别
  92. x1, y1, x2, y2 = box
  93. part = image[y1:y2, x1:x2]
  94. img = cv2.imencode(".jpg", part)[1]
  95. result = ocr.classification(img.tobytes())
  96. result = re.sub("[a-zA-Z0-9]+", "", result)
  97. if len(result) > 1:
  98. result = result[0]
  99. strxys[result] = [x1, y1, x2, y2]
  100. result = {
  101. "msg": "success",
  102. "code": 0,
  103. "r": {
  104. "code": strxys,
  105. "code_list": poses
  106. }
  107. }
  108. return result
  109. def user_authentication(form_data: OAuth2PasswordRequestForm):
  110. user_dict = FAKE_USERS_DB.get(form_data.username)
  111. if not user_dict:
  112. raise HTTPException(status_code=400, detail="用户名或密码错误")
  113. async def cjy_postpic_base64(
  114. pic_type: str = Query(..., min_length=4, max_length=4, description="验证码图片类型"),
  115. jy_code: Union[str, None] = Query(None, description="爬虫代码"),
  116. file: UploadFile = File(..., description="验证码图片"),
  117. ):
  118. start = time.time()
  119. img_bytes = await file.read()
  120. base64_str = base64.b64encode(img_bytes)
  121. discern_result = CJ.postpic_base64(base64_str, int(pic_type))
  122. err_no = discern_result["err_no"]
  123. pic_id = discern_result["pic_id"]
  124. result = {
  125. "msg": discern_result["err_str"],
  126. "code": err_no,
  127. "r": {
  128. "time": float("{:.2f}".format(time.time() - start)),
  129. "pic_str": discern_result["pic_str"],
  130. "pic_id": pic_id,
  131. "md5": discern_result["md5"],
  132. "jy_code": jy_code or ""
  133. }
  134. }
  135. return result
  136. @images_router.post("/discern", summary="超级鹰识别")
  137. @limiter.limit(setting.LIMITER_VALUE)
  138. async def discern_complex_image(
  139. request: Request, # 必须设置,否则 limiter 无法起限制作用
  140. form_data: OAuth2PasswordRequestForm = Depends(),
  141. cjy_captcha: dict = Depends(cjy_postpic_base64),
  142. ):
  143. user_authentication(form_data)
  144. return cjy_captcha
  145. @images_router.post("/report_err", summary="回传超级鹰识别错误图片退积分")
  146. async def cjy_report_error(
  147. form_data: OAuth2PasswordRequestForm = Depends(),
  148. pic_id: str = Query(..., description="验证码图片id"),
  149. ):
  150. user_authentication(form_data)
  151. report_error = CJ.report_error(pic_id)
  152. return {
  153. "msg": report_error["err_str"],
  154. "code": report_error["err_no"],
  155. "r": {}
  156. }
  157. @images_router.get("/reset", summary="重置超级鹰日访问次数")
  158. async def reset_limiter():
  159. limiter.reset()
  160. limiter.reset_warn()
  161. return {
  162. "msg": "success",
  163. "code": 0,
  164. "r": {}
  165. }
  166. @images_router.post("/verify_z", summary="Ocr")
  167. async def zhipu_ocr(file: UploadFile = File(...)):
  168. start = time.time()
  169. img_bytes = await file.read()
  170. image = prepare_image(img_bytes)
  171. client = ZhipuFileExtract(
  172. api_key="4abf8f53d1daed90f8a03fc982899418.dyK8ivPgzm3M1Nx5"
  173. )
  174. ret = client.glm_4v_flash(
  175. image,
  176. text='这是一张100以内数学计算图片验证码,图片内容计算结果是什么?你只需要返回图中内容计算结果即可,不要解释,不要返回任何其它内容'
  177. )
  178. return {
  179. "msg": "success",
  180. "code": 0,
  181. "r": {
  182. "time": float("{:.2f}".format(time.time() - start)),
  183. "code": ret
  184. }
  185. }