# RowQuery Documentation

> SQL-first query execution and projection engine for Python. Supports SQLite, PostgreSQL, MySQL, and Oracle via adapter pattern. Requires Python >=3.10.

## What Is RowQuery

RowQuery is a Python library for executing raw SQL queries against multiple database backends with structured result mapping. It is currently in alpha (v0.1.2) and requires Python >=3.10.

Key features:

- **SQL-first**: You write real SQL. RowQuery executes it and maps results to Python objects.
- **Multi-backend**: SQLite, PostgreSQL, MySQL, and Oracle are supported through an adapter pattern.
- **Sync and async**: Every operation has both synchronous and asynchronous variants.
- **Adapter pattern**: Each database backend is implemented as a separate adapter conforming to a runtime-checkable protocol.
- **Result mapping**: Map flat query rows to dataclasses, Pydantic models, or complex aggregate object graphs.
- **SQL file registry**: Organize SQL queries in `.sql` files and reference them by dot-separated namespace keys.

RowQuery's core dependency is Pydantic >=2.0.0. Database-specific drivers are optional extras installed per backend.

## What RowQuery Is Not

- **Not an ORM.** RowQuery does not generate SQL from Python model definitions. You write all SQL yourself.
- **Not a schema generator.** RowQuery does not create database tables from class definitions. Use migrations or external tools for DDL.
- **Not a migration-only tool.** While RowQuery includes a `MigrationManager`, it is a convenience feature, not the library's primary purpose.
- **Not a connection pooling library.** RowQuery delegates pooling to the underlying database drivers (psycopg, aiomysql, oracledb). It manages connection lifecycle but does not implement its own pool.
- **Not a query builder with DSL syntax.** There is no fluent API for constructing SQL. You pass SQL strings directly or load them from `.sql` files via `SQLRegistry`.
- **Not a data validation framework.** RowQuery maps query results to objects but does not validate business rules or enforce constraints beyond what the database itself provides.

## Installation

### Core Installation

Install RowQuery with no database-specific drivers:

```python
# Using uv (recommended)
# uv add row-query

# Using pip
# pip install row-query
```

The core installation includes only the `pydantic` dependency and the SQLite sync adapter (which uses Python's built-in `sqlite3` module).

### Optional Database Backends

Install extras for specific database backends:

```python
# PostgreSQL (psycopg v3 — handles both sync and async)
# uv sync --extra postgres

# MySQL (mysql-connector-python for sync, aiomysql for async)
# uv sync --extra mysql

# Oracle (oracledb — handles both sync and async)
# uv sync --extra oracle

# SQLite async support (aiosqlite)
# uv sync --extra sqlite

# All backends at once
# uv sync --extra all
```

## Quick Start

### Sync Example

A complete example using Engine with SQLite in-memory database:

```python
from row_query import ConnectionConfig, Engine, SQLRegistry

# 1. Configure connection
config = ConnectionConfig(driver="sqlite", database=":memory:")

# 2. Create a SQL registry (optional — you can also use inline SQL)
# For this example, we use inline SQL directly.
import tempfile, os
sql_dir = tempfile.mkdtemp()
# Write a SQL file: sql_dir/users/list.sql
os.makedirs(os.path.join(sql_dir, "users"))
with open(os.path.join(sql_dir, "users", "list.sql"), "w") as f:
    f.write("SELECT id, name FROM users")

registry = SQLRegistry(sql_dir)

# 3. Create engine
engine = Engine.from_config(config, registry)

# 4. Set up a table and insert data using inline SQL
engine.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
engine.execute("INSERT INTO users (id, name) VALUES (:id, :name)", {"id": 1, "name": "Alice"})
engine.execute("INSERT INTO users (id, name) VALUES (:id, :name)", {"id": 2, "name": "Bob"})

# 5. Query using a registry key
all_users = engine.fetch_all("users.list")
print(all_users)
# [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]

# 6. Query using inline SQL
one_user = engine.fetch_one("SELECT id, name FROM users WHERE id = :id", {"id": 1})
print(one_user)
# {'id': 1, 'name': 'Alice'}

# 7. Scalar query
count = engine.fetch_scalar("SELECT COUNT(*) FROM users")
print(count)
# 2
```

### Async Example

A complete async example using AsyncEngine with SQLite and aiosqlite:

```python
import asyncio
from row_query import ConnectionConfig, AsyncEngine, SQLRegistry

async def main():
    config = ConnectionConfig(driver="sqlite", database=":memory:")

    import tempfile
    sql_dir = tempfile.mkdtemp()
    registry = SQLRegistry(sql_dir)

    engine = AsyncEngine.from_config(config, registry)

    await engine.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    await engine.execute(
        "INSERT INTO users (id, name) VALUES (:id, :name)",
        {"id": 1, "name": "Alice"},
    )

    users = await engine.fetch_all("SELECT id, name FROM users")
    print(users)
    # [{'id': 1, 'name': 'Alice'}]

    user = await engine.fetch_one("SELECT id, name FROM users WHERE id = :id", {"id": 1})
    print(user)
    # {'id': 1, 'name': 'Alice'}

    count = await engine.fetch_scalar("SELECT COUNT(*) FROM users")
    print(count)
    # 1

asyncio.run(main())
```

Async SQLite requires the `aiosqlite` extra: `uv sync --extra sqlite`.

## Core Concepts

### Engine and AsyncEngine

Engine is the primary interface for executing SQL queries. AsyncEngine is its async counterpart.

**Creating an Engine:**

```python
from row_query import ConnectionConfig, ConnectionManager, Engine, SQLRegistry

config = ConnectionConfig(driver="sqlite", database=":memory:")
registry = SQLRegistry("/path/to/sql/files")

# Option 1: Factory method (recommended)
engine = Engine.from_config(config, registry)

# Option 2: Manual construction
connection_manager = ConnectionManager(config)
engine = Engine(connection_manager, registry)
```

**Creating an AsyncEngine:**

```python
from row_query import ConnectionConfig, AsyncEngine, SQLRegistry

config = ConnectionConfig(driver="sqlite", database=":memory:")
registry = SQLRegistry("/path/to/sql/files")
engine = AsyncEngine.from_config(config, registry)
```

**Methods:**

| Method | Signature | Returns | Description |
|--------|-----------|---------|-------------|
| `fetch_one` | `(query, params=None, *, mapper=None)` | `dict`, mapped object, or `None` | Fetch a single row. Raises `MultipleRowsError` if >1 row returned. |
| `fetch_all` | `(query, params=None, *, mapper=None)` | `list[dict]` or `list[T]` | Fetch all rows. |
| `fetch_scalar` | `(query, params=None)` | `Any` or `None` | Fetch first column of first row. |
| `execute` | `(query, params=None)` | `int` | Execute a write query. Returns affected row count. |
| `transaction` | `()` | `TransactionManager` | Returns a context manager for atomic operations. |

AsyncEngine has identical methods, all `async`.

**Query resolution:** The `query` parameter can be either:
- A **registry key** (no whitespace): `"users.get_by_id"` — resolved via SQLRegistry.
- **Inline SQL** (contains whitespace): `"SELECT * FROM users WHERE id = :id"` — used directly.

**Parameter passing:** The `params` parameter accepts:
- `dict` — Named parameters: `{"id": 1, "name": "Alice"}`
- `tuple` or `list` — Positional parameters: `(1, "Alice")`
- Scalar value — Wrapped as `(value,)` automatically
- `None` — No parameters (default)

RowQuery automatically converts `:name` parameter syntax to the format required by each backend (`%(name)s` for PostgreSQL and MySQL, `:name` for SQLite and Oracle).

**Optional SQLSanitizer:**

```python
from row_query import SQLSanitizer, Engine

sanitizer = SQLSanitizer(
    strip_comments=True,
    block_multiple_statements=True,
    allowed_verbs=frozenset({"SELECT"}),
)
engine = Engine.from_config(config, registry, sanitizer=sanitizer)
```

When a sanitizer is provided, it is applied only to inline SQL. Queries loaded from SQLRegistry are always trusted and bypass sanitization.

### ConnectionConfig

ConnectionConfig is a Pydantic BaseModel that holds all database connection settings.

```python
from row_query import ConnectionConfig

# SQLite (minimal)
sqlite_config = ConnectionConfig(driver="sqlite", database=":memory:")

# SQLite with file path
sqlite_file_config = ConnectionConfig(driver="sqlite", database="/path/to/mydb.sqlite")
```

**Fields:**

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `driver` | `str` | (required) | Backend identifier: `"sqlite"`, `"postgresql"`, `"mysql"`, `"oracle"` |
| `host` | `str \| None` | `None` | Database server hostname |
| `port` | `int \| None` | `None` | Database server port |
| `user` | `str \| None` | `None` | Authentication username |
| `password` | `str \| None` | `None` | Authentication password |
| `database` | `str` | (required) | Database name or path (`:memory:` for SQLite in-memory) |
| `pool_size` | `int` | `5` | Maximum connections in pool |
| `pool_timeout` | `int` | `30` | Seconds to wait for a connection from pool |
| `pool_recycle` | `int` | `1800` | Seconds before recycling idle connections |
| `extra` | `dict[str, Any]` | `{}` | Additional driver-specific parameters |

See the Database Backends section for backend-specific ConnectionConfig examples.

### SQLRegistry

SQLRegistry loads `.sql` files from a directory tree and makes them available by dot-separated namespace keys.

```python
from row_query import SQLRegistry

# Given this file structure:
#   sql/
#     users/
#       get_by_id.sql     -> "users.get_by_id"
#       list_active.sql   -> "users.list_active"
#     orders/
#       create.sql        -> "orders.create"

registry = SQLRegistry("sql")

# Retrieve SQL text by key
sql = registry.get("users.get_by_id")

# Check if a key exists
exists = registry.has("users.get_by_id")  # True

# List all registered keys (sorted)
keys = registry.query_names  # ["orders.create", "users.get_by_id", "users.list_active"]

# Count registered queries
count = len(registry)  # 3
```

**Namespace mapping:** The file path relative to the root directory is converted to a dot-separated key by replacing path separators with dots and stripping the `.sql` extension.

**Errors:**
- `QueryNotFoundError` — raised by `get()` when the key does not exist.
- `DuplicateQueryError` — raised during construction if two files produce the same key.

### SQLSanitizer

SQLSanitizer provides defense-in-depth input validation for inline SQL queries.

```python
from row_query import SQLSanitizer

# Default: strip comments and block multiple statements
sanitizer = SQLSanitizer()

# Read-only enforcement: only allow SELECT queries
sanitizer = SQLSanitizer(allowed_verbs=frozenset({"SELECT"}))

# Custom configuration
sanitizer = SQLSanitizer(
    strip_comments=True,
    block_multiple_statements=True,
    allowed_verbs=frozenset({"SELECT", "INSERT", "UPDATE"}),
)

# Use with Engine
clean_sql = sanitizer.sanitize("SELECT * FROM users -- get all")
# Returns: "SELECT * FROM users"
```

**Fields:**

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `strip_comments` | `bool` | `True` | Remove `--` line comments and `/* */` block comments |
| `block_multiple_statements` | `bool` | `True` | Reject SQL containing `;` followed by non-whitespace |
| `allowed_verbs` | `frozenset[str] \| None` | `None` | If set, only allow SQL starting with these keywords |

**Important:** SQLSanitizer is defense-in-depth only. It does not protect against SQL injection from string concatenation. Always use parameterized queries (`:name` parameters) for user-provided values.

SQLSanitizer is only applied to inline SQL. Queries loaded from SQLRegistry are trusted and never sanitized.

Raises `SQLSanitizationError` if any enabled check fails.

## Database Backends

### SQLite

SQLite uses Python's built-in `sqlite3` module for sync operations and `aiosqlite` for async.

**Dependencies:**
- Sync: No extra dependencies (stdlib `sqlite3`)
- Async: `uv sync --extra sqlite` (installs `aiosqlite`)

**ConnectionConfig:**

```python
from row_query import ConnectionConfig

# In-memory database
config = ConnectionConfig(driver="sqlite", database=":memory:")

# File-based database
config = ConnectionConfig(driver="sqlite", database="/path/to/mydb.sqlite")
```

**Parameter style:** `:name` (named parameters). SQL uses `:name` syntax directly.

**Adapter details:** Connections use WAL journal mode and `sqlite3.Row` row factory for dict-like access.

### PostgreSQL

PostgreSQL uses psycopg v3 for both sync and async operations.

**Dependencies:** `uv sync --extra postgres` (installs `psycopg[binary] >=3.1.0`)

**ConnectionConfig:**

```python
from row_query import ConnectionConfig

config = ConnectionConfig(
    driver="postgresql",
    host="localhost",
    port=5432,
    user="myuser",
    password="mypassword",
    database="mydb",
)
```

**Parameter style:** `pyformat`. RowQuery automatically converts `:name` in your SQL to `%(name)s` before execution.

**Adapter details:** Uses `psycopg.rows.dict_row` row factory. Connection string is built from ConnectionConfig fields.

### MySQL

MySQL uses `mysql-connector-python` for sync operations and `aiomysql` for async.

**Dependencies:** `uv sync --extra mysql` (installs `mysql-connector-python >=8.0.0` and `aiomysql >=0.2.0`)

**ConnectionConfig:**

```python
from row_query import ConnectionConfig

config = ConnectionConfig(
    driver="mysql",
    host="localhost",
    port=3306,
    user="myuser",
    password="mypassword",
    database="mydb",
)
```

**Parameter style:** `pyformat`. RowQuery automatically converts `:name` in your SQL to `%(name)s` before execution.

**Adapter details:** Sync adapter uses `cursor(dictionary=True)`. Async adapter uses `aiomysql.DictCursor`.

### Oracle

Oracle uses `oracledb` for both sync and async operations.

**Dependencies:** `uv sync --extra oracle` (installs `oracledb >=2.0.0`)

**ConnectionConfig:**

```python
from row_query import ConnectionConfig

config = ConnectionConfig(
    driver="oracle",
    host="localhost",
    port=1521,
    user="myuser",
    password="mypassword",
    database="mydb",
)
```

**Parameter style:** `named`. SQL uses `:name` syntax directly (same as SQLite).

**Adapter details:** DSN format is `host:port/database`. Uses a custom row factory to convert tuples to dicts with lowercased column names.

## Result Mapping

### ModelMapper

ModelMapper maps a single query row (dict) to a Python object. It supports Pydantic models, dataclasses, and plain classes.

```python
from dataclasses import dataclass
from row_query import ConnectionConfig, Engine, SQLRegistry, ModelMapper

@dataclass
class User:
    id: int
    name: str

config = ConnectionConfig(driver="sqlite", database=":memory:")

import tempfile
registry = SQLRegistry(tempfile.mkdtemp())
engine = Engine.from_config(config, registry)

engine.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
engine.execute("INSERT INTO users (id, name) VALUES (:id, :name)", {"id": 1, "name": "Alice"})

# Create a mapper
mapper = ModelMapper(User)

# Use with fetch_one
user = engine.fetch_one("SELECT id, name FROM users WHERE id = :id", {"id": 1}, mapper=mapper)
print(user)       # User(id=1, name='Alice')
print(user.name)  # 'Alice'

# Use with fetch_all
users = engine.fetch_all("SELECT id, name FROM users", mapper=mapper)
print(users)  # [User(id=1, name='Alice')]
```

**Constructor:**

```python
ModelMapper(target_class: type[T], aliases: dict[str, str] | None = None)
```

- `target_class` — The class to instantiate. Detection order: Pydantic model → dataclass → plain class.
- `aliases` — Optional column-to-field name mapping: `{"column_name": "field_name"}`.

**Aliases example:**

```python
@dataclass
class UserProfile:
    user_id: int
    full_name: str

mapper = ModelMapper(UserProfile, aliases={"id": "user_id", "name": "full_name"})
user = engine.fetch_one("SELECT id, name FROM users WHERE id = :id", {"id": 1}, mapper=mapper)
print(user)  # UserProfile(user_id=1, full_name='Alice')
```

Raises `ColumnMismatchError` if the row columns do not match the target class fields.

### Aggregate Mapping

Aggregate mapping reconstructs complex object graphs (parent with collections, references, and value objects) from flat SQL JOIN result sets using a single-pass O(n) algorithm.

#### Building an AggregatePlan

Use the `aggregate()` function and fluent builder to define the mapping plan:

```python
from dataclasses import dataclass, field
from row_query.mapping.builder import aggregate

@dataclass
class OrderItem:
    id: int
    product: str
    quantity: int

@dataclass
class Customer:
    id: int
    name: str

@dataclass
class Address:
    street: str
    city: str

@dataclass
class Order:
    id: int
    total: float
    items: list[OrderItem] = field(default_factory=list)
    customer: Customer | None = None
    shipping: Address | None = None

plan = (
    aggregate(Order, prefix="order__")
    .key("id")
    .auto_fields()
    .collection("items", OrderItem, prefix="item__", key="id")
    .reference("customer", Customer, prefix="customer__", key="id")
    .value_object("shipping", Address, prefix="addr__")
    .build()
)
```

**Builder methods:**

| Method | Description |
|--------|-------------|
| `aggregate(root_class, prefix=None)` | Entry point. `prefix` defaults to `lowercase(class_name) + "__"`. |
| `.key(field_name)` | **Required.** Sets the identity key field for the root entity. |
| `.auto_fields()` | Auto-maps all fields of the root class (excluding collections, references, value objects). |
| `.field(attr_name, column_name=None)` | Explicitly map a single field. `column_name` defaults to `attr_name`. |
| `.collection(name, entity_class, prefix, key)` | Define a child collection (one-to-many). |
| `.reference(name, entity_class, prefix, key=None)` | Define a reference (many-to-one). `key` defaults to `"id"`. |
| `.value_object(name, vo_class, prefix)` | Define an embedded value object. |
| `.strict(enabled=True)` | Enable strict mode: validates all mapped columns exist in result set. |
| `.build()` | Compile and return an `AggregatePlan`. Raises `PlanCompilationError` if `.key()` was not called or prefixes collide. |

#### AggregateMapper

AggregateMapper consumes the plan and reconstructs objects from flat rows:

```python
from row_query.mapping.aggregate import AggregateMapper

mapper = AggregateMapper(plan)

# Use with engine.fetch_all — AggregateMapper requires multiple rows
orders = engine.fetch_all(
    """
    SELECT
        o.id AS order__id,
        o.total AS order__total,
        i.id AS item__id,
        i.product AS item__product,
        i.quantity AS item__quantity,
        c.id AS customer__id,
        c.name AS customer__name,
        a.street AS addr__street,
        a.city AS addr__city
    FROM orders o
    LEFT JOIN order_items i ON o.id = i.order_id
    LEFT JOIN customers c ON o.customer_id = c.id
    LEFT JOIN addresses a ON o.shipping_address_id = a.id
    """,
    mapper=mapper,
)
# Returns: list[Order] with nested items, customer, and shipping populated
```

**Behavior:**
- Deduplicates root entities by key field (e.g., one Order per unique `order__id`).
- Deduplicates collection children by their key field.
- References and value objects are set once per root (first non-NULL occurrence).
- Rows with NULL root key are silently skipped.
- `map_one()` raises `NotImplementedError` — use `map_many()` only.
- In strict mode, raises `StrictModeViolation` if columns are missing or unknown prefix groups appear.

## Transactions

### Sync Transactions

Use `engine.transaction()` to execute multiple statements atomically:

```python
from row_query import ConnectionConfig, Engine, SQLRegistry

config = ConnectionConfig(driver="sqlite", database=":memory:")

import tempfile
registry = SQLRegistry(tempfile.mkdtemp())
engine = Engine.from_config(config, registry)

engine.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance REAL)")
engine.execute("INSERT INTO accounts (id, balance) VALUES (:id, :balance)", {"id": 1, "balance": 100.0})
engine.execute("INSERT INTO accounts (id, balance) VALUES (:id, :balance)", {"id": 2, "balance": 50.0})

# Transfer funds atomically
with engine.transaction() as tx:
    tx.execute(
        "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
        {"amount": 25.0, "id": 1},
    )
    tx.execute(
        "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
        {"amount": 25.0, "id": 2},
    )
    # Auto-commits on successful exit from the `with` block

# If an exception occurs inside the `with` block, the transaction auto-rolls back.
```

**Available methods within a transaction:**

| Method | Signature | Returns |
|--------|-----------|---------|
| `execute` | `(query, params=None)` | `int` (affected row count) |
| `fetch_one` | `(query, params=None)` | `dict[str, Any] \| None` |
| `fetch_all` | `(query, params=None)` | `list[dict[str, Any]]` |
| `commit` | `()` | `None` |
| `rollback` | `()` | `None` |

**Auto-commit/rollback behavior:**
- On normal exit from the `with` block: transaction commits automatically.
- On exception: transaction rolls back automatically.
- You can call `commit()` or `rollback()` explicitly within the block.

Raises `TransactionStateError` if you call `commit()` on an already-committed or rolled-back transaction, or `rollback()` on an already-committed transaction.

### Async Transactions

Use `async with engine.transaction()` for async atomic operations:

```python
import asyncio
from row_query import ConnectionConfig, AsyncEngine, SQLRegistry

async def main():
    config = ConnectionConfig(driver="sqlite", database=":memory:")

    import tempfile
    registry = SQLRegistry(tempfile.mkdtemp())
    engine = AsyncEngine.from_config(config, registry)

    await engine.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance REAL)")
    await engine.execute("INSERT INTO accounts (id, balance) VALUES (:id, :balance)", {"id": 1, "balance": 100.0})
    await engine.execute("INSERT INTO accounts (id, balance) VALUES (:id, :balance)", {"id": 2, "balance": 50.0})

    async with engine.transaction() as tx:
        await tx.execute(
            "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
            {"amount": 25.0, "id": 1},
        )
        await tx.execute(
            "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
            {"amount": 25.0, "id": 2},
        )
        # Auto-commits on successful exit

asyncio.run(main())
```

Async transactions have the same methods as sync transactions, all `async`. Auto-commit and auto-rollback behavior is identical.

## Migrations

MigrationManager provides SQL-based schema migrations with version tracking.

```python
from pathlib import Path
from row_query import ConnectionConfig, ConnectionManager, MigrationManager

config = ConnectionConfig(driver="sqlite", database=":memory:")
connection_manager = ConnectionManager(config)

# Migration files must follow the naming convention: NNN_description.sql
# Example directory:
#   migrations/
#     001_create_users.sql
#     002_add_email_column.sql
#     003_create_orders.sql

manager = MigrationManager(
    migration_dir=Path("migrations"),
    connection_manager=connection_manager,
)

# Discover all migrations and their applied status
all_migrations = manager.discover()
# Returns: [MigrationInfo(version='001', description='create_users', file_path=..., applied=False), ...]

# Check pending (unapplied) migrations
pending = manager.pending()

# Apply all pending migrations in order
applied = manager.apply()
# Returns list of successfully applied MigrationInfo objects
# Stops at first failure and raises MigrationExecutionError

# Check which migrations have been applied
already_applied = manager.applied()

# Get the current migration version
version = manager.current_version()  # "003" or None if no migrations applied
```

**MigrationInfo fields:**

| Field | Type | Description |
|-------|------|-------------|
| `version` | `str` | Version identifier from filename (e.g., `"001"`) |
| `description` | `str` | Description from filename (e.g., `"create_users"`) |
| `file_path` | `Path` | Absolute path to the `.sql` file |
| `applied` | `bool` | Whether this migration has been applied (default: `False`) |

**File naming convention:** `NNN_description.sql` where `NNN` is a zero-padded numeric version and `description` uses underscores. Files not matching this pattern raise `MigrationFileError`.

**Tracking:** MigrationManager creates a `schema_migrations` table to track applied versions.

**Note:** The current implementation uses SQLite-specific SQL syntax for the tracking table. Multi-backend migration support may require adaptation.

## Repository Pattern

Repository and AsyncRepository provide DDD-style base classes for organizing database access around domain entities.

```python
from dataclasses import dataclass, field
from row_query import ConnectionConfig, Engine, SQLRegistry, ModelMapper
from row_query.repository.base import Repository
from row_query.mapping.builder import aggregate
from row_query.mapping.aggregate import AggregateMapper

@dataclass
class User:
    id: int
    name: str
    email: str

class UserRepository(Repository[User]):
    def find_by_id(self, user_id: int) -> User | None:
        return self.engine.fetch_one(
            "SELECT id, name, email FROM users WHERE id = :id",
            {"id": user_id},
            mapper=self.mapper,
        )

    def find_all(self) -> list[User]:
        return self.engine.fetch_all(
            "SELECT id, name, email FROM users",
            mapper=self.mapper,
        )

    def save(self, user: User) -> int:
        return self.engine.execute(
            "INSERT INTO users (id, name, email) VALUES (:id, :name, :email)",
            {"id": user.id, "name": user.name, "email": user.email},
        )

# Usage
config = ConnectionConfig(driver="sqlite", database=":memory:")

import tempfile
registry = SQLRegistry(tempfile.mkdtemp())
engine = Engine.from_config(config, registry)

mapper = ModelMapper(User)
repo = UserRepository(engine=engine, mapper=mapper)
```

**Constructor parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `engine` | `Engine` or `AsyncEngine` | The engine to use for queries |
| `mapping` | `AggregatePlan \| None` | If provided, wraps it in an `AggregateMapper` automatically |
| `mapper` | `Mapper[T] \| None` | Provide a mapper directly (takes priority over `mapping`) |

**Public attributes:**
- `self.engine` — The engine instance.
- `self.mapper` — The mapper instance (or `None`).

**With aggregate mapping:**

```python
plan = (
    aggregate(Order, prefix="order__")
    .key("id")
    .auto_fields()
    .collection("items", OrderItem, prefix="item__", key="id")
    .build()
)

# Pass the plan directly — Repository wraps it in AggregateMapper
repo = OrderRepository(engine=engine, mapping=plan)
```

AsyncRepository has the same constructor and attributes. Subclass methods should use `await self.engine.fetch_one(...)`, etc.

## Exception Reference

All exceptions inherit from `RowQueryError`. The full hierarchy:

```text
RowQueryError
├── RegistryError
│   ├── QueryNotFoundError
│   └── DuplicateQueryError
├── ExecutionError
│   ├── MultipleRowsError
│   ├── ParameterBindingError
│   └── SQLSanitizationError
├── MappingError
│   ├── ColumnMismatchError
│   ├── StrictModeViolation
│   └── PlanCompilationError
├── TransactionError
│   └── TransactionStateError
├── MigrationError
│   ├── MigrationFileError
│   └── MigrationExecutionError
└── AdapterError
    ├── ConnectionError
    └── PoolError
```

**Detailed reference:**

| Exception | Parent | When Raised | Notable Attributes |
|-----------|--------|-------------|-------------------|
| `RowQueryError` | `Exception` | Base class for all RowQuery exceptions | — |
| `RegistryError` | `RowQueryError` | Base for SQLRegistry errors | — |
| `QueryNotFoundError` | `RegistryError` | `SQLRegistry.get()` called with nonexistent key | `query_name: str` |
| `DuplicateQueryError` | `RegistryError` | Two SQL files produce the same registry key | `query_name: str` |
| `ExecutionError` | `RowQueryError` | Base for query execution errors | — |
| `MultipleRowsError` | `ExecutionError` | `fetch_one()` returns more than one row | `query_name: str`, `row_count: int` |
| `ParameterBindingError` | `ExecutionError` | Parameter binding fails during query execution | `query_name: str` |
| `SQLSanitizationError` | `ExecutionError` | SQLSanitizer rejects inline SQL | — |
| `MappingError` | `RowQueryError` | Base for result mapping errors | — |
| `ColumnMismatchError` | `MappingError` | Row columns don't match target class fields | `target_class: str`, `missing_fields: list[str]` |
| `StrictModeViolation` | `MappingError` | Strict mode: columns missing or unknown prefix groups | — |
| `PlanCompilationError` | `MappingError` | `AggregateMappingBuilder.build()` fails (no key set, prefix collision) | — |
| `TransactionError` | `RowQueryError` | Base for transaction errors | — |
| `TransactionStateError` | `TransactionError` | Operation on committed/rolled-back transaction | `current_state: str`, `attempted_action: str` |
| `MigrationError` | `RowQueryError` | Base for migration errors | — |
| `MigrationFileError` | `MigrationError` | Migration file has invalid name format | `file_name: str` |
| `MigrationExecutionError` | `MigrationError` | Migration SQL execution fails | `version: str` |
| `AdapterError` | `RowQueryError` | Base for adapter/connection errors | — |
| `ConnectionError` | `AdapterError` | Database connection cannot be established | — |
| `PoolError` | `AdapterError` | Connection pool error (exhausted, timeout) | — |

## Terminology Glossary

| Term | Definition |
|------|-----------|
| **Engine** | The primary interface for executing SQL queries. Created via `Engine.from_config()` or `AsyncEngine.from_config()`. |
| **adapter** | A database-specific implementation conforming to `SyncAdapter` or `AsyncAdapter` protocol. Each backend (SQLite, PostgreSQL, MySQL, Oracle) has its own adapter. |
| **ConnectionConfig** | A Pydantic model holding all database connection parameters: `driver`, `host`, `port`, credentials, `database`, and pool configuration. |
| **ConnectionManager** | Manages the lifecycle of database connections — pool creation, connection acquisition and release, and pool shutdown. `AsyncConnectionManager` is its async counterpart. |
| **SQLRegistry** | Loads `.sql` files from a directory tree and makes them available by dot-separated namespace keys (e.g., `"users.get_by_id"`). |
| **SQLSanitizer** | An optional defense-in-depth guard applied to inline SQL queries. Can strip comments, block multiple statements, and restrict SQL verbs. |
| **mapper** | An object implementing the `Mapper[T]` protocol (`map_one`, `map_many`) that transforms raw dict rows into typed Python objects. |
| **ModelMapper** | A mapper that converts a single row dict into a Pydantic model, dataclass, or plain class instance. |
| **AggregateMapper** | A mapper that reconstructs complex object graphs (with collections, references, and value objects) from flat JOIN result sets using single-pass O(n) processing. |
| **AggregatePlan** | A frozen dataclass describing how to map flat rows to an aggregate root with its nested entities. Built via the `aggregate()` DSL. |
| **aggregate()** | The DSL entry point function that returns an `AggregateMappingBuilder` for fluently defining an `AggregatePlan`. |
| **TransactionManager** | A context manager for executing multiple SQL statements atomically. Auto-commits on success, auto-rolls-back on exception. `AsyncTransactionManager` is its async counterpart. |
| **MigrationManager** | Discovers, tracks, and applies SQL migration files in version order using a `schema_migrations` tracking table. |
| **Repository** | A DDD-style base class for organizing database access around domain entities. Wraps an Engine and optional mapper. `AsyncRepository` is its async counterpart. |
| **:name parameter** | The parameter syntax used in SQL queries (e.g., `:id`, `:name`). RowQuery automatically converts this to the format required by each backend. |
