Source code for responder.ext.ratelimit

"""Simple in-memory rate limiter for Responder."""

import threading
import time
from collections import defaultdict


[docs] class RateLimiter: """Token bucket rate limiter. Usage:: from responder.ext.ratelimit import RateLimiter limiter = RateLimiter(requests=100, period=60) # 100 req/min @api.route(before_request=True) def rate_limit(req, resp): limiter.check(req, resp) Or use the shorthand:: limiter = RateLimiter(requests=100, period=60) limiter.install(api) """ def __init__(self, requests=100, period=60): self.max_requests = requests self.period = period self._buckets: dict[str, list[float]] = defaultdict(list) self._lock = threading.Lock() def _client_key(self, req): client = req.client if client: return client[0] return req.headers.get("X-Forwarded-For", "unknown") def _cleanup(self, key): now = time.time() cutoff = now - self.period self._buckets[key] = [t for t in self._buckets[key] if t > cutoff] if not self._buckets[key]: del self._buckets[key]
[docs] def check(self, req, resp): """Check rate limit. Sets 429 status if exceeded.""" key = self._client_key(req) with self._lock: self._cleanup(key) if len(self._buckets[key]) >= self.max_requests: resp.status_code = 429 resp.media = {"error": "rate limit exceeded"} resp.headers["Retry-After"] = str(self.period) return False self._buckets[key].append(time.time()) remaining = self.max_requests - len(self._buckets[key]) resp.headers["X-RateLimit-Limit"] = str(self.max_requests) resp.headers["X-RateLimit-Remaining"] = str(remaining) return True
[docs] def install(self, api): """Install as a before_request hook on the API.""" @api.route(before_request=True) def _rate_limit(req, resp): self.check(req, resp)