Metadata-Version: 2.4
Name: operetta
Version: 0.0.18
Summary: A lightweight framework for building Python applications that is not tied to a specific transport protocol
License: Apache-2.0
License-File: LICENSE
License-File: NOTICE
Keywords: application,framework,ddd,domain-driven design,asyncio,microservices,microservice,dependency-injection,dependency injection,di,web,api,rest,aiohttp,http,openapi,openapi3,swagger,swagger-ui,redoc,postgres,postgresql,database,asyncpg,hasql,aiomisc
Author: Alexander Tikhonov
Author-email: random.gauss@gmail.com
Requires-Python: >=3.11,<4.0
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Environment :: Console
Classifier: Environment :: Web Environment
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Database
Classifier: Topic :: Database :: Front-Ends
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Provides-Extra: aiohttp
Provides-Extra: asyncpg
Provides-Extra: hasql
Provides-Extra: prometheus
Provides-Extra: sentry
Requires-Dist: aiohttp (>=3.12.15,<4.0.0) ; extra == "aiohttp"
Requires-Dist: aiomisc[uvloop] (>=17.9.4,<18.0.0)
Requires-Dist: asyncpg (>=0.30.0,<0.31.0) ; extra == "asyncpg"
Requires-Dist: dishka (>=1.7.1,<2.0.0)
Requires-Dist: hasql (>=0.8.1,<0.9.0) ; extra == "hasql"
Requires-Dist: openapify (>=0.7,<0.8) ; extra == "aiohttp"
Requires-Dist: orjson (>=3.11.3,<4.0.0) ; extra == "aiohttp"
Requires-Dist: prometheus-client (>=0.23.1,<0.24.0) ; extra == "prometheus"
Requires-Dist: pyyaml (>=6.0.2,<7.0.0)
Requires-Dist: sentry-sdk (>=2.41.0,<3.0.0) ; extra == "sentry"
Project-URL: Homepage, https://github.com/Fatal1ty/operetta
Description-Content-Type: text/markdown

<div align="center">

<img alt="logo" width="175" src="https://raw.githubusercontent.com/Fatal1ty/operetta/refs/heads/main/img/logo.svg">

###### Design Python services right

[![Build Status](https://github.com/Fatal1ty/operetta/workflows/tests/badge.svg)](https://github.com/Fatal1ty/operetta/actions)
[![Latest Version](https://img.shields.io/pypi/v/operetta.svg)](https://pypi.python.org/pypi/operetta)
[![Python Version](https://img.shields.io/pypi/pyversions/operetta.svg)](https://pypi.python.org/pypi/operetta)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
</div>

# Operetta

A lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of [aiomisc](https://github.com/aiokitchen/aiomisc) (service lifecycle, entrypoint) and [dishka](https://github.com/reagento/dishka) (dependency injection). On top of that, the following integrations are available:

- [AIOHTTP](https://github.com/aio-libs/aiohttp): declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with [Swagger](https://github.com/swagger-api/swagger-ui) and [Redoc](https://github.com/Redocly/redoc).
- PostgreSQL via [asyncpg](https://github.com/MagicStack/asyncpg): a database adapter and DI provider for a connection pool.
- PostgreSQL with HA via [hasql](https://github.com/aiokitchen/hasql): a pool with balancing, failover and the same adapter layer.
- Error monitoring via [Sentry](https://sentry.io/) using [sentry-sdk](https://github.com/getsentry/sentry-python).
- Prometheus metrics via [prometheus-client](https://github.com/prometheus/client_python): expose metrics over HTTP for scraping.

## Table of contents

- [Highlights](#highlights)
- [Installation](#installation)
- [Quickstart (HTTP API)](#quickstart-http-api)
  - [How it works under the hood](#how-it-works-under-the-hood)
- [Quickstart (non-HTTP app)](#quickstart-non-http-app)
- [Services and DI](#services-and-di)
- [AIOHTTP](#aiohttp)
  - [Configuration](#configuration)
  - [Middleware ordering](#middleware-ordering)
  - [Error handling and response format](#error-handling-and-response-format)
- [PostgreSQL](#postgresql)
  - [Single-node PostgreSQL (asyncpg)](#single-node-postgresql-asyncpg)
  - [High-availability PostgreSQL cluster (hasql)](#high-availability-postgresql-cluster-hasql)
  - [Advanced setup](#advanced-setup)
- [Sentry](#sentry)
  - [Configuration](#configuration-1)
  - [Behavior and notes](#behavior-and-notes)
- [Prometheus](#prometheus)
  - [Configuration](#configuration-2)
  - [Usage](#usage)

## Highlights

- Services as units of functionality: each service starts/stops via [aiomisc](https://github.com/aiokitchen/aiomisc) and may provide DI providers.
- Single DI container ([dishka](https://github.com/reagento/dishka)) for the whole app; separate [scopes](https://dishka.readthedocs.io/en/stable/advanced/scopes.html) for `APP` and `REQUEST`.
- [AIOHTTP](https://github.com/aio-libs/aiohttp) integration:
  - Handler parameter annotations: `FromBody[T]`, `FromQuery[T]`, `FromPath[T]`.
  - Automatic parsing and validation via [mashumaro](https://github.com/Fatal1ty/mashumaro); friendly error details.
  - Unified JSON envelope for responses.
  - OpenAPI generation with static assets for Swagger/Redoc.
- PostgreSQL integrations ([asyncpg](https://github.com/MagicStack/asyncpg)/[hasql](https://github.com/aiokitchen/hasql)): interface adapter `PostgresDatabaseAdapter` + transactional `PostgresTransactionDatabaseAdapter` for repositories and units of work.
- Sentry integration: simple, configurable initialization of [sentry-sdk](https://github.com/getsentry/sentry-python).
- Prometheus integration: scrape metrics from a built-in HTTP endpoint; zero dependencies on third-party web frameworks.

## Installation

Requires Python 3.11+.

- Base:

```bash
pip install operetta
```

- With AIOHTTP and OpenAPI:

```bash
pip install 'operetta[aiohttp]'
```

- With PostgreSQL via asyncpg:

```bash
pip install 'operetta[asyncpg]'
```

- With PostgreSQL HA via hasql:

```bash
pip install 'operetta[hasql]'
```

- With Sentry:

```bash
pip install 'operetta[sentry]'
```

- With Prometheus:

```bash
pip install 'operetta[prometheus]'
```

## Quickstart (HTTP API)

A minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.

```python
from dataclasses import dataclass, asdict
from aiohttp import web
from operetta.app import Application
from operetta.integrations.aiohttp.annotations import (
    FromBody,
    FromPath,
    FromQuery,
)
from operetta.integrations.aiohttp.response import success_response
from operetta.integrations.aiohttp.service import AIOHTTPService


@dataclass
class CreateUserBody:
    name: str
    email: str


@dataclass
class UserDto:
    id: int
    name: str
    email: str


async def create_user(
    _: web.Request, body: FromBody[CreateUserBody]
) -> web.StreamResponse:
    # ... create a user ...
    user = UserDto(id=1, name=body.name, email=body.email)
    return success_response(asdict(user))


async def get_user(
    _: web.Request,
    user_id: FromPath[int],
    detailed: FromQuery[bool] = False,
) -> UserDto:
    # ... load a user ...
    user = UserDto(id=user_id, name="Alice", email="alice@example.com")
    return user


routes = [
    web.post("/users", create_user),
    web.get("/users/{user_id}", get_user),
]

app = Application(
    AIOHTTPService(
        address="127.0.0.1",
        port=8080,
        routes=routes,
        docs_title="Demo API",
        docs_servers=("http://127.0.0.1:8080",),
        docs_default_type="swagger",  # or "redoc"
    ),
    di_providers=[],  # your dishka providers if needed
    warmup_dependencies=True,
)

if __name__ == "__main__":
    app.run()
```

Short example: raising DDD errors in handlers

```python
from operetta.ddd import NotFoundError, AuthorizationError

async def get_user(_: web.Request, user_id: FromPath[int]) -> User:
    # Example auth check
    if not has_access_to_user(user_id):
        raise AuthorizationError(details=[{"permission": "users:read"}])

    user = await repo.get_user(user_id)
    if user is None:
        raise NotFoundError(details=[{"id": user_id}])

    return user
```

Open the docs at:

- OpenAPI spec: `/static/openapi/openapi.yaml` (static files path is configurable).
- Swagger UI: `/docs/swagger` (and redirect from `/docs`).
- Redoc: `/docs/redoc`.

### How it works under the hood

- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) at app creation time:
  - Wraps your routes by inspecting handler signatures and [`FromBody`/`FromQuery`/`FromPath`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) annotations.
  - Injects parsed values into the handler call.
  - If the return type is not a `StreamResponse`, serializes result into [`SuccessResponse[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/response.py) and returns JSON ([format details](#error-handling-and-response-format)).
  - Builds the OpenAPI spec via [openapify](https://github.com/Fatal1ty/openapify) and serves it as static.
  - Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.
- DI is configured via [dishka integration with AIOHTTP](https://dishka.readthedocs.io/en/stable/integrations/aiohttp.html); the container is created by [`DIService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py) and wired into the app.
  - Each request gets a new DI scope (`REQUEST`) for per-request dependencies.
  - Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to `FromBody/FromQuery/FromPath` via `FromDishka`.

## Quickstart (non-HTTP app)

Operetta is not tied to HTTP. You can write background services/workers on [aiomisc](https://github.com/aiokitchen/aiomisc) and use DI:

```python
import asyncio
import contextlib
from operetta.app import Application
from operetta.service.base import Service

class Worker(Service):
    async def start(self):
        # example: a periodic task
        self._task = asyncio.create_task(self._job())

    async def stop(self, exception: Exception | None = None):
        self._task.cancel()
        with contextlib.suppress(Exception):
            await self._task

    async def _job(self):
        while True:
            # get dependencies if needed:
            # db = await self.get_dependency(PostgresDatabaseAdapter)
            await asyncio.sleep(1)

app = Application(Worker(), warmup_dependencies=True)
app.run()
```

## Services and DI

- Base service class: [`operetta.service.base.Service`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/base.py) (inherits `aiomisc.Service`).
- DI container: created inside `DIService` (see [`operetta/service/di.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py)).
  - Providers are collected from:
    - the `Application` itself (argument `di_providers`),
    - application services implementing `get_di_providers()`.
  - Supports dependency warmup (`warmup=True`) for APP/REQUEST factories.
- Retrieve a dependency from a service via `await service.get_dependency(Type)`.

To load config from YAML, use [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py):

```python
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

config_service = YAMLConfigurationService()  # reads --config path from CLI
app = Application(config_service)
```

Two values are provided to DI: [`ApplicationDictConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/types.py) (raw dict) and a config object (if you provide `config_cls`/`config_factory`).

Custom config class (mashumaro DataClassDictMixin):

```python
from dataclasses import dataclass
from mashumaro import DataClassDictMixin
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

# Define your typed config mapped to YAML structure
@dataclass
class AppConfig(DataClassDictMixin):
    # You can use nested dataclasses as well; here kept minimal
    creds: dict[str, str] | None = None

# Build service that parses YAML into AppConfig using mashumaro
config_service = YAMLConfigurationService(
    config_cls=AppConfig,
    config_factory=AppConfig.from_dict,
)

# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI
app = Application(config_service)
```

## AIOHTTP

A first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.

Highlights:

- Handler parameter annotations: [`FromBody[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromQuery[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromPath[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) (plus DI via `FromDishka`).
- Unified JSON responses out of the box.
- Automatic OpenAPI spec generation and static docs at `/docs` (Swagger or Redoc).

Provided components:
- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) — the main service that wraps routes, handles requests, and serves OpenAPI/docs.
- [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) — registers a config provider into DI.
- [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) — reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).
- [`AIOHTTPServiceRequestProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) — exposes `aiohttp.web.Request` in DI (request scope). Useful when you want to access request data (headers, auth, remote, etc.) inside your own DI providers/factories (e.g., authorization).

Install extra:

```bash
pip install 'operetta[aiohttp]'
```

How to wire it up:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.aiohttp import (
    AIOHTTPService,
    AIOHTTPServiceConfigProvider,
    AIOHTTPServiceRequestProvider,
)

app = Application(
    YAMLConfigurationService(),  # loads --config path and exposes dict to DI
    AIOHTTPService(
        routes=[],
        # You may still override settings here (constructor wins over YAML):
        # port=9090,
        # docs_default_type="redoc",
    ),
    di_providers=[
        AIOHTTPServiceConfigProvider(),
        # Optional: put aiohttp.web.Request into DI (Scope.REQUEST)
        AIOHTTPServiceRequestProvider(),
    ],
)
```

Using `Request` inside a DI provider (example: simple authorization context factory):

```python
from dataclasses import dataclass

from aiohttp import web
from dishka import Provider, Scope, provide


@dataclass(frozen=True)
class AuthContext:
    token: str | None


class AuthProvider(Provider):
    scope = Scope.REQUEST

    @provide
    def get_auth_context(self, request: web.Request) -> AuthContext:
        return AuthContext(token=request.headers.get("Authorization"))
```

### Configuration

You can configure [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) in three complementary ways:

- Via constructor (`__init__`) arguments — explicit values have the highest priority.
- Via YAML file ([`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) + [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py)/[`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) — good for ops-driven setups; overrides defaults but not explicit `__init__` values.
- Via custom DI providers — e.g., environment variables or secrets managers.

Precedence rule:
- `__init__` → DI ([`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) → internal defaults

> [!TIP]
> - [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) is a helper that installs [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) into DI.
> - This provider reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).
> - YAML is not required. You can provide [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) via any DI provider.

YAML keys (all optional) live under the `api:` section:

```yaml
api:
  address: 0.0.0.0         # bind address
  port: 8081               # listen port
  static_endpoint_prefix: /static/
  static_files_root: ./var/static  # where to serve static files and openapi spec
  docs_default_path: /docs
  docs_swagger_path: /docs/swagger
  docs_redoc_path: /docs/redoc
  docs_title: Demo API
  docs_servers:
    - http://127.0.0.1:8081
  docs_default_type: swagger  # swagger | redoc | null (no redirect from /docs)
  docs_remove_path_prefix: /v1/
  # Optional OpenAPI cosmetics
  docs_tag_descriptions:
    users: Operations with users
  docs_tag_groups:
    Management:
      - users
```

Custom config provider example (env-vars):

```python
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.aiohttp.config import AIOHTTPServiceConfig
from operetta.integrations.aiohttp import AIOHTTPService

class EnvAiohttpConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> AIOHTTPServiceConfig:
        return AIOHTTPServiceConfig(
            address=os.getenv("HTTP_ADDRESS", "0.0.0.0"),
            port=int(os.getenv("HTTP_PORT", "8080")),
        )

app = Application(
    AIOHTTPService(routes=[]),
    di_providers=[EnvAiohttpConfigProvider()],
)
```

### Middleware ordering

AIOHTTP middlewares are applied as a stack: the first middleware in the list is the outermost.

Operetta splits middlewares into 3 layers:

- `outer_middlewares` — user middlewares that should be able to modify the response even when an error happens (CORS, security headers, request-id headers, tracing wrappers).
- `system_middlewares` — built-in operetta middlewares (DDD → API errors mapping, generic error handling).
- `inner_middlewares` — user middlewares that must be as close to the handler as possible (specialized request parsing/validation, raw exception mapping, etc.).

Example:

```python
from operetta.integrations.aiohttp import AIOHTTPService

service = AIOHTTPService(
    routes=[...],
    outer_middlewares=[cors_middleware],
    inner_middlewares=[custom_validation_middleware],
)
```


### Error handling and response format

- Successful responses are automatically wrapped into `{ "success": true, "data": ..., "error": null }`.
- Errors use `{ "success": false, "data": null, "error": { message, code, details } }`.
- Standard AIOHTTP errors and domain/application/infrastructure errors (see [`operetta.ddd.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/errors.py)) are mapped by middleware from [`integrations/aiohttp/middlewares.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py).
- Parsing errors for body/params use types from [`integrations/aiohttp/errors.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `InvalidJSONBodyError`, `InvalidQueryParamsError`, `InvalidPathParamsError`, ...).

Recommended way to raise errors in your app

- Import DDD exceptions from a single place:

  ```python
  from operetta.ddd import (
      NotFoundError,
      AlreadyExistsError,
      ConflictError,
      ValidationError,
      AuthenticationError,
      AuthorizationError,
      RelatedResourceNotFoundError,
      DependencyUnavailableError,
  )
  ```

- Raise with optional structured details (a sequence of JSON-serializable objects):

  ```python
  raise NotFoundError(
      details=[{"resource": "User", "id": user_id}]
  )
  ```

HTTP mapping of DDD exceptions (handled by middleware)

| DDD exception                                                                                                     | HTTP status | HTTP error               | code                  |
|-------------------------------------------------------------------------------------------------------------------|-------------|--------------------------|-----------------------|
| AuthenticationError                                                                                               | 401         | UnauthorizedError        | UNAUTHORIZED          |
| AuthorizationError, PermissionDeniedError                                                                         | 403         | ForbiddenError           | FORBIDDEN             |
| NotFoundError                                                                                                     | 404         | ResourceNotFoundError    | RESOURCE_NOT_FOUND    |
| AlreadyExistsError                                                                                                | 409         | DuplicateRequestError    | DUPLICATE_RESOURCE    |
| ConflictError, InvalidOperationError                                                                              | 409         | ConflictError            | CONFLICT              |
| ValidationError, RelatedResourceNotFoundError                                                                     | 422         | UnprocessableEntityError | UNPROCESSABLE_ENTITY  |
| DeadlineExceededError                                                                                             | 504         | GatewayTimeoutError      | GATEWAY_TIMEOUT       |
| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503         | ServiceUnavailableError  | SERVICE_UNAVAILABLE   |
| DependencyFailureError                                                                                            | 502         | BadGatewayError          | BAD_GATEWAY           |
| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback)                                    | 500         | ServerError              | INTERNAL_SERVER_ERROR |

Response envelope reference

- Success:

  ```json
  { "success": true, "data": { "id": 1, "name": "Alice" }, "error": null }
  ```

- Error:

  ```json
  {
    "success": false,
    "data": null,
    "error": {
      "message": "Resource not found",
      "code": "RESOURCE_NOT_FOUND",
      "details": [ { "resource": "User", "id": 123 } ]
    }
  }
  ```

Advanced

- You can throw HTTP-specific errors directly if you need full control over the client response: see [`operetta.integrations.aiohttp.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `ForbiddenError`, `UnauthorizedError`, `UnprocessableEntityError`).
- Two middlewares are installed by default:
  - [`ddd_errors_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) maps DDD exceptions to HTTP errors above.
  - [`unhandled_error_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) catches all other exceptions and returns a generic 500 with a safe message.

## PostgreSQL

Operetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:

- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.

There are two interchangeable backends:
- [asyncpg](https://github.com/MagicStack/asyncpg) — a straightforward single-pool setup.
- [hasql](https://github.com/aiokitchen/hasql) (asyncpg HA) — a high-availability pool manager with balancing/failover.

Both backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=APP (shared pool).
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=REQUEST (per-request/operation handle for transactional work).

Configuration is provided via DI:
- Connection config types: [`AsyncpgPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (for asyncpg) and [`AsyncpgHAPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/config.py) (for asyncpg HA).
- Pool factory kwargs type: [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (to pass `init` or other pool options to the driver/manager).
- Built-in config providers — [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) and [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py) — read settings from `ApplicationDictConfig['postgres']`, which is loaded by [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) from your YAML file.
- A built-in pool kwargs provider returns an empty [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) by default; you can override it to customize connection initialization (see [Advanced setup](#advanced-setup)).

Typical pattern:
- Use [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) when you don't need explicit transaction management: it's suitable for any reads and writes.
- When you need transactional boundaries, get [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py), call `start_transaction()`/`commit_transaction()` (or `rollback_transaction()` on error), and run your operations within that transaction.

Configuration can be loaded from YAML via [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) under the `postgres:` key. Optional connection initialization (e.g., custom codecs or `search_path`) can be provided through [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) in DI; this works for both `asyncpg` and `hasql` variants.

### Single-node PostgreSQL (asyncpg)

Provides:
- Providers: [`AsyncpgPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py), [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py).
- Convenience services to plug into `Application`:
  - [`AsyncpgPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — pool and adapters,
  - [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).

YAML config example:

```yaml
postgres:
  user: app
  password: secret
  database: appdb
  host: 127.0.0.1:5432
  # optional pool params:
  min_size: 5
  max_size: 20
  max_queries: 50000
  max_inactive_connection_lifetime: 300
```

Plug into the app:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.service import (
    AsyncpgPostgresDatabaseConfigProvider,
    AsyncpgPostgresDatabaseService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgPostgresDatabaseService(),
    di_providers=[AsyncpgPostgresDatabaseConfigProvider()],
)
```

Use in a repository:

```python
from dataclasses import dataclass
from operetta.ddd.infrastructure.db.postgres.adapters.interface import (
    PostgresDatabaseAdapter,
    PostgresTransactionDatabaseAdapter,
)

@dataclass
class User:
    id: int
    name: str

class UserRepository:
    def __init__(self, db: PostgresDatabaseAdapter):
        self._db = db

    async def get_by_id(self, user_id: int) -> User | None:
        row = await self._db.fetch_one(
            "SELECT id, name FROM users WHERE id=$1", user_id
        )
        return User(id=row["id"], name=row["name"]) if row else None

class UnitOfWork:
    def __init__(self, tx: PostgresTransactionDatabaseAdapter):
        self._tx = tx

    async def __aenter__(self):
        await self._tx.start_transaction()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if exc:
            await self._tx.rollback_transaction()
        else:
            await self._tx.commit_transaction()
```

### High-availability PostgreSQL cluster (hasql)

If you run an HA cluster (multiple nodes), use the hasql integration.

Provides:
- Providers: [`AsyncpgHAPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py).
- Convenience services to plug into `Application`:
  - [`AsyncpgHAPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — pool and adapters,
  - [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).

YAML config example:

```yaml
postgres:
  user: app
  password: secret
  database: appdb
  hosts:
    - 10.0.0.1:5432
    - 10.0.0.2:5432
  min_masters: 1
  min_replicas: 1
  # optional:
  acquire_timeout: 5
  refresh_delay: 5
  refresh_timeout: 5
  fallback_master: false
  master_as_replica_weight: 1.0
  balancer_policy: greedy  # or round_robin / random_weighted
  stopwatch_window_size: 100
```

Plug into the app:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigProvider,
    AsyncpgHAPostgresDatabaseService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[AsyncpgHAPostgresDatabaseConfigProvider()],
)
```

> [!TIP]
> DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.

### Advanced setup

You can pass an `init` callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):

```python
import json
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigProvider,
    AsyncpgHAPostgresDatabaseService,
)

class AsyncpgJSONCodecProvider(Provider):
    scope = Scope.APP

    @provide(override=True)
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        async def set_custom_codecs(conn):
            await conn.set_type_codec(
                "jsonb",
                encoder=json.dumps,
                decoder=json.loads,
                schema="pg_catalog",
            )
        return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[
        AsyncpgHAPostgresDatabaseConfigProvider(),
        AsyncpgJSONCodecProvider(),
    ],
)
```

> [!IMPORTANT]\
> If you use the built-in [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) or
> [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), they already register a
> default provider for [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py). To customize pool options,
> declare your provider with `@provide(override=True)` so it overrides the
> built-in one; otherwise container validation will fail.\
> When you provide your own [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) and there is an existing
> default provider from those services, `override=True` is mandatory.

Define your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:

```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import (
    AsyncpgPostgresDatabaseConfig,
    AsyncpgPoolFactoryKwargs,
)
from operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService

class EnvAsyncpgConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:
        return AsyncpgPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            host=os.getenv("PGHOST", "127.0.0.1:5432"),
            password=os.getenv("PGPASSWORD"),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}

app = Application(
    AsyncpgPostgresDatabaseService(),
    di_providers=[EnvAsyncpgConfigProvider()],
)
```

Example of an environment-based HA config provider:

```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigurationService,
    AsyncpgHAPostgresDatabaseService,
)


class EnvHasqlConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:
        hosts = os.getenv("PGHOSTS", "10.0.0.1:5432,10.0.0.2:5432").split(",")
        return AsyncpgHAPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            hosts=[h.strip() for h in hosts if h.strip()],
            password=os.getenv("PGPASSWORD"),
            min_masters=int(os.getenv("PG_MIN_MASTERS", "1")),
            min_replicas=int(os.getenv("PG_MIN_REPLICAS", "1")),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}


app = Application(
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[EnvHasqlConfigProvider()],
)
```

## Sentry

A built-in integration that initializes [sentry-sdk](https://github.com/getsentry/sentry-python) with a logging integration for breadcrumbs and error events. It’s optional and configured via DI and/or constructor parameters.

Provided components:
- [`SentryService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/sentry/service.py) — initializes the SDK on start and closes the client on stop.
- [`SentryConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/sentry/service.py) — registers a config provider into DI.
- [`SentryServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/sentry/providers.py) — reads `ApplicationDictConfig['sentry']` and decodes it into [`SentryServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/sentry/config.py).

Install extra:

```bash
pip install 'operetta[sentry]'
```

How to wire it up:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.sentry import (
    SentryService,
    SentryServiceConfigProvider,
)

app = Application(
    YAMLConfigurationService(),          # optional: load YAML into DI
    SentryService(
        # You can override any config here; constructor wins over DI
        # send_default_pii=False,
        # debug=False,
    ),
    di_providers=[SentryServiceConfigProvider()]
)
```

### Configuration

You can configure [`SentryService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/sentry/service.py) in three complementary ways:

- Constructor (`SentryService.__init__`) — highest priority.
- DI (`SentryServiceConfig` resolved from provider) — overrides defaults.
- Internal defaults — used if neither of the above specify a value.

YAML keys (all optional) live under the `sentry:` section:

```yaml
sentry:
  dsn: https://public@o0.ingest.sentry.io/0
  enabled: true

  # Logging integration
  capture_log_level: ERROR      # event level (string or int)
  context_log_level: INFO       # breadcrumbs level (string or int)
  ignore_loggers:               # loggers to exclude from breadcrumbs/events
    - aiohttp.access

  # Core SDK options
  environment: production
  release: 1.2.3
  server_name: api-01

  include_local_variables: true
  max_breadcrumbs: 100
  shutdown_timeout: 2.0

  # Sampling
  sample_rate: 1.0              # error event sampling
  traces_sample_rate: 0.2       # performance tracing sampling

  # Error filtering and in-app
  ignore_errors:
    - TimeoutError
  in_app_include:
    - myapp
  in_app_exclude:
    - aiohttp

  # Privacy / debug
  send_default_pii: false
  debug: false

  # HTTP body and propagation
  max_request_body_size: medium
  trace_propagation_targets:
    - .*

  # Transport tweaks
  keep_alive: false

  # Anything else passed to sentry_sdk.init (overrides same-named keys above)
  extra_options:
    profiles_sample_rate: 0.1
```

Custom config provider example (env-vars):

```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.sentry.config import SentryServiceConfig
from operetta.integrations.sentry import SentryService

class EnvSentryConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> SentryServiceConfig:
        return SentryServiceConfig(
            dsn=os.getenv("SENTRY_DSN"),
            enabled=os.getenv("SENTRY_ENABLED", "true").lower() == "true",
        )

app = Application(
    SentryService(),
    di_providers=[EnvSentryConfigProvider()],
)
```

### Behavior and notes

If `enabled: false` or no `dsn` is provided, Sentry is skipped (a message is logged).

All other parameters rely on the defaults defined by `sentry-sdk` itself. Operetta does not override those internal defaults: if you do not set a field in `SentryServiceConfig` and do not provide it via `extra_options`, the behavior is identical to calling `sentry_sdk.init` without that argument. See the official documentation for the full list of options and their default values:
https://docs.sentry.io/platforms/python/configuration/options/

The `extra_options` parameter lets you supply any additional keys for `sentry_sdk.init` that do not have a dedicated field in `SentryServiceConfig`. These keys are merged last (overriding same-named ones) into the final options dict passed to the SDK.

## Prometheus

Expose Prometheus metrics over HTTP with zero dependencies on third-party web frameworks. The service uses Python's standard library (asyncio.start_server) to serve metrics efficiently in asyncio environments.

Provided components:
- [`PrometheusService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/prometheus/service.py) — a tiny asyncio HTTP server exposing metrics on a configured endpoint.
- [`PrometheusConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/prometheus/service.py) — registers a config provider into DI.
- [`PrometheusServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/prometheus/providers.py) — reads `ApplicationDictConfig['prometheus']` and decodes it into [`PrometheusServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/prometheus/config.py).

Install extra:

```bash
pip install 'operetta[prometheus]'
```

How to wire it up:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.prometheus import (
    PrometheusService,
    PrometheusServiceConfigProvider,
)

app = Application(
    YAMLConfigurationService(),
    PrometheusService(),  # constructor args override YAML
    di_providers=[PrometheusServiceConfigProvider()],
)
```

### Configuration

You can configure [`PrometheusService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/prometheus/service.py) in three complementary ways:

- Constructor (`PrometheusService.__init__`) — highest priority.
- DI (`PrometheusServiceConfig` resolved from provider) — overrides defaults.
- Internal defaults — used if neither of the above specify a value.

YAML keys (all optional) live under the `prometheus:` section:

```yaml
prometheus:
  address: 0.0.0.0
  port: 9000
  endpoint: /metrics
  enabled: true
```

Custom config provider example (env-vars):

```python
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.prometheus.config import PrometheusServiceConfig
from operetta.integrations.prometheus import PrometheusService

class EnvPromConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> PrometheusServiceConfig:
        return PrometheusServiceConfig(
            address=os.getenv("PROM_ADDRESS", "0.0.0.0"),
            port=int(os.getenv("PROM_PORT", "9000")),
            endpoint=os.getenv("PROM_ENDPOINT", "/metrics"),
            enabled=os.getenv("PROM_ENABLED", "true").lower() == "true",
        )

app = Application(
    PrometheusService(),
    di_providers=[EnvPromConfigProvider()],
)
```

### Usage

- By default, the global Prometheus registry is used. If you want a custom registry, provide one via DI:

```python
from operetta import Application
from operetta.integrations.prometheus import PrometheusService
from dishka import Provider, Scope, provide
from prometheus_client import CollectorRegistry

class PromRegistryProvider(Provider):
    scope = Scope.APP

    @provide
    def get_registry(self) -> CollectorRegistry:
        return CollectorRegistry()

app = Application(
    PrometheusService(),
    di_providers=[PromRegistryProvider()],
)
```

- Create and register metrics as usual using prometheus-client; they will be collected from the registry the service uses (global by default, or your DI-provided one):

```python
from prometheus_client import Counter, REGISTRY

# Using default (global) registry
REQUESTS = Counter('http_requests_total', 'Count of HTTP requests')
REQUESTS.inc()

# If you provided a custom registry, pass it explicitly when creating metrics:
from prometheus_client import Counter, CollectorRegistry
registry = CollectorRegistry()
CUSTOM_COUNTER = Counter('my_counter', 'Help', registry=registry)
CUSTOM_COUNTER.inc()
```

- HTTP details:
  - Methods: GET and HEAD.
  - Path match ignores query string (e.g., `/metrics?format=...`).
  - Content-Type is set to Prometheus TEXT format.
  - The server is a tiny asyncio server based on the standard library.

