# -*- coding: utf-8 -*- """ Created on 2022-06-15 --------- @summary: 限制器 --------- @author: Dzr """ import inspect from fastapi import FastAPI from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded import db.redisdb as redis import setting from services.robot import send_msg from services.utils import current_date LIMITER_DATE = "lm_date" # 当前日期 LIMITER_FORECAST_WARNING = "lm_forecast_warning" # 上限预告 LIMITER_EXCEEDED_WARNING = "lm_exceeded_warning" # 超限告警 PENV = setting.PLATFORM_ENVIRONMENT # 当前运行环境 redis_db = redis.RedisDB() # redis实例 def register_limiter(app: FastAPI): if not redis_db.exists_key(setting.LIMITER_REDIS_KEY): # 服务首次启动时初始化参数 init_datas = [ (LIMITER_DATE, current_date()), (LIMITER_FORECAST_WARNING, 1), (LIMITER_EXCEEDED_WARNING, 1) ] redis_db.hset_batch(setting.LIMITER_REDIS_KEY, init_datas) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) class PayCaptchaLimiter(Limiter): def reset_warn(self): self.logger.debug("告警重置") # 开启上限预告 redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_FORECAST_WARNING, 1) # 开启超限告警 redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 1) def send_pre_warning(self, item, *identifiers): table = setting.LIMITER_REDIS_KEY limit_date = redis_db.hget(table, LIMITER_DATE) curr_date = current_date() if curr_date != limit_date: redis_db.hset(table, LIMITER_DATE, curr_date) self.reset_warn() self.reset() self.limiter.hit(item, *identifiers) usages_count = self._storage.get(item.key_for(*identifiers)) val = '{:.2f}'.format(usages_count / item.amount * 100) if float(val) > 80 and int(redis_db.hget(table, LIMITER_FORECAST_WARNING)): tips = "".join([f"[{PENV}]", f"使用次数已超过{val}%"]) send_msg("超级鹰", item.amount, usages_count, tips) redis_db.hset(table, LIMITER_FORECAST_WARNING, 0) def send_finished(self, limit): max_limit = setting.LIMITER_MAX_LIMIT # 最大访问次数 amount = limit.limit.amount self.logger.debug(f"今日接口调用次数 {amount} 已达上限!") if int(redis_db.hget(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING)): tips = "".join([f"[{PENV}]", "今日接口调用次数已达上限!\n 继续使用请点击"]) send_msg("超级鹰", max_limit, amount, tips, allow_reset=True) # 关闭超限告警 redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 0) def __evaluate_limits(self, request, endpoint: str, limits) -> None: failed_limit = None limit_for_header = None for lim in limits: limit_scope = lim.scope or endpoint if lim.is_exempt: continue if lim.methods is not None and request.method.lower() not in lim.methods: continue if lim.per_method: limit_scope += ":%s" % request.method if "request" in inspect.signature(lim.key_func).parameters.keys(): limit_key = lim.key_func(request) else: limit_key = lim.key_func() args = [limit_key, limit_scope] if all(args): if self._key_prefix: args = [self._key_prefix] + args if not limit_for_header or lim.limit < limit_for_header[0]: limit_for_header = (lim.limit, args) self.send_pre_warning(lim.limit, *args) # 发送预警&自动重置 if not self.limiter.hit(lim.limit, *args): self.logger.warning( "ratelimit %s (%s) exceeded at endpoint: %s", lim.limit, limit_key, limit_scope, ) failed_limit = lim limit_for_header = (lim.limit, args) break else: self.logger.error( "Skipping limit: %s. Empty value found in parameters.", lim.limit ) continue request.state.view_rate_limit = limit_for_header if failed_limit: self.send_finished(failed_limit) # 发送重置消息 raise RateLimitExceeded(failed_limit) # 覆盖父类私有方法(不建议尝试做法,此处覆盖仅为了插入告警) _Limiter__evaluate_limits = __evaluate_limits def get_usage_user_key(request): if "X_USER_TOKEN" in request.headers: key = request.headers["X_USER_TOKEN"] else: key = "PyCaptcha" return key limiter = PayCaptchaLimiter( key_func=get_usage_user_key, storage_uri=setting.LIMITER_STORAGE_URI, ) # 付费验证码限制器实例对象