123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # -*- coding: utf-8 -*-
- """
- Created on 2022-06-15
- ---------
- @summary: 限制器
- ---------
- @author: Dzr
- """
- import inspect
- from fastapi import FastAPI
- from slowapi import Limiter, _rate_limit_exceeded_handler
- from slowapi.errors import RateLimitExceeded
- from slowapi.util import get_remote_address
- import db.redisdb as redis
- import setting
- from services.robot import send_msg
- from services.utils import current_date
- LIMITER_DATE = "lm_date" # 当前日期
- LIMITER_FORECAST_WARNING = "lm_forecast_warning" # 上限预告
- LIMITER_EXCEEDED_WARNING = "lm_exceeded_warning" # 超限告警
- redis_db = redis.RedisDB() # redis实例
- def register_limiter(app: FastAPI):
- if not redis_db.exists_key(setting.LIMITER_REDIS_KEY):
- # 服务首次启动时初始化参数
- init_datas = [
- (LIMITER_DATE, current_date()),
- (LIMITER_FORECAST_WARNING, 1),
- (LIMITER_EXCEEDED_WARNING, 1)
- ]
- redis_db.hset_batch(setting.LIMITER_REDIS_KEY, init_datas)
- app.state.limiter = limiter
- app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
- class PayCaptchaLimiter(Limiter):
- def reset_warn(self):
- self.logger.debug("告警重置")
- # 开启上限预告
- redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_FORECAST_WARNING, 1)
- # 开启超限告警
- redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 1)
- def send_pre_warning(self, item, *identifiers):
- table = setting.LIMITER_REDIS_KEY
- limit_date = redis_db.hget(table, LIMITER_DATE)
- curr_date = current_date()
- if curr_date != limit_date:
- redis_db.hset(table, LIMITER_DATE, curr_date)
- self.reset_warn()
- self.reset()
- self.limiter.hit(item, *identifiers)
- usages_count = self._storage.get(item.key_for(*identifiers))
- val = '{:.2f}'.format(usages_count / item.amount * 100)
- if float(val) > 80 and int(redis_db.hget(table, LIMITER_FORECAST_WARNING)):
- send_msg("超级鹰", item.amount, usages_count, f"使用次数已超过{val}%")
- redis_db.hset(table, LIMITER_FORECAST_WARNING, 0)
- def send_finished(self, limit):
- max_limit = setting.LIMITER_MAX_LIMIT # 最大访问次数
- amount = limit.limit.amount
- self.logger.debug(f"今日接口调用次数 {amount} 已达上限!")
- if int(redis_db.hget(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING)):
- msg = "今日接口调用次数已达上限!\n 继续使用请点击"
- send_msg("超级鹰", max_limit, amount, msg, allow_reset=True)
- # 关闭超限告警
- redis_db.hset(setting.LIMITER_REDIS_KEY, LIMITER_EXCEEDED_WARNING, 0)
- def __evaluate_limits(self, request, endpoint: str, limits) -> None:
- failed_limit = None
- limit_for_header = None
- for lim in limits:
- limit_scope = lim.scope or endpoint
- if lim.is_exempt:
- continue
- if lim.methods is not None and request.method.lower() not in lim.methods:
- continue
- if lim.per_method:
- limit_scope += ":%s" % request.method
- if "request" in inspect.signature(lim.key_func).parameters.keys():
- limit_key = lim.key_func(request)
- else:
- limit_key = lim.key_func()
- args = [limit_key, limit_scope]
- if all(args):
- if self._key_prefix:
- args = [self._key_prefix] + args
- if not limit_for_header or lim.limit < limit_for_header[0]:
- limit_for_header = (lim.limit, args)
- self.send_pre_warning(lim.limit, *args) # 发送预警&自动重置
- if not self.limiter.hit(lim.limit, *args):
- self.logger.warning(
- "ratelimit %s (%s) exceeded at endpoint: %s",
- lim.limit,
- limit_key,
- limit_scope,
- )
- failed_limit = lim
- limit_for_header = (lim.limit, args)
- break
- else:
- self.logger.error(
- "Skipping limit: %s. Empty value found in parameters.",
- lim.limit
- )
- continue
- request.state.view_rate_limit = limit_for_header
- if failed_limit:
- self.send_finished(failed_limit) # 发送重置消息
- raise RateLimitExceeded(failed_limit)
- # 覆盖父类私有方法(不建议尝试做法,此处覆盖仅为了插入告警)
- _Limiter__evaluate_limits = __evaluate_limits
- limiter = PayCaptchaLimiter(
- key_func=get_remote_address,
- storage_uri=setting.LIMITER_STORAGE_URI,
- ) # 付费验证码限制器实例对象
|