limiter.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2022-06-15
  4. ---------
  5. @summary: 限制器
  6. ---------
  7. @author: Dzr
  8. """
  9. import inspect
  10. from fastapi import FastAPI
  11. from slowapi import Limiter, _rate_limit_exceeded_handler
  12. from slowapi.errors import RateLimitExceeded
  13. import db.redisdb as redis
  14. import setting
  15. from services.robot import send_msg
  16. from services.utils import current_date
  17. LIMITER_DATE = "lm_date" # 当前日期
  18. LIMITER_FORECAST_WARNING = "lm_forecast_warning" # 上限预告
  19. LIMITER_EXCEEDED_WARNING = "lm_exceeded_warning" # 超限告警
  20. PENV = setting.PLATFORM_ENVIRONMENT # 当前运行环境
  21. redis_db = redis.RedisDB() # redis实例
  22. def register_limiter(app: FastAPI):
  23. if not redis_db.exists_key(setting.LIMITER_REDIS_KEY):
  24. # 服务首次启动时初始化参数
  25. init_datas = [
  26. (LIMITER_DATE, current_date()),
  27. (LIMITER_FORECAST_WARNING, 1),
  28. (LIMITER_EXCEEDED_WARNING, 1)
  29. ]
  30. redis_db.hset_batch(setting.LIMITER_REDIS_KEY, init_datas)
  31. app.state.limiter = limiter
  32. app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
  33. class PayCaptchaLimiter(Limiter):
  34. def reset_warn(self):
  35. self.logger.debug("告警重置")
  36. # 开启上限预告
  37. redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_FORECAST_WARNING, 1)
  38. # 开启超限告警
  39. redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 1)
  40. def send_pre_warning(self, item, *identifiers):
  41. table = setting.LIMITER_REDIS_KEY
  42. limit_date = redis_db.hget(table, LIMITER_DATE)
  43. curr_date = current_date()
  44. if curr_date != limit_date:
  45. redis_db.hset(table, LIMITER_DATE, curr_date)
  46. self.reset_warn()
  47. self.reset()
  48. self.limiter.hit(item, *identifiers)
  49. usages_count = self._storage.get(item.key_for(*identifiers))
  50. val = '{:.2f}'.format(usages_count / item.amount * 100)
  51. if float(val) > 80 and int(redis_db.hget(table, LIMITER_FORECAST_WARNING)):
  52. tips = "".join([f"[{PENV}]", f"使用次数已超过{val}%"])
  53. send_msg("超级鹰", item.amount, usages_count, tips)
  54. redis_db.hset(table, LIMITER_FORECAST_WARNING, 0)
  55. def send_finished(self, limit):
  56. max_limit = setting.LIMITER_MAX_LIMIT # 最大访问次数
  57. amount = limit.limit.amount
  58. self.logger.debug(f"今日接口调用次数 {amount} 已达上限!")
  59. if int(redis_db.hget(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING)):
  60. tips = "".join([f"[{PENV}]", "今日接口调用次数已达上限!\n 继续使用请点击"])
  61. send_msg("超级鹰", max_limit, amount, tips, allow_reset=True)
  62. # 关闭超限告警
  63. redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 0)
  64. def __evaluate_limits(self, request, endpoint: str, limits) -> None:
  65. failed_limit = None
  66. limit_for_header = None
  67. for lim in limits:
  68. limit_scope = lim.scope or endpoint
  69. if lim.is_exempt:
  70. continue
  71. if lim.methods is not None and request.method.lower() not in lim.methods:
  72. continue
  73. if lim.per_method:
  74. limit_scope += ":%s" % request.method
  75. if "request" in inspect.signature(lim.key_func).parameters.keys():
  76. limit_key = lim.key_func(request)
  77. else:
  78. limit_key = lim.key_func()
  79. args = [limit_key, limit_scope]
  80. if all(args):
  81. if self._key_prefix:
  82. args = [self._key_prefix] + args
  83. if not limit_for_header or lim.limit < limit_for_header[0]:
  84. limit_for_header = (lim.limit, args)
  85. self.send_pre_warning(lim.limit, *args) # 发送预警&自动重置
  86. if not self.limiter.hit(lim.limit, *args):
  87. self.logger.warning(
  88. "ratelimit %s (%s) exceeded at endpoint: %s",
  89. lim.limit,
  90. limit_key,
  91. limit_scope,
  92. )
  93. failed_limit = lim
  94. limit_for_header = (lim.limit, args)
  95. break
  96. else:
  97. self.logger.error(
  98. "Skipping limit: %s. Empty value found in parameters.",
  99. lim.limit
  100. )
  101. continue
  102. request.state.view_rate_limit = limit_for_header
  103. if failed_limit:
  104. self.send_finished(failed_limit) # 发送重置消息
  105. raise RateLimitExceeded(failed_limit)
  106. # 覆盖父类私有方法(不建议尝试做法,此处覆盖仅为了插入告警)
  107. _Limiter__evaluate_limits = __evaluate_limits
  108. def get_usage_user_key(request):
  109. if "X_USER_TOKEN" in request.headers:
  110. key = request.headers["X_USER_TOKEN"]
  111. else:
  112. key = "PyCaptcha"
  113. return key
  114. limiter = PayCaptchaLimiter(
  115. key_func=get_usage_user_key,
  116. storage_uri=setting.LIMITER_STORAGE_URI,
  117. ) # 付费验证码限制器实例对象