Source code for duck.http.middlewares.security.requestslimit

"""
High-performance rate-limiting middleware using InMemoryCache expiry-based
counters.

This implementation uses a *fixed window* algorithm:

- Each client/IP has a single counter stored in the cache.
- The key expires automatically after `requests_delay` seconds.
- On each request, the counter increments.
- If it exceeds `max_requests`, the request is rejected.

This design provides:
- O(1) operations
- Zero list allocations
- Zero timestamp storage
- Minimal memory footprint
- Very high request throughput
"""
from duck.http.middlewares import BaseMiddleware
from duck.settings import SETTINGS
from duck.shortcuts import simple_response, template_response
from duck.http.response import HttpTooManyRequestsResponse
from duck.utils.caching import InMemoryCache


[docs] class RequestsLimitMiddleware(BaseMiddleware): """ High-speed request limiter using expiry-based counters. Attributes: _clients (InMemoryCache): Cache storing counters per client IP. Keys automatically expire after the configured window duration. requests_delay (float): Duration (in seconds) forming the rate-limit window. max_requests (int): Maximum number of requests allowed within the window. """ # LRU in-memory cache with expiry support _clients = InMemoryCache(maxkeys=2000) # Fixed window settings requests_delay: float = 60 """ Duration in seconds defining the time window for request counting. """ max_requests: int = 500 """ Maximum number of allowed requests within the `requests_delay` window. """ debug_message: str = "RequestsLimitMiddleware: Too many requests"
[docs] @classmethod def _process_request(cls, request): """ Core request-processing logic. Flow: 1. Extract client IP. 2. Fetch current request count from cache. 3. If count is missing -> this is first request in the window. Create count=1 with expiry. 4. If count >= max_requests -> reject. 5. Otherwise increment counter and update expiry. This implementation does not store timestamps and does not scan arrays. It relies fully on cache expiry to define the time window. """ # Extract client IP; if missing, fail open addr = request.client_address if not addr: return cls.request_ok ip = addr[0] # Localize variables for micro-optimizations window = cls.requests_delay limit = cls.max_requests # Retrieve current request count (or None if expired/new) count = cls._clients.get(ip) if count is None: # First request in this window → create counter with expiry cls._clients.set(ip, 1, expiry=window) return cls.request_ok # If limit reached → reject immediately if count >= limit: return cls.request_bad # Increment count and refresh expiry # We set the expiry again so each request resets the 60-second window. # (If fixed windows are desired instead, do NOT refresh expiry here.) cls._clients.set(ip, count + 1, expiry=window) return cls.request_ok
[docs] @classmethod def get_readable_limit(cls) -> str: """ Returns a user-friendly description of the rate limit. Example: "200 requests per 60 seconds" """ if cls.requests_delay == 1: return f"{cls.max_requests} requests per second" return f"{cls.max_requests} requests per {cls.requests_delay} seconds"
[docs] @classmethod def get_error_response(cls, request): """ Creates a 429 Too Many Requests HTTP response. Includes additional debugging information when DEBUG is enabled. """ body = ( "<h4>Too Many Requests!</h4>" f"<p>Rate limit: {cls.get_readable_limit()}.</p>" f"<p>You sent more than {cls.max_requests} requests within " f"{cls.requests_delay} seconds.</p>" ) if SETTINGS["DEBUG"]: return template_response(HttpTooManyRequestsResponse, body=body) return simple_response(HttpTooManyRequestsResponse, body=body)
[docs] @classmethod def process_request(cls, request): """ Framework entry point. Wraps the internal handler and ensures the server always fails open instead of blocking requests due to middleware errors. """ try: return cls._process_request(request) except Exception: raise # Never rate-limit due to internal middleware errors return cls.request_ok