# lauren — Full Reference for LLMs

A production-grade, metadata-first Python web framework for ASGI services.
This document is a single self-contained reference intended for ingestion
by AI coding assistants. It covers every public API, idiomatic usage, and
common mistakes.

---

## Table of Contents

1. [Installation](#installation)
2. [Minimal Example](#minimal-example)
3. [Application Factory & Lifecycle](#application-factory--lifecycle)
4. [Decorators](#decorators)
5. [Types (Request / Response / State / Headers)](#types-request--response--state--headers)
6. [Extractors](#extractors)
7. [Dependency Injection](#dependency-injection)
8. [Custom Providers](#custom-providers)
9. [Modules](#modules)
10. [Guards & Middleware](#guards--middleware)
11. [Exception Handlers](#exception-handlers)
12. [Auto-Serialization](#auto-serialization)
13. [Error Catalog](#error-catalog)
14. [Testing](#testing)
15. [Strict Decorator Inheritance](#strict-decorator-inheritance)
16. [Custom Extractors (plugin API)](#custom-extractors-plugin-api)
17. [OpenAPI 3.1 Generation](#openapi-31-generation)
18. [Logging](#logging)
19. [Graceful Shutdown & Signals](#graceful-shutdown--signals)
20. [Common Patterns & Anti-Patterns](#common-patterns--anti-patterns)
21. [WebSockets](#websockets)
22. [Server-Sent Events](#server-sent-events-eventstream)
23. [Typed Streaming](#typed-streaming)
24. [Socket.IO Compatibility](#socketio-compatibility)
25. [Companion Packages](#companion-packages)

---

## Installation

```bash
pip install lauren
# For Pydantic-based extractors:
pip install pydantic
```

Python 3.11+ required. `anyio>=4.0` is a required dependency (used to
offload sync handlers to a thread pool). `py.typed` marker ships for
full static-analysis support.

---

## Minimal Example

```python
import asyncio
from pydantic import BaseModel
from lauren import (
    LaurenFactory, controller, get, post, module,
    Path, Json, Response,
)

class CreateUser(BaseModel):
    name: str
    age: int

@controller("/users")
class UserController:
    @get("/{id}")
    async def get_user(self, id: Path[int]) -> dict:
        return {"id": id, "found": True}

    @post("/")
    async def create(self, body: Json[CreateUser]):
        # tuple form: (body, status); lauren builds the 201 Response.
        return body.model_dump(), 201

@module(controllers=[UserController])
class AppModule:
    pass

app = LaurenFactory.create(AppModule)
# `app` is an ASGI callable — use uvicorn/hypercorn to serve.
# startup() is called automatically via the ASGI lifespan protocol.
```

---

## Application Factory & Lifecycle

### `LaurenFactory.create(root_module, *, strict_lifecycle=True, global_middleware=None, max_body_size=1048576, app_state=None)`

Synchronous factory. Returns a `LaurenApp` ready to be served. Executes seven phases:

1. **Module graph construction** — walk `@module(imports=[...])` edges, detect cycles.
2. **Provider collection** — gather every `@injectable` from the graph.
3. **Protocol binding** — resolve `provides=[Protocol]` declarations.
4. **DI graph compilation** — detect circular deps, scope violations, protocol ambiguities.
5. **Router compilation** — walk controllers, compile handler signatures into extraction plans, insert into the radix tree.
6. **Lifecycle order computation** — build the topological `@post_construct` / `@pre_destruct` schedule.
7. **Readiness** — build the `LaurenApp` object; `startup()` is deferred to the ASGI lifespan event (or the first HTTP request for servers without lifespan support).

Failures in phases 1–6 raise a `StartupError` subclass immediately — the app never begins serving traffic with an invalid graph.

### `LaurenApp`

Attributes & methods:

- `app.router: Router` — the compiled radix tree.
- `app.container: DIContainer` — compiled DI container.
- `app.app_state: AppState` — sealed, read-only app-level state.
- `app.routes()` — list of `RouteEntry` objects.
- `app.openapi()` — OpenAPI 3.1 schema dict.
- `await app.startup()` — idempotent; invoked automatically by the ASGI lifespan protocol (uvicorn) or lazily on the first HTTP request. Call explicitly in tests that check `@post_construct` state.
- `await app.shutdown(drain_timeout=10.0)` — runs `@pre_destruct` hooks, awaits in-flight requests.
- `await app(scope, receive, send)` — standard ASGI callable (HTTP + lifespan).

---

## Decorators

### `@injectable(*, scope=Scope.SINGLETON, provides=None, multi=False)`

Marks a class as a DI provider.

```python
from lauren import Scope, injectable

@injectable(scope=Scope.SINGLETON)
class Clock:
    def now(self) -> float: ...

@injectable(scope=Scope.REQUEST)
class DbSession:
    def __init__(self, clock: Clock) -> None: ...
```

- `scope`:
  - `Scope.SINGLETON` — one instance per app (default)
  - `Scope.REQUEST` — one instance per request, shared within that request's handler tree
  - `Scope.TRANSIENT` — new instance every resolution
- `provides=[Protocol, ...]` — register the class as an implementation of one or more Protocols.
- `multi=True` — permit multiple providers for the same token; resolution returns a list.

**Rule:** subclasses of `@injectable` classes are NOT automatically injectable. You must re-decorate explicitly:

```python
@injectable()
class Base: ...

class NotAnInjectable(Base):
    pass  # startup raises MetadataInheritanceError if you try to register this

@injectable()
class Child(Base):  # explicit opt-in
    pass
```

### `@module(*, controllers=None, providers=None, imports=None, exports=None)`

Declares a module boundary (NestJS-style).

```python
@module(
    controllers=[UserController, ProjectController],
    providers=[Clock, DbSession],
    imports=[SharedModule],
    exports=[DbSession],  # must be in providers or imported
)
class AppModule:
    pass
```

- `controllers` — classes to expose via the HTTP router.
- `providers` — classes to register with the DI container.
- `imports` — other `@module` classes; their `exports` become visible here.
- `exports` — subset of providers/imports re-exported to modules that import this one.

Violations raised at startup:
- `CircularModuleError` — an import cycle exists.
- `ModuleExportViolation` — an export is neither declared nor imported.

### `@controller(prefix="", *, tags=None, summary=None, description=None, deprecated=False, security=None)`

Declares a controller class. Automatically makes the class `@injectable(scope=Scope.REQUEST)` so handlers can receive request-scoped deps (sessions, current user, etc.).

```python
@controller("/api/v1/users", tags=["users"])
class UserController:
    def __init__(self, session: DbSession) -> None: ...
```

### Route decorators

- `@get(path="", *, summary=None, description=None, response_model=None, responses=None, deprecated=False, operation_id=None, include_in_schema=True, tags=None)`
- `@post(...)`, `@put(...)`, `@patch(...)`, `@delete(...)`, `@head(...)`, `@options(...)`

Each decorator attaches a `RouteMeta` to the method. Multiple route decorators on the same method register multiple routes.

```python
@controller("/p")
class PingController:
    @get("/ping")
    @get("/health")  # same handler, two routes
    async def ping(self) -> dict:
        return {"ok": True}
```

### Lifecycle hook decorators

```python
@injectable()
class Db:
    @post_construct
    async def connect(self) -> None: ...

    @pre_destruct
    async def disconnect(self) -> None: ...
```

- `@post_construct` — runs after DI construction, in topological order (deps first).
- `@pre_destruct` — runs at shutdown in reverse-topological order, with a per-hook timeout. Failures are collected (shutdown is always best-effort) as `DestructError` / `DestructTimeoutError`.

### Middleware & guard decorators

- `@middleware` — marks a class with a `dispatch(request, call_next)` method as middleware.
- `@use_middleware(*classes)` — attach middleware to a **controller class** OR a **handler method**.
- `@use_guards(*classes)` — attach guards to a **controller class** OR a **handler method**.
- `@set_metadata(key, value)` — attach free-form metadata readable by guards via `ExecutionContext.get_metadata(key, default)`.

Both decoration orders work for class-level `@use_guards` and `@use_middleware`:

```python
# Order A
@use_guards(AuthenticatedGuard)
@controller("/x")
class A: ...

# Order B
@controller("/x")
@use_guards(AuthenticatedGuard)
class B: ...
```

---

## Types (Request / Response / State / Headers)

### `Request`

Properties: `method`, `path`, `url`, `path_params`, `query_params`, `headers`, `cookies`, `client`, `server`, `state`, `app_state`.

Body methods (async): `await request.body() -> bytes`, `await request.text()`, `await request.json()`, `await request.form()`, `request.stream()`.

Introspection: `get_handler_class()`, `get_route_handler_func()`, `get_route_template()`, `get_matched_route()`.

### `Response`

Immutable value object. All `with_*` methods return new instances.

**Factories:**
- `Response.json(data, *, status=200, headers=None)`
- `Response.text(data, *, status=200, headers=None)`
- `Response.html(data, *, status=200, headers=None)`
- `Response.bytes(data, *, status=200, media_type="application/octet-stream", headers=None)`
- `Response.empty(status=204)`, `Response.no_content()`, `Response.created(data=None, *, location=None)`, `Response.accepted(data=None)`
- `Response.redirect(location, *, status=307)`
- `Response.stream(async_iterable, *, status=200, media_type=..., headers=None)`
- `Response.sse(async_iterable_of_events)`

**Builders (return new instance):**
- `with_status(status)`, `with_header(k, v)`, `with_headers({k: v, ...})`, `without_header(k)`, `with_media_type(mt)`, `with_body(bytes|str)`
- `with_cookie(key, value, *, max_age=None, path="/", domain=None, secure=False, http_only=False, same_site=None)`
- `delete_cookie(key, *, path="/")`

### `State` (request-scoped) and `AppState` (sealed after startup)

```python
# Writes
request.state.user_id = 42
request.state.set("scope", "admin")

# Reads (safe)
val = request.state.get_typed("user_id", int)   # None if missing, raises StateTypeError if wrong type
val = request.state.require("user_id", int)     # raises MissingStateError if absent
request.state.has("user_id")                    # bool
```

`AppState` raises `RuntimeError` on writes after `app.startup()` has sealed it.

### `Headers`

Case-insensitive, ordered, multi-value mapping. Use `.getall(name)` to get a list of values (e.g. multiple `Set-Cookie` lines).

---

## Extractors

Extractors convert request data into typed Python values. Built-in:

| Extractor | Source | Reads body |
|---|---|---|
| `Path[T]` | URL path param | no |
| `Query[T]` | query string | no |
| `Header[T]` | request header | no |
| `Cookie[T]` | cookie | no |
| `Json[T]` | JSON body, Pydantic-validated | yes |
| `Form[T]` | form-urlencoded body | yes |
| `Bytes` | raw body bytes | yes |
| `State` | `request.state` | no |
| `Depends[T]` | DI container | no |

Handler parameters whose name matches a path variable are auto-promoted to `Path[...]`:

```python
@get("/users/{id}")
async def show(self, id: int) -> dict:  # `id` is treated as Path[int]
    return {"id": id}
```

### Field descriptors

For validation metadata and OpenAPI hints, use `QueryField` / `HeaderField` / `CookieField`:

```python
from lauren import Query, QueryField

@get("/search")
async def search(
    self,
    q: Query[str],
    page: Query[int] = QueryField(default=1, ge=1, le=100),
    tags: Query[list[str]] = QueryField(default=[]),
    per_page: Query[int] = QueryField(default=25, ge=1, le=200, alias="pp"),
): ...
```

Supported constraints: `ge`, `le`, `gt`, `lt`, `min_length`, `max_length`, `pattern`, `alias`, `default`.

### JSON body with Pydantic

```python
class CreatePost(BaseModel):
    title: str
    body: str

@post("/posts")
async def create(self, body: Json[CreatePost]) -> dict:
    return body.model_dump(), 201
```

Validation errors are raised as `ExtractorError` and mapped to HTTP 422.

---

## Dependency Injection

### `DIContainer`

- `container.register(cls)` — add a provider.
- `container.compile()` — validate the graph (cycles, missing deps, scope violations).
- `await container.resolve(token, *, request_cache=None, framework_values=None)` — resolve an instance.
- `container.get_provider(token)` — get the `Provider` metadata.
- `container.has_provider(token)` — membership test.
- `container.set_singleton(cls, instance)` — manually install a singleton (useful for tests).

### Scopes and their constraints

- `SINGLETON` can depend on `SINGLETON` only (depending on request-scoped raises `DIScopeViolationError` at compile).
- `REQUEST` can depend on `SINGLETON` or `REQUEST`.
- `TRANSIENT` produces a new instance each resolution; can depend on anything.

### Protocols and multi-bindings

```python
from typing import Protocol, runtime_checkable

@runtime_checkable
class EmailSender(Protocol):
    def send(self, to: str, msg: str) -> None: ...

@injectable(provides=[EmailSender])
class SmtpSender:
    def send(self, to, msg): ...

# Consumers request the Protocol:
@injectable()
class Notifier:
    def __init__(self, sender: EmailSender) -> None: ...
```

Two providers for the same token without `multi=True` raise `ProtocolAmbiguityError` at compile. Mark both `multi=True` to receive a `list[EmailSender]` at resolution.

#### Type-correct `list[T]` injection

Multi-bindings can be requested with their natural type at *every* injection site, not just `container.resolve(T)`:

```python
@injectable(provides=[EmailSender], multi=True)
class SmtpSender: ...

@injectable(provides=[EmailSender], multi=True)
class SmsSender: ...

# 1. Constructor injection
@injectable()
class Dispatcher:
    def __init__(self, senders: list[EmailSender]) -> None:
        self._senders = senders

# 2. Class-body field injection
@injectable()
class Aggregator:
    senders: list[EmailSender]

# 3. Handler parameters
@controller("/notify")
class NotifyController:
    @get("/channels")
    async def channels(self, senders: list[EmailSender]) -> dict:
        return {"count": len(senders)}
```

The container recognises `list[T]` (and `typing.List[T]`) as a request for every visible provider of `T` registered with `multi=True`, in registration order. Visibility, request-scope caching, and scope narrowing all apply per element. Asking for `list[T]` when `T` is registered without `multi=True` raises `ProtocolAmbiguityError` at compile time — it forces the user to decide between scalar (`T`) and collection (`list[T]`) intent.

### Optional parameters with defaults

If an `__init__` parameter has a default value, the DI container treats it as optional — the default is used when no provider is registered. This is what lets dataclass-backed configs work naturally:

```python
@injectable(scope=Scope.SINGLETON)
@dataclass
class Settings:
    database_url: str = "sqlite:///:memory:"
    jwt_secret: str = "dev"
```

---

## Modules

```python
@module(providers=[Clock], exports=[Clock])
class SharedModule: ...

@module(
    controllers=[HealthController],
    providers=[Db, UserRepo],
    imports=[SharedModule],
)
class AppModule: ...
```

Resolution rules:
- A provider is visible inside a module if it's declared there, or imported from an `exports`-listed module.
- Imports propagate: `A imports B` + `B imports C` makes C's exports visible to A only if B re-exports them.

---

## Guards & Middleware

### Guards (`GuardProtocol`)

Guards authorize a request by returning `bool` (or raising an `HTTPError`).

```python
from lauren import ExecutionContext

class AdminGuard:
    async def can_activate(self, ctx: ExecutionContext) -> bool:
        if ctx.request.state.get("user", {}).get("role") != "admin":
            return False
        return True
```

Attach with `@use_guards(...)`. Guards declared on a controller class run before guards declared on the individual method.

`@set_metadata("key", value)` lets guards read per-route policy:

```python
@get("/purge")
@use_guards(RoleGuard)
@set_metadata("required_role", "admin")
async def purge(self) -> dict: ...

class RoleGuard:
    async def can_activate(self, ctx):
        required = ctx.get_metadata("required_role", "user")
        return ctx.request.state.get("user", {}).get("role") == required
```

### Middleware (`MiddlewareProtocol`)

```python
@middleware
class RequestId:
    async def dispatch(self, request, call_next):
        import uuid
        request.state.rid = uuid.uuid4().hex
        resp = await call_next(request)
        return resp.with_header("x-request-id", request.state.rid)
```

Registration paths (onion order, outermost first):

1. Global — `LaurenFactory.create(AppModule, global_middleware=[RequestId, AuthMiddleware])`
2. Controller — `@use_middleware(...)` on the controller class
3. Route — `@use_middleware(...)` on a handler method

---

## Auto-Serialization

Handlers can return any of the following — lauren builds the `Response`.
Both `async def` and plain `def` handlers coerce their return values
identically (see *Sync vs Async Handlers* below).

```python
@get("/r1")
async def r1(self) -> dict:
    return {"ok": True}                     # -> JSON, 200

@get("/r2")
async def r2(self) -> str:
    return "hello"                          # -> text/plain, 200

@get("/r3")
async def r3(self) -> None:
    return None                             # -> 204 No Content

@post("/r4")
async def r4(self):
    return {"id": 42}, 201                  # -> JSON, 201 (tuple form)

@post("/r5")
async def r5(self):
    return {"queued": True}, 202, {"x-queue": "default"}  # body, status, headers

@get("/r6")
async def r6(self) -> BaseModel:
    return UserOut(id=1, name="x")          # -> JSON via model_dump(mode="json")

@get("/r7")
async def r7(self) -> list[BaseModel]:
    return [UserOut(...), UserOut(...)]     # -> JSON array of dumped models

@get("/r8")
async def r8(self):
    return Response.html("<h1>hi</h1>")     # raw Response is honored
```

## Sync vs Async Handlers

Handler methods may be `async def` **or** plain `def`:

```python
@controller("/items")
class ItemController:
    @get("/async/{id}")
    async def get_async(self, id: Path[int]) -> dict:
        return await self._repo.find(id)    # awaits async I/O

    @get("/sync/{id}")
    def get_sync(self, id: Path[int]) -> dict:
        return self._repo.find_sync(id)     # blocking call — offloaded to thread pool
```

Sync handlers are **automatically offloaded** to
`anyio.to_thread.run_sync` at dispatch time (the `is_coroutine` flag is
set once at startup via `inspect.iscoroutinefunction`).  The event loop
stays free to serve other requests while the sync function executes.

**Thread-safety rules for sync handlers:**

- `Scope.SINGLETON` services are shared across threads — protect mutable
  state with `threading.Lock`.
- `Scope.REQUEST` and `Scope.TRANSIENT` instances are per-request — safe
  to mutate freely inside one handler.
- Do **not** call asyncio primitives (e.g. `asyncio.Queue.put_nowait`)
  directly from a sync handler.  Use
  `asyncio.get_running_loop().call_soon_threadsafe(fn)` instead.

All binding styles work with sync handlers:

```python
@controller("/demo")
class DemoController:
    @get("/instance")
    def instance_handler(self) -> dict: ...     # sync instance method

    @staticmethod
    @get("/static")
    def static_handler() -> dict: ...           # sync static method

    @classmethod
    @get("/cls")
    def class_handler(cls) -> dict: ...         # sync classmethod
```

The default JSON encoder handles: Pydantic v2 models, enums (`Enum.value`), `datetime` / `date` / `time` (`isoformat()`), `timedelta` (total seconds), `UUID`, `pathlib.PurePath`, `Decimal`, `set` / `frozenset` (as list), `bytes` (UTF-8 decoded), and dataclass instances.

---

## Error Catalog

All exceptions live in `lauren.exceptions` and inherit from `LaurenError`.

**Startup errors (not HTTP-mapped):**

| Class | Meaning |
|---|---|
| `RouterConflictError` | Two routes share the same (method, path) |
| `CircularDependencyError` | DI graph has a cycle |
| `CircularModuleError` | Module imports form a cycle |
| `MissingProviderError` | No provider registered for a token |
| `ProtocolAmbiguityError` | Multiple providers for a non-`multi` token |
| `ModuleExportViolation` | Module exports something it doesn't own or import |
| `LifecycleConfigError` | Misuse of `@post_construct` / `@pre_destruct` |
| `MetadataInheritanceError` | Subclass used a parent's decoration without re-declaring |
| `DuplicateBindingError` | Same class registered twice |
| `UnresolvableParameterError` | Handler/provider param cannot be mapped |
| `DIScopeViolationError` | Singleton depends on request-scoped |
| `MiddlewareConfigError` | Middleware class missing `dispatch` |
| `GuardConfigError` | Guard class missing `can_activate` |
| `OpenAPISchemaError` | OpenAPI generation failed |

**HTTP-mapped errors:**

| Class | Status | `code` |
|---|---|---|
| `ExtractorError` / `ExtractorFieldError` | 422 | `extractor_error` |
| `RouteNotFoundError` | 404 | `route_not_found` |
| `MethodNotAllowedError` | 405 | `method_not_allowed` (includes `allow` list) |
| `RequestBodyTooLarge` | 413 | `request_body_too_large` |
| `UnauthorizedError` | 401 | `unauthorized` |
| `ForbiddenError` | 403 | `forbidden` |
| `MissingStateError` / `StateTypeError` | 500 | `missing_state` / `state_type_error` |

**Lifecycle errors:** `LifecycleError`, `LifecycleViolationError`, `DestructError`, `DestructTimeoutError`, `DrainTimeoutError`.

All HTTP errors serialize as:

```json
{"error": {"code": "...", "message": "...", "detail": { ... }}}
```

To surface a custom domain error with a specific status, subclass `HTTPError`:

```python
from lauren.exceptions import HTTPError

class NotFoundError(HTTPError):
    status_code = 404
    code = "not_found"

# In a handler:
raise NotFoundError("user not found", detail={"id": user_id})
```

---

## Testing

```python
from lauren.testing import TestClient

app = LaurenFactory.create(AppModule)
c = TestClient(app)

r = c.get("/users/42", headers={"Authorization": "Bearer ..."})
assert r.status_code == 200
assert r.json()["id"] == 42
```

`TestClient`:
- `get / post / put / patch / delete / options / head`
- `request(method, url, *, headers=None, json=None, content=None, params=None, cookies=None)`
- All methods return a `TestResponse` with `status_code`, `headers` (list of tuples), `body` (bytes), `text` (str), `json()`, `header(name)`, `headers_all(name)`.
- Works inside `asyncio.run(...)` tests — it auto-switches to a background thread when called from within a running event loop.

---

## Strict Decorator Inheritance

**Rule:** subclassing does not inherit `@controller`, `@module`, `@injectable`, or `@middleware` status. Every class must be decorated explicitly.

```python
@injectable()
class Base: ...

# Starting the app with this class registered raises MetadataInheritanceError:
class Child(Base):
    pass

# Correct:
@injectable()
class Child(Base):
    pass
```

Same rule for `@controller` and `@module`. This avoids surprising behavior when the user subclasses for purely implementation reasons.

Method-level decorators (`@get`, `@post`, etc.) DO propagate through inheritance — that's just Python method resolution, nothing lauren-specific.

`@use_guards` and `@use_middleware` attach to the exact target (class or method) only. A subclass that wants parent guards must re-declare them.

---

## Custom Extractors (plugin API)

Any subclass of `ExtractionMarker` with an async `extract` classmethod can participate in handler signatures.

```python
from lauren import DIContainer
from lauren.extractors import Extraction, ExtractionMarker
from lauren.exceptions import UnauthorizedError
from lauren.types import Request

class CurrentUser(ExtractionMarker):
    source = "app.current_user"  # any unique string

    @classmethod
    async def extract(
        cls,
        request: Request,
        extraction: Extraction,
        *,
        container: DIContainer,
        request_cache: dict[type, object] | None,
    ) -> object:
        uid = request.state.get("user_id")
        if uid is None:
            raise UnauthorizedError("missing auth")
        session = await container.resolve(
            DbSession,
            request_cache=request_cache,
            framework_values={type(request): request},
        )
        user = await session.get(User, uid)
        if user is None:
            raise UnauthorizedError("user vanished")
        return user

# Usage \u2014 declare as a handler parameter annotation:
@get("/me")
async def me(self, user: CurrentUser) -> dict:
    return {"id": user.id, "name": user.name}
```

The extractor receives:
- `request` — the current `Request`
- `extraction` — an `Extraction` with `name`, `default`, `has_default`, etc.
- `container` — the DI container (use to resolve sessions, services)
- `request_cache` — the per-request cache; pass it when resolving request-scoped deps.

Raise any `HTTPError` subclass (`UnauthorizedError`, `ForbiddenError`, ...) to short-circuit with the matching status.

---

## OpenAPI 3.1 Generation

`app.openapi()` returns a schema dict. Route metadata is derived from:

- Controller `@controller(prefix=..., tags=..., summary=..., description=..., deprecated=..., security=...)`
- Handler `@get(..., summary=..., description=..., response_model=..., responses=..., deprecated=..., operation_id=..., include_in_schema=..., tags=...)`

`response_model` must be a Pydantic v2 model (exposes `model_json_schema()`).

```python
class UserOut(BaseModel):
    id: int
    name: str

@get("/users/{id}", response_model=UserOut, operation_id="getUser")
async def show(self, id: Path[int]) -> UserOut: ...
```

Schemas are emitted under `components.schemas` and referenced from the path operation's `responses.200.content["application/json"].schema`.

---

## Logging

lauren ships with a NestJS-style logger. Pass any object implementing the
``lauren.logging.Logger`` protocol to ``LaurenFactory.create(..., logger=...)``.
Four implementations are built in:

* ``ConsoleLogger`` — coloured, human-readable output (default for TTYs).
* ``JsonLogger`` — one JSON object per line for production aggregators.
* ``NullLogger`` — silent; also the factory default if no logger is provided.
* ``InMemoryLogger`` — test helper that captures records in a list.

```python
from lauren import LaurenFactory
from lauren.logging import default_logger, ConsoleLogger, JsonLogger, LogLevel

# Auto-choose based on TTY detection + LAUREN_LOG_FORMAT / LAUREN_LOG_LEVEL
app = LaurenFactory.create(AppModule, logger=default_logger())

# Or pick explicitly:
app = LaurenFactory.create(
    AppModule,
    logger=ConsoleLogger(level="DEBUG", name="MyApp"),
)
app = LaurenFactory.create(
    AppModule,
    logger=JsonLogger(level=LogLevel.INFO),
)
```

Events emitted by the framework:

| Phase | Context | Level |
|---|---|---|
| Factory entry | ``LaurenFactory`` | INFO |
| Phase 1 module graph | ``ModuleGraph`` | VERBOSE |
| Phase 2-4 DI | ``DIContainer`` | VERBOSE |
| Each route registration | ``RouterExplorer`` | INFO |
| Phase 6 lifecycle | ``Lifecycle`` | VERBOSE / INFO |
| App ready | ``LaurenApp`` | INFO |
| Each request (DEBUG level) | ``Request`` | DEBUG / WARN / ERROR |
| Shutdown steps | ``Shutdown`` | INFO / WARN |

Log levels (ascending): ``DEBUG``, ``VERBOSE``, ``INFO``, ``WARN``, ``ERROR``.
Per-request traces fire at ``DEBUG`` for 2xx/3xx, ``WARN`` for 4xx, ``ERROR``
for 5xx, so production runs at ``INFO`` stay quiet unless something is wrong.

Custom loggers: implement the minimal protocol::

    class MyLogger:
        level: LogLevel = LogLevel.INFO
        def log_record(self, record: LogRecord) -> None: ...

or subclass ``lauren.logging._BaseLogger`` to inherit the ``log/debug/warn/
error/verbose`` sugar methods.

---

## Graceful Shutdown & Signals

``LaurenApp.shutdown()`` is the canonical teardown path. It:

1. Flips the ``_running`` flag so no new work is scheduled.
2. Drains in-flight requests (up to ``drain_timeout`` seconds).
3. Runs any ``on_shutdown`` callbacks the user registered (LIFO).
4. Runs ``@pre_destruct`` hooks in reverse topological order.

All four steps are logged through the installed logger. Calls are
idempotent: concurrent or repeated ``shutdown()`` invocations return once
the first shutdown has finished.

```python
app = LaurenFactory.create(AppModule, logger=default_logger())

# Register arbitrary cleanup (decorator form supported):
@app.on_shutdown
async def flush_metrics() -> None:
    await metrics_client.flush()

# Signal integration (POSIX):
from lauren.signals import install_signal_handlers, wait_for_shutdown

async def main():
    event = install_signal_handlers(app, drain_timeout=30)
    # ... run the app (e.g. via uvicorn.run) ...
    await wait_for_shutdown(event)
```

When running under a well-behaved ASGI server (uvicorn, hypercorn), the
server already delivers SIGTERM to the lifespan protocol, so you typically
won't need ``install_signal_handlers`` — it's for standalone scripts and
customised runners.

---

## Common Patterns & Anti-Patterns

### Pattern: per-request DB session

```python
@injectable(scope=Scope.REQUEST)
class DbSession:
    def __init__(self, engine: Engine) -> None:
        self._engine = engine
        self._s = None

    @property
    def s(self):
        if self._s is None:
            self._s = self._engine.session()
        return self._s

    async def aclose(self):                # called automatically by lauren
        if self._s is not None:
            await self._s.close()

@controller("/x")
class C:
    def __init__(self, db: DbSession): ...
```

Lauren's runtime awaits `aclose()` on every request-scoped instance after each request, so DB sessions close deterministically.

### Pattern: extract the user at the handler level

```python
class CurrentUser(ExtractionMarker):
    source = "app.current_user"
    @classmethod
    async def extract(cls, request, extraction, *, container, request_cache):
        ...

@controller("/me")
class MeController:
    @get("/")
    async def me(self, user: CurrentUser) -> dict:
        return {"id": user.id}
```

Better than threading an AuthContext through the controller constructor — the dependency is visible per-route.

### Pattern: return tuple for non-default status

```python
@post("/")
async def create(self, body: Json[CreateUser]):
    user = await self._repo.create(body)
    return user, 201, {"location": f"/users/{user.id}"}
```

### Anti-pattern: subclassing to "inherit" routes

```python
@controller("/a")
class A:
    @get("/")
    async def idx(self): ...

class B(A):  # NO - not a controller, startup raises MetadataInheritanceError
    pass
```

Decorate B explicitly with `@controller(...)` to opt in.

### Anti-pattern: singleton depending on request-scoped

```python
@injectable()  # SINGLETON
class Bad:
    def __init__(self, session: DbSession): ...  # DbSession is REQUEST \u2014 raises DIScopeViolationError
```

Either make `Bad` request-scoped too, or refactor so it receives the session per-method.

### Pattern: sync handler wrapping a blocking library

```python
import httpx as _httpx  # sync client

@controller("/proxy")
class ProxyController:
    @get("/data")
    def fetch(self) -> dict:
        # time.sleep / requests.get / blocking DB calls are all safe here —
        # the sync handler runs in a thread pool, not on the event loop.
        resp = _httpx.get("https://api.example.com/data")
        return resp.json()
```

### Anti-pattern: calling asyncio primitives from a sync handler

```python
import asyncio

_queue: asyncio.Queue = asyncio.Queue()

@controller("/events")
class EventController:
    @post("/")
    def push(self, body: Json[Event]) -> None:
        # ❌ NOT safe — asyncio.Queue is not thread-safe
        _queue.put_nowait(body)

        # ✅ Safe — schedules work back on the event loop
        asyncio.get_running_loop().call_soon_threadsafe(_queue.put_nowait, body)
```

### Anti-pattern: building `Response.json(...)` manually for every handler

```python
# Don't:
@get("/")
async def h(self) -> Response:
    return Response.json({"ok": True})

# Do:
@get("/")
async def h(self) -> dict:
    return {"ok": True}
```

Lauren serializes the return value automatically.

---

## Appendix: Cheat Sheet

| Task | Pattern |
|---|---|
| Register a dependency | `@injectable()` on the class; add to `@module(providers=[...])` |
| Declare a route | `@controller("/p")` on class, `@get("/x")` on method |
| Extract URL param | `id: Path[int]` |
| Extract query param | `q: Query[str] = QueryField(default="", ge=..., le=...)` |
| Extract JSON body | `body: Json[MyPydanticModel]` |
| Raise 404 in handler | `raise NotFoundError("...")` (subclass `HTTPError`) |
| Add a guard to one route | `@use_guards(MyGuard)` on the method |
| Add a guard to all routes on a controller | `@use_guards(MyGuard)` on the class |
| Read per-route metadata in a guard | `ctx.get_metadata("key", default)` |
| Attach a singleton explicitly | `app.container.set_singleton(MyCls, instance)` |
| Run post-construct hook | `@post_construct` on an `async def` method of an `@injectable` |
| Serve a large file | `Response.stream(async_iterable, media_type="application/pdf")` |
| Serve Server-Sent Events | `Response.sse(async_iterable_of_dicts)` |

---

_End of reference._

---

## Server-Sent Events (`EventStream`)

`EventStream` is a one-way streaming response primitive that frames
events according to the Server-Sent Events spec
(`text/event-stream`). The browser's `EventSource` reads from it
natively — no JS-side parser code required.

```python
from lauren import EventStream, ServerSentEvent

@get("/feed")
async def feed(self, q: Depends[Queue]) -> EventStream:
    async def producer():
        async for ev in q.subscribe():
            yield ServerSentEvent(event=ev.kind, data=ev.payload, id=ev.id)
    return EventStream(producer(), keep_alive=15.0)
```

Producers can yield strings (treated as `data`), dicts (auto-promoted
via `ServerSentEvent.from_dict`), or `ServerSentEvent` instances.

`keep_alive=N` emits a comment frame (`: keep-alive\n\n`) every `N`
seconds when the producer is idle, preventing intermediaries from
killing long-lived connections.

`last_event_id(request.headers)` reads the browser's `Last-Event-ID`
header for resumable streams.

## Socket.IO Compatibility

Lauren ships an Engine.IO v4 / Socket.IO v5 adapter so the official
`socket.io-client` can connect to a lauren backend without any
wire-level glue:

```python
from lauren.socketio import (
    SocketIOConnection, on_socketio_event, socketio_controller,
)

@socketio_controller("/socket.io/")
class ChatGateway:
    @on_socketio_event("connect")
    async def hello(self, conn: SocketIOConnection) -> None:
        await conn.emit("welcome", {"sid": conn.sid})

    @on_socketio_event("chat:message")
    async def on_message(
        self, conn: SocketIOConnection, payload: dict
    ) -> dict:
        return {"echo": payload}  # auto-acks the client
```

Two reserved event names map to lifecycle hooks: `"connect"` and
`"disconnect"`. Every other name routes inbound `EVENT` packets to
the matching method; the return value, if non-`None`, becomes an `ACK`
payload when the client supplied a callback.

The adapter implements the **WebSocket transport** subset (no
long-polling, no binary attachments, root namespace only). It layers
on `@ws_controller` internally so DI, lifecycle, and middleware all
work identically to a hand-written WS gateway.

JS client::

    import { io } from "socket.io-client";
    const sock = io("ws://...", { transports: ["websocket"] });
    sock.emit("chat:message", {text: "hi"}, (ack) => console.log(ack));


---

## Custom Providers

Four NestJS-style recipes for cases where ``@injectable`` is not enough.
All return a ``CustomProvider`` record listed in a module's
``providers=[...]``.

### `use_value` — bind to a literal

```python
from lauren import use_value, Token

DB_URL = Token("DB_URL")

@module(providers=[
    use_value(provide=DB_URL, value="postgres://localhost/app"),
    use_value(provide="REDIS", value=redis.from_url(...)),
])
class AppModule: ...
```

### `use_class` — bind a token to a different class

```python
from lauren import use_class, Scope

@module(providers=[
    use_class(
        provide=ConfigService,
        use=ProductionConfig if PROD else DevelopmentConfig,
        scope=Scope.SINGLETON,
    ),
])
class AppModule: ...
```

### `use_factory` — compute the value through a DI-resolved function

```python
from lauren import use_factory, OptionalDep

def make_engine(url: str, log: Logger | None) -> Engine:
    return Engine(url, logger=log)

@module(providers=[
    use_factory(
        provide=Engine,
        factory=make_engine,
        injects=[DB_URL, OptionalDep("LOGGER")],
        scope=Scope.SINGLETON,
    ),
])
class AppModule: ...
```

`async def` factories are awaited transparently.

### `use_existing` — alias one token to another

```python
from lauren import use_existing

@module(providers=[
    LoggerService,
    use_existing(provide="AliasedLoggerService", existing=LoggerService),
])
class AppModule: ...
```

### `Inject` for non-class tokens

```python
from typing import Annotated
from lauren import Inject, Token

DB_URL = Token("DB_URL")

@injectable()
class Repo:
    def __init__(
        self,
        url: Annotated[str, Inject(DB_URL)],
    ) -> None:
        self.url = url
```

`Token("NAME")` is hashable, identity-distinct by default
(``unique=True``), and renders nicely in errors. Bare strings work but
sacrifice the diagnostic and IDE benefits.

---

## Exception Handlers

Catch domain or framework exceptions and turn them into structured
HTTP responses. Two forms — class (DI-injected) and function (no DI).
Three attachment scopes — global, per-controller, per-route.

### Class form

```python
from lauren import exception_handler, use_exception_handlers
from lauren.types import Request, Response

class NotFoundError(HTTPError):
    status_code = 404
    code = "not_found"

@exception_handler(NotFoundError, ConflictError)
class DomainErrors:
    def __init__(self, log: Logger) -> None:
        self.log = log

    async def catch(self, exc: Exception, request: Request) -> Response:
        self.log.warn(f"domain error: {type(exc).__name__}: {exc}")
        return Response.json({"error": str(exc)}, status=400)
```

The class is auto-marked as `@injectable(scope=Scope.SINGLETON)` so it
participates in DI like guards and middleware. It MUST define
`catch(self, exc, request)` returning a ``Response``.

### Function form

```python
@exception_handler(ValueError)
async def handle_value_error(exc: ValueError, request: Request) -> Response:
    return Response.json({"detail": str(exc)}, status=422)
```

Function-form handlers are invoked directly with ``(exc, request)`` and
do **not** participate in DI. For dependencies, use the class form.

### Attachment

```python
# Per-controller — handles every method on the class:
@use_exception_handlers(DomainErrors)
@controller("/users")
class UserController: ...

# Per-route — handles only this handler:
@get("/x")
@use_exception_handlers(handle_value_error)
async def x(self): ...

# Global — handles every request:
app = LaurenFactory.create(
    AppModule,
    global_exception_filters=[DomainErrors, AuditFailures],
)
```

Resolution order, most specific wins: route → controller → global.
Within each tier handlers are tried in registration order; the first
whose declared types match (`isinstance(exc, declared)`) wins.

### Rules

* `@exception_handler` MUST be invoked with at least one exception
  type. Bare `@exception_handler` and empty `@exception_handler()` are
  both rejected with `ExceptionHandlerConfigError`.
* Subclasses of `@exception_handler` classes are NOT auto-handlers —
  re-decorate to opt in (`MetadataInheritanceError`).
* Handlers should *return* responses, never raise. A raise inside
  `catch` becomes a hard 500.

---

## WebSockets

First-class WebSocket controllers with typed message dispatch.

### Gateway anatomy

```python
from pydantic import BaseModel
from lauren import (
    BroadcastGroup, Json, WebSocket, ws_controller,
    on_connect, on_disconnect, on_error, on_message,
)

class ChatMessage(BaseModel):
    text: str

@ws_controller("/chat/{room_id}")
class ChatGateway:
    def __init__(self, rooms: BroadcastGroup) -> None:
        self._rooms = rooms

    @on_connect
    async def joined(self, ws: WebSocket) -> None:
        await ws.accept()                              # accept handshake
        room_id = ws.path_params["room_id"]
        await self._rooms.subscribe(room_id, ws)

    @on_message("chat.send")
    async def send(self, ws: WebSocket, body: Json[ChatMessage]) -> None:
        room_id = ws.path_params["room_id"]
        await self._rooms.broadcast(
            room_id,
            {"event": "chat.recv", "text": body.text},
            exclude=ws,
        )

    @on_disconnect
    async def left(self, ws: WebSocket) -> None:
        # ``BroadcastGroup`` automatically unsubscribes ``ws`` from every
        # group at disconnect, but explicit cleanup is fine too.
        pass
```

Register the gateway in a module's ``controllers=[…]`` list — same
contract as HTTP controllers. ``@ws_controller`` auto-marks the class
as ``@injectable(scope=Scope.REQUEST)`` so each connection gets its own
instance.

### `WebSocket` API

| Method                       | Purpose                                  |
|------------------------------|------------------------------------------|
| `await ws.accept(*, subprotocol=None, headers=None)` | Complete the handshake. Auto-called when `@on_connect` returns normally. |
| `await ws.receive_text()`    | Pull next text frame.                    |
| `await ws.receive_bytes()`   | Pull next binary frame.                  |
| `await ws.receive_json()`    | Pull next text frame and decode JSON.    |
| `await ws.send_text(s)`      | Send a text frame.                       |
| `await ws.send_bytes(b)`     | Send a binary frame.                     |
| `await ws.send_json(obj)`    | JSON-encode and send. Handles Pydantic models, dataclasses, and standard JSON types. |
| `await ws.close(code=1000, reason="")` | Initiate server-side close. Idempotent. |

Properties: `path`, `path_template`, `path_params`, `headers`,
`query_string`, `state`, `app_state`, `client_subprotocols`,
`subprotocol`, `connected`, `connection_state`, `close_code`,
`close_reason`.

### Typed dispatch and discriminated unions

```python
from typing import Annotated, Literal, Union
from pydantic import BaseModel, Field

class ImageEvent(BaseModel):
    kind: Literal["image"]
    url: str

class TextEvent(BaseModel):
    kind: Literal["text"]
    content: str

Event = Annotated[Union[ImageEvent, TextEvent], Field(discriminator="kind")]

@ws_controller("/feed")
class FeedGateway:
    @on_message("post")
    async def post(self, ws: WebSocket, body: Json[Event]) -> None:
        if isinstance(body, ImageEvent):
            ...
        else:
            ...
```

The wildcard event ``"*"`` matches any event without a specific
handler; ``"__binary__"`` captures binary frames as ``bytes`` rather
than JSON-decoded text.

### `BroadcastGroup`

Room-scoped fan-out provider, resolvable via DI like any other
injectable.

| Method                                     | Purpose                                         |
|--------------------------------------------|-------------------------------------------------|
| `await group.subscribe(name, ws)`          | Add ``ws`` to ``name``. Idempotent.             |
| `await group.unsubscribe(name, ws)`        | Remove. Safe to call if not a member.           |
| `await group.unsubscribe_all(ws)`          | Remove from every group. Auto-called on disconnect. |
| `await group.broadcast(name, msg, *, as_bytes=False, exclude=None)` | Send to every subscriber; returns number sent. Detects and prunes dead sockets. |
| `group.groups()` / `group.members(name)` / `group.member_count(name)` | Introspection. |

The default implementation is single-process. Multi-worker production
should subclass and back the membership store with Redis Pub/Sub —
the same controller code works unchanged.

### Authorisation

```python
@ws_controller("/private")
class PrivateGateway:
    @on_connect
    async def auth(self, ws: WebSocket) -> None:
        if ws.headers.get("authorization") != "Bearer xxx":
            await ws.close(code=4401, reason="unauthorised")
            return                                 # short-circuit
        await ws.accept()
```

Returning from `@on_connect` without calling `accept()` rejects the
handshake. Three safe rejection patterns exist (all produce identical
client-visible close codes):

```python
# 1. close() then return (preferred)
await ws.close(code=4401, reason="unauthorised"); return

# 2. raise WebSocketDisconnect (runtime closes)
raise WebSocketDisconnect("unauthorised", close_code=4401)

# 3. close() then raise (also safe — no duplicate close frame)
await ws.close(code=4401); raise WebSocketDisconnect("unauthorised", close_code=4401)
```

### Errors and `@on_error`

```python
@on_error
async def caught(self, ws: WebSocket, exc: Exception) -> None:
    # Run for any exception inside @on_message handlers other than
    # WebSocketDisconnect. Returning normally resumes the loop.
    await ws.send_json({"error": str(exc)})
```

Errors related to WebSockets:

* `WebSocketError` — base class.
* `WebSocketDisconnect(close_code=...)` — the peer closed.
* `WebSocketValidationError` — frame failed Pydantic validation.
* `WebSocketRouteNotFoundError` — no gateway matched the path.

### Testing

```python
from lauren.testing import WsTestClient

client = WsTestClient(app)
async with client.connect("/chat/42") as ws:
    await ws.send_json({"event": "chat.send", "data": {"text": "hi"}})
    reply = await ws.receive_json()
    assert reply["text"] == "hi"
```

The session context-manager guarantees the server task is awaited so
any unhandled server-side exception propagates into the test harness.

### Inheritance

Subclasses of `@ws_controller` classes are NOT auto-mounted. The
runtime raises `MetadataInheritanceError` at startup for any subclass
in a module's `controllers=[...]` list without its own
`@ws_controller` decoration. Method-level markers (`@on_connect` /
`@on_message`) attach to the function itself; an override that
doesn't re-decorate loses the marker — symmetric with HTTP route
decorators.

---

## Typed Streaming

For homogeneous streams (single Pydantic schema), prefer
`StreamingResponse[T]` over raw `EventStream`. The framework
content-negotiates between SSE, NDJSON, and JSON Lines based on the
client's `Accept` header.

```python
from typing import AsyncIterator
from pydantic import BaseModel
from lauren import StreamingResponse

class TickEvent(BaseModel):
    seq: int
    value: float

@get("/ticks")
async def ticks(self) -> StreamingResponse[TickEvent]:
    async def gen() -> AsyncIterator[TickEvent]:
        for i in range(100):
            yield TickEvent(seq=i, value=i * 0.5)
            await asyncio.sleep(0.05)
    return StreamingResponse(gen())
```

Negotiation rules:

| `Accept` (or default)                | Format chosen           |
|--------------------------------------|-------------------------|
| `text/event-stream`                  | SSE                     |
| `application/x-ndjson`               | NDJSON                  |
| `application/json-seq`               | JSON Lines              |
| `*/*` or anything else               | NDJSON (default)        |

The `Stream` extractor and `StreamReader[T]` consumer give the same
typed-iteration story for *receiving* a stream from the client.

---

## Companion Packages

The framework stays small on purpose; cross-cutting concerns live in
companion packages that depend on stock `lauren>=1.0`.

### `lauren-middlewares`

Production-grade middlewares, each as a factory function returning a
`@middleware`-decorated class:

* `cors(...)` — W3C CORS with origin allow-lists, regex,
  credentials, expose-headers, preflight short-circuit.
* `rate_limit(...)` — token-bucket per IP/key, pluggable store,
  429 + `Retry-After`.
* `gzip(...)` — compress responses ≥ threshold for compressible types.
* `security_headers(...)` — OWASP defaults (HSTS, XCTO, XFO,
  Referrer-Policy, Permissions-Policy, COOP/CORP, optional CSP).
* `request_id(...)` — generate / propagate `X-Request-ID`.
* `trusted_hosts(...)` — `Host` header allow-list with subdomain
  wildcards.
* `request_log(...)` — structured access log.
* `https_redirect(...)` — HTTP → HTTPS with `X-Forwarded-Proto`.
* `body_size_limit(...)` — early 413 on oversize POSTs.
* `timeout(...)` — `asyncio.wait_for` wrapper, 504 on expiry.

```bash
pip install lauren-middlewares
```

```python
from lauren import LaurenFactory
from lauren_middlewares import (
    cors, rate_limit, gzip, security_headers, request_id,
)

app = LaurenFactory.create(
    AppModule,
    global_middleware=[
        request_id(),
        security_headers(),
        cors(allow_origins=["https://app.example.com"]),
        rate_limit(per_minute=60),
        gzip(),
    ],
)
```

### `lauren-logging`

Configurable logging module modelled after NestJS's
`LoggerModule.forRoot`. Processor pipeline, contextvars binding,
request-logging middleware, pluggable backends.

```bash
pip install lauren-logging                  # core
pip install lauren-logging[structlog]       # structlog backend
pip install lauren-logging[otel]            # OpenTelemetry processor
```

```python
from lauren_logging import (
    LoggingModule, LoggingConfig, LoggingService, ConsoleBackend,
    ContextvarsProcessor, RedactSecrets,
)

LogModule = LoggingModule.configure(
    LoggingConfig(
        backend=ConsoleBackend(),
        processors=[ContextvarsProcessor(), RedactSecrets()],
    ),
)

@module(imports=[LogModule], controllers=[BillingController])
class AppModule: ...

@injectable()
class BillingService:
    def __init__(self, log: LoggingService) -> None:
        self.log = log
    def charge(self, user_id: int) -> None:
        self.log.info("charged", user_id=user_id)
```

Backends: `ConsoleBackend`, `StdlibBackend`, `StructlogBackend`
(extra), `FileBackend`, `RotatingFileBackend`, `FanOutBackend`,
`QueueBackend`, `NullBackend`, `InMemoryBackend`.

Processors: `ContextvarsProcessor`, `RedactSecrets`, `Sampler`,
`RateLimit`, `TimestampProcessor`, `OpenTelemetryProcessor` (extra).

Drop-in `request_logging()` middleware propagates `X-Request-ID`,
binds contextvars, and emits a structured request-summary line per
request.

---

## Background Tasks

`BackgroundTasks` lets you fire work **after** the response has been
sent. Declare it as a handler parameter and the framework injects it:

```python
from lauren import BackgroundTasks, injectable, Scope

@injectable(scope=Scope.SINGLETON)
class EmailService:
    async def send(self, addr: str) -> None: ...

@post("/sign-up")
async def sign_up(self, body: Json[SignUpBody], tasks: BackgroundTasks) -> dict:
    handle: TaskHandle = tasks.add_task(self._email.send, body.email)
    return {"status": "queued"}
```

`TaskHandle` has `.cancel()`, `.result()` (awaitable), and `.done`
properties.  Both `async def` and regular `def` callables are accepted;
sync callables are offloaded via `anyio.to_thread.run_sync`.

### Background-task signals

Three `LifecycleEvent` subclasses are emitted on the shared `SignalBus`:

| Event | When |
|---|---|
| `BackgroundTaskStarted` | Just before the callable is invoked |
| `BackgroundTaskComplete` | After it returns successfully |
| `BackgroundTaskFailed` | After it raises an exception |

---

## Signals & Lifecycle Events

`SignalBus` is an in-process pub/sub bus for `LifecycleEvent` subclasses.
Subscribe with the `@bus.on(EventType)` decorator or `bus.subscribe()`.
The default app bus is accessible via `lauren.signals.bus`.

```python
from lauren import SignalBus
from lauren.signals import (
    StartupBegin, StartupComplete,
    ShutdownBegin,
    RequestReceived, RequestComplete,
    LifecycleEvent,
)

bus = SignalBus()

@bus.on(StartupComplete)
async def on_ready(event: StartupComplete) -> None:
    print("App ready")
```

Full event hierarchy:

| Event | When |
|---|---|
| `StartupBegin` | `lifespan.startup` arrives; `@post_construct` not yet run |
| `StartupComplete` | All `@post_construct` hooks finished |
| `ShutdownBegin` | `lifespan.shutdown` or OS signal received |
| `RequestReceived` | ASGI scope parsed into a `Request` |
| `RequestComplete` | Response fully sent to the client |
| `BackgroundTaskStarted` | Background task begins |
| `BackgroundTaskComplete` | Background task finishes successfully |
| `BackgroundTaskFailed` | Background task raises |

`LifecycleEvent` is the common base class for all events above.

---

## Interceptors

Interceptors run **after guards** and **before** the handler. They wrap
handler execution: ideal for timing, caching, or response transforms.

```python
from lauren import interceptor, use_interceptors, CallHandler
from lauren.types import ExecutionContext, Response

@interceptor()
class TimingInterceptor:
    async def intercept(self, ctx: ExecutionContext, next: CallHandler) -> Response:
        import time
        t0 = time.monotonic()
        response = await next.handle()
        ms = (time.monotonic() - t0) * 1000
        return response.with_header("x-duration-ms", f"{ms:.1f}")

# Apply to a controller or individual route:
@use_interceptors(TimingInterceptor)
@controller("/api")
class MyController: ...
```

`InterceptorProtocol` is the structural protocol every interceptor satisfies.
`InterceptorConfigError` is raised when an interceptor is misconfigured at
startup (e.g. applied without parentheses).

`use_middlewares` is the per-controller / per-route equivalent of
`global_middlewares=` on the app factory.  Both `use_interceptors` and
`use_middlewares` accept a class or an instance and can be stacked.

---

## Pipes

Pipes transform or validate an extracted parameter value **before** it
reaches the handler.

```python
from lauren import Pipe, PipeContext, is_pipe
from lauren import Path, injectable, Scope

class PositiveIntPipe(Pipe):
    async def transform(self, value: int, ctx: PipeContext) -> int:
        if value <= 0:
            raise ValueError("Must be positive")
        return value

@get("/{id}")
async def get_item(self, id: Path[int, PositiveIntPipe]) -> dict: ...
```

`PipeMeta` is the metadata marker stored on pipe callables; `PIPE_META`
is the sentinel attribute name; `is_pipe(obj)` returns `True` if *obj*
carries the pipe marker.

`PathField` provides alias / validation overrides for path parameters,
analogous to Pydantic's `Field`.  `FieldDescriptor` is the richer
variant that supports `ge`, `le`, `min_length`, etc.

`PipeContext` carries the parameter name, annotation, and owning handler
metadata so a pipe can make context-aware decisions.

---

## Per-Route Middleware (`use_middlewares`)

`use_middlewares` attaches middleware(s) to a specific controller class
or route handler without registering them globally:

```python
from lauren import use_middlewares, middleware, injectable, Scope

@middleware()
@injectable(scope=Scope.SINGLETON)
class AuditLog:
    async def dispatch(self, req, call_next):
        response = await call_next(req)
        print(f"AUDIT {req.method} {req.url.path} → {response.status_code}")
        return response

@use_middlewares(AuditLog)
@controller("/admin")
class AdminController: ...
```

`CallNext` is the callable type for `call_next` inside a middleware
`dispatch` method.

---

## JSON Serialisation

Lauren auto-selects the fastest available encoder at startup via
`auto_encoder()`.  The selection order is orjson > msgspec > stdlib.

| Class | Dependency |
|---|---|
| `StdlibJSONEncoder` | Built-in `json` (always available) |
| `OrjsonEncoder` | `orjson` C extension |
| `MsgspecEncoder` | `msgspec` |
| `JSONEncoder` | Abstract base — implement `encode(obj) -> bytes` |

```python
from lauren import auto_encoder, OrjsonEncoder, StdlibJSONEncoder
enc = auto_encoder()          # picks best available
enc.encode({"hello": "world"})
```

---

## Types & Connection Info

`ClientInfo` and `ServerInfo` are `(host, port)` named-tuples
available on `Request.client` and `Request.server` respectively.

`MutableHeaders` is the writable headers mapping used when constructing
or mutating a `Response`.

`ByteStream` is the zero-copy streaming body extractor:
`body: ByteStream` receives an async iterable of raw `bytes` chunks so
the handler can process the body without buffering it into memory.

`UploadFile` is the multipart file-upload extractor — FastAPI-compatible:

```python
from lauren import UploadFile

@post("/upload")
async def upload(self, file: UploadFile) -> dict:
    content = await file.read()
    return {"filename": file.filename, "size": len(content)}
```

`StateExtractor` is the typed extractor for `request.state` fields
declared by guards or middleware.

---

## Static Files

`StaticFilesModule` serves a directory of static assets as a Lauren
feature module:

```python
from lauren.static_files import StaticFilesModule

@module(imports=[StaticFilesModule.for_root("/assets", directory="./public")])
class AppModule: ...
```

---

## OpenAPI Security

`openapi_security` attaches OpenAPI 3.1 security requirement objects to
a guard class so the generated schema documents authentication schemes:

```python
from lauren import openapi_security

@openapi_security([{"BearerAuth": []}])
class JwtGuard:
    async def can_activate(self, ctx) -> bool: ...
```

`OpenAPISecurityMeta` is the metadata dataclass stored on the guard by
this decorator.

---

## Request Arena

`RequestArena` is the pool of reusable per-request containers.
`RequestAllocation` is the handle to a leased set of containers for a
single request.  Both are internal infrastructure exposed for advanced
ASGI adapter customisation; typical application code never touches them.

---

## Errors: Decorator & Interceptor

`DecoratorUsageError` is raised when a configurable decorator such as
`@controller`, `@injectable`, or `@middleware` is used **bare** (without
parentheses):

```python
# Wrong — raises DecoratorUsageError at startup:
@controller          # missing ()
class Broken: ...

# Correct:
@controller("/api")
class Fine: ...
```

`InterceptorConfigError` is raised when an interceptor class is
misconfigured at startup (e.g. no `intercept` method, wrong signature,
or applied without parentheses).

---

## Server-Sent Events Helpers

`format_sse_event` formats a single SSE frame as its wire string without
creating a full `ServerSentEvent` object — useful for low-level adapters:

```python
from lauren import format_sse_event
line = format_sse_event(data="hello", event="ping", id="1")
# → "id: 1\nevent: ping\ndata: hello\n\n"
```

---

## AI Documentation Access

`docs` is a module that exposes the bundled `llms.txt` and
`llms-full.txt` files programmatically:

```python
import lauren.docs as ld
overview = ld.llms_txt()        # short overview (~2 KB)
reference = ld.llms_full_txt()  # full reference (~25 KB)
```
