Metadata-Version: 2.4
Name: tempest-fastapi-sdk
Version: 0.8.0
Summary: Shared FastAPI building blocks: base schemas, ORM model, async repository, exceptions, pagination and settings — the conventions used across Tempest projects.
Project-URL: Homepage, https://github.com/mauriciobenjamin700/tempest-fastapi-sdk
Project-URL: Repository, https://github.com/mauriciobenjamin700/tempest-fastapi-sdk
Project-URL: Issues, https://github.com/mauriciobenjamin700/tempest-fastapi-sdk/issues
Author-email: Mauricio Benjamin <mauricio.benjamin@reloverelations.com>
License: MIT
Keywords: async,fastapi,pagination,pydantic,repository,sdk,sqlalchemy
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: FastAPI
Classifier: Framework :: Pydantic :: 2
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: alembic>=1.18.4
Requires-Dist: fastapi>=0.118.0
Requires-Dist: pydantic-settings>=2.6.0
Requires-Dist: pydantic>=2.13.4
Requires-Dist: sqlalchemy[asyncio]>=2.0.49
Provides-Extra: all
Requires-Dist: aiofiles>=24.1.0; extra == 'all'
Requires-Dist: aiosmtplib>=4.0.0; extra == 'all'
Requires-Dist: bcrypt>=5.0.0; extra == 'all'
Requires-Dist: cryptography>=42.0.0; extra == 'all'
Requires-Dist: faststream[rabbit]>=0.5.30; extra == 'all'
Requires-Dist: nvidia-ml-py>=12.560.30; extra == 'all'
Requires-Dist: psutil>=6.0.0; extra == 'all'
Requires-Dist: pyjwt>=2.12.1; extra == 'all'
Requires-Dist: python-multipart>=0.0.12; extra == 'all'
Requires-Dist: pywebpush>=2.0.0; extra == 'all'
Requires-Dist: redis>=5.0.0; extra == 'all'
Requires-Dist: taskiq-aio-pika>=0.4.0; extra == 'all'
Requires-Dist: taskiq>=0.11.0; extra == 'all'
Provides-Extra: auth
Requires-Dist: bcrypt>=5.0.0; extra == 'auth'
Requires-Dist: pyjwt>=2.12.1; extra == 'auth'
Provides-Extra: cache
Requires-Dist: redis>=5.0.0; extra == 'cache'
Provides-Extra: email
Requires-Dist: aiosmtplib>=4.0.0; extra == 'email'
Provides-Extra: metrics
Requires-Dist: nvidia-ml-py>=12.560.30; extra == 'metrics'
Requires-Dist: psutil>=6.0.0; extra == 'metrics'
Provides-Extra: queue
Requires-Dist: faststream[rabbit]>=0.5.30; extra == 'queue'
Provides-Extra: tasks
Requires-Dist: taskiq-aio-pika>=0.4.0; extra == 'tasks'
Requires-Dist: taskiq>=0.11.0; extra == 'tasks'
Provides-Extra: upload
Requires-Dist: aiofiles>=24.1.0; extra == 'upload'
Requires-Dist: python-multipart>=0.0.12; extra == 'upload'
Provides-Extra: webpush
Requires-Dist: cryptography>=42.0.0; extra == 'webpush'
Requires-Dist: pywebpush>=2.0.0; extra == 'webpush'
Description-Content-Type: text/markdown

# tempest-fastapi-sdk

Shared FastAPI/SQLAlchemy/Pydantic building blocks used across Tempest projects: base schemas, ORM model, async repository, pagination, settings, exceptions, Alembic helper, FastStream/TaskIQ broker managers, Redis cache, Server-Sent Events, Web Push and the utility classes (`PasswordUtils`, `JWTUtils`, `EmailUtils`, `UploadUtils`, `MetricsUtils`, `LogUtils`).

The goal is to start every new backend with the same opinionated foundation already in place — no copy-pasting `BaseModel`, no rewriting the same CRUD repository, no re-inventing the exception envelope.

---

## Table of contents

- [Install](#install)
  - [Optional extras](#optional-extras)
- [What's inside](#whats-inside)
- [Architecture overview](#architecture-overview)
- [Tutorial — building the *Users* feature](#tutorial--building-the-users-feature)
  - [1. Project layout](#1-project-layout)
  - [2. Settings, server, app factory & entry point](#2-settings-server-app-factory--entry-point)
  - [3. ORM model](#3-orm-model)
  - [4. Schemas](#4-schemas)
  - [5. Domain exceptions](#5-domain-exceptions)
  - [6. Repository](#6-repository)
  - [7. Service](#7-service)
  - [8. Controller](#8-controller)
  - [9. Dependency providers](#9-dependency-providers)
  - [10. Router](#10-router)
  - [11. Pagination](#11-pagination)
- [Recipes](#recipes)
  - [Authentication](#authentication-recipe)
  - [File uploads](#file-uploads-recipe)
  - [Transactional email](#transactional-email-recipe)
  - [Alembic migrations](#alembic-migrations-recipe)
  - [Utility helpers (`utcnow`, `to_utc`, `modify_dict`)](#utility-helpers-recipe)
  - [BR document & phone validation](#br-document--phone-validation-recipe)
  - [Testing](#testing-recipe)
  - [Application bootstrap (`create_app`)](#application-bootstrap-recipe)
  - [Structured logging & request IDs](#structured-logging--request-ids-recipe)
  - [Settings mixins composition](#settings-mixins-composition-recipe)
  - [Controllers & services layering](#controllers--services-layering-recipe)
  - [Audit & soft-delete mixins](#audit--soft-delete-mixins-recipe)
  - [Cursor pagination](#cursor-pagination-recipe)
  - [Redis cache (`AsyncRedisManager`)](#redis-cache-recipe)
  - [Server-Sent Events (SSE)](#server-sent-events-recipe)
  - [Web Push notifications](#web-push-notifications-recipe)
  - [Message queues — FastStream (`AsyncBrokerManager`)](#message-queues--faststream-recipe)
  - [Background tasks — TaskIQ (`AsyncTaskBrokerManager`)](#background-tasks--taskiq-recipe)
  - [Periodic tasks scheduler (`AsyncTaskScheduler`)](#periodic-tasks-scheduler-recipe)
  - [System metrics (`MetricsUtils`)](#system-metrics-recipe)
  - [Programmatic server entry point (`run_server`)](#programmatic-server-entry-point-recipe)
  - [JWT bearer / current-user / role dependencies](#jwt-bearer--current-user--role-dependencies-recipe)
  - [CEP (Brazilian zipcode)](#cep-brazilian-zipcode-recipe)
  - [Cache decorator (`@cached`)](#cache-decorator-recipe)
  - [Tool-spec router (`make_tool_spec_router`)](#tool-spec-router-recipe)
  - [Webhook signature verification (`WebhookSignatureVerifier`)](#webhook-signature-verification-recipe)
  - [Pagination Link headers (`build_pagination_link_header`)](#pagination-link-headers-recipe)
  - [Rate limit middleware (`RateLimitMiddleware`)](#rate-limit-middleware-recipe)
  - [Outbox dispatcher pattern](#outbox-dispatcher-pattern-recipe)
  - [Migration guide 0.7 → 0.8](#migration-guide-07--08)
- [Reference](#reference)
- [Conventions](#conventions)
- [Development](#development)
- [Release](#release)
- [License](#license)

---

## Install

```bash
pip install tempest-fastapi-sdk
```

Via `pyproject.toml`:

```toml
dependencies = [
    "tempest-fastapi-sdk>=0.7.1",
]
```

Requires Python `>=3.11`.

### Optional extras

Feature-rich helpers pull in third-party dependencies that you only need when you actually use the helper. Pick the extras the service consumes:

| Extra | Pulls in | Unlocks |
| --- | --- | --- |
| `[auth]` | `bcrypt`, `PyJWT` | `PasswordUtils`, `JWTUtils` |
| `[email]` | `aiosmtplib` | `EmailUtils` |
| `[upload]` | `aiofiles`, `python-multipart` | `UploadUtils` |
| `[cache]` | `redis` | `AsyncRedisManager` |
| `[webpush]` | `pywebpush`, `cryptography` | `WebPushDispatcher` |
| `[metrics]` | `psutil`, `nvidia-ml-py` | `MetricsUtils` |
| `[queue]` | `faststream[rabbit]` | `AsyncBrokerManager` (FastStream) |
| `[tasks]` | `taskiq`, `taskiq-aio-pika` | `AsyncTaskBrokerManager` (TaskIQ) |
| `[all]` | everything above | every helper |

```bash
pip install "tempest-fastapi-sdk[auth,upload]"   # only what the service uses
pip install "tempest-fastapi-sdk[all]"           # or pull everything
```

Since `0.7.1` every optional dependency is imported lazily at first instantiation, so `import tempest_fastapi_sdk` works with any subset of extras — instantiating a helper whose extra is missing raises `ImportError` with a clear hint pointing at the right one.

---

## What's inside

| Module | Exports |
| --- | --- |
| `tempest_fastapi_sdk.schemas` | `BaseSchema`, `BaseResponseSchema`, `BasePaginationFilterSchema`, `BasePaginationSchema[T]`, `CursorPaginationFilterSchema`, `CursorPaginationSchema`, `encode_cursor`, `decode_cursor`, `build_pagination_link_header` |
| `tempest_fastapi_sdk.db` | `BaseModel`, `BaseRepository[ModelType]`, `AsyncDatabaseManager`, `AlembicHelper`, `NAMING_CONVENTION`, `AuditMixin`, `SoftDeleteMixin` |
| `tempest_fastapi_sdk.exceptions` | `AppException`, `NotFoundException`, `ConflictException`, `ValidationException`, `UnauthorizedException`, `ForbiddenException`, `InvalidTokenException`, `ExpiredTokenException`, `FileTooLargeException`, `InvalidFileTypeException` |
| `tempest_fastapi_sdk.settings` | `BaseAppSettings`, `ServerSettings`, `LogSettings`, `DatabaseSettings`, `RedisSettings`, `RabbitMQSettings`, `JWTSettings`, `CORSSettings`, `EmailSettings`, `UploadSettings`, `TokenSettings`, `WebPushSettings`, `TaskIQSettings` |
| `tempest_fastapi_sdk.api` | `register_exception_handlers`, `app_exception_handler`, `apply_cors`, `make_health_router`, `make_tool_spec_router`, `make_token_dependency`, `make_bearer_token_dependency`, `make_jwt_user_dependency`, `make_role_dependency`, `make_permission_dependency`, `require_x_token`, `run_server`, `RequestIDMiddleware`, `RateLimitMiddleware`, `WebhookSignatureVerifier`, `HealthCheck` |
| `tempest_fastapi_sdk.controllers` | `BaseController` |
| `tempest_fastapi_sdk.services` | `BaseService` |
| `tempest_fastapi_sdk.core` | `configure_logging`, `JSONFormatter`, `get_request_id`/`set_request_id`/`clear_request_id`, `request_id_ctx` |
| `tempest_fastapi_sdk.sse` | `EventStream`, `ServerSentEvent`, `sse_response` |
| `tempest_fastapi_sdk.cache` *(extra: `[cache]`)* | `AsyncRedisManager`, `cached` |
| `tempest_fastapi_sdk.webpush` *(extra: `[webpush]`)* | `WebPushDispatcher`, `WebPushError`, `WebPushGoneError`, `WebPushSubscriptionSchema`, `WebPushKeysSchema`, `WebPushPayloadSchema` |
| `tempest_fastapi_sdk.queue` *(extra: `[queue]`)* | `AsyncBrokerManager` (FastStream lifecycle wrapper) |
| `tempest_fastapi_sdk.tasks` *(extra: `[tasks]`)* | `AsyncTaskBrokerManager` (TaskIQ lifecycle wrapper), `AsyncTaskScheduler` (periodic / cron tasks) |
| `tempest_fastapi_sdk.utils` | `to_utc`, `utcnow`, `modify_dict`, `LogUtils`, `PasswordUtils` *(extra: `[auth]`)*, `JWTUtils` *(extra: `[auth]`)*, `EmailUtils` *(extra: `[email]`)*, `UploadUtils` *(extra: `[upload]`)*, `MetricsUtils`/`CPUMetrics`/`MemoryMetrics`/`DiskMetrics`/`GPUMetrics`/`SystemMetrics` *(extra: `[metrics]`)*, BR regex helpers (`CPF`, `CNPJ`, `CPFOrCNPJ`, `PhoneBR`, `CEP`, `is_valid_*`, `normalize_*`, `only_digits`, `*_PATTERN`) |

Core primitives are re-exported from `tempest_fastapi_sdk` at the top level — `from tempest_fastapi_sdk import BaseModel, BaseRepository, AppException` always works. The extras-gated managers in `tempest_fastapi_sdk.cache`, `tempest_fastapi_sdk.queue` and `tempest_fastapi_sdk.tasks` must be imported from their own submodule (`from tempest_fastapi_sdk.queue import AsyncBrokerManager`).

---

## Architecture overview

The SDK assumes a layered architecture where each layer has a single, narrow responsibility:

```text
HTTP request
    │
    ▼
┌─────────────┐    receive HTTP, validate input via schemas,
│   Router    │    call service, return response schema.
└──────┬──────┘    No business logic, no DB access.
       │
       ▼
┌─────────────┐    orchestrate use case across one or more services;
│ Controller  │    handle cross-service coordination only.
└──────┬──────┘    Optional — skip for simple CRUD.
       │
       ▼
┌─────────────┐    business rules, validation beyond Pydantic,
│   Service   │    domain decisions. Calls one or more repositories.
└──────┬──────┘    No HTTP types, no SQLAlchemy types.
       │
       ▼
┌─────────────┐    raw data access via SQLAlchemy. CRUD, filters,
│ Repository  │    pagination. Translates between ORM and schemas
└──────┬──────┘    via map_to_* methods. No business decisions.
       │
       ▼
┌─────────────┐    SQLAlchemy AsyncSession on top of asyncpg/aiosqlite.
│  Database   │
└─────────────┘
```

The SDK ships **`BaseModel`**, **`BaseRepository`**, **`BaseSchema`** and the exception/settings primitives. Routers, services and controllers are your code — the SDK gives you the conventions so they all look the same across projects.

---

## Tutorial — building the *Users* feature

We'll build a complete `Users` feature from scratch, end to end. Every file below is something you write in your project; SDK primitives are imported.

### 1. Project layout

The canonical layout every Python service shipped against this SDK should adopt — `main.py` is a one-liner, `src/server.py` exposes both `run()` and the importable `app` (or re-exports it from `src/api/app.py`), `api/dependencies/` is **always a package** (auth + factory providers), `controllers/` is mandatory even when it's only a thin pass-through, and `repositories/` lives **under** `db/`.

```text
my-service/
├── main.py                       # one-liner: from src.server import run; run()
└── src/
    ├── __init__.py               # re-exports `run` from src.server
    ├── server.py                 # programmatic uvicorn.run(...) + module-level `app`
    ├── core/
    │   ├── __init__.py
    │   ├── settings.py           # Settings(BaseAppSettings, mixins...)
    │   └── exceptions.py         # domain exceptions (UserNotFoundError, ...)
    ├── db/
    │   ├── __init__.py           # re-exports BaseModel + every model
    │   ├── models/
    │   │   ├── __init__.py
    │   │   └── user.py           # UserModel(BaseModel)
    │   └── repositories/
    │       ├── __init__.py
    │       └── user.py           # UserRepository(BaseRepository[UserModel])
    ├── schemas/
    │   ├── __init__.py
    │   └── user.py               # UserCreate/Update/Response/Filter
    ├── services/
    │   ├── __init__.py
    │   └── user.py               # UserService — business logic
    ├── controllers/
    │   ├── __init__.py
    │   └── user.py               # UserController — orchestration (thin pass-through OK)
    └── api/
        ├── __init__.py
        ├── app.py                # create_app() — middleware, CORS, exception handlers, routers
        ├── routers/
        │   ├── __init__.py
        │   └── users.py
        └── dependencies/         # ALWAYS a package, never a flat module
            ├── __init__.py
            ├── auth.py           # X-Token / current_user / require_role dependencies
            └── controllers.py    # get_<X>_controller / get_<X>_service factories
```

Each `__init__.py` re-exports every public symbol from its directory so consumers always do `from src.schemas import UserCreateSchema` (not `from src.schemas.user import UserCreateSchema`). This keeps refactors painless — move files around without breaking imports.

If your service has no controllers/services/repositories yet, **still ship empty packages with the right names** — uniformity matters more than skipping a directory. Drop `db/`, `utils/`, `queue/` or `tasks/` only when the service genuinely doesn't need persistence/utilities/messaging.

### 2. Settings, server, app factory & entry point

Four files map onto four responsibilities:

| File | Responsibility |
| --- | --- |
| `src/core/settings.py` | `Settings(BaseAppSettings, ...mixins)` — one source of truth for env vars. |
| `src/api/app.py` | `create_app()` factory + middleware + CORS + exception handlers + router includes + module-level `app` instance. |
| `src/server.py` | `run()` invoking `uvicorn.run("src.api.app:app", ...)` programmatically, plus re-exports `app` so external runners (gunicorn, uvicorn CLI) can import it. |
| `main.py` | Process entry point — a single line under `if __name__ == "__main__":` calling `run()`. |

```python
# src/core/settings.py
from tempest_fastapi_sdk import BaseAppSettings, DatabaseSettings, ServerSettings


class Settings(ServerSettings, DatabaseSettings, BaseAppSettings):
    """All environment-driven configuration lives here.

    BaseAppSettings ships `env_file=.env`, `extra=ignore`,
    `case_sensitive=True`, `frozen=True` and `str_strip_whitespace=True`.
    ServerSettings adds SERVER_HOST/PORT/RELOAD, DatabaseSettings adds
    DATABASE_URL/ECHO/POOL_*.
    """

    JWT_SECRET: str
    JWT_ALGORITHM: str = "HS256"
    JWT_TTL_HOURS: int = 1

    SMTP_HOST: str = "localhost"
    SMTP_PORT: int = 587
    SMTP_USERNAME: str | None = None
    SMTP_PASSWORD: str | None = None
    SMTP_FROM_ADDR: str = "noreply@example.com"

    UPLOAD_DIR: str = "./var/uploads"


settings = Settings()
```

```python
# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI

from tempest_fastapi_sdk import (
    AsyncDatabaseManager,
    RequestIDMiddleware,
    make_health_router,
    register_exception_handlers,
)

from src.api.routers import users
from src.core.settings import settings


db = AsyncDatabaseManager(settings.DATABASE_URL)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    """Connect on startup, dispose on shutdown."""
    await db.connect()
    try:
        yield
    finally:
        await db.disconnect()


def create_app() -> FastAPI:
    """Build and configure the FastAPI app."""
    app = FastAPI(title="my-service", version="0.1.0", lifespan=lifespan)

    app.add_middleware(RequestIDMiddleware)
    register_exception_handlers(app)

    # Meta endpoints sit at the root prefix.
    app.include_router(make_health_router(db=db, version="0.1.0"))

    # Business endpoints sit under /api/<domain>.
    app.include_router(users.router, prefix="/api")
    return app


app = create_app()
```

```python
# src/server.py
from tempest_fastapi_sdk import run_server

from src.api.app import app  # noqa: F401 — re-exported for external runners
from src.core.settings import settings


def run() -> None:
    """Start the API server programmatically."""
    run_server("src.api.app:app", settings=settings)


__all__: list[str] = ["app", "run"]
```

`run_server` reads `SERVER_HOST` / `SERVER_PORT` / `SERVER_RELOAD` from `settings` (falling back to `127.0.0.1` / `8000` / `False`) and forwards any extra kwargs (`workers=`, `log_config=`, `ssl_*=`) verbatim to `uvicorn.run`. See the [Programmatic server entry point recipe](#programmatic-server-entry-point-recipe).

```python
# src/__init__.py
from src.server import run

__all__: list[str] = ["run"]
```

```python
# main.py
from src.server import run

if __name__ == "__main__":
    run()
```

Bind defaults: `127.0.0.1` for internal services (the SDK's `ServerSettings.SERVER_HOST` default), `0.0.0.0` only when the service is consumed by a separate origin (e.g. a frontend dev server). Never start uvicorn via `subprocess.run(["uvicorn", ...])` — always go through `run_server` (or `uvicorn.run("src.api.app:app", ...)` directly) so reload, signal handling and graceful shutdown behave correctly.

### 3. ORM model

```python
# src/db/models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import BaseModel


class UserModel(BaseModel):
    """One row per registered user.

    Inherits from BaseModel, so it automatically gets:
    - id (UUID v4, cross-DB portable via sqlalchemy.Uuid)
    - is_active (bool, soft-delete flag)
    - created_at, updated_at (timezone-aware TIMESTAMP, set by Python AND
      the DB so the instance attribute is populated right after flush)
    - __tablename__ = "user" (auto: class name without "Model" suffix,
      snake-cased; override by assigning __tablename__ explicitly)
    - __eq__/__hash__ by (type, id) so the same row across sessions
      compares equal
    - to_dict(exclude, include, remove_none) and
      update_from_dict(data, allowed_fields) helpers
    """

    name: Mapped[str] = mapped_column(String(64), nullable=False)
    email: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
    password_hash: Mapped[str] = mapped_column(String(128), nullable=False)
```

Re-export it:

```python
# src/db/models/__init__.py
from src.db.models.user import UserModel

__all__: list[str] = ["UserModel"]
```

```python
# src/db/__init__.py
from src.db.models import UserModel
from tempest_fastapi_sdk import BaseModel

__all__: list[str] = ["BaseModel", "UserModel"]
```

> **Tip:** Always import models in `src/db/__init__.py`. SQLAlchemy needs to "see" every model before `BaseModel.metadata` is complete, so Alembic autogenerate and `create_tables()` work correctly.

### 4. Schemas

The recommended naming pattern: one `*Create`, `*Update`, `*Response` and `*Filter` schema per resource.

```python
# src/schemas/user.py
from pydantic import EmailStr, Field

from tempest_fastapi_sdk import (
    BasePaginationFilterSchema,
    BaseResponseSchema,
    BaseSchema,
)


class UserCreateSchema(BaseSchema):
    """Payload for POST /users."""

    name: str = Field(min_length=1, max_length=64)
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)


class UserUpdateSchema(BaseSchema):
    """Partial payload for PATCH /users/{id}. Every field optional."""

    name: str | None = Field(default=None, min_length=1, max_length=64)
    email: EmailStr | None = None


class UserResponseSchema(BaseResponseSchema):
    """Outbound representation.

    Inherits id/is_active/created_at/updated_at from BaseResponseSchema
    (timestamps already normalized to UTC by the field validator).
    """

    name: str
    email: EmailStr


class UserFilterSchema(BasePaginationFilterSchema):
    """Query-string filters for GET /users.

    Inherits page/size/order_by/ascending/is_active from
    BasePaginationFilterSchema. Add domain-level filters below.
    """

    name: str | None = None              # ILIKE %name% search
    email: EmailStr | None = None        # exact-match filter
```

```python
# src/schemas/__init__.py
from src.schemas.user import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)

__all__: list[str] = [
    "UserCreateSchema",
    "UserFilterSchema",
    "UserResponseSchema",
    "UserUpdateSchema",
]
```

### 5. Domain exceptions

The SDK ships generic `NotFoundException`, `ConflictException`, etc. Subclass them per domain so error responses have a useful `code` field:

```python
# src/core/exceptions.py
from typing import ClassVar

from tempest_fastapi_sdk import ConflictException, NotFoundException


class UserNotFoundError(NotFoundException):
    message: str = "Usuário não encontrado"
    code: ClassVar[str] = "USER_NOT_FOUND"


class UserEmailAlreadyTakenError(ConflictException):
    message: str = "Já existe um usuário com esse e-mail"
    code: ClassVar[str] = "USER_EMAIL_TAKEN"
```

The SDK's exception handler ([`register_exception_handlers`](#2-settings-server-app-factory--entry-point)) serializes them to:

```json
{
    "detail": "Usuário não encontrado",
    "code": "USER_NOT_FOUND",
    "details": {}
}
```

The frontend branches on `code`, not on the (potentially translated) message.

### 6. Repository

```python
# src/db/repositories/user.py
from typing import ClassVar

from tempest_fastapi_sdk import AppException, BaseRepository

from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.schemas import UserResponseSchema


class UserRepository(BaseRepository[UserModel]):
    """Data-access layer for users."""

    model: type[UserModel] = UserModel
    not_found_exception: ClassVar[type[AppException]] = UserNotFoundError

    def map_to_schema(self, instance: UserModel) -> UserResponseSchema:
        return UserResponseSchema.model_validate(instance)

    def map_to_response(self, instance: UserModel) -> UserResponseSchema:
        return self.map_to_schema(instance)
```

Per-domain error messages (optional but recommended in real apps):

```python
class UserRepository(BaseRepository[UserModel]):
    model: type[UserModel] = UserModel
    not_found_exception: ClassVar[type[AppException]] = UserNotFoundError

    def __init__(self, session: AsyncSession) -> None:
        super().__init__(
            session,
            not_found_message="Usuário não encontrado",
            create_conflict_message="Já existe um usuário com esse e-mail",
            update_conflict_message="Conflito ao atualizar usuário",
        )
```

The base repo gives you 17 methods for free — see the [reference table](#baserepository-methods) below. Add custom methods on top:

```python
async def get_by_email(self, email: str) -> UserModel:
    """Look up a user by email. Raises UserNotFoundError on miss."""
    return await self.get({"email": email})
```

### 7. Service

The service is where business rules live. It calls one or more repositories and never touches HTTP or SQLAlchemy types directly.

```python
# src/services/user.py
from uuid import UUID

from tempest_fastapi_sdk import (
    BasePaginationSchema,
    PasswordUtils,
)

from src.core.exceptions import UserEmailAlreadyTakenError
from src.db.repositories import UserRepository
from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)


class UserService:
    """Business logic for the user domain."""

    def __init__(
        self,
        repository: UserRepository,
        passwords: PasswordUtils,
    ) -> None:
        """Initialize the service.

        Args:
            repository (UserRepository): User-domain repository.
            passwords (PasswordUtils): Shared bcrypt helper.
        """
        self.repo: UserRepository = repository
        self.passwords: PasswordUtils = passwords

    async def create(self, data: UserCreateSchema) -> UserResponseSchema:
        if await self.repo.exists({"email": data.email}):
            raise UserEmailAlreadyTakenError()
        user = self.repo.map_to_model(
            {
                **data.to_dict(exclude=["password"]),
                "password_hash": self.passwords.hash(data.password),
            }
        )
        user = await self.repo.add(user)
        return self.repo.map_to_response(user)

    async def get(self, user_id: UUID) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        return self.repo.map_to_response(user)

    async def update(
        self,
        user_id: UUID,
        data: UserUpdateSchema,
    ) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        user.update_from_dict(
            data.to_dict(),
            allowed_fields={"name", "email"},   # prevents mass-assignment
        )
        user = await self.repo.update(user)
        return self.repo.map_to_response(user)

    async def soft_delete(self, user_id: UUID) -> None:
        await self.repo.soft_delete(user_id)

    async def list_paginated(
        self,
        filters: UserFilterSchema,
    ) -> BasePaginationSchema[UserResponseSchema]:
        page = await self.repo.paginate(
            filters=filters.get_conditions(),
            page=filters.page,
            page_size=filters.size,
            order_by=filters.order_by,
            ascending=filters.ascending,
        )
        return BasePaginationSchema[UserResponseSchema](
            items=[self.repo.map_to_response(u) for u in page["items"]],
            total=page["total"],
            page=page["page"],
            size=page["size"],
            pages=page["pages"],
        )
```

### 8. Controller

Even when there's no orchestration to do, `controllers/` exists as a **thin pass-through** so the import graph stays uniform across services. The day a use case needs to coordinate two services (or fan out to a queue), the controller is already there.

```python
# src/controllers/user.py
from uuid import UUID

from tempest_fastapi_sdk import BasePaginationSchema

from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)
from src.services.user import UserService


class UserController:
    """Orchestrate user use cases.

    Today every method is a thin pass-through to ``UserService``. As
    soon as a use case needs to coordinate more than one service —
    e.g. signup also sends a welcome email and enqueues a CRM sync —
    the orchestration lives here, not in the router and not in the
    service.
    """

    def __init__(self, service: UserService) -> None:
        self.service: UserService = service

    async def create(self, data: UserCreateSchema) -> UserResponseSchema:
        return await self.service.create(data)

    async def get(self, user_id: UUID) -> UserResponseSchema:
        return await self.service.get(user_id)

    async def update(
        self,
        user_id: UUID,
        data: UserUpdateSchema,
    ) -> UserResponseSchema:
        return await self.service.update(user_id, data)

    async def soft_delete(self, user_id: UUID) -> None:
        await self.service.soft_delete(user_id)

    async def list_paginated(
        self,
        filters: UserFilterSchema,
    ) -> BasePaginationSchema[UserResponseSchema]:
        return await self.service.list_paginated(filters)
```

### 9. Dependency providers

`api/dependencies/` is **always a package**. `auth.py` hosts the shared-secret / current-user dependencies; `controllers.py` (or `services.py` when there is no controller layer yet) hosts the factory providers the routers depend on. Never construct controllers or services inline inside the router file.

```python
# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import PasswordUtils

from src.api.app import db
from src.controllers.user import UserController
from src.db.repositories import UserRepository
from src.services.user import UserService


# Stateless utilities — instantiate once per process.
_passwords: PasswordUtils = PasswordUtils()


def get_user_controller(
    session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
    """Wire repository → service → controller for a single request."""
    repository = UserRepository(session)
    service = UserService(repository=repository, passwords=_passwords)
    return UserController(service=service)
```

```python
# src/api/dependencies/__init__.py
from src.api.dependencies.controllers import get_user_controller

__all__: list[str] = ["get_user_controller"]
```

### 10. Router

Routers receive controllers via FastAPI `Depends` — no inline construction, no business logic, no DB calls. Business endpoints sit under `/api/<domain>` (the prefix is added at the include site in `src/api/app.py`); meta endpoints (`/health`, `/tool-spec`) stay at the root prefix.

```python
# src/api/routers/users.py
from uuid import UUID

from fastapi import APIRouter, Depends, status

from tempest_fastapi_sdk import BasePaginationSchema

from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.schemas import (
    UserCreateSchema,
    UserFilterSchema,
    UserResponseSchema,
    UserUpdateSchema,
)


router = APIRouter(prefix="/users", tags=["users"])


@router.post(
    "",
    response_model=UserResponseSchema,
    status_code=status.HTTP_201_CREATED,
)
async def create_user(
    data: UserCreateSchema,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.create(data)


@router.get("/{user_id}", response_model=UserResponseSchema)
async def get_user(
    user_id: UUID,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.get(user_id)


@router.patch("/{user_id}", response_model=UserResponseSchema)
async def update_user(
    user_id: UUID,
    data: UserUpdateSchema,
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    return await controller.update(user_id, data)


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
    user_id: UUID,
    controller: UserController = Depends(get_user_controller),
) -> None:
    await controller.soft_delete(user_id)


@router.get("", response_model=BasePaginationSchema[UserResponseSchema])
async def list_users(
    filters: UserFilterSchema = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> BasePaginationSchema[UserResponseSchema]:
    return await controller.list_paginated(filters)
```

### 11. Pagination

The pagination contract is enforced end-to-end by SDK primitives:

- `UserFilterSchema(BasePaginationFilterSchema)` parses `?page=&size=&order_by=&ascending=&is_active=&name=` from the query string and exposes `.get_conditions()` returning only the domain-level filters (without pagination keys).
- `UserRepository.paginate(...)` runs the query with the filter dict + ordering + offset/limit + count, returning `{items, total, page, size, pages}`.
- `BasePaginationSchema[UserResponseSchema]` wraps the result so OpenAPI documents the response shape correctly.

```http
GET /api/users?page=2&size=20&order_by=name&ascending=true&is_active=true&name=ana
```

Returns:

```json
{
    "items": [
        {"id": "...", "name": "Ana ...", "email": "...", ...},
        ...
    ],
    "total": 142,
    "page": 2,
    "size": 20,
    "pages": 8
}
```

---

## Recipes

### Authentication recipe

End-to-end signup + login + protected route using `PasswordUtils` and `JWTUtils`. Requires the `[auth]` extra.

#### Wire the utility singletons

```python
# src/core/security.py
from datetime import timedelta

from tempest_fastapi_sdk import JWTUtils, PasswordUtils

from src.core.settings import settings


passwords = PasswordUtils(rounds=12)

tokens = JWTUtils(
    secret=settings.JWT_SECRET,
    algorithm=settings.JWT_ALGORITHM,
    default_ttl=timedelta(hours=settings.JWT_TTL_HOURS),
    issuer="my-app",
)
```

#### Signup

Reuse the `UserService.create` defined in the tutorial — it already hashes the password.

#### Login

```python
# src/schemas/auth.py
from pydantic import EmailStr

from tempest_fastapi_sdk import BaseSchema


class LoginSchema(BaseSchema):
    email: EmailStr
    password: str


class TokenResponseSchema(BaseSchema):
    access_token: str
    token_type: str = "bearer"
```

```python
# src/services/auth.py
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import JWTUtils, PasswordUtils, UnauthorizedException

from src.db.repositories import UserRepository
from src.schemas.auth import LoginSchema, TokenResponseSchema


class AuthService:
    def __init__(
        self,
        session: AsyncSession,
        passwords: PasswordUtils,
        tokens: JWTUtils,
    ) -> None:
        self.repo = UserRepository(session)
        self.passwords = passwords
        self.tokens = tokens

    async def login(self, data: LoginSchema) -> TokenResponseSchema:
        user = await self.repo.get_or_none({"email": data.email})
        if user is None or not self.passwords.verify(
            data.password, user.password_hash
        ):
            # Same error for both cases — don't leak which one failed.
            raise UnauthorizedException(message="E-mail ou senha inválidos")
        token = self.tokens.encode({"sub": str(user.id)})
        return TokenResponseSchema(access_token=token)
```

```python
# src/api/routers/auth.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.api.app import db
from src.core.security import passwords, tokens
from src.schemas.auth import LoginSchema, TokenResponseSchema
from src.services.auth import AuthService


router = APIRouter(prefix="/auth", tags=["auth"])


def get_auth_service(
    session: AsyncSession = Depends(db.session_dependency),
) -> AuthService:
    return AuthService(session, passwords, tokens)


@router.post("/login", response_model=TokenResponseSchema)
async def login(
    data: LoginSchema,
    service: AuthService = Depends(get_auth_service),
) -> TokenResponseSchema:
    return await service.login(data)
```

#### Protect a route — JWT dependency

Use `make_jwt_user_dependency` to wire the bearer scheme + JWT decode + user load in one call. The single seam is `user_loader(subject)`, an async callable that maps the JWT subject claim to your domain `UserModel`.

```python
# src/api/dependencies/auth.py
from uuid import UUID

from tempest_fastapi_sdk import make_jwt_user_dependency

from src.api.app import db
from src.core.security import tokens
from src.db.models import UserModel
from src.db.repositories import UserRepository


async def load_user(subject: str) -> UserModel:
    """Resolve the JWT subject (a UUID string) to a persisted user.

    Opens its own session so the dependency stays request-scope-agnostic
    (the loader is called once per request, and SDK exceptions raised
    inside translate to the canonical 401/404 envelope).
    """
    async with db.get_session_context() as session:
        repo = UserRepository(session)
        return await repo.get_by_id(UUID(subject))


get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)
```

```python
# Use in any route
@router.get("/me", response_model=UserResponseSchema)
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
    return UserResponseSchema.model_validate(current)
```

#### Soft auth (optional user)

`get_current_user_or_none` above already uses `soft=True` — it returns `None` instead of raising on a missing or invalid token, so endpoints can work both authenticated and anonymous:

```python
@router.get("/feed")
async def feed(
    current: UserModel | None = Depends(get_current_user_or_none),
) -> FeedResponseSchema:
    return await feed_service.list(viewer=current)
```

Under the hood `soft=True` calls `tokens.decode_or_none` (no exception on expired/invalid tokens) and skips the loader when the subject is missing.

---

### File uploads recipe

Avatar endpoint with validation + cleanup. Requires the `[upload]` extra.

```python
# src/core/storage.py
from tempest_fastapi_sdk import UploadUtils

from src.core.settings import settings


avatar_storage = UploadUtils(
    upload_dir=f"{settings.UPLOAD_DIR}/avatars",
    max_size_bytes=5 * 1024 * 1024,            # 5 MiB
    allowed_extensions={"png", "jpg", "jpeg", "webp"},
    allowed_mimetypes={"image/png", "image/jpeg", "image/webp"},
)
```

```python
# src/api/routers/users.py (extension)
from fastapi import UploadFile

from src.api.dependencies import get_user_controller
from src.controllers.user import UserController
from src.core.storage import avatar_storage


@router.post("/{user_id}/avatar", response_model=UserResponseSchema)
async def upload_avatar(
    user_id: UUID,
    file: UploadFile,
    current: UserModel = Depends(get_current_user),
    controller: UserController = Depends(get_user_controller),
) -> UserResponseSchema:
    if current.id != user_id:
        raise ForbiddenException(message="Só pode editar o próprio avatar")
    path = await avatar_storage.save(file, subdir=str(user_id))
    return await controller.set_avatar(user_id, str(path))
```

Add `set_avatar` to both the service and the controller (the controller stays a thin pass-through unless orchestration is needed — e.g. firing an "avatar updated" event):

```python
# src/services/user.py
class UserService:
    async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
        user = await self.repo.get_by_id(user_id)
        # Delete previous file when replacing.
        if user.avatar_path and user.avatar_path != path:
            avatar_storage.delete(user.avatar_path)
        user.avatar_path = path
        user = await self.repo.update(user)
        return self.repo.map_to_response(user)


# src/controllers/user.py
class UserController:
    async def set_avatar(self, user_id: UUID, path: str) -> UserResponseSchema:
        return await self.service.set_avatar(user_id, path)
```

`UploadUtils.save()` raises `FileTooLargeException` (413) or `InvalidFileTypeException` (415) on rejection — the SDK's exception handler already returns the right status code with a `code` field on the response.

#### Serving the file back

Local-disk uploads are best served by an upstream (nginx / Caddy) so FastAPI doesn't stream bytes. For dev:

```python
from fastapi.staticfiles import StaticFiles

app.mount(
    "/static/uploads",
    StaticFiles(directory=settings.UPLOAD_DIR),
    name="uploads",
)
```

Construct the public URL in the response schema:

```python
class UserResponseSchema(BaseResponseSchema):
    name: str
    email: EmailStr
    avatar_url: str | None = None

    @field_validator("avatar_url", mode="before")
    @classmethod
    def _absolute_url(cls, value: str | None) -> str | None:
        if value is None:
            return None
        # avatar_path stored as relative path → public URL
        return f"/static/uploads/{value}"
```

---

### Transactional email recipe

Password reset flow using `EmailUtils` + a short-lived JWT. Requires the `[email]` extra.

```python
# src/core/mailer.py
from tempest_fastapi_sdk import EmailUtils

from src.core.settings import settings


mailer = EmailUtils(
    host=settings.SMTP_HOST,
    port=settings.SMTP_PORT,
    from_addr=settings.SMTP_FROM_ADDR,
    username=settings.SMTP_USERNAME,
    password=settings.SMTP_PASSWORD,
    use_starttls=True,
)
```

```python
# src/services/password_reset.py
from datetime import timedelta

from tempest_fastapi_sdk import EmailUtils, JWTUtils, NotFoundException

from src.db.repositories import UserRepository


class PasswordResetService:
    def __init__(
        self,
        repo: UserRepository,
        tokens: JWTUtils,
        mailer: EmailUtils,
    ) -> None:
        self.repo = repo
        self.tokens = tokens
        self.mailer = mailer

    async def request_reset(self, email: str) -> None:
        """Send a password-reset link to `email`.

        Always returns silently — don't reveal whether the email
        is registered or not (avoids account enumeration).
        """
        user = await self.repo.get_or_none({"email": email})
        if user is None:
            return
        token = self.tokens.encode(
            {"sub": str(user.id), "purpose": "password_reset"},
            ttl=timedelta(minutes=15),
        )
        reset_url = f"https://my-app.com/reset-password?token={token}"
        await self.mailer.send(
            to=user.email,
            subject="Reset your password",
            body=f"Click here to reset your password: {reset_url}",
            html=f'<p>Click <a href="{reset_url}">here</a> to reset.</p>',
        )

    async def consume_reset(
        self,
        token: str,
        new_password: str,
        passwords: PasswordUtils,
    ) -> None:
        # `decode` raises InvalidTokenException / ExpiredTokenException
        # (both 401). Caught by the SDK handler.
        payload = self.tokens.decode(token)
        if payload.get("purpose") != "password_reset":
            raise InvalidTokenException()
        user = await self.repo.get_by_id(UUID(payload["sub"]))
        user.password_hash = passwords.hash(new_password)
        await self.repo.update(user)
```

---

### Alembic migrations recipe

Full workflow: bootstrap → first migration → apply → CI gate.

#### Bootstrap once per project

```python
# scripts/alembic_init.py
from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper(config_path="alembic.ini", db_url=settings.DB_URL)
helper.init(
    directory="alembic",
    metadata_module="app.db",        # exposes BaseModel
    metadata_attr="BaseModel",
    db_url=settings.DB_URL,
)
```

Run once: `uv run python scripts/alembic_init.py`.

This creates:

```text
alembic.ini                 # SDK-curated config (UTC timezone, date-prefixed file template)
alembic/
├── env.py                  # SDK template (already wires target_metadata, compare_type, batch mode)
├── script.py.mako
└── versions/
```

#### Author migrations

```python
# scripts/make_migration.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
helper.revision(
    message=sys.argv[1],
    autogenerate=True,
)
```

```bash
uv run python scripts/make_migration.py "add users table"
```

Generated file lands at `alembic/versions/2026_05_16_1432-ae12cd34_add_users_table.py` — the date prefix means files sort chronologically and merge conflicts are obvious.

#### Apply on startup

```python
# src/api/app.py — extend lifespan
import asyncio

from tempest_fastapi_sdk import AlembicHelper


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Run pending migrations before serving traffic.
    helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
    await asyncio.to_thread(helper.upgrade)

    await db.connect()
    yield
    await db.disconnect()
```

#### CI gate — schema must match models

```python
# scripts/check_migrations.py
import sys

from tempest_fastapi_sdk import AlembicHelper

from src.core.settings import settings

helper = AlembicHelper("alembic.ini", db_url=settings.DB_URL)
if not helper.check():
    print("Schema drift detected — run make_migration.py and commit.")
    sys.exit(1)
print("Schema is in sync.")
```

```yaml
# .github/workflows/ci.yml
- name: Check migrations are in sync
  run: uv run python scripts/check_migrations.py
```

---

### Utility helpers recipe

Small stateless helpers from `tempest_fastapi_sdk.utils` that the SDK itself relies on and that show up across every service. Available without any extra.

| Helper | Signature | Purpose |
| --- | --- | --- |
| `utcnow()` | `() -> datetime` | Current time as a timezone-aware UTC datetime — the SDK uses this for `created_at` / `updated_at` defaults. |
| `to_utc(value)` | `(datetime) -> datetime` | Coerce naive datetimes to UTC (assumed UTC) and aware datetimes to UTC via `astimezone`. Used by `BaseResponseSchema` field validators. |
| `modify_dict(data, exclude=None, include=None)` | `(dict, list[str] \| None, dict \| None) -> dict` | Single-pass filter + merge. Drop sensitive keys before logging or merge computed fields when mapping payloads to ORM models. |

#### Timestamps the same way everywhere

`utcnow` is the canonical "now" for the SDK. Use it for soft-delete timestamps, JWT `iat` / `exp`, audit trails — anything where mixing naive and aware datetimes would burn you later.

```python
from datetime import timedelta

from tempest_fastapi_sdk import to_utc, utcnow


now = utcnow()                      # timezone-aware UTC
expires_at = now + timedelta(hours=1)

# Normalize whatever the caller gave you
incoming = request.json()["scheduled_for"]              # naive or aware
scheduled_for = to_utc(datetime.fromisoformat(incoming))
```

A naive datetime is tagged with UTC (not converted from local time) so it's predictable in headless workers and Docker containers where `time.timezone` is anyone's guess.

#### Drop sensitive keys before logging / mapping

`modify_dict` is the tiny utility that powers `BaseSchema.to_dict(exclude=..., include=...)` and `BaseModel.update_from_dict(...)`. Use it directly when you don't want to call into Pydantic round-trips:

```python
from tempest_fastapi_sdk import LogUtils, modify_dict

log = LogUtils("app.users")

payload = {"email": "ana@example.com", "password": "s3cr3t", "name": "Ana"}

# Strip password before logging
log.info("user_signup", **modify_dict(payload, exclude=["password"]))

# Merge a computed hash before persisting
user_row = modify_dict(
    payload,
    exclude=["password"],
    include={"password_hash": passwords.hash(payload["password"])},
)
```

`include` wins over `data`, so it doubles as a "set or override" helper without mutating the source dict.

#### Where every other helper is documented

Every helper has its own recipe — this section is the quick map:

| Helper | Recipe |
| --- | --- |
| `PasswordUtils`, `JWTUtils` | [Authentication recipe](#authentication-recipe) |
| `EmailUtils` | [Transactional email recipe](#transactional-email-recipe) |
| `UploadUtils` | [File uploads recipe](#file-uploads-recipe) |
| `LogUtils` + `configure_logging` | [Structured logging & request IDs recipe](#structured-logging--request-ids-recipe) |
| `MetricsUtils` (CPU/memory/disk/GPU) | [System metrics recipe](#system-metrics-recipe) |
| `CPF`, `CNPJ`, `CPFOrCNPJ`, `PhoneBR`, `is_valid_*`, `normalize_*`, `only_digits` | [BR document & phone validation recipe](#br-document--phone-validation-recipe) |

### BR document & phone validation recipe

`tempest_fastapi_sdk.utils.regex` ships ready-to-use regex patterns, validators, normalizers and Pydantic types for the identity/contact fields that show up in almost every Brazilian API. No extra required — pure stdlib + Pydantic (already a core dependency).

| Symbol | Kind | Purpose |
| --- | --- | --- |
| `CPF_PATTERN`, `CNPJ_PATTERN`, `CPF_CNPJ_PATTERN`, `PHONE_BR_PATTERN` | `re.Pattern[str]` | Compiled regex (masked or raw input). |
| `is_valid_cpf`, `is_valid_cnpj`, `is_valid_cpf_cnpj` | `(str) -> bool` | Format match **+** check-digit math. All-same-digit sequences rejected. |
| `is_valid_phone_br` | `(str) -> bool` | BR phone shape: optional `+55`, optional DDD, optional 9th digit. |
| `normalize_cpf`, `normalize_cnpj`, `normalize_cpf_cnpj`, `normalize_phone_br` | `(str) -> str` | Strip mask to digits-only; raise `ValueError` if invalid. |
| `only_digits` | `(str) -> str` | Strip every non-digit character. |
| `CPF`, `CNPJ`, `CPFOrCNPJ`, `PhoneBR` | `Annotated[str, AfterValidator(...)]` | Drop-in Pydantic field types — validate + normalize automatically. |

#### Schema usage

```python
from pydantic import EmailStr, Field

from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CPF, CPFOrCNPJ, PhoneBR


class CustomerCreateSchema(BaseSchema):
    """Payload for POST /customers.

    `document` accepts CPF or CNPJ in masked or raw form and is
    stored digits-only after validation. `phone` is normalized the
    same way. Invalid values surface as a Pydantic `ValidationError`
    (HTTP 422 via the SDK exception handler).
    """

    name: str = Field(min_length=1, max_length=128)
    email: EmailStr
    document: CPFOrCNPJ
    phone: PhoneBR
```

Valid input:

```json
{
    "name": "Ana",
    "email": "ana@example.com",
    "document": "529.982.247-25",
    "phone": "+55 (11) 98888-7777"
}
```

After validation:

```python
CustomerCreateSchema(...).document  # "52998224725"
CustomerCreateSchema(...).phone     # "5511988887777"
```

#### Manual validation (services, controllers, queue handlers)

```python
from tempest_fastapi_sdk.utils import (
    is_valid_cpf_cnpj,
    normalize_cpf_cnpj,
    only_digits,
)

if not is_valid_cpf_cnpj(raw_document):
    raise ValidationException(message="Documento inválido")

document_digits = normalize_cpf_cnpj(raw_document)
```

#### Filtering by stored digits

The normalizers strip masks before saving, so repository filters and unique constraints all work on the canonical digits-only form:

```python
await repo.get({"document": normalize_cpf_cnpj(query)})
```

---

### Testing recipe

pytest + pytest-asyncio + in-memory SQLite + FastAPI TestClient.

#### Shared fixtures

```python
# tests/conftest.py
from collections.abc import AsyncGenerator

import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession

from tempest_fastapi_sdk import AsyncDatabaseManager

from src.api.app import create_app
from src.db import BaseModel       # importing BaseModel ensures models are registered


@pytest_asyncio.fixture
async def db() -> AsyncGenerator[AsyncDatabaseManager, None]:
    """Fresh in-memory DB per test."""
    manager = AsyncDatabaseManager("sqlite+aiosqlite:///:memory:")
    await manager.connect()
    await manager.create_tables()
    try:
        yield manager
    finally:
        await manager.drop_tables()
        await manager.disconnect()


@pytest_asyncio.fixture
async def session(db: AsyncDatabaseManager) -> AsyncGenerator[AsyncSession, None]:
    """Managed session bound to the in-memory DB."""
    async with db.get_session_context() as session:
        yield session


@pytest_asyncio.fixture
async def client(db: AsyncDatabaseManager) -> AsyncGenerator[TestClient, None]:
    """FastAPI TestClient with the SDK manager overridden to use SQLite."""
    app = create_app()
    # Override the session dependency to use the test DB.
    from src.api.app import db as production_db

    app.dependency_overrides[production_db.session_dependency] = db.session_dependency

    async with TestClient(app) as client:
        yield client
```

#### Repository test

```python
# tests/repositories/test_user.py
import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from src.core.exceptions import UserNotFoundError
from src.db.models import UserModel
from src.db.repositories import UserRepository


class TestUserRepository:
    async def test_get_by_email_raises_when_missing(
        self, session: AsyncSession
    ) -> None:
        repo = UserRepository(session)
        with pytest.raises(UserNotFoundError):
            await repo.get({"email": "ghost@example.com"})

    async def test_add_and_get(self, session: AsyncSession) -> None:
        repo = UserRepository(session)
        user = await repo.add(
            UserModel(
                name="Ana", email="ana@example.com", password_hash="x"
            )
        )
        loaded = await repo.get_by_id(user.id)
        assert loaded.name == "Ana"
```

#### Endpoint test

```python
# tests/api/test_users.py
from fastapi.testclient import TestClient


class TestUsersAPI:
    def test_create_user(self, client: TestClient) -> None:
        response = client.post(
            "/api/users",
            json={
                "name": "Ana",
                "email": "ana@example.com",
                "password": "hunter22",
            },
        )
        assert response.status_code == 201
        body = response.json()
        assert body["email"] == "ana@example.com"
        assert "password" not in body
        assert "password_hash" not in body

    def test_get_user_not_found(self, client: TestClient) -> None:
        response = client.get("/api/users/00000000-0000-0000-0000-000000000000")
        assert response.status_code == 404
        body = response.json()
        # SDK envelope is always {detail, code, details}
        assert body["code"] == "USER_NOT_FOUND"
```

#### `tempest_fastapi_sdk.testing` helpers

`tempest_fastapi_sdk.testing` ships framework-agnostic helpers that don't require `pytest` to be importable — wrap them in `@pytest.fixture` (or any other harness) inside the consuming project's `conftest.py`. Useful when a test doesn't need a full `AsyncDatabaseManager` (no `lifespan`, no health-check probes).

| Helper | Purpose |
| --- | --- |
| `create_test_engine(url="sqlite+aiosqlite:///:memory:", **engine_kwargs)` | Build a throw-away `AsyncEngine`. |
| `create_test_session_factory(engine)` | Build a `sessionmaker` bound to the engine. |
| `init_test_metadata(engine, metadata=None)` | Create every SQLAlchemy table on the engine (defaults to `BaseModel.metadata`). |
| `drop_test_metadata(engine, metadata=None)` | Drop every table. |
| `test_database(url="sqlite+aiosqlite:///:memory:", metadata=None)` | Async context manager — yields an engine with metadata pre-created, drops everything and disposes on exit. |
| `test_session(url="sqlite+aiosqlite:///:memory:", metadata=None)` | Async context manager — yields an `AsyncSession` on top of a fresh `test_database`. |

```python
# tests/conftest.py
from collections.abc import AsyncGenerator

import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

from tempest_fastapi_sdk.testing import test_database, test_session


@pytest_asyncio.fixture
async def engine() -> AsyncGenerator[AsyncEngine, None]:
    """Yield a fresh in-memory SQLite engine for each test."""
    async with test_database() as e:
        yield e


@pytest_asyncio.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
    """Yield a managed AsyncSession bound to the in-memory engine."""
    async with test_session() as s:
        yield s
```

Use the one-shot `test_session()` context manager for ad-hoc tests that don't need cross-fixture sharing:

```python
from tempest_fastapi_sdk.testing import test_session

from src.db.models import UserModel
from src.db.repositories import UserRepository


async def test_repo_directly() -> None:
    async with test_session() as session:
        repo = UserRepository(session)
        await repo.add(UserModel(name="Ana", email="ana@example.com", password_hash="x"))
        assert await repo.count() == 1
```

Pass `metadata=` when the project mixes the SDK `BaseModel.metadata` with a second, isolated metadata (rare — keep one `BaseModel` per service whenever possible).

### Application bootstrap recipe

[Section 2 of the tutorial](#2-settings-server-app-factory--entry-point) shows the minimal `create_app()`. This recipe is the **extended** version, wiring everything `tempest_fastapi_sdk.api` ships — exception handlers, CORS, request-ID middleware, the health router with extra checks, a shared-secret token dependency and an extra Redis manager — all from the same canonical `src/api/app.py` location. The bootstrapping pattern stays identical; only the contents of `create_app()` grow.

```python
# src/api/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI

from tempest_fastapi_sdk import (
    AsyncDatabaseManager,
    RequestIDMiddleware,
    apply_cors,
    configure_logging,
    make_health_router,
    make_token_dependency,
    register_exception_handlers,
)
from tempest_fastapi_sdk.cache import AsyncRedisManager

from src.core.settings import settings


configure_logging(level=settings.LOG_LEVEL, json_output=settings.LOG_JSON)

db = AsyncDatabaseManager(
    settings.DATABASE_URL,
    echo=settings.DATABASE_ECHO,
    pool_size=settings.DATABASE_POOL_SIZE,
    max_overflow=settings.DATABASE_MAX_OVERFLOW,
    pool_recycle=settings.DATABASE_POOL_RECYCLE,
)
redis = AsyncRedisManager(settings.REDIS_URL)
require_token = make_token_dependency(settings.TOKEN_SECRET)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    await db.connect()
    await redis.connect()
    try:
        yield
    finally:
        await redis.disconnect()
        await db.disconnect()


def create_app() -> FastAPI:
    """Build and configure the FastAPI app."""
    app = FastAPI(
        title="my-service",
        version=settings.VERSION,
        lifespan=lifespan,
    )

    app.add_middleware(RequestIDMiddleware)
    apply_cors(app, settings)
    register_exception_handlers(app)

    # Meta endpoints at the root prefix.
    app.include_router(
        make_health_router(
            db=db,
            checks={"redis": redis.health_check},
            version=settings.VERSION,
        ),
    )

    # Business endpoints under /api/<domain>, guarded by the shared secret.
    from src.api.routers import users

    app.include_router(
        users.router,
        prefix="/api",
        dependencies=[Depends(require_token)],
    )
    return app


app = create_app()
```

Key points:

- `src/server.py` and `main.py` (one-liner) stay exactly as in [Section 2 of the tutorial](#2-settings-server-app-factory--entry-point) — only `create_app()` changes when you add primitives. Never start uvicorn via `subprocess.run(["uvicorn", ...])`; always import `app` from `src.api.app` or call `uvicorn.run("src.api.app:app", ...)` programmatically from `src/server.py`.
- `RequestIDMiddleware` reads/writes `X-Request-ID` and seeds `request_id_ctx` so every log line emitted during the request carries the correlation ID.
- `apply_cors(app, settings)` reads `CORSSettings` defaults; pass keyword overrides for one-off changes.
- `register_exception_handlers(app)` wires every `AppException` subclass to the canonical `{detail, code, details}` envelope.
- `make_health_router(db=db, checks={"redis": redis.health_check}, version=...)` mounts `GET /health/liveness` and `GET /health/readiness` (returns `503` when any check fails) at the root prefix.
- `make_token_dependency(secret)` returns an async dependency that validates `X-Token` via `hmac.compare_digest`; pass an empty string to disable in dev. The dependency lives next to the rest of the auth glue in `src/api/dependencies/auth.py` once it grows beyond the one-liner above.

### Structured logging & request IDs recipe

`configure_logging` installs a JSON handler on the root logger that emits one-line JSON records carrying the active request ID. `LogUtils` is a thin facade that adds level methods accepting structured `**fields`.

```python
from tempest_fastapi_sdk import LogUtils, configure_logging
from tempest_fastapi_sdk.core import get_request_id

# Imperative — call once during bootstrap.
configure_logging(level="INFO", json_output=True)

# Facade — handy for service-wide singletons.
log = LogUtils("app.users", level="INFO")
log.info("user_created", user_id=str(user.id), email=user.email)
log.warning("login_throttled", ip="1.2.3.4", attempts=5)

try:
    risky()
except RuntimeError:
    log.exception("risky_failed", op="reconcile")  # appends traceback

# Surface the correlation ID outside the log line if needed.
request_id = get_request_id()
```

JSON output (single line — formatted here for readability):

```json
{
  "timestamp": "2026-05-16T20:14:33.412+00:00Z",
  "level": "INFO",
  "logger": "app.users",
  "message": "user_created",
  "request_id": "d83e4b0c-7c2f-4bd6-aaa1-7d4f6cf5e5e9",
  "user_id": "9c1a5b2d-...",
  "email": "ana@example.com"
}
```

The middleware accepts a custom header name (`RequestIDMiddleware(app, header_name="X-Correlation-ID")`); the same header is echoed back on every response.

### Settings mixins composition recipe

`BaseAppSettings` is the configured `pydantic-settings` base. The SDK also exposes composable mixins for the most common dependencies; pick the ones the service needs and put `BaseAppSettings` at the **end** of the MRO so its `model_config` wins.

```python
# src/core/settings.py
from pydantic import Field

from tempest_fastapi_sdk import (
    BaseAppSettings,
    CORSSettings,
    DatabaseSettings,
    EmailSettings,
    JWTSettings,
    LogSettings,
    RabbitMQSettings,
    RedisSettings,
    ServerSettings,
    TaskIQSettings,
    TokenSettings,
    UploadSettings,
    WebPushSettings,
)


class Settings(
    ServerSettings,
    LogSettings,
    DatabaseSettings,
    RedisSettings,
    RabbitMQSettings,
    TaskIQSettings,
    JWTSettings,
    CORSSettings,
    EmailSettings,
    UploadSettings,
    TokenSettings,
    WebPushSettings,
    BaseAppSettings,
):
    """Service-wide settings."""

    VERSION: str = Field(default="0.0.0")


settings = Settings()
```

Each mixin owns its own env-var prefix — pick only the ones the service needs:

| Mixin | Env vars |
| --- | --- |
| `ServerSettings` | `SERVER_HOST`, `SERVER_PORT`, `SERVER_RELOAD`, `SERVER_DEBUG` |
| `LogSettings` | `LOG_LEVEL`, `LOG_JSON` |
| `DatabaseSettings` | `DATABASE_URL`, `DATABASE_ECHO`, `DATABASE_POOL_SIZE`, `DATABASE_MAX_OVERFLOW`, `DATABASE_POOL_RECYCLE` |
| `RedisSettings` | `REDIS_URL`, `REDIS_DECODE_RESPONSES` |
| `RabbitMQSettings` | `RABBITMQ_URL`, `RABBITMQ_PREFETCH_COUNT` |
| `TaskIQSettings` | `TASKIQ_BROKER_URL`, `TASKIQ_RESULT_BACKEND_URL` |
| `JWTSettings` | `JWT_SECRET`, `JWT_ALGORITHM`, `JWT_ACCESS_TTL_SECONDS`, `JWT_REFRESH_TTL_SECONDS`, `JWT_ISSUER` |
| `CORSSettings` | `CORS_ORIGINS`, `CORS_ALLOW_CREDENTIALS`, `CORS_ALLOW_METHODS`, `CORS_ALLOW_HEADERS`, `CORS_EXPOSE_HEADERS`, `CORS_MAX_AGE` |
| `EmailSettings` | `SMTP_HOST`, `SMTP_PORT`, `SMTP_USERNAME`, `SMTP_PASSWORD`, `SMTP_FROM_ADDR`, `SMTP_USE_TLS`, `SMTP_USE_SSL`, `SMTP_TIMEOUT_SECONDS` |
| `UploadSettings` | `UPLOAD_DIR`, `UPLOAD_MAX_SIZE_BYTES`, `UPLOAD_ALLOWED_EXTENSIONS`, `UPLOAD_ALLOWED_MIMETYPES` |
| `TokenSettings` | `TOKEN_SECRET` |
| `WebPushSettings` | `VAPID_PUBLIC_KEY`, `VAPID_PRIVATE_KEY`, `VAPID_SUBJECT`, `WEBPUSH_DEFAULT_TTL_SECONDS` |

> **Breaking change in 0.8.0:** `ServerSettings` previously exposed bare `HOST` / `PORT` / `DEBUG` / `LOG_LEVEL` / `LOG_JSON` fields. They were renamed to `SERVER_HOST` / `SERVER_PORT` / `SERVER_RELOAD` / `SERVER_DEBUG`, and `LOG_LEVEL` / `LOG_JSON` moved to the new `LogSettings` mixin. Update both your `.env` file (env var names) and any code reading `settings.HOST` etc.

### Controllers & services layering recipe

`BaseService[RepositoryT, ResponseT]` and `BaseController[ServiceT, ResponseT]` are generic skeletons matching the SDK layering (router → controller → service → repository). They expose pass-through CRUD methods so simple endpoints can subclass them without overriding anything; you override only methods that need orchestration.

```python
# src/services/user_service.py
from uuid import UUID

from tempest_fastapi_sdk import BaseService

from src.db.repositories import UserRepository
from src.schemas.user import UserCreate, UserResponse, UserUpdate
from src.utils.security import password_utils


class UserService(BaseService[UserRepository, UserResponse]):
    """Business logic for the user feature."""

    async def signup(self, data: UserCreate) -> UserResponse:
        # Business logic — hash the password, then delegate to the repo.
        instance = self.repository.map_to_model(
            {
                "name": data.name,
                "email": data.email,
                "password_hash": password_utils.hash(data.password),
            },
        )
        created = await self.repository.add(instance)
        return self.repository.map_to_response(created)


# src/controllers/user_controller.py
from tempest_fastapi_sdk import BaseController

from src.schemas.user import UserCreate, UserResponse
from src.services.user_service import UserService


class UserController(BaseController[UserService, UserResponse]):
    """Thin orchestration over UserService."""

    async def signup(self, data: UserCreate) -> UserResponse:
        # Pass-through today; the controller is the seam to add
        # cross-service coordination later (audit log, outbox event,
        # downstream notification, etc.) without touching the router.
        return await self.service.signup(data)


# src/api/dependencies/controllers.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from src.api.app import db
from src.controllers.user_controller import UserController
from src.db.repositories import UserRepository
from src.services.user_service import UserService


def get_user_controller(
    session: AsyncSession = Depends(db.session_dependency),
) -> UserController:
    return UserController(UserService(UserRepository(session)))


# src/api/routers/users.py
from fastapi import APIRouter, Depends, status

from src.api.dependencies.controllers import get_user_controller
from src.controllers.user_controller import UserController
from src.schemas.user import UserCreate, UserResponse

router = APIRouter(prefix="/users", tags=["users"])


@router.post(
    "/",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
)
async def create_user(
    data: UserCreate,
    controller: UserController = Depends(get_user_controller),
) -> UserResponse:
    return await controller.signup(data)
```

Keep controllers present even when they only pass through — the import graph stays uniform across services, so adding cross-cutting policy later doesn't change the router signature.

### Audit & soft-delete mixins recipe

`SoftDeleteMixin` adds a `deleted_at` timestamp column with `mark_deleted()` / `mark_restored()` / `is_deleted` helpers. `AuditMixin` adds `created_by` / `updated_by` UUID columns with `stamp_created_by(user_id)` / `stamp_updated_by(user_id)` helpers. Mix them in alongside `BaseModel`:

```python
# src/db/models/user.py
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import AuditMixin, BaseModel, SoftDeleteMixin


class UserModel(BaseModel, SoftDeleteMixin, AuditMixin):
    """Users — soft-deletable and audited."""

    name: Mapped[str] = mapped_column()
    email: Mapped[str] = mapped_column(unique=True)
    password_hash: Mapped[str] = mapped_column()
```

Filtering is the caller's responsibility — the mixin doesn't install a global filter. Hide soft-deleted rows from list endpoints by passing `deleted_at=None` (or filtering in your repository subclass):

```python
async def list_alive(self) -> list[UserResponse]:
    instances = await self.repository.list(filters={"deleted_at": None})
    return [self.repository.map_to_response(i) for i in instances]
```

Stamping audit columns belongs to the service layer where the current user is in scope:

```python
async def update(self, user_id: UUID, data: UserUpdate, *, actor_id: UUID) -> UserResponse:
    instance = await self.repository.get_by_id(user_id)
    instance.update_from_dict(data.model_dump(exclude_unset=True))
    instance.stamp_updated_by(actor_id)
    updated = await self.repository.update(instance)
    return self.repository.map_to_response(updated)
```

Use the mixin's helpers (`mark_deleted` / `mark_restored`) when you want the `deleted_at` semantics; use `BaseRepository.soft_delete(id)` when the existing `is_active` flag is enough.

### Cursor pagination recipe

Cursor pagination scales better than offset pagination on big tables (no `COUNT(*)`, stable under concurrent inserts) at the cost of losing random-access. The SDK provides `CursorPaginationFilterSchema`, `CursorPaginationSchema[T]` and the opaque `encode_cursor` / `decode_cursor` helpers.

```python
# src/schemas/user.py
from tempest_fastapi_sdk import CursorPaginationFilterSchema, CursorPaginationSchema

from src.schemas.user import UserResponse


class UserCursorFilter(CursorPaginationFilterSchema):
    name: str | None = None  # ILIKE %value% via repository convention


UserCursorPage = CursorPaginationSchema[UserResponse]
```

Repository helper (cursor over `created_at` + `id` tie-break):

```python
# src/db/repositories/user.py
from sqlalchemy import asc, desc

from tempest_fastapi_sdk import BaseRepository, decode_cursor, encode_cursor

from src.db.models.user import UserModel
from src.schemas.user import UserResponse


class UserRepository(BaseRepository[UserModel]):
    model = UserModel

    async def cursor_page(
        self,
        *,
        cursor: str | None,
        limit: int,
        ascending: bool,
        filters: dict[str, Any] | None = None,
    ) -> UserCursorPage:
        query = select(UserModel)
        if filters:
            query = self._apply_filters(query, filters)

        order = asc if ascending else desc
        query = query.order_by(order(UserModel.created_at), order(UserModel.id))

        if cursor is not None:
            state = decode_cursor(cursor)
            cmp = (UserModel.created_at, UserModel.id) > (state["value"], state["id"])
            query = query.where(cmp if ascending else ~cmp)

        query = query.limit(limit + 1)  # peek one ahead to set has_more
        result = await self.session.execute(query)
        rows = list(result.unique().scalars().all())
        has_more = len(rows) > limit
        rows = rows[:limit]
        next_cursor = (
            encode_cursor(
                {"id": str(rows[-1].id), "value": rows[-1].created_at.isoformat()},
            )
            if has_more and rows
            else None
        )
        return UserCursorPage(
            items=[self.map_to_response(r) for r in rows],
            next_cursor=next_cursor,
            has_more=has_more,
            limit=limit,
        )
```

Router:

```python
@router.get("/", response_model=UserCursorPage)
async def list_users(
    f: UserCursorFilter = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> UserCursorPage:
    return await controller.service.repository.cursor_page(
        cursor=f.cursor,
        limit=f.limit,
        ascending=f.ascending,
        filters=f.get_conditions(),
    )
```

The cursor is opaque base64-url-safe JSON — clients never inspect it; they pass back the value verbatim until `next_cursor` becomes `null`.

### Redis cache recipe

`AsyncRedisManager` wraps `redis.asyncio` with the same connect/disconnect/health-check surface as `AsyncDatabaseManager`. Install with `[cache]`.

```python
from tempest_fastapi_sdk.cache import AsyncRedisManager

cache = AsyncRedisManager(settings.REDIS_URL, decode_responses=True)

# Lifespan
await cache.connect()
...
await cache.disconnect()

# Direct use
await cache.client.set("user:123:name", "Ana", ex=300)
name = await cache.client.get("user:123:name")

# FastAPI dependency — yields the live client.
from fastapi import Depends
from redis.asyncio import Redis


@router.get("/cached")
async def cached_endpoint(
    redis: Redis = Depends(cache.client_dependency),
) -> dict[str, str]:
    value = await redis.get("greeting") or "hello"
    return {"value": value}
```

Wire the health check on the canonical router with `make_health_router(checks={"redis": cache.health_check})` so readiness probes fail when Redis is down.

### Server-Sent Events recipe

`EventStream` is an in-memory async queue feeding one SSE HTTP connection. `ServerSentEvent` encodes one frame; `sse_response` wraps the byte stream in a Starlette `StreamingResponse` with SSE-friendly headers.

```python
# src/api/routers/events.py
import asyncio

from fastapi import APIRouter

from tempest_fastapi_sdk import EventStream, sse_response

router = APIRouter()


@router.get("/events")
async def events() -> "StreamingResponse":  # forward-declared by Starlette
    stream = EventStream(heartbeat_seconds=15.0)

    async def producer() -> None:
        for n in range(1, 4):
            await stream.publish({"n": n}, event="counter", id=str(n))
            await asyncio.sleep(1)
        await stream.close()

    asyncio.create_task(producer())
    return sse_response(stream.stream())
```

Browser side:

```javascript
const es = new EventSource("/events");
es.addEventListener("counter", (e) => console.log("got", JSON.parse(e.data)));
```

`heartbeat_seconds` emits a `: keepalive` SSE comment when idle so load-balancers don't close long-lived connections. `ServerSentEvent.data` accepts strings, bytes or any JSON-serializable Python object — non-strings are JSON-encoded automatically. Pass `retry=` to hint the browser at the reconnect delay (milliseconds).

### Web Push notifications recipe

`WebPushDispatcher` wraps the synchronous `pywebpush` library in `asyncio.to_thread` and surfaces the two errors the application cares about: `WebPushGoneError` (HTTP 404/410 — delete the subscription) and `WebPushError` (everything else). Install with `[webpush]`.

```python
# src/services/notifications.py
from tempest_fastapi_sdk import (
    WebPushDispatcher,
    WebPushGoneError,
    WebPushPayloadSchema,
    WebPushSubscriptionSchema,
)


dispatcher = WebPushDispatcher(
    settings.VAPID_PRIVATE_KEY,
    vapid_subject="mailto:ops@example.com",
    ttl_seconds=60,
)


async def notify_order_paid(
    subscription: WebPushSubscriptionSchema,
    order_id: str,
) -> None:
    payload = WebPushPayloadSchema(
        title="Pagamento confirmado",
        body=f"Pedido {order_id} aprovado.",
        icon="/static/icons/order.png",
        data={"orderId": order_id, "url": f"/orders/{order_id}"},
    )
    try:
        await dispatcher.send(subscription, payload)
    except WebPushGoneError:
        # Prune the subscription from your store.
        await subscriptions_repo.delete_by_endpoint(subscription.endpoint)


async def broadcast(subs: list[WebPushSubscriptionSchema], payload: WebPushPayloadSchema) -> None:
    gone = await dispatcher.send_many(subs, payload)
    if gone:
        await subscriptions_repo.delete_by_endpoints(gone)
```

`WebPushSubscriptionSchema` round-trips the exact JSON `PushSubscription.toJSON()` emits in the browser (it aliases `expiration_time` ↔ `expirationTime`), so you can store inbound subscriptions verbatim and replay them on dispatch.

### Message queues — FastStream recipe

`AsyncBrokerManager` wraps any FastStream broker (RabbitMQ, Kafka, NATS, Redis Streams) with a uniform connect/disconnect/health-check surface. The broker instance is injected so the SDK doesn't pin a single transport.

Install with `[queue]` (pulls `faststream[rabbit]`). Pick the matching FastStream extra for other transports.

```python
# src/queue/__init__.py
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel

from tempest_fastapi_sdk.queue import AsyncBrokerManager

from src.core.settings import settings


broker = RabbitBroker(settings.RABBITMQ_URL)
queue = AsyncBrokerManager(broker)


class OrderMessage(BaseModel):
    order_id: str
    user_id: str


@broker.subscriber("orders.paid")
async def handle_order_paid(msg: OrderMessage) -> None:
    await mark_order_paid(msg.order_id, msg.user_id)


# src/api/app.py lifespan
await queue.connect()
...
await queue.disconnect()


# Publish from anywhere in the application
await queue.publish(OrderMessage(order_id="abc", user_id="x"), queue="orders.paid")
```

The manager exposes:

- `connect()` / `disconnect()` — idempotent; safe to call from FastAPI lifespan.
- `publish(message, *args, **kwargs)` — passthrough to `broker.publish` with a `RuntimeError` guard when the broker isn't started.
- `lifespan()` — async context manager handling start/stop, handy for short scripts.
- `broker_dependency` — FastAPI `Depends` that yields the live broker.
- `health_check()` / `is_connected` — true while the broker is started.

Wire it on the health router with `make_health_router(checks={"queue": queue.health_check})`.

### Background tasks — TaskIQ recipe

`AsyncTaskBrokerManager` wraps any TaskIQ broker (AioPika for RabbitMQ, Redis, in-memory for tests). Install with `[tasks]` (pulls `taskiq` + `taskiq-aio-pika`).

```python
# src/tasks/__init__.py
from taskiq_aio_pika import AioPikaBroker

from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager

from src.core.settings import settings


tasks = AsyncTaskBrokerManager(AioPikaBroker(settings.TASKIQ_BROKER_URL))


@tasks.task
async def send_welcome_email(to: str, name: str) -> None:
    await email_utils.send(
        to=to,
        subject="Bem-vindo!",
        body=f"Olá, {name} — sua conta foi criada.",
    )


# src/api/app.py lifespan
await tasks.connect()
...
await tasks.disconnect()


# Enqueue from a request handler
await send_welcome_email.kiq(to=user.email, name=user.name)
```

`register_task(callable, task_name=..., **kwargs)` registers a function without decorator syntax — useful when wiring third-party callables that you can't decorate at definition time. For tests, swap the broker for `taskiq.InMemoryBroker()` so kicked tasks execute synchronously.

The same lifespan guard rails as the queue manager apply: `connect()`/`disconnect()`/`lifespan()`/`broker_dependency`/`health_check()`/`is_connected`.

### Periodic tasks scheduler recipe

`AsyncTaskScheduler` wraps `taskiq.TaskiqScheduler` + `LabelScheduleSource` so periodic tasks are declared with decorators alongside regular tasks and the scheduler is driven from the FastAPI lifespan. It **does not execute task bodies** — it kicks them into the same broker `AsyncTaskBrokerManager` wraps, so a worker process must be running to consume them. Requires the `[tasks]` extra.

```python
# src/tasks/__init__.py
from datetime import timedelta

from taskiq_aio_pika import AioPikaBroker

from tempest_fastapi_sdk.tasks import AsyncTaskBrokerManager, AsyncTaskScheduler

from src.core.settings import settings


# Use TASKIQ_BROKER_URL (from TaskIQSettings) when the scheduler /
# task broker is a different broker than the FastStream queue
# (RABBITMQ_URL). Reuse the same RabbitMQ URL when they share the
# broker — both env vars can point to the same value.
broker = AioPikaBroker(settings.TASKIQ_BROKER_URL)
tasks = AsyncTaskBrokerManager(broker)
scheduler = AsyncTaskScheduler(broker)


@tasks.task
async def reconcile_invoices(batch_size: int = 100) -> None:
    """Background task — kicked by handlers or the scheduler."""
    ...


@scheduler.cron("*/5 * * * *")          # every five minutes
async def heartbeat() -> None:
    """Liveness ping written to the audit log."""
    ...


@scheduler.cron("0 9 * * MON-FRI", cron_offset="-03:00")  # 09:00 BRT, weekdays
async def daily_digest() -> None:
    ...


@scheduler.interval(seconds=30)         # every 30s
async def poll_remote_queue() -> None:
    ...


@scheduler.interval(timedelta(minutes=15))
async def warm_cache() -> None:
    ...
```

Wire it into the app lifespan next to the broker manager:

```python
# src/api/app.py
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    await tasks.connect()
    await scheduler.connect()
    await scheduler.run_in_background()   # dev / single-process services
    try:
        yield
    finally:
        await scheduler.disconnect()
        await tasks.disconnect()
```

Decorator surface:

| Method | When to use |
| --- | --- |
| `@scheduler.cron("*/5 * * * *", cron_offset=None)` | Cron expression; pass `cron_offset` (string like `"-03:00"` or `timedelta`) to anchor to a timezone other than UTC. |
| `@scheduler.interval(seconds=30)` / `@scheduler.interval(timedelta(...))` | Fixed-interval recurrence. |
| `@scheduler.schedule([{...}, {...}])` | Raw TaskIQ schedule list — combine triggers, use one-shot `time`, etc. |
| `scheduler.register(func, schedule=[...], task_name=...)` | Register without decorator syntax (third-party callables). |

Production deployments with multiple workers should run the standalone scheduler CLI instead of `run_in_background()`, so only one scheduler is active across the cluster:

```bash
taskiq scheduler src.tasks:scheduler.scheduler
```

(`scheduler.scheduler` is the inner `TaskiqScheduler` instance exposed on `AsyncTaskScheduler`.) The worker process stays the same:

```bash
taskiq worker src.tasks:tasks.broker
```

Lifecycle controls mirror the broker manager: `connect()` / `disconnect()` / `lifespan()` / `run_in_background()` / `health_check()` / `is_connected`.

### System metrics recipe

`MetricsUtils` collects CPU, memory, disk and NVIDIA GPU usage via `psutil` + `pynvml`. Every method has a sync and an async variant (the async wrapper runs the same code via `asyncio.to_thread`). GPU sampling gracefully degrades to `[]` when `pynvml` or NVIDIA drivers are missing.

Install with `[metrics]`.

```python
from tempest_fastapi_sdk import MetricsUtils

# Synchronous, blocking call
snapshot = MetricsUtils.snapshot(disk_paths=["/", "/data"], cpu_interval=0.1)
print(snapshot.cpu.percent, snapshot.memory.percent)
for disk in snapshot.disks:
    print(disk.path, disk.percent)
for gpu in snapshot.gpus:
    print(gpu.name, gpu.utilization_percent, gpu.memory_used_bytes)

# Async — runs every collector concurrently via asyncio.gather
snapshot = await MetricsUtils.snapshot_async(disk_paths=["/"])


@router.get("/metrics")
async def metrics() -> dict[str, Any]:
    snap = await MetricsUtils.snapshot_async()
    return snap.to_dict()
```

Individual collectors are also available: `MetricsUtils.cpu(interval=...)`, `MetricsUtils.memory()`, `MetricsUtils.disk(path)`, `MetricsUtils.disks(paths)`, `MetricsUtils.gpus()` — and their `*_async` variants. Each returns a typed dataclass (`CPUMetrics`, `MemoryMetrics`, `DiskMetrics`, `GPUMetrics`, `SystemMetrics`) with a `to_dict()` helper for JSON serialization.

### Programmatic server entry point recipe

`run_server` is the canonical helper imported from `src/server.py`. It centralizes the `host` / `port` / `reload` defaults — pulling values from a `ServerSettings`-flavoured `settings` object when present — and keeps the entry point a single line.

```python
# src/server.py
from tempest_fastapi_sdk import run_server

from src.api.app import app  # noqa: F401 — re-exported for external runners
from src.core.settings import settings


def run() -> None:
    """Start the API server programmatically."""
    run_server("src.api.app:app", settings=settings)


__all__: list[str] = ["app", "run"]
```

```python
# main.py
from src.server import run

if __name__ == "__main__":
    run()
```

Resolution order for each kwarg is `explicit argument → settings.SERVER_* → SDK default` (`"127.0.0.1"` / `8000` / `False`). Extra uvicorn kwargs (`workers=`, `log_config=`, `ssl_*=`) are forwarded verbatim.

### JWT bearer / current-user / role dependencies recipe

Four dependency factories live in `tempest_fastapi_sdk.api.dependencies.auth` — pick the level of abstraction you need.

| Factory | What you get |
| --- | --- |
| `make_token_dependency(secret)` | Validate the `X-Token` shared-secret header (constant time). |
| `make_bearer_token_dependency(tokens, soft=False)` | Decode `Authorization: Bearer <jwt>` and return the claims dict. |
| `make_jwt_user_dependency(tokens, user_loader, soft=False, subject_claim="sub")` | Decode the bearer JWT, await `user_loader(subject)`, return the loaded user. |
| `make_role_dependency(tokens, ["admin"], require_all=False, roles_claim="roles")` / `make_permission_dependency(tokens, ["users:write"], require_all=True, permissions_claim="permissions")` | Decode the bearer JWT and gate the route on roles / permissions. |

```python
# src/api/dependencies/auth.py
from uuid import UUID

from tempest_fastapi_sdk import (
    JWTUtils,
    make_bearer_token_dependency,
    make_jwt_user_dependency,
    make_permission_dependency,
    make_role_dependency,
)

from src.api.app import db
from src.core.settings import settings
from src.db.models import UserModel
from src.db.repositories import UserRepository


tokens = JWTUtils(
    secret=settings.JWT_SECRET,
    algorithm=settings.JWT_ALGORITHM,
)


async def load_user(subject: str) -> UserModel:
    """Resolve the JWT subject (a UUID string) to a persisted user."""
    async with db.get_session_context() as session:
        repo = UserRepository(session)
        return await repo.get_by_id(UUID(subject))


require_bearer = make_bearer_token_dependency(tokens)
get_current_user = make_jwt_user_dependency(tokens, load_user)
get_current_user_or_none = make_jwt_user_dependency(tokens, load_user, soft=True)

require_admin = make_role_dependency(tokens, ["admin"])
require_users_write = make_permission_dependency(tokens, ["users:write"])
```

```python
# src/api/routers/users.py
from fastapi import APIRouter, Depends

from src.api.dependencies.auth import (
    get_current_user,
    require_admin,
    require_users_write,
)

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/me")
async def me(current: UserModel = Depends(get_current_user)) -> UserResponseSchema:
    return UserResponseSchema.model_validate(current)


@router.delete("/{user_id}", dependencies=[Depends(require_admin)])
async def delete_user(user_id: UUID) -> None:
    ...


@router.patch(
    "/{user_id}/permissions",
    dependencies=[Depends(require_users_write)],
)
async def update_perms(user_id: UUID) -> None:
    ...
```

`soft=True` returns `None` instead of raising on missing/invalid tokens — useful for endpoints that work both authenticated and anonymous. `subject_claim` defaults to `"sub"` but can be any custom claim (`"user_id"`, `"uid"`, ...). Role dependencies accept either a string or a list of strings on the JWT claim; `require_all=True` requires every listed role/permission, `False` (default for roles, override for permissions) requires any.

### CEP (Brazilian zipcode) recipe

`CEP` is an `Annotated[str, AfterValidator(normalize_cep)]` type — drop it into a Pydantic schema and inbound values are accepted as `"01310-100"` or `"01310100"`, normalized to 8 digits, and rejected (`ValidationError` → HTTP 422 envelope) when they don't match the shape. CEPs have no check digits, so validation is format-only.

```python
from tempest_fastapi_sdk import BaseSchema
from tempest_fastapi_sdk.utils import CEP


class AddressCreateSchema(BaseSchema):
    cep: CEP
    street: str
    number: str
```

Imperative variants: `is_valid_cep(value)`, `normalize_cep(value)`, plus `CEP_PATTERN` for raw regex use. Use them inside services / queue handlers where you don't want a Pydantic round-trip.

### Cache decorator recipe

`@cached(redis, ttl=..., key_prefix=...)` memoizes the result of an async function in Redis. Cache keys are derived from the function's `__qualname__` plus a SHA-256 of args/kwargs; pass `key_prefix=` to namespace entries so invalidation works by prefix scan.

```python
from tempest_fastapi_sdk.cache import AsyncRedisManager, cached

from src.core.settings import settings


redis = AsyncRedisManager(settings.REDIS_URL)


@cached(redis, ttl=300, key_prefix="users:")
async def get_user_profile(user_id: str) -> dict[str, str]:
    """Hits Redis on warm cache; runs the body once every 5 minutes."""
    return await load_from_db(user_id)


# Selectively bypass the cache (read AND write) for some calls
@cached(
    redis,
    ttl=60,
    skip_cache=lambda args, kwargs: kwargs.get("fresh") is True,
)
async def list_orders(user_id: str, *, fresh: bool = False) -> list[dict]:
    ...
```

Defaults: `ttl=300` seconds (`0` disables expiry), `serializer=json.dumps` / `deserializer=json.loads`. Override `serializer` / `deserializer` for non-JSON payloads (Pydantic models — pass `model_dump_json` / `MyModel.model_validate_json`, or use `pickle.dumps` / `pickle.loads` for arbitrary objects). Corrupt cached values fall back to running the wrapped function and warn on the SDK logger.

### Tool-spec router recipe

`make_tool_spec_router(spec)` mounts a `GET /tool-spec` endpoint exposing a machine-readable manifest at the root prefix — meant to sit alongside `/health/liveness` so external callers can discover capabilities without parsing the full OpenAPI document.

```python
# src/api/app.py
from tempest_fastapi_sdk import (
    make_health_router,
    make_tool_spec_router,
)


def _tool_spec() -> dict[str, object]:
    """Computed per request — keeps version + counts in sync with state."""
    return {
        "service": "my-service",
        "version": settings.VERSION,
        "tools": [
            {"path": "/api/users", "method": "GET", "summary": "List users"},
            {"path": "/api/orders", "method": "POST", "summary": "Place order"},
        ],
    }


def create_app() -> FastAPI:
    app = FastAPI(...)
    app.include_router(make_health_router(db=db))
    app.include_router(make_tool_spec_router(_tool_spec))
    ...
    return app
```

Pass a dict (served verbatim), a sync callable (called every request) or an async callable (awaited). Override `path=` to expose the manifest at a different URL or `tag=` to group it under a different OpenAPI tag.

### Webhook signature verification recipe

`WebhookSignatureVerifier` validates HMAC-signed inbound webhooks (Stripe / GitHub style) and exposes a FastAPI dependency that reads the raw body, checks the signature with `hmac.compare_digest`, and yields the body bytes so the route handler can re-parse it without re-reading the stream.

```python
# src/api/dependencies/webhooks.py
from tempest_fastapi_sdk import WebhookSignatureVerifier

from src.core.settings import settings


github = WebhookSignatureVerifier(
    secret=settings.GITHUB_WEBHOOK_SECRET,
    algorithm="sha256",
    header_name="X-Hub-Signature-256",
    prefix="sha256=",
)
stripe = WebhookSignatureVerifier(
    secret=settings.STRIPE_WEBHOOK_SECRET,
    algorithm="sha256",
    header_name="Stripe-Signature",
    encoding="hex",
)
```

```python
# src/api/routers/webhooks.py
from fastapi import APIRouter, Depends

from src.api.dependencies.webhooks import github

router = APIRouter(prefix="/webhooks", tags=["webhooks"])


@router.post("/github")
async def github_event(body: bytes = Depends(github.dependency())) -> None:
    payload = json.loads(body)
    ...
```

Supports `hex` (default) and `base64` encodings, any hashlib algorithm guaranteed across platforms, and an optional `prefix` (e.g. `"sha256="`) stripped before comparison. Use the imperative `verifier.verify(body, signature)` from queue handlers when validation happens outside the FastAPI pipeline.

### Pagination Link headers recipe

`build_pagination_link_header` emits an RFC 8288 `Link` header with `first` / `prev` / `next` / `last` rels — pair it with (or use instead of) the `BasePaginationSchema` body wrapper for REST clients that expect GitHub-style headers. Existing query parameters on the base URL are preserved.

```python
from fastapi import Request, Response

from tempest_fastapi_sdk import (
    BasePaginationSchema,
    build_pagination_link_header,
)


@router.get("", response_model=list[UserResponseSchema])
async def list_users(
    request: Request,
    response: Response,
    filters: UserFilterSchema = Depends(),
    controller: UserController = Depends(get_user_controller),
) -> list[UserResponseSchema]:
    page: BasePaginationSchema[UserResponseSchema] = await controller.list_paginated(filters)
    response.headers["Link"] = build_pagination_link_header(
        str(request.url),
        page=page.page,
        size=page.size,
        pages=page.pages,
    )
    response.headers["X-Total-Count"] = str(page.total)
    return page.items
```

Tweak `page_param=` / `size_param=` when your service uses non-standard query parameter names (e.g. `offset` / `limit`). Pass `extra_params={"sort": "name"}` to bake the current sort/filter state into every link.

### Rate limit middleware recipe

`RateLimitMiddleware` is a lightweight in-process sliding-window limiter — each unique key (client IP by default) is allowed at most `max_requests` requests inside every `window_seconds` window. Exceeded requests get a `429 Too Many Requests` with a `Retry-After` header.

```python
# src/api/app.py
from tempest_fastapi_sdk import RateLimitMiddleware


def create_app() -> FastAPI:
    app = FastAPI(...)
    app.add_middleware(
        RateLimitMiddleware,
        max_requests=120,
        window_seconds=60.0,
        exempt_paths=("/health/liveness", "/health/readiness"),
    )
    ...
```

Pass `key_func=` to partition state by tenant header, authenticated user, or any request attribute:

```python
def by_tenant(request: Request) -> str:
    return request.headers.get("X-Tenant", request.client.host or "anon")


app.add_middleware(
    RateLimitMiddleware,
    max_requests=600,
    window_seconds=60.0,
    key_func=by_tenant,
)
```

The state is held **in-process** — for multi-worker deployments either run a single uvicorn worker behind a single reverse-proxy node, or push rate limiting to the edge (nginx / Cloudflare / AWS WAF). The middleware is intentionally simple; a Redis-backed sliding-window limiter is one issue away if it shows up as a real need.

### Outbox dispatcher pattern recipe

The transactional outbox pattern keeps a "to publish" table in the same database as the domain rows, so writing the row and recording the side-effect happen in a single transaction. A worker reads the outbox in order and publishes to RabbitMQ (FastStream) / TaskIQ, marking each row as dispatched only after the broker ACKs. Crashes between commit and publish replay safely on the next poll.

The SDK does **not** ship a dedicated `OutboxDispatcher` primitive — the implementation is short, opinionated, and benefits from staying in the service's `db/models/` + `tasks/` boundary. Use the recipe below.

```python
# src/db/models/outbox.py
from sqlalchemy import JSON, String
from sqlalchemy.orm import Mapped, mapped_column

from tempest_fastapi_sdk import BaseModel


class OutboxEventModel(BaseModel):
    """One row per domain event waiting to be published."""

    topic: Mapped[str] = mapped_column(String(128), nullable=False, index=True)
    payload: Mapped[dict] = mapped_column(JSON, nullable=False)
    status: Mapped[str] = mapped_column(
        String(16),
        nullable=False,
        default="pending",
        index=True,
    )
    # is_active / created_at / updated_at come from BaseModel.
```

```python
# src/db/repositories/outbox.py
from sqlalchemy import select, update

from tempest_fastapi_sdk import BaseRepository

from src.db.models import OutboxEventModel


class OutboxRepository(BaseRepository[OutboxEventModel]):
    model: type[OutboxEventModel] = OutboxEventModel

    async def claim_pending(self, *, limit: int = 100) -> list[OutboxEventModel]:
        """Lock-free claim — fine for single-worker dispatcher."""
        stmt = (
            select(OutboxEventModel)
            .where(OutboxEventModel.status == "pending")
            .order_by(OutboxEventModel.created_at)
            .limit(limit)
        )
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def mark_dispatched(self, ids: list[str]) -> None:
        await self.session.execute(
            update(OutboxEventModel)
            .where(OutboxEventModel.id.in_(ids))
            .values(status="dispatched"),
        )
        await self.session.commit()
```

```python
# src/services/orders.py — produce side
from src.db.models import OrderModel, OutboxEventModel


class OrderService:
    async def place_order(self, data: OrderCreateSchema) -> OrderResponseSchema:
        order = OrderModel(**data.to_dict())
        self.repo.session.add(order)
        # Same transaction as the order row.
        self.repo.session.add(
            OutboxEventModel(
                topic="orders.placed",
                payload={"order_id": str(order.id), "amount": order.amount},
            ),
        )
        await self.repo.session.flush()
        await self.repo.session.commit()
        return self.repo.map_to_response(order)
```

```python
# src/tasks/__init__.py — dispatcher side
from tempest_fastapi_sdk.tasks import AsyncTaskScheduler

from src.api.app import broker as queue_broker  # FastStream AsyncBrokerManager
from src.api.app import db

scheduler = AsyncTaskScheduler(broker)


@scheduler.interval(seconds=5)
async def dispatch_outbox() -> None:
    """Poll the outbox and publish each pending event."""
    async with db.get_session_context() as session:
        repo = OutboxRepository(session)
        events = await repo.claim_pending(limit=100)
        if not events:
            return
        dispatched: list[str] = []
        for event in events:
            try:
                await queue_broker.publish(event.payload, event.topic)
                dispatched.append(str(event.id))
            except Exception:  # noqa: BLE001 — retry on next tick
                continue
        if dispatched:
            await repo.mark_dispatched(dispatched)
```

Trade-offs to keep in mind:

- **Order is best-effort.** When a batch contains one failing publish, every later event in the same batch still runs — but they're still published in `created_at` order. If strict ordering matters, break on the first failure.
- **Single dispatcher.** The naive `claim_pending` does not lock rows; running multiple dispatcher workers will double-publish. Use `SELECT ... FOR UPDATE SKIP LOCKED` on PostgreSQL when you need to scale out.
- **Retention.** Add a periodic `TRUNCATE`-style job to delete `dispatched` rows older than N days, otherwise the outbox table grows unbounded.
- **At-least-once.** Consumers must be idempotent — the dispatcher can crash after publishing but before `mark_dispatched`.

### Migration guide 0.7 → 0.8

0.8.0 renames every field on `ServerSettings`, extracts log fields to a new `LogSettings` mixin, and adds eleven other primitives. The renames are the only **breaking** changes — every new primitive is opt-in.

#### 1. Rename env vars

| Old | New | Mixin |
| --- | --- | --- |
| `HOST` | `SERVER_HOST` | `ServerSettings` |
| `PORT` | `SERVER_PORT` | `ServerSettings` |
| `DEBUG` | `SERVER_DEBUG` | `ServerSettings` |
| *(new)* | `SERVER_RELOAD` | `ServerSettings` |
| `LOG_LEVEL` | `LOG_LEVEL` | **moved to** `LogSettings` |
| `LOG_JSON` | `LOG_JSON` | **moved to** `LogSettings` |

Mechanical `sed` on every `.env` / `docker-compose.yml` / deployment manifest:

```bash
sed -i \
  -e 's/^HOST=/SERVER_HOST=/' \
  -e 's/^PORT=/SERVER_PORT=/' \
  -e 's/^DEBUG=/SERVER_DEBUG=/' \
  .env .env.example .env.test
```

`LOG_LEVEL` and `LOG_JSON` keep their names — only the mixin moves.

#### 2. Rename code references

```bash
# `settings.HOST` → `settings.SERVER_HOST`, same for PORT/DEBUG
grep -rn "settings\.\(HOST\|PORT\|DEBUG\)\b" src/ tests/
```

Replace each match with the `SERVER_*` form. If a service was using the
old `settings.DEBUG` flag for application-level debug behavior, switch
to `settings.SERVER_DEBUG`; if it was only being read for uvicorn
auto-reload, switch to `settings.SERVER_RELOAD`.

#### 3. Mix `LogSettings` into the project `Settings`

```diff
 from tempest_fastapi_sdk import (
     BaseAppSettings,
     CORSSettings,
     DatabaseSettings,
     JWTSettings,
+    LogSettings,
     RabbitMQSettings,
     RedisSettings,
     ServerSettings,
 )


 class Settings(
     ServerSettings,
+    LogSettings,
     DatabaseSettings,
     RedisSettings,
     RabbitMQSettings,
     JWTSettings,
     CORSSettings,
     BaseAppSettings,
 ):
     ...
```

Skip this step if the service never read `settings.LOG_LEVEL` /
`settings.LOG_JSON` — `configure_logging` accepts the values as
keyword arguments directly.

#### 4. (Optional) Adopt the new primitives

Pick what fits. None of these are required.

- Replace the hand-written `src/server.py` `uvicorn.run(...)` with
  [`run_server(...)`](#programmatic-server-entry-point-recipe).
- Replace the hand-written `get_current_user` with
  [`make_jwt_user_dependency(tokens, load_user)`](#jwt-bearer--current-user--role-dependencies-recipe).
- Move `SMTP_*` / `UPLOAD_*` / `TOKEN_SECRET` / `VAPID_*` /
  `TASKIQ_*` fields out of the project's `Settings` and onto the
  matching SDK mixin ([Settings mixins composition](#settings-mixins-composition-recipe)).
- Adopt the
  [`Outbox dispatcher pattern`](#outbox-dispatcher-pattern-recipe) if
  you already write side-effects from the same transaction as your
  domain rows.

#### 5. Verify

```bash
uv sync                      # picks up new pyproject deps
uv run pytest -q             # full suite
uv run ruff check src tests  # confirm no `HOST`/`PORT`/`DEBUG` references slipped
```

If `pytest` fails with a Pydantic `ValidationError` referencing
`HOST` / `PORT` / `DEBUG`, an env var was not renamed (look at the
process environment or `.env`).

---

## Reference

### BaseRepository methods

| Method | Purpose | Raises on miss |
| --- | --- | --- |
| `get(filters, for_update=False)` | Single record matching filters | ✅ `not_found_exception` |
| `get_or_none(filters, for_update=False)` | Single record or `None` | — |
| `get_by_id(id, for_update=False)` | Shortcut for `get({"id": id})` | ✅ `not_found_exception` |
| `exists(filters)` | `bool` without loading the row | — |
| `first(filters, order_by, ascending)` | First match ordered | — |
| `list(filters, order_by, ascending)` | All rows; `[]` when empty | — |
| `paginate(filters, order_by, page, page_size, ascending, query=)` | `dict` with `items, total, page, size, pages` | — |
| `count(filters)` | Row count | — |
| `add(model)` | Insert single | `ConflictException` on integrity |
| `add_all(models)` | Insert batch | `ConflictException` on integrity |
| `update(model)` | Commit mutated instance | `ConflictException` on integrity |
| `update_many(models)` | Commit batch | `ConflictException` on integrity |
| `bulk_update(filters, values)` | `UPDATE ... WHERE` mass mutation; rejects empty filters | `ConflictException` on integrity |
| `delete(id)` | Hard delete by PK | ✅ `not_found_exception` |
| `delete_many(filters)` | Hard delete by filter | — |
| `delete_batch(ids)` | Hard delete several PKs | — |
| `soft_delete(id)` | Flip `is_active=False`; returns row | ✅ `not_found_exception` |
| `restore(id)` | Flip `is_active=True`; returns row | ✅ `not_found_exception` |
| `map_to_schema(instance)` | Override in subclass | `NotImplementedError` |
| `map_to_response(instance)` | Override in subclass | `NotImplementedError` |
| `map_to_model(data)` | Default: `self.model(**data)` | — |

### Filter dict conventions

The dict passed to `get` / `list` / `paginate` / `count` / `exists` / `delete_many` / `bulk_update` understands these patterns out of the box:

| Filter shape | Generated SQL |
| --- | --- |
| `{"col": value}` | `col = value` |
| `{"col": [a, b]}` | `col IN (a, b)` |
| `{"col": True}` / `{"col": False}` | `col IS TRUE` / `col IS FALSE` |
| `{"name": "ana"}` (string field literally named `name`) | `name ILIKE '%ana%'` |
| `{"col": date(2024, 1, 1)}` (date value) | `date(col) = '2024-01-01'` |
| `{"start_in": date(...)}` | `date(date_col_or_created_at) >= ...` |
| `{"end_in": date(...)}` | `date(date_col_or_created_at) <= ...` |
| `{"col": None}` | filter is skipped (omit-when-None semantics) |

Pass the dict from `BasePaginationFilterSchema.get_conditions()` for query-string-driven filters.

### Lifecycle managers

Every SDK-shipped manager follows the same core shape: `connect()` to start,
`disconnect()` to stop, and `health_check()` to plug into
`make_health_router(checks=...)`. Brokers and the scheduler additionally
expose `lifespan()` (async context manager) and `is_connected`
(read-only state). The tables below list the manager-specific surface.

#### `AsyncDatabaseManager`

| Method / Property | Purpose |
| --- | --- |
| `__init__(url, *, echo=False, pool_size=10, max_overflow=20, pool_recycle=3600, **engine_kwargs)` | Build the manager (engine is lazy). |
| `await connect()` | Create the engine + sessionmaker. |
| `await disconnect()` | Dispose the engine. |
| `is_connected` (property) | `True` while the engine is alive. |
| `db_url_safe` (property) | DSN with password redacted (safe to log). |
| `await get_session()` | Return a single `AsyncSession`. |
| `async with get_session_context()` | Yield an `AsyncSession`; commits on success, rolls back on raise. |
| `async session_dependency()` | FastAPI `Depends`-compatible generator. |
| `await create_tables()` | Issue `CREATE TABLE` for every model on `BaseModel.metadata` (tests / local dev — production schemas go through Alembic). |
| `await drop_tables()` | Issue `DROP TABLE` for every model on `BaseModel.metadata`. |
| `await health_check()` | Ping with `SELECT 1`; returns `bool`. |

#### `AsyncRedisManager` *(extra: `[cache]`)*

| Method / Property | Purpose |
| --- | --- |
| `__init__(url, *, decode_responses=True, **client_kwargs)` | Wrap a `redis.asyncio.Redis` client. |
| `await connect()` | Build the client. |
| `await disconnect()` | Close the client + release the pool. |
| `client` (property) | Live `Redis` instance; raises `RuntimeError` before `connect`. |
| `async with get_client_context()` | Yield the same client inside an `async with`. |
| `async client_dependency()` | FastAPI `Depends`-compatible generator. |
| `await health_check()` | `PING` returns `bool`. |

Pair with `@cached(redis, ttl=..., key_prefix=...)` for function-level memoization — see the [Cache decorator recipe](#cache-decorator-recipe).

#### `AsyncBrokerManager` *(extra: `[queue]`)*

| Method / Property | Purpose |
| --- | --- |
| `__init__(broker)` | Wrap any FastStream broker (`RabbitBroker`, `KafkaBroker`, ...). |
| `await connect()` | Start the broker; idempotent. |
| `await disconnect()` | Stop the broker. |
| `broker` (attribute) | The wrapped FastStream broker — use `broker.publisher(...)` / `broker.subscriber(...)` directly. |
| `await publish(message, *args, **kwargs)` | Forward to `broker.publish`; raises before `connect`. |
| `async with lifespan()` | Connect on enter, disconnect on exit. |
| `async broker_dependency()` | FastAPI `Depends`-compatible generator yielding the broker. |
| `await health_check()` | `True` while the broker is started. |
| `is_connected` (property) | Read-only state. |

#### `AsyncTaskBrokerManager` *(extra: `[tasks]`)*

| Method / Property | Purpose |
| --- | --- |
| `__init__(broker)` | Wrap any TaskIQ broker (`AioPikaBroker`, `RedisBroker`, `InMemoryBroker`, ...). |
| `await connect()` | `broker.startup()`; idempotent. |
| `await disconnect()` | `broker.shutdown()`. |
| `broker` (attribute) | The wrapped TaskIQ broker. |
| `task(*args, **kwargs)` | Decorator forwarding to `broker.task`. |
| `register_task(func, *, task_name=None, **kwargs)` | Register without decorator syntax. |
| `async with lifespan()` | Connect on enter, disconnect on exit. |
| `async broker_dependency()` | FastAPI `Depends`-compatible generator. |
| `await health_check()` | `True` while the broker is started. |
| `is_connected` (property) | Read-only state. |

#### `AsyncTaskScheduler` *(extra: `[tasks]`)*

| Method / Property | Purpose |
| --- | --- |
| `__init__(broker, sources=None)` | Wrap `TaskiqScheduler` + `LabelScheduleSource` (default). |
| `broker` (attribute) | Same broker tasks are kicked into. |
| `sources` (attribute) | List of `ScheduleSource` instances. |
| `scheduler` (attribute) | Underlying `taskiq.TaskiqScheduler`. |
| `@cron(expr, *, cron_offset=None, task_name=None, **labels)` | Register a cron-scheduled task. |
| `@interval(seconds_or_timedelta, *, task_name=None, **labels)` | Register a fixed-interval task. |
| `@schedule(spec, *, task_name=None, **labels)` | Register with raw TaskIQ schedule list. |
| `register(func, *, schedule, task_name=None, **labels)` | Decorator-free registration. |
| `await connect()` | `scheduler.startup()` + every `source.startup()`. |
| `await disconnect()` | Cancel the background loop (if any) and shut down. |
| `await run_in_background()` | Spawn an in-process `SchedulerLoop` task (dev / single-process). |
| `async with lifespan()` | Connect on enter, disconnect on exit. |
| `await health_check()` | `True` while started and (when applicable) the loop is alive. |
| `is_connected` (property) | Read-only state. |

Production deployments should run the standalone CLI instead of `run_in_background()`:

```bash
taskiq worker src.tasks:tasks.broker
taskiq scheduler src.tasks:scheduler.scheduler
```

### Error envelope

Every `AppException` (and any subclass) is serialized by `register_exception_handlers` into:

```json
{
    "detail": "Human-readable message",
    "code": "MACHINE_READABLE_CODE",
    "details": {"any": "structured context"}
}
```

| Exception | Default `status_code` | Default `code` |
| --- | --- | --- |
| `AppException` | 500 | `INTERNAL_SERVER_ERROR` |
| `NotFoundException` | 404 | `NOT_FOUND` |
| `ConflictException` | 409 | `CONFLICT` |
| `ValidationException` | 422 | `VALIDATION_ERROR` |
| `UnauthorizedException` | 401 | `UNAUTHORIZED` |
| `InvalidTokenException` | 401 | `INVALID_TOKEN` |
| `ExpiredTokenException` | 401 | `TOKEN_EXPIRED` |
| `ForbiddenException` | 403 | `FORBIDDEN` |
| `FileTooLargeException` | 413 | `FILE_TOO_LARGE` |
| `InvalidFileTypeException` | 415 | `INVALID_FILE_TYPE` |

Subclasses set `message`/`code`/`status_code` as class attributes; instances can override `message` and attach a `details` dict at construction time.

---

## Conventions

- **Layered architecture**: routers → controllers → services → repositories. Never skip layers.
- **Async-first**: every I/O method is `async`. Use SQLAlchemy 2.0 patterns (`select`, not `session.query()`).
- **Collections return `[]`**: never raise on empty results. Only single-resource lookups raise `NotFoundException`.
- **Soft delete by default**: `is_active=False` instead of physical delete when applicable.
- **UTC everywhere**: timestamps normalized via `to_utc`; `BaseResponseSchema` enforces this on field validators.
- **Double quotes**: enforced by `ruff format`.
- **Full typing**: every parameter, return value and class attribute is typed. `Any` is explicit, never implicit.
- **Docstrings in English**: Google-style covering description / args / returns / raises.
- **Frontend branches on `code`**, not on the (translatable) message.

---

## Development

```bash
make install     # uv sync --all-extras
make test        # pytest with coverage
make lint        # ruff check .
make fmt         # ruff format .
make type        # mypy --strict
make check       # lint + fmt-check + type + test (every gate)
make ci          # check + build + smoke install in a clean venv (mirrors GitHub Actions)
make build       # uv build → dist/
make clean       # remove caches and build artifacts
```

Run `make` (or `make help`) to list every target. The Makefile is just thin wrappers around `uv` — direct invocations still work too:

```bash
uv sync --all-extras
uv run pytest
uv run ruff check .
uv run mypy tempest_fastapi_sdk
uv build
```

The CI gate (`.github/workflows/ci.yml`) runs the equivalent of `make ci` on every push and pull request against `main`. The wheel-build smoke step installs the freshly built artifact into a clean Python 3.13 virtualenv and imports the top-level surface to guard against the empty-wheel / missing-package-data class of bugs.

---

## Release

Releases are published to [PyPI](https://pypi.org/project/tempest-fastapi-sdk/) automatically when a `v*.*.*` tag is pushed.

The pipeline (`.github/workflows/release-pypi.yml`) does three things:

1. **Version sanity check.** The tag (`v0.1.0`), `pyproject.toml`'s `version` field and `tempest_fastapi_sdk.__version__` must all match. Mismatched releases abort before any artifact is built.
2. **Build + verify.** Run the full validation suite (tests + lint + mypy), build sdist/wheel with `uv build`, check metadata with `twine check`, and verify the wheel actually contains the package files + the bundled `env.py.template` Alembic template.
3. **Publish via Trusted Publishing.** Uses OIDC against PyPI — no long-lived API tokens stored in repo secrets.

### Cutting a release

```bash
make release VERSION=0.2.0
```

This single target:

1. Refuses to run if the working tree is dirty.
2. Bumps `pyproject.toml` and `tempest_fastapi_sdk/__init__.py` to the requested version.
3. Runs `make check` (lint + format + mypy + tests) so a broken commit never gets tagged.
4. Commits the bump as `chore: release v0.2.0` and creates the `v0.2.0` tag locally.
5. Prints the two `git push` commands you still need to run — pushing is left manual on purpose so you can review the commit one last time.

```bash
# Review then push
git show v0.2.0
git push origin main
git push origin v0.2.0
```

GitHub Actions picks up the tag, runs the release pipeline and publishes the artifacts to PyPI.

Manual flow (no Makefile) — same result:

```bash
$EDITOR pyproject.toml                              # version = "0.2.0"
$EDITOR tempest_fastapi_sdk/__init__.py             # __version__ = "0.2.0"
git commit -am "chore: release v0.2.0"
git tag v0.2.0
git push origin main v0.2.0
```

### One-time PyPI Trusted Publishing setup

PyPI needs to know which workflow is allowed to publish on the project's behalf. Done once per project:

1. Create the project on [PyPI](https://pypi.org) (name: `tempest-fastapi-sdk`). For brand-new projects, register a placeholder release manually first or use [`pending` publishers](https://docs.pypi.org/trusted-publishers/creating-a-project-through-oidc/) so the first OIDC-driven upload can create it.
2. On the project's PyPI settings page, add a **Trusted Publisher** pointing to:
   - **Owner**: `mauriciobenjamin700`
   - **Repository**: `tempest-fastapi-sdk`
   - **Workflow filename**: `release-pypi.yml`
   - **Environment**: `pypi` (must match `release-pypi.yml`'s `environment.name`)
3. In the GitHub repository, create an environment named `pypi` (Settings → Environments → New environment). Optionally restrict deployments to tags matching `v*.*.*` for an extra safety net.

After this, every `v*.*.*` tag triggers a publish. No PYPI tokens are stored anywhere.

---

## License

MIT
