Feature Tour¶
This section walks through Responder’s features in depth. Each section explains the concept, shows working code, and explains the design choices behind it. If you’re new to web development, this is a good place to learn how modern web frameworks work under the hood.
Method Filtering¶
HTTP defines several methods (also called verbs) that describe what a client wants to do with a resource. The most common are:
GET— retrieve dataPOST— create something newPUT— replace something entirelyPATCH— update part of somethingDELETE— remove something
By default, a Responder route matches all methods. This is fine for simple
endpoints, but REST APIs typically map different methods to different
operations. Use the methods parameter to restrict a route:
@api.route("/items", methods=["GET"])
def list_items(req, resp):
resp.media = {"items": []}
@api.route("/items", methods=["POST"], check_existing=False)
async def create_item(req, resp):
data = await req.media()
resp.media = {"created": data}
Note the check_existing=False — Responder normally prevents you from
registering two routes with the same path (to catch typos). When you
intentionally want multiple handlers for the same path with different
methods, you need to opt in.
Method-restricted routes get correct HTTP semantics for free:
Requests with an unsupported method receive
405 Method Not Allowed(not 404) with anAllowheader listing what the path supports.OPTIONSrequests are answered automatically with theAllowheader.HEADis accepted whereverGETis.
Returning Values¶
Handlers normally communicate by mutating resp, but they can also just
return a value:
@api.route("/users")
def list_users(req, resp):
return [{"name": "alice"}] # same as resp.media = [...]
@api.route("/hello")
def hello(req, resp):
return "hello, world!" # same as resp.text = "..."
A dict or list becomes resp.media, a str becomes
resp.text, and bytes become resp.content. A Pydantic model or a
dataclass instance also becomes resp.media — serialized natively across
JSON, YAML, and MessagePack, so a trailing .model_dump() is optional.
Returning None (the implicit default) leaves the response exactly as you
set it, so existing handlers are unaffected. Use whichever style reads
better — for quick JSON endpoints, returning the data directly is hard to
beat.
To set a status code, or status and headers, return a Flask-style tuple —
body, status or body, status, headers:
@api.route("/items/{id:int}")
def get_item(req, resp, *, id):
item = lookup(id)
if item is None:
return {"error": "not found"}, 404
return item, 200, {"X-Source": "cache"}
Routes are also forgiving about trailing slashes: a request to /users/
when only /users is registered (or vice versa) receives a 307
redirect to the canonical path, preserving the method and query string.
Pass redirect_slashes=False to API() for strict matching.
Class-Based Views¶
Function-based views are great for simple endpoints, but sometimes you want to group related HTTP methods together into a single resource. This is where class-based views come in — a pattern popularized by Falcon.
Responder dispatches to the appropriate method handler based on the HTTP method:
@api.route("/{greeting}")
class GreetingResource:
def on_get(self, req, resp, *, greeting):
resp.text = f"{greeting}, world!"
def on_post(self, req, resp, *, greeting):
resp.media = {"received": greeting}
def on_request(self, req, resp, *, greeting):
"""Called on EVERY request, before the method-specific handler."""
resp.headers["X-Greeting"] = greeting
The on_request method is called for all HTTP methods, much like
middleware scoped to a single route. Method-specific handlers (on_get,
on_post, on_put, on_delete, etc.) are called after.
No inheritance required — just define a class with the right method names.
This is simpler than Django’s View classes and more Pythonic than
framework-specific base classes.
Everything the type-driven request pipeline offers function views works on
class-based view methods too: a required Pydantic-typed body parameter on a
write-method handler receives the parsed, validated body (an invalid body
returns 422 before the handler runs), and Query()/Header()/
Cookie()/Path()/Form() markers resolve exactly as they do on
functions:
from pydantic import BaseModel
from responder import Query
class ItemIn(BaseModel):
name: str
price: float
@api.route("/items")
class ItemResource:
def on_get(self, req, resp, *, limit: int = Query(10)):
resp.media = fetch_items(limit)
def on_post(self, req, resp, *, item: ItemIn):
resp.media = create_item(item)
When on_request and a method handler both declare the same body model,
the body is parsed and validated once and the same instance is injected into
both.
Class-based views also answer OPTIONS automatically with 200 and an
Allow header listing the implemented methods — define on_options
(or a catch-all on_request) to take over. A request for an unimplemented
method still gets a 405 whose Allow header advertises exactly the
methods that are actually served.
Lifespan Events¶
Real applications need to set up resources when they start (database connection pools, ML models, caches) and tear them down when they stop. This is called the application lifespan.
The modern approach is the context manager pattern, where startup and shutdown are two halves of the same block:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
# Startup — runs before the first request
print("connecting to database...")
yield
# Shutdown — runs after the server stops
print("closing connections...")
api = responder.API(lifespan=lifespan)
Everything before yield runs at startup. Everything after runs at
shutdown. If startup fails, the server won’t start. If shutdown raises,
it’s logged but the server still exits.
The traditional event decorator style also works:
@api.on_event("startup")
async def startup():
print("starting up")
@api.on_event("shutdown")
async def shutdown():
print("shutting down")
The context manager is preferred for new code — it keeps related startup and shutdown logic together and makes resource cleanup more explicit.
The two styles compose: with lifespan= set, on_event("startup")
handlers run after the context manager is entered, and
on_event("shutdown") handlers run before it exits.
For values that belong to the application as a whole, use api.state —
a free-form namespace reachable from any handler via req.api.state:
@api.on_event("startup")
async def connect():
api.state.db = await create_pool()
@api.route("/users")
async def users(req, resp):
resp.media = await req.api.state.db.fetch_users()
(For resources with teardown, app-scoped dependencies below are usually the better fit.)
Dependency Injection¶
Views often need shared resources — a database session, a config object, the current user. Rather than reaching for globals or recomputing them in every handler, register them as dependencies and declare them as view parameters:
@api.dependency()
async def db():
session = await create_session()
yield session
await session.close()
@api.route("/users/{id:int}")
async def get_user(req, resp, *, id, db):
resp.media = await db.fetch_user(id)
Any view parameter (beyond req and resp) whose name matches a
registered dependency is injected automatically. Path parameters take
precedence over dependencies of the same name.
For one-off dependencies that belong to a single route, use Depends
instead of registering a global name:
from responder import Depends
def current_user(req):
return decode_user(req.headers.get("Authorization"))
@api.route("/me")
def me(req, resp, *, user=Depends(current_user)):
resp.media = {"user": user}
Depends providers follow the same lifecycle rules as registered
dependencies, including generator teardown.
Depends also nests: a provider’s own parameters can use Depends, so
dependencies compose into chains without registering each one by name:
def get_db():
...
def current_user(db=Depends(get_db)): # a provider depending on a provider
return load_user(db)
@api.route("/me")
def me(req, resp, *, user=Depends(current_user)):
resp.media = {"user": user}
Each provider resolves once per request and shares the same cache, and cycles are detected and reported.
When a dependency is only a guard or setup step, attach it to the route instead of adding an unused handler parameter:
@api.route("/private", dependencies=[Depends(current_user)])
def private(req, resp):
resp.media = {"ok": True}
Providers can be:
Plain functions (sync or async) — the return value is injected.
Generators (sync or async) — the yielded value is injected, and the code after
yieldruns as teardown once the response is sent (for WebSocket routes, when the connection closes). This is perfect for database sessions and other resources that need cleanup. Teardown runs even if the handler raised.
To make a dependency request-aware, give the provider a parameter named
req (or request, or one annotated responder.Request) — it
receives the current request:
@api.dependency()
def current_user(req):
return decode_token(req.headers.get("Authorization", ""))
Dependencies compose. A provider can depend on other providers simply by naming them as parameters; Responder resolves the whole graph for you, memoizes each provider so it runs at most once per request, and tears everything down in reverse order:
@api.dependency()
def config():
return load_config()
@api.dependency()
async def db(config): # depends on `config`
session = await create_session(config.db_url)
yield session
await session.close()
@api.route("/users/{id:int}")
async def get_user(req, resp, *, id, db):
resp.media = await db.fetch_user(id)
Because resolution is memoized across the whole request, a shared provider
(or one asked for by both on_request and on_get in a class-based
view) runs a single time. Cyclic dependencies are detected and raise
DependencyCycleError.
The names req, request, resp, response, ws, and
websocket are reserved and can’t be used as dependency names. When
resolution goes wrong, Responder raises a catchable DependencyError —
specifically DependencyCycleError, DependencyScopeError, or
DependencyResolutionError. To register under a different name, pass it
explicitly with @api.dependency(name="db") or call
api.add_dependency("db", provider).
For resources that should live as long as the application — connection
pools, ML models, expensive clients — use the "app" scope. The provider
runs once, on first use, and generator teardown is deferred until the
application shuts down:
@api.dependency(scope="app")
async def pool():
pool = await create_pool()
yield pool
await pool.close() # runs at shutdown
App-scoped providers may compose with other app-scoped providers, but they can’t depend on the request or on request-scoped providers — they outlive any single request. A per-request database session layered on an app-scoped connection pool is the canonical pattern; see Using SQLAlchemy for a complete example.
WebSocket handlers participate too: declare path parameters and
dependencies by name after the ws argument, and they’re injected the
same way (a provider receives the socket via a ws/websocket (or
req/request) parameter or a WebSocket annotation):
@api.route("/ws/{room}", websocket=True)
async def chat(ws, *, room, hub):
await ws.accept()
await hub.join(room, ws)
Handlers that only take ws keep working unchanged — names are injected
only when declared.
Serving Files¶
Web applications often need to serve files — downloads, reports, images.
Responder makes this simple with resp.file(), which reads a file from
disk and sets the Content-Type header automatically using Python’s
mimetypes module:
@api.route("/download")
def download(req, resp):
resp.file("reports/annual.pdf")
You can override the content type if the automatic detection isn’t right:
@api.route("/image")
def image(req, resp):
resp.file("photos/cat.jpg", content_type="image/jpeg")
For large files, use resp.stream_file() to avoid loading the entire
file into memory. This streams the file in chunks:
@api.route("/export")
def export(req, resp):
resp.stream_file("data/export.csv")
Both resp.file() and resp.stream_file() understand HTTP range
requests: clients sending Range: bytes=... receive 206 Partial
Content with the requested slice. Multiple ranges are answered as
multipart/byteranges. This is what makes video seeking and resumable
downloads work — no extra code needed.
To prompt the browser to download rather than display, use
resp.download(), which streams the file (resumably) and sets
Content-Disposition:
@api.route("/export")
def export(req, resp):
resp.download("reports/annual.pdf", filename="Annual Report.pdf")
Warning
When the path comes from user input — a URL segment, a query parameter —
pass root= to jail file access to a directory. resp.file,
resp.stream_file, and resp.download resolve the path under
root and return 404 on any ../ or symlink that escapes it:
@api.route("/files/{name}")
def serve(req, resp, *, name):
resp.file(name, root="/srv/public") # cannot escape /srv/public
Large uploads work the same way in reverse — iterate over the request
body in chunks instead of buffering it with await req.content:
@api.route("/upload", methods=["POST"])
async def upload(req, resp):
async with await anyio.open_file("incoming.bin", "wb") as f:
async for chunk in req.stream():
await f.write(chunk)
For ordinary multipart uploads, a typed File marker gives you Starlette’s
UploadFile plus a Responder convenience method for saving it:
from responder import File, UploadFile
@api.route("/avatar", methods=["POST"])
async def avatar(req, resp, *, image: UploadFile = File(...)):
path = await image.save("/srv/uploads/avatar.bin", create_parents=True)
resp.media = {"saved": str(path)}
Conditional Requests¶
HTTP caching saves bandwidth and server time: clients remember a validator
for the responses they’ve seen, and the server answers 304 Not Modified
— no body — when nothing changed. Set resp.etag or
resp.last_modified and Responder handles the comparison automatically:
@api.route("/report")
def report(req, resp):
resp.etag = compute_version_hash()
resp.text = expensive_render() # skipped clients get a 304
When the request’s If-None-Match header matches the ETag (or
If-Modified-Since is at or after last_modified), the client
receives 304 Not Modified with empty body. resp.last_modified
accepts a datetime or a preformatted HTTP-date string:
from datetime import datetime, timezone
@api.route("/feed")
def feed(req, resp):
resp.last_modified = datetime(2026, 1, 15, tzinfo=timezone.utc)
resp.media = load_feed()
Per RFC 7232, If-None-Match takes precedence when both validators are
present, weak ETags (W/"...") compare by their core value, and 304
handling applies only to GET and HEAD.
The same validators also guard writes against lost updates. On
state-changing methods (anything but GET/HEAD), when resp.etag
or resp.last_modified is set, Responder evaluates the request’s
If-Match, If-Unmodified-Since, and If-None-Match preconditions
(RFC 9110 §13) and answers 412 Precondition Failed — carrying the
current validator in the response headers — when they don’t hold:
@api.route("/items/{id}", methods=["PUT"])
async def update_item(req, resp, *, id):
item = load(id)
resp.etag = item.version_hash # the resource's current validator
apply_changes(item, await req.media())
resp.media = {"updated": True}
A client that sends If-Match: "<stale-tag>" receives a 412 instead
of the success response. Per RFC 9110, If-Match uses strong
comparison (a weak W/"..." ETag never matches it), If-Match: *
succeeds whenever a validator is set, and If-Unmodified-Since is
consulted only when If-Match is absent. As with 304 handling, each
precondition is evaluated only when its validator is set (resp.etag
for the ETag preconditions, resp.last_modified for the date one), and
only for responses that would otherwise succeed (2xx) — a handler that
already set a 404 keeps its 404.
Note
Preconditions are checked when the response is sent, after the
handler has run. To skip the state change itself (not just the success
response), compare the precondition in the handler before mutating —
e.g. check req.headers.get("If-Match") against the current tag and
return early with resp.status_code = 412.
Don’t want to manage validators yourself? Turn on automatic ETags and
every GET response gets a content-hash tag, with 304 handling for
free (note: the body is still rendered server-side; you save bandwidth,
not compute):
api = responder.API(auto_etag=True)
An explicitly set resp.etag always wins over the automatic one.
To control how long clients cache, set Cache-Control with the helper —
underscores become hyphens, True renders a bare directive:
resp.cache_control(public=True, max_age=3600)
# Cache-Control: public, max-age=3600
For common REST responses, the response object has small helpers that set the status, headers, body, and media type together:
@api.post("/items")
def create_item(req, resp):
item = save_item()
resp.created({"id": item.id}, location=f"/items/{item.id}")
@api.delete("/items/{id:int}")
def delete_item(req, resp, *, id):
delete_item_by_id(id)
resp.no_content()
@api.post("/items")
def reject_duplicate(req, resp):
resp.problem(409, "Item already exists", type="https://example.com/conflict")
After-Response Tasks¶
Work that shouldn’t delay the response — sending emails, recording analytics, cache warming — can be deferred until after the client has the bytes:
@api.route("/signup", methods=["POST"])
async def signup(req, resp):
resp.media = {"ok": True}
resp.background(send_welcome_email, "user@example.com")
Sync functions run in a thread pool; async functions run on the event
loop. Multiple tasks run in the order scheduled. (For fire-and-forget work
from anywhere — not tied to a response — use api.background instead.)
Custom Error Handling¶
In production, you don’t want your users to see raw Python tracebacks. Responder lets you register custom handlers for specific exception types, so you can return clean, structured error responses:
@api.exception_handler(ValueError)
async def handle_value_error(req, resp, exc):
resp.status_code = 400
resp.media = {"error": str(exc)}
Now, any route that raises a ValueError will return a clean JSON
response with a 400 status code instead of a generic 500 error page.
This is a common pattern in API development — you define your own exception classes for different error conditions, register handlers for each, and your API always returns consistent, machine-readable error responses.
To raise an HTTP error from anywhere — a handler, a hook, a dependency —
call abort() instead of importing Starlette’s exceptions:
from responder import abort
@api.route("/admin")
def admin(req, resp):
if not req.session.get("is_admin"):
abort(403, detail="Forbidden")
abort() halts the handler immediately and renders a content-negotiated
error — unlike setting resp.status_code = 403, which doesn’t stop
execution.
For richer errors, raise responder.Problem — a raisable RFC 9457
problem-details exception carrying type, title, instance, and
arbitrary extension members into the rendered application/problem+json
payload. Because it works from anywhere an exception propagates (views,
hooks, dependencies), it makes typed problem catalogs possible without
hand-building responses in every handler:
from responder import Problem
@api.route("/quota")
def quota(req, resp):
raise Problem(
409,
"You have used all 100 requests for today.",
title="Quota Exceeded",
type="https://api.example.com/errors/quota-exceeded",
balance=0,
)
The payload flows through the same machinery as framework errors —
problem_handler enrichment, request IDs, and the app’s JSON encoder —
and instance defaults to the request path per the RFC recommendation.
Passing any of these members to abort() raises a Problem for you:
abort(409, type="...", balance=0).
Handlers can also be registered programmatically with
api.add_exception_handler(exc_or_status, handler) (the
@api.exception_handler decorator delegates to it). It accepts an
exception class or an integer status code, so you can catch a 404
directly. Registering against Exception (or 500) installs a
catch-all for unhandled server errors — though under debug=True the
traceback page is shown instead:
async def not_found(req, resp, exc):
resp.status_code = 404
resp.media = {"error": "nothing here"}
api.add_exception_handler(404, not_found)
Framework-generated errors use an RFC 9457-style envelope by default. Errors
such as 404, 405, validation failures, response-model validation failures, and
request timeouts use application/problem+json with type, title,
status, and detail fields. Validation errors also include errors.
Pass problem_details=False when creating the app to keep the legacy
content-negotiated error format, where JSON clients receive bodies like
{"error": "Not Found"} and browsers receive plain text.
Before-Request Hooks¶
Sometimes you need to run the same code before every request — authentication checks, request logging, adding common headers, or setting up per-request state. Before-request hooks let you do this without duplicating code in every route:
@api.route(before_request=True)
def add_headers(req, resp):
resp.headers["X-API-Version"] = "3.2"
Short-circuiting is the really powerful part. If your hook sets
resp.status_code, the route handler is skipped entirely and the
response is sent immediately. This is the pattern for authentication:
@api.route(before_request=True)
def auth_check(req, resp):
if "Authorization" not in req.headers:
resp.status_code = 401
resp.media = {"error": "unauthorized"}
If the Authorization header is missing, the client gets a 401 response
and the actual route handler never runs. This is cleaner than adding
auth checks to every individual route.
When the hook belongs to just one endpoint, attach it directly to that route:
def require_admin(req, resp):
if not req.session.get("is_admin"):
resp.status_code = 403
resp.media = {"error": "forbidden"}
@api.route("/admin", before=require_admin)
def admin(req, resp):
resp.media = {"ok": True}
After-Request Hooks¶
The complement to before-request hooks. After-request hooks run after the route handler completes but before the response is sent. They’re useful for logging, adding response headers, or any post-processing:
@api.after_request()
def log_response(req, resp):
print(f"{req.method} {req.full_url} -> {resp.status_code}")
@api.after_request()
async def add_timing(req, resp):
resp.headers["X-Served-By"] = "responder"
The parentheses are optional — the bare @api.after_request works too,
as does the bare @api.before_request.
Route-local after hooks use after= and run before global after hooks:
def audit(req, resp):
resp.headers["X-Audited"] = "1"
@api.route("/reports", after=audit)
def reports(req, resp):
resp.media = []
WebSocket Support¶
HTTP is a request-response protocol — the client asks, the server answers. But some applications need real-time, bidirectional communication: chat apps, live dashboards, multiplayer games, collaborative editors.
WebSockets solve this by upgrading an HTTP connection into a persistent, full-duplex channel where both sides can send messages at any time:
@api.route("/ws", websocket=True)
async def websocket(ws):
await ws.accept()
while True:
name = await ws.receive_text()
await ws.send_text(f"Hello {name}!")
await ws.close()
You can send and receive in multiple formats:
send_text/receive_text— plain text stringssend_json/receive_json— JSON objects (auto-serialized)send_bytes/receive_bytes— raw binary data
WebSocket routes are marked with websocket=True in the route decorator.
They receive a ws object instead of req and resp.
Server-Sent Events (SSE)¶
SSE is a simpler alternative to WebSockets for one-way real-time communication — the server pushes events to the client, but the client can’t send messages back. This is perfect for live feeds, progress bars, notification streams, and AI response streaming.
Unlike WebSockets, SSE works over plain HTTP, is automatically reconnected by the browser, and doesn’t require any special client-side libraries:
@api.route("/events")
async def events(req, resp):
@resp.sse
async def stream():
for i in range(10):
yield {"data": f"message {i}"}
On the client side, you consume SSE events with JavaScript’s built-in
EventSource API:
const source = new EventSource("/events");
source.onmessage = (event) => {
console.log(event.data);
};
Each yielded value can be a string (treated as data) or a dict with the
standard SSE fields. A data value that is a dict or list is
JSON-encoded automatically — handy for structured events:
yield {"event": "update", "data": {"progress": 42}, "id": "1"}
yield "simple string message"
yield {"comment": "keepalive"} # an SSE comment line
For long-lived streams behind proxies, pass heartbeat= (seconds) to emit a
keepalive comment during idle periods so the connection isn’t dropped. The
response also sets X-Accel-Buffering: no so events flush immediately:
@resp.sse(heartbeat=15)
async def stream():
...
When the browser reconnects it sends the id of the last event it saw; read it
with req.last_event_id to resume:
@api.route("/events")
async def events(req, resp):
resume_from = req.last_event_id
@resp.sse(heartbeat=15)
async def stream():
async for item in feed(after=resume_from):
yield {"data": item.payload, "id": item.id}
GraphQL¶
GraphQL is a query language for APIs that lets clients request exactly the data they need — no more, no less. Instead of multiple REST endpoints, you define a schema and let clients query it.
Responder includes built-in GraphQL support via
Graphene. Install it with the
graphql extra:
$ uv pip install 'responder[graphql]'
Then set up a full GraphQL endpoint with a single method call:
import graphene
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f"Hello {name}"
api.graphql("/graphql", schema=graphene.Schema(query=Query))
Visiting /graphql in a browser renders the
GraphiQL interactive IDE, where
you can explore your schema, write queries, and see results in real-time.
Programmatic clients can POST JSON queries to the same endpoint.
You can access the Responder request and response objects in your resolvers
through info.context["request"] and info.context["response"].
For production, lock the endpoint down: turn off the in-browser IDE, reject introspection queries, and cap query nesting depth to blunt denial-of-service queries:
api.graphql(
"/graphql", schema=schema,
graphiql=api.debug, introspection=api.debug, max_depth=10,
)
OpenAPI Documentation¶
OpenAPI (formerly Swagger) is the industry standard for describing REST APIs. An OpenAPI specification lets you auto-generate interactive documentation, client libraries, and validation logic.
Responder generates OpenAPI specs from your code:
api = responder.API(
title="Pet Store",
version="1.0",
openapi="3.0.2",
docs_route="/docs",
)
This gives you:
An OpenAPI schema at
/schema.ymlInteractive Swagger UI documentation at
/docs
OpenAPI 3.1 is supported — pass openapi="3.1.0". The schema is served
as YAML by default; clients sending Accept: application/json get JSON,
and an openapi_route ending in .json (e.g. "/schema.json")
serves JSON always.
Path parameters are documented automatically from your route patterns:
/pets/{id:int} produces a required integer path parameter in the spec,
with the OpenAPI-style template path (/pets/{id}).
Every route appears automatically. Responder builds the spec from each
route’s methods, path parameters, body and response models, and any
Query/Header/Cookie markers (see Pydantic Validation) — so a
route shows up with its parameters and schemas even without a line of
annotation:
@api.route("/health")
def health(req, resp):
resp.media = {"status": "ok"} # documented as GET /health -> 200
Validating routes (anything with a request body or typed parameters) also
get an automatic 422 in the spec. To hide a route, pass
include_in_schema=False; the internal schema, docs, static, and metrics
endpoints are excluded for you:
@api.route("/internal", include_in_schema=False)
def internal(req, resp):
resp.text = "private"
Beyond that baseline, three tools let you enrich and override the generated operations.
Pydantic models — the recommended approach. Add a required Pydantic-typed
handler parameter for the request body and either a Pydantic return annotation
or response_model= for the response. Responder generates the schema and
validates at runtime: invalid bodies get a 422 with detailed errors, and
responses are serialized through the model (extra fields stripped, types
enforced):
from pydantic import BaseModel
class PetIn(BaseModel):
name: str
age: int = 0
class PetOut(BaseModel):
id: int
name: str
age: int
@api.route("/pets", methods=["POST"])
async def create_pet(req, resp, *, pet: PetIn) -> PetOut:
return PetOut(id=1, name=pet.name, age=pet.age)
See Pydantic Validation for how these typed signatures behave at runtime.
Decorator metadata — for operation-level OpenAPI details that should stay
near the route, pass metadata directly to the route decorator. responses and
examples are deep-merged with generated response schemas and framework error
responses:
@api.get(
"/pets/{id:int}",
response_model=PetOut,
tags=["pets"],
summary="Fetch a pet",
responses={404: "Pet not found"},
examples={
"found": {
"summary": "Existing pet",
"value": {"id": 1, "name": "Fido", "age": 4},
}
},
openapi_extra={"x-codeSamples": []},
)
def get_pet(req, resp, *, id):
resp.media = {"id": id, "name": "Fido", "age": 4}
Routes whose success status isn’t 200 can declare it with
status_code=. It does double duty: resp.status_code is pre-seeded
with it before the handler runs (so the handler only sets the body), and the
generated OpenAPI operation documents the success response under that status
instead of 200. Declaring status_code=204 documents no response
body. An explicit resp.status_code = ... in the handler still wins:
@api.route("/pets", methods=["POST"], status_code=201)
async def create_pet(req, resp, *, pet: PetIn):
resp.media = {"id": 1, "name": pet.name, "age": pet.age} # -> 201
YAML docstrings — for fine-grained control, embed OpenAPI YAML in the docstring; it is deep-merged on top of the auto-generated operation, so you override only what you mention:
@api.route("/pets")
def list_pets(req, resp):
"""A list of pets.
---
get:
description: Get all pets
responses:
200:
description: A list of pets
"""
resp.media = [{"name": "Fido"}]
Marshmallow schemas — if you’re already using marshmallow:
from marshmallow import Schema, fields
@api.schema("Pet")
class PetSchema(Schema):
name = fields.Str()
All three approaches can be mixed in the same API. You can choose from
multiple documentation themes: swagger_ui (default), redoc,
rapidoc, or elements.
Route Groups¶
As your application grows, you’ll want to organize routes logically. Route groups let you share a URL prefix across related endpoints — a common pattern for API versioning:
v1 = api.group("/v1")
@v1.route("/users")
def list_users(req, resp):
resp.media = []
@v1.route("/users/{user_id:int}")
def get_user(req, resp, *, user_id):
resp.media = {"id": user_id}
v2 = api.group("/v2")
@v2.route("/users")
def list_users_v2(req, resp):
resp.media = {"users": [], "total": 0}
This keeps your code organized without affecting the routing logic.
Before-request hooks registered on a group only run for paths under the group’s prefix — handy for guarding a whole API version with one check:
@v1.before_request()
def require_key(req, resp):
if "X-Api-Key" not in req.headers:
resp.status_code = 401
resp.media = {"error": "missing API key"}
Groups register on a live api instance. To declare routes in separate
modules — without an API at all — use the standalone
responder.Router and api.include_router(); see Composing Apps with Routers.
Mounting Other Apps¶
Responder can mount any WSGI or ASGI application at a subroute. This is incredibly useful for gradual migrations — you can run Flask and Responder side by side, moving routes over one at a time:
from flask import Flask
flask_app = Flask(__name__)
@flask_app.route("/")
def hello():
return "Hello from Flask!"
api.mount("/flask", flask_app)
Requests to /flask/ will be handled by Flask. Everything else goes
through Responder. Both WSGI and ASGI apps are supported — Responder
wraps WSGI apps in an ASGI adapter automatically.
You can also mount marimo notebooks as interactive dashboards within your API:
import marimo
server = (
marimo.create_asgi_app()
.with_app(path="/notebooks", root="./notebooks/dashboard.py")
.with_app(path="/notebooks/analysis", root="./notebooks/analysis.py")
)
api.mount("", server.build())
Notebooks are served at /notebooks/ and /notebooks/analysis,
with full interactivity — reactive cells, widgets, plots, and all.
Static Files¶
Most web applications serve static assets — CSS stylesheets, JavaScript
files, images, fonts. When a static/ directory exists next to your app,
Responder serves it automatically — no configuration needed:
api = responder.API()
Place your assets in the static/ directory and they’ll be served at
/static/style.css, /static/app.js, etc. Use static_dir and
static_route to serve a different directory or URL prefix.
The directory is never created for you: the implicit default static/
is simply skipped if it doesn’t exist, while a static_dir you pass
explicitly must exist — a missing one raises FileNotFoundError at
construction, so create the directory before pointing Responder at it.
Pass static_dir=None to disable static file serving entirely.
For single-page applications (React, Vue, Angular), you can serve
index.html as the default response for all unmatched routes:
api.add_route("/", static=True)
CORS¶
CORS (Cross-
Origin Resource Sharing) is a security mechanism that controls which
websites can make requests to your API. Browsers enforce this — if your
API is at api.example.com and your frontend is at app.example.com,
the browser will block requests unless your API explicitly allows it.
Enable CORS and configure which origins are allowed:
api = responder.API(cors=True, cors_params={
"allow_origins": ["https://app.example.com"],
"allow_methods": ["GET", "POST"],
"allow_headers": ["*"],
"allow_credentials": True,
"max_age": 600,
})
The default policy is restrictive — you must explicitly allow each origin.
Using ["*"] for allow_origins permits any website to call your API,
which is fine for public APIs but not for private ones.
HSTS¶
HSTS
(HTTP Strict Transport Security) tells browsers to always use HTTPS when
communicating with your server. Once a browser sees the HSTS header, it
will refuse to connect over plain HTTP, even if the user types http://
in the address bar:
api = responder.API(enable_hsts=True)
This redirects HTTP requests to HTTPS and sends a
Strict-Transport-Security header on responses. For a custom max-age
or to enable preloading, install the middleware directly:
from responder.middleware import HSTSMiddleware
api.add_middleware(HSTSMiddleware, max_age=63072000, preload=True)
Security Headers¶
Opt in to common security headers on every response —
X-Content-Type-Options: nosniff, X-Frame-Options: DENY, and
Referrer-Policy: strict-origin-when-cross-origin:
api = responder.API(security_headers=True)
Add a Content-Security-Policy or Permissions-Policy, or override any header, by passing options through:
api = responder.API(security_headers={
"content_security_policy": "default-src 'self'",
"headers": {"X-Frame-Options": "SAMEORIGIN"},
})
A header a handler sets itself is always left untouched.
Trusted Hosts¶
The Host header in an HTTP request tells the server which domain name
the client used. Attackers can forge this header to trick your application
into generating URLs to malicious domains (a class of attack called Host
header injection).
Restrict which hostnames your application accepts:
api = responder.API(allowed_hosts=["example.com", "*.example.com"])
Requests with unrecognized hosts get a 400 Bad Request. Wildcard
patterns are supported. By default, all hostnames are allowed.
Authentication¶
responder.ext.auth provides Bearer, Basic, and API-key schemes. Each one
is a callable that pulls the credential from the request, verifies it, and
returns the principal your verify callback produced — or raises 401
with the right WWW-Authenticate challenge. The most direct form is
auth= on a route:
from responder.ext.auth import BearerAuth
auth = BearerAuth(verify=lambda token: users.get(token))
@api.get("/me", auth=auth)
async def me(req, resp, *, user):
resp.media = {"user": user}
Responder enforces the scheme, registers the OpenAPI security scheme when
OpenAPI is enabled, stores the principal on req.state.user /
req.state.auth, and injects it into user, principal, or auth
parameters. The older explicit form still works when you want to separate
runtime dependency wiring from documentation:
auth.register(api)
api.add_dependency("user", auth)
@api.get("/me", security=["bearerAuth"])
async def me(req, resp, *, user):
resp.media = {"user": user}
verify may be sync or async; return a truthy principal to accept or a falsy
value to reject. For static secrets, pass them directly and the scheme compares
them in constant time:
BearerAuth(tokens=["s3cret"])
APIKeyAuth(keys=["abc123"], name="X-API-Key") # or location="query"
BasicAuth(credentials={"alice": "password"})
Calling register() (or api.add_security_scheme()) makes the scheme
appear in the OpenAPI document, so the interactive docs grow an Authorize
button. security= on a route marks which operations require it; pass
default=True to add_security_scheme to require a scheme everywhere.
Request ID¶
In distributed systems, tracing a single request across multiple services is essential for debugging. Request IDs are unique identifiers attached to each request — if something goes wrong, you can search your logs for that ID and find every related event.
Responder can auto-generate request IDs. If the client sends an
X-Request-ID header (common in microservice architectures), it’s
forwarded. Otherwise, a new UUID is generated:
api = responder.API(request_id=True)
The ID appears in the X-Request-ID response header.
An inbound ID is honored only when it looks like a request ID — at most 128
characters of letters, digits, ., _, and -. Anything else (an
oversized header, control characters) is discarded and a fresh UUID minted
instead, so a client can’t bloat your responses or log lines.
enable_logging=True applies exactly the same policy and mints the same
UUID format, so switching access logging on or off never changes the IDs
your log pipeline parses.
Request Size Limits¶
Unbounded request bodies are an easy denial-of-service vector. Cap them
application-wide and oversized uploads get 413 automatically —
whether the body is read with await req.content, req.media(), or
streamed:
api = responder.API(max_request_size=10 * 1024 * 1024) # 10 MB
The check fails fast on the Content-Length header when present, and
enforces the limit cumulatively for chunked uploads.
Request Timeouts¶
A handler stuck on a slow database or unresponsive upstream shouldn’t hold
the client forever. Set an application-wide budget and overruns are
answered with 504 Gateway Timeout:
api = responder.API(request_timeout=30) # seconds
Dependency teardowns still run when a request times out. One caveat: a synchronous handler running in the thread pool can’t be interrupted — the client gets the 504 on time, but the thread runs to completion in the background.
WebSocket handlers have a matching guard. Pass ws_idle_timeout to close a
connection that waits too long for the next inbound message (close code
1001):
api = responder.API(ws_idle_timeout=30) # seconds
The deadline resets on every message received, so it bounds idle time between
messages rather than the total connection lifetime — a continuously active
client is never closed. It defaults to None (no timeout).
Rate Limiting¶
Rate limiting prevents individual clients from overwhelming your API with too many requests. It’s essential for public APIs, and good practice even for internal ones.
Responder includes a built-in rate limiter (a sliding window in memory; a fixed window on the Redis backends):
from responder.ext.ratelimit import RateLimiter
limiter = RateLimiter(requests=100, period=60) # 100 req/min
limiter.install(api)
When the limit is exceeded, clients receive a 429 Too Many Requests
response with a Retry-After header. Every response includes
X-RateLimit-Limit, X-RateLimit-Remaining, and X-RateLimit-Reset
(seconds until the window resets) headers so clients can pace themselves.
The rate limiter is per-client, keyed by IP address by default. To key by
something else — an API key, an authenticated user id — pass key=, a
callable receiving the request:
limiter = RateLimiter(
requests=100, period=60,
key=lambda req: req.headers.get("x-api-key", "anonymous"),
)
If the key function returns None (or an empty string), the request
falls back to the IP-based key.
By default, counts live in process memory. The in-memory store tracks at most
max_keys distinct clients (100k by default), evicting the least-recently
seen beyond that, so a client rotating source IPs can’t grow memory without
bound. For multi-process or multi-host deployments, plug in the Redis backend so
all workers share one budget:
from responder.ext.ratelimit import RateLimiter, RedisBackend
limiter = RateLimiter(
requests=100, period=60,
backend=RedisBackend(url="redis://localhost:6379/0"),
)
In async apps, reach for AsyncRedisBackend instead — it talks to Redis
without a thread-pool hop. It’s async-only, so drive it with
limiter.install(api) (or await limiter.acheck(req, resp)) rather than
a sync route:
from responder.ext.ratelimit import RateLimiter, AsyncRedisBackend
limiter = RateLimiter(
requests=100, period=60,
backend=AsyncRedisBackend(url="redis://localhost:6379/0"),
)
limiter.install(api)
A word on outages: when the backend errors out (say, Redis is unreachable),
the limiter answers 503 Service Unavailable by default — nothing slips
past the limit, but rate-limited routes go down with the backend. Pass
fail_open=True to make the opposite trade-off: requests are let through
unmetered, with a warning logged, until the backend recovers. Choose
fail_open=True when availability matters more than strict enforcement
(most public APIs); keep the default when the limiter is a security control
(login attempts, expensive mutations).
Any object with a hit(key, max_requests, period) method returning
(allowed, remaining) — or (allowed, remaining, reset_after) to
populate X-RateLimit-Reset — works as a backend (ahit for the async
variant), so custom stores are easy to write.
To rate-limit a single route instead of the whole API, apply
limit() beneath @api.route.
Give each route its own RateLimiter so budgets stay independent:
expensive_limiter = RateLimiter(requests=5, period=60)
@api.route("/reports")
@expensive_limiter.limit
async def generate_report(req, resp):
...
Metrics¶
Production services need visibility into traffic and latency. Responder ships a zero-dependency metrics endpoint in Prometheus text format:
api = responder.API(metrics_route="/metrics")
Every request is recorded as a counter
(responder_requests_total{method,path,status}), a latency histogram
(responder_request_duration_seconds), and an in-flight gauge
(responder_requests_in_flight — requests currently being handled, the
series capacity dashboards want). Labels use the route pattern
(/users/{id}), not the raw path, so cardinality stays bounded; requests
matching no route are labelled unmatched. Point Prometheus, Grafana
Alloy, or any compatible scraper at the endpoint and you have dashboards.
The histogram’s bucket bounds default to 5ms–10s
(responder.ext.metrics.BUCKETS). If your latency profile is
different — 30-second report endpoints, sub-millisecond cache hits — pass
your own ascending bounds via metrics_buckets= (or construct a
MetricsCollector with buckets=
directly):
api = responder.API(
metrics_route="/metrics",
metrics_buckets=(0.1, 0.5, 1.0, 5.0, 30.0),
)
Health Checks¶
Orchestrators (Kubernetes, load balancers) want a readiness endpoint that reflects whether the app’s dependencies are actually reachable. Register checks and Responder aggregates them:
api = responder.API(health_route="/health")
api.add_health_check("db", lambda: database.ping())
api.add_health_check("cache", check_redis)
A check passes unless it returns False or raises. The endpoint returns
200 with {"status": "ok", "checks": {...}} when all pass, and 503
otherwise — with each check’s status (and the error detail for any that
raised). Checks may be sync or async, and the route is excluded from the
OpenAPI schema. (add_health_check adds the route at /health on first
use if you didn’t set health_route.)
Structured Logging¶
Production applications need structured, searchable logs. Responder includes built-in logging that automatically attaches request context — request ID, HTTP method, path, and client IP — to every log message emitted during request handling:
api = responder.API(enable_logging=True)
This gives you:
Access logging with timing for every request:
2026-03-24 12:00:00 [INFO] responder.access — GET /users → 200 (1.2ms)
A logger on the API instance — use
api.loganywhere in your routes. Request context (ID, method, path, client IP) is attached automatically:@api.route("/users/{user_id:int}") def get_user(req, resp, *, user_id): api.log.info("fetching user %d", user_id) # => [INFO] responder.app -- fetching user 42 [GET /users/42] [req:a1b2c3] [client:10.0.0.1] resp.media = {"id": user_id}
Request IDs generated automatically (or forwarded from the
X-Request-IDheader) and included in responses.
The logging uses Python’s standard logging module, so it works with
any handler — files, syslog, JSON formatters, Datadog, Sentry, whatever
you already use.
For additional loggers (e.g. in helper modules), use get_logger:
from responder.ext.logging import get_logger
logger = get_logger("myapp.db")
You can also access the current request context directly:
from responder.ext.logging import RequestContext
@api.route("/debug")
def debug(req, resp):
resp.media = {
"request_id": RequestContext.get_request_id(),
"client_ip": RequestContext.get_client_ip(),
}
When enable_logging=True is set, it supersedes request_id=True
— the logging middleware handles request IDs itself, so you don’t get
duplicate headers.
For debugging route control flow, pass trace_dispatch=True. Responder emits
debug logs for the documented dispatch stages: before hooks, auth,
dependencies, handler execution, and after hooks:
api = responder.API(enable_logging=True, trace_dispatch=True)
Pydantic Validation¶
Pydantic models integrate directly with
Responder’s routing. Add a required Pydantic-typed body parameter to validate
incoming data and set response_model to control the shape of outgoing data:
from pydantic import BaseModel
class ItemIn(BaseModel):
name: str
price: float
class ItemOut(BaseModel):
id: int
name: str
price: float
@api.route("/items", methods=["POST"], response_model=ItemOut)
async def create_item(req, resp, *, item: ItemIn):
resp.media = {"id": 1, **item.model_dump()}
When a write-method handler has a required Pydantic-typed body parameter:
Valid requests are parsed, validated, and injected as a model instance.
Invalid requests get an automatic
422 Unprocessable Entityresponse with detailed error messages — you don’t write any validation code.
When response_model is set:
The response is serialized through the model before being sent
Extra fields are stripped automatically
Type coercion happens at the boundary
For individual query parameters, headers, and cookies, declare them as
keyword-only arguments with a marker default. Responder reads the value,
coerces it to the annotated type, and injects it — returning 422 if a
required value is missing or won’t coerce:
from responder import Query, Header
@api.route("/search")
def search(req, resp, *,
q: str = Query(...),
limit: int = Query(10),
tags: list[str] = Query([]),
user_agent: str = Header("unknown")):
resp.media = {"q": q, "limit": limit, "tags": tags}
Query(...)(an Ellipsis) marks the value required;Query(10)makes it optional with that default.The annotation drives coercion:
"5"becomes5, and a sequence type likelist[str]collects repeated keys (?tags=a&tags=b).Headerlooks up the header named after the parameter, converting underscores to dashes (user_agent→user-agent); passalias=for an explicit name.Cookiereads cookies, andPathre-validates or renames a path segment:from responder import Path @api.route("/users/{uid:int}") def get_user(req, resp, *, user_id: int = Path(..., alias="uid")): resp.media = {"id": user_id}
All four markers are exported from the top level
(from responder import Query, Header, Cookie, Path) and feed the
generated OpenAPI parameters.
To validate the whole query string as a single model instead, use
params_model — values are coerced, defaults apply, repeated keys map to
list fields, invalid queries get a 422, and the parameters appear in
your OpenAPI spec:
class SearchParams(BaseModel):
q: str
limit: int = 10
@api.route("/search", params_model=SearchParams)
async def search(req, resp):
params = req.state.validated_params
resp.media = await find(params.q, limit=params.limit)
The body and the response validate from type hints too. On
POST/PUT/PATCH/DELETE, a keyword-only parameter annotated
with a Pydantic model (and no default) receives the parsed, validated body —
and a Pydantic return annotation becomes the response model:
@api.route("/items", methods=["POST"])
async def create_item(req, resp, *, item: ItemIn) -> ItemOut:
return ItemOut(id=1, name=item.name, price=item.price)
An invalid or non-object body returns 422 before your handler runs. When
the handler sets resp.media to a dict or model, the -> ItemOut return
annotation validates and coerces it and strips undeclared fields; if the
payload violates the contract it fails closed (a 500 in production, or
re-raises under debug=True) rather than leaking a malformed response.
Opt out with @api.route(..., response_model=False).
response_model= also accepts generic types — response_model=list[ItemOut]
validates and serializes a list response (and emits an array schema), and a
union like ItemOut | ErrorOut emits a oneOf. A bare -> list[ItemOut]
return annotation still appears in the schema but, unlike an explicit
response_model=, is not validated at runtime (so loose data keeps working).
Note
Response-model validation runs only when resp.media is a dict or a
Pydantic model (for a single model) — a raw ORM object isn’t auto-validated,
so wrap it with ItemOut.model_validate(obj).
This is the recommended way to build validated REST APIs with Responder. See the Building a REST API for a complete walkthrough.
Pagination¶
responder.ext.pagination provides a generic Page envelope and a
paginate helper for list endpoints. Pair them with the typed Query
markers and a Page[Model] response model:
from responder import Query
from responder.ext.pagination import Page, paginate
@api.get("/items", response_model=Page[Item])
def list_items(req, resp, *,
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100)):
resp.media = paginate(db.all(), page=page, size=size)
The response is an envelope with items, total, page, size, and
pages, and OpenAPI documents it as an inline object referencing your element
model. paginate slices an in-memory collection by default; when you page in
the database yourself, pass the already-sliced rows plus the overall
total=:
rows = db.query(limit=size, offset=(page - 1) * size)
resp.media = paginate(rows, page=page, size=size, total=db.count())
Many REST consumers navigate by response headers instead of the body
envelope — the GitHub-style RFC 8288 Link header plus
X-Total-Count. Call set_pagination_headers to emit both:
from responder.ext.pagination import paginate, set_pagination_headers
@api.get("/items", response_model=Page[Item])
def list_items(req, resp, *,
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100)):
result = paginate(db.all(), page=page, size=size)
set_pagination_headers(req, resp, result)
resp.media = result
The Link header carries rel="first"/"prev"/"next"/"last"
URLs built from the request’s own URL — every other query parameter
(filters, sort specs) is preserved; only page and size are
rewritten. prev is omitted on the first page and next on the last.
Sorting and Filtering¶
responder.ext.query rounds out list endpoints with sort_items and
filter_items — in-memory helpers (dicts or objects, no ORM coupling) that
pair with the typed markers and paginate:
from responder.ext.query import filter_items, sort_items
@api.get("/items", response_model=Page[Item])
def list_items(req, resp, *,
status: str = Query(None),
sort: str = Query("name"),
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100)):
rows = filter_items(db.all(), {"status": status})
rows = sort_items(rows, sort, allowed={"name", "created_at"})
resp.media = paginate(rows, page=page, size=size)
filter_items applies field == value equality and skips entries whose
value is None (so optional markers pass straight through). sort_items
reads a name,-created spec (- = descending, multiple keys allowed);
always pass allowed= for a client-supplied sort so users can’t order by
arbitrary attributes — an out-of-list field returns 400.
Content Negotiation¶
Responder automatically negotiates the response format based on the
client’s Accept header. Set resp.media to a Python object and
the right thing happens:
Accept: application/json(default) → JSONAccept: application/yaml→ YAML (the legacyapplication/x-yamlis accepted too)Accept: application/x-msgpack→ MessagePack
This means a single endpoint serves multiple formats without any conditional logic in your code:
@api.route("/data")
def data(req, resp):
resp.media = {"key": "value"}
Clients get the format they ask for:
$ curl http://localhost:5042/data
{"key": "value"}
$ curl -H "Accept: application/yaml" http://localhost:5042/data
key: value
When the Accept header ranks several supported formats, Responder
serves the one the client prefers — the highest q-value wins, per
RFC 9110 §12.5.1 — rather than the first format registered:
$ curl -H "Accept: application/yaml;q=0.9, application/json;q=0.1" \
http://localhost:5042/data
key: value
Without an Accept header, or when everything ties (e.g. */*),
JSON remains the default. To apply the same ranking yourself — say, to
choose between representations you render by hand — use
req.preferred_media_type(), which returns the candidate the client
prefers (ties keep your order) or None when it accepts none of them:
@api.route("/export")
def export(req, resp):
preferred = req.preferred_media_type(["application/json", "text/csv"])
if preferred == "text/csv":
resp.text = render_csv()
resp.mimetype = "text/csv"
else:
resp.media = render_dict()
Faster JSON with orjson¶
When orjson is installed, Responder transparently uses it to encode and decode JSON bodies — typically 3-10x faster than the standard library, which matters for large payloads that would otherwise serialize on the event loop. Install it via the extra:
$ pip install "responder[orjson]"
No code changes are needed: resp.media, await req.media(), and a
custom API(encoder=...) hook all keep working exactly as before. The
legacy API(json_ensure_ascii=True) mode always uses the standard
library, since orjson emits UTF-8 only.
Output is byte-identical to the stdlib encoder except for whitespace
(orjson emits compact ,/: separators) and one corner: float
nan/inf values serialize as null instead of the non-standard
NaN/Infinity literals. Payloads orjson cannot handle (such as
integers beyond 64 bits) automatically fall back to the standard library,
so nothing that serialized before stops working.
MessagePack¶
MessagePack is a binary serialization format that’s more compact and faster to parse than JSON. It’s useful for high-throughput APIs, IoT devices, and anywhere bandwidth matters.
Responder supports MessagePack alongside JSON and YAML:
# Decode a MessagePack request body
data = await req.media("msgpack")
# Respond with MessagePack
resp.media = {"result": [1, 2, 3]}
Content negotiation works automatically — clients can send
Accept: application/x-msgpack to receive MessagePack responses
instead of JSON. You can also explicitly decode MessagePack request
bodies by passing "msgpack" to req.media().