Metadata-Version: 2.4
Name: wallow
Version: 0.1.0
Summary: A deduplicating run registry for ML research, with TOML schemas, an expression DSL, and Alembic migrations.
Author-email: Yiding Song <dev@yiding.rocks>
License: Copyright 2026 Yiding Song
        
        Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
        
Project-URL: Homepage, https://github.com/PerceptronV/wallow
Project-URL: Repository, https://github.com/PerceptronV/wallow
Project-URL: Issues, https://github.com/PerceptronV/wallow/issues
Keywords: ml,experiments,registry,deduplication,sqlalchemy,alembic,research
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Science/Research
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Database
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: sqlalchemy>=2.0
Requires-Dist: alembic>=1.13
Requires-Dist: tomli>=2.0; python_version < "3.11"
Provides-Extra: test
Requires-Dist: pytest>=7; extra == "test"
Requires-Dist: pytest-cov>=4; extra == "test"
Dynamic: license-file

# wallow

A deduplicating run registry for ML research. TOML schemas, an expression DSL, Alembic migrations, SQLite by default.

`wallow` solves one problem: **deduplicate ML experiments by their identifying hyperparameters** so that a sweep dispatcher can be rerun any number of times without redoing work, and so that a notebook three months later can ask "what did we run?" and get an authoritative answer.

The full specification lives in [`specs/wallow_spec.md`](specs/wallow_spec.md). This README is the practical guide; an agent should be able to build a wallow-powered system from scratch with only what's below.

## Mental model

A `wallow` schema declares two kinds of fields:

- **Identifying fields** — together they form a composite UNIQUE key. Two runs with identical values across all identifying fields are *the same run*. This is what defines an "experiment" for your project. Use these for hyperparameters, dataset version, code revision — anything that, if changed, makes the run a different experiment.
- **Annotating fields** — everything recorded *about* a run rather than *defining* it. Status, metrics, artefact paths, host name, timestamps, training curves. Annotating fields can be edited freely; they don't affect dedup.

The `register()` call writes a run keyed on the identifying tuple and returns a `RegisterResult` that tells the caller exactly what happened (`was_inserted`, `was_updated`, `was_skipped`). If a run with that tuple already exists, behaviour is controlled by an explicit `on_duplicate` policy — there is no default, every caller must decide.

This split is what enables the resume-safe sweep pattern: register before training (with `return_existing` to claim or read back the existing row), inspect `status`, skip already-completed work, then `register` again with `overwrite` to record final metrics. See the [ML sweep recipe](#recipe-an-ml-sweep-with-artefact-paths) below. For *live* multi-worker dispatch where two workers may both encounter the same combo at once, use `on_duplicate="claim_if_stale"` plus `wallow.heartbeat()` (see [Concurrency](#concurrency)).

## Install

```bash
pip install -e .            # editable install from a clone
pip install -e .[test]      # adds pytest + pytest-cov
```

Requires Python 3.10+. SQLAlchemy 2.x and Alembic are installed transitively.

## Quick start (no Alembic)

For prototyping or contained scripts, skip Alembic entirely. `Store` calls `Base.metadata.create_all` on first connection when no `alembic_version` table exists, so the database materialises itself.

`wallow.toml`:

```toml
[project]
name = "demo"

[identifying.lr]
type = "float"

[identifying.seed]
type = "int"
default = 0

[annotating.status]
type = "string"
indexed = true

[annotating.val_loss]
type = "float"
indexed = true
```

`run.py`:

```python
from wallow import Store, load_schema, register

schema = load_schema("wallow.toml")
store  = Store("runs.db", schema=schema)

result = register(
    store,
    identifying={"lr": 1e-3},          # `seed` may be omitted: it has default = 0
    annotating={"status": "running"},
    on_duplicate="return_existing",
)
run = result.run                         # the Run; result.was_inserted etc carry context
```

Switch to Alembic when you need [schema evolution](#schema-evolution).

## Project setup with Alembic

```bash
mkdir my_project && cd my_project
wallow init --db runs.db
```

Materialises:

```
my_project/
├── wallow.toml          schema
├── alembic.ini          migration config (sqlalchemy.url is resolved relative to this file)
├── alembic/
│   ├── env.py           wired to wallow.toml
│   ├── script.py.mako
│   ├── versions/        autogenerated migration scripts go here
│   └── snapshots/       wallow.toml history (one .toml per revision)
└── runs.db              created by `migrate apply`
```

Then:

```bash
# edit wallow.toml to declare your fields
wallow migrate generate "initial schema"   # writes alembic/versions/<rev>_initial_schema.py + snapshot
wallow migrate apply                       # creates the runs table + alembic_version
wallow status                              # exits 0 when DB is at head, 1 otherwise
```

`wallow init`, `wallow migrate`, `wallow status`, `wallow inspect` all walk up from cwd looking for `alembic.ini`, or accept `--alembic-ini PATH`.

## Schema authoring (`wallow.toml`)

```toml
[project]
name = "my_experiment"
description = "What this dataset is for."   # optional
float_precision = 12                          # optional; sig figs for identifying-float normalisation (default 12)

# identifying = composite UNIQUE; together these define one experiment.
# Restricted to int / float / string / bool.
[identifying.<name>]
type     = "int" | "float" | "string" | "bool"
default  = <literal of the right type>      # optional; required if added in a later migration
doc      = "human-readable description"     # optional; doc-only changes don't generate a migration
indexed  = true                              # default true for identifying

# annotating = everything else. All seven types allowed.
[annotating.<name>]
type     = "int" | "float" | "string" | "bool" | "json" | "datetime" | "path"
indexed  = false   # default false for annotating; set true on fields you query a lot
nullable = true    # default true for annotating; set false to require a value
default  = <literal>
doc      = "..."
```

### Type catalogue

| TOML `type` | Python value at register-time | SQLAlchemy column | Notes |
|---|---|---|---|
| `int`      | `int` (NOT `bool`)            | `Integer`         | Bool is a `int` subclass in Python but rejected here |
| `float`    | `int` or `float` (NOT `bool`) | `Float`           | NaN rejected on identifying floats. Identifying floats are normalised to 12 sig figs by default — see [Identifying floats and normalisation](#identifying-floats-and-normalisation) |
| `string`   | `str`                         | `String`          | |
| `bool`     | `bool`                        | `Boolean`         | |
| `json`     | any JSON-serialisable         | `JSON`            | Annotating only. Query with `F("...").json_path("a.b")` |
| `path`     | `str`                         | `String`          | Annotating only. Semantic marker for filesystem paths |
| `datetime` | tz-aware `datetime.datetime`  | `DateTime`        | Annotating only. Naive datetimes rejected |

**Identifying fields** are restricted to the four primitive types (`int`, `float`, `string`, `bool`). `json`, `datetime`, and `path` are annotating-only. Identifying fields are also forced to `nullable = false` (NULL in a UNIQUE constraint silently breaks dedup on most backends).

### Reserved field names

`id`, `created_at`, `updated_at`, and any name matching `^_wallow_` (case-insensitive) are reserved. Auto-populated by wallow on every row.

### Defaults and NULLs

A `default` on an identifying field does three things:

1. **Register-time fill.** `register()` and `find()` may omit any identifying field with a declared default; the default is filled in before validation and dedup. So `register(..., identifying={"lr": 1e-3})` is fine when `seed` declares `default = 0`.
2. **Migration backfill.** When you add an identifying field in a later migration, the default becomes a DDL `server_default` so existing rows get backfilled cleanly — adding the new NOT NULL column to a non-empty table just works.
3. **Python ORM default.** The default is also handed to SQLAlchemy as the `Column(default=...)` for callers who construct `Run(...)` directly (rare).

Identifying fields without a `default` must be passed explicitly on every call.

### Identifying floats and normalisation

Identifying float values are rounded to **12 significant figures** by default before insertion, lookup, and DSL comparison. This means `lr = 0.1 + 0.2` and `lr = 0.3` dedupe as the same run — IEEE-754 mantissa noise from arithmetic, JSON/YAML round-trips, or numpy ops collapses to the same canonical float, which is almost always what you want for a sweep dispatcher.

Set the precision via `[project] float_precision = N` in `wallow.toml` (any positive int; 12 is conservative, 6–8 is fine for most ML sweeps). Annotating floats are *not* normalised — they preserve full precision so range queries and metrics analysis behave intuitively.

If you really want bit-exact identifying floats, set `float_precision` to a large number (≥17 covers full double precision) — but consider whether the resulting double-counting is the dedup behaviour you want. For totally-deterministic sweep keys, prefer `string` identifying fields with a fixed format like `"1e-3"`.

NaN values in identifying floats remain rejected (`SchemaValidationError`); infinities pass through normalisation unchanged.

## Python API

```python
from wallow import (
    Store, load_schema, register, find, heartbeat,  # store / mutation / lookup / liveness
    RegisterResult,                                  # return type of register()
    F,                                               # DSL field reference
    DuplicateRunError,                               # raised when on_duplicate="raise"
    PendingMigrationError,                           # DB schema is behind wallow.toml head
    SchemaValidationError,                           # bad value or unknown field
)
```

### `Store`

```python
store = Store(db_path, *, schema, check_schema=True)
```

- `db_path` — `"runs.db"`, `Path("runs.db")`, or `":memory:"`. SQLite URL is built automatically.
- `schema` — a `Schema` from `load_schema("wallow.toml")`.
- `check_schema=True` — when Alembic is in use, raise `PendingMigrationError` if the DB is behind the schema head. No-op otherwise.

Properties:

- `store.engine` — the SQLAlchemy `Engine` (escape hatch).
- `store.schema` — the parsed `Schema`.
- `store.session()` — context manager yielding a session (commits on success, rolls back on exception).
- `store.execute(stmt)` — run a raw SQLAlchemy statement.

The query DSL methods on `Store` are aliases for `Query(store).<method>`:

- `store.where(*exprs)` → `Query`
- `store.count()` → `int`
- `store.all()` → `list[Run]`

### `register`

```python
result = register(
    store,
    identifying={...},                      # identifying tuple; fields with TOML default may be omitted
    annotating={...},                       # subset of annotating fields — optional
    on_duplicate="raise" | "return_existing" | "overwrite" | "skip" | "claim_if_stale",
    stale_after=timedelta(...),             # required only when on_duplicate="claim_if_stale"
)
run = result.run                             # SQLAlchemy ORM object (or None on skip-duplicate)
```

`on_duplicate` has **no default** — every caller picks the dedup policy explicitly. This is intentional: the right policy depends on whether you're claiming a slot, finalising a run, or upserting metadata, and the wrong choice silently corrupts data.

| `on_duplicate`       | Existing row found → returns | Existing row found → side effect | Use when |
|----------------------|------------------------------|----------------------------------|----------|
| `"raise"`            | raises `DuplicateRunError`   | none                             | You believe this combo is fresh; surface bugs loudly. |
| `"return_existing"`  | the existing run             | none                             | Dedup gate. Read it back, inspect `status`, decide whether to do work. |
| `"overwrite"`        | the existing run             | each provided annotating field is overwritten | Recording final metrics; upserting metadata. |
| `"skip"`             | `None`                       | none                             | Bulk seeding when you don't care about the existing row. |
| `"claim_if_stale"`   | existing if recently heartbeat'd; otherwise overwritten | bumps `updated_at` and writes annotating fields when stale | Live multi-worker dispatch — see [Concurrency](#concurrency). |

`register()` returns a `RegisterResult`:

```python
@dataclass(frozen=True)
class RegisterResult:
    run: Run | None        # the row (None only for "skip" on duplicate)
    was_inserted: bool     # True iff this call inserted a new row
    was_updated: bool      # True iff this call wrote annotating fields to an existing row
    was_skipped: bool      # True iff an existing row was returned without modification
```

Exactly one flag is True for every outcome except `return_existing` on a duplicate, where all three are False (the row was neither inserted, written to, nor functionally skipped — the caller asked to read it back). Use the flags to log "claimed" vs "rejoined", to count fresh inserts in a sweep loop, or to branch on whether `claim_if_stale` actually claimed.

Validation runs before the DB hit:

- Identifying fields with a declared TOML `default` may be omitted; missing fields without a default still raise `SchemaValidationError`.
- Unknown identifying or annotating fields → `SchemaValidationError`.
- Type mismatch (e.g. passing `1` for a `bool` field, or a naive `datetime`) → `SchemaValidationError`.
- Identifying float values are normalised to `schema.float_precision` significant figures (default 12) — see [Identifying floats and normalisation](#identifying-floats-and-normalisation).

The returned `Run` is a SQLAlchemy ORM object; access fields as attributes (`run.val_loss`, `run.artefacts_dir`). It's detached from the session, so attribute access after `register()` returns is safe.

### `find`

```python
run = find(store, lr=1e-3)                   # `seed` may be omitted: it has default = 0
```

Direct identifying-tuple lookup. Like `register` with `on_duplicate="skip"` minus the insert. Identifying fields with a declared `default` may be omitted; floats are normalised the same way as `register` so `find(store, lr=0.1+0.2)` matches a row registered at `lr=0.3`.

### `heartbeat`

```python
heartbeat(store, identifying={...})          # bumps updated_at; raises if no match
```

Updates the row's `updated_at` to "now" without touching any other field. Pairs with `on_duplicate="claim_if_stale"` for live multi-worker dispatch — see [Concurrency](#concurrency). Identifying defaults are filled and floats normalised the same way as `register`/`find`.

### The resume-safe pattern

This is the canonical wallow idiom for **sequential** redispatch (a single dispatcher rerun after a crash). For *concurrent* dispatch from multiple live workers see [Concurrency](#concurrency); the pattern below would let two live workers double-train the same combo.

```python
# 1. Claim the slot or read back the existing row.
result = register(
    store,
    identifying=combo,
    annotating={"status": "running", "started_at": now()},
    on_duplicate="return_existing",
)
run = result.run
# result.was_inserted distinguishes "I just claimed this combo" from
# "I rejoined someone else's row" if you want to log it.

# 2. If it's already done, skip.
if run.status == "completed":
    continue

# 3. Otherwise, do the expensive work.
artefacts = train(combo)

# 4. Record final state. `overwrite` so the row lands in a known state regardless
#    of whether we're finishing the first attempt or replacing a stale "running"
#    from a previous crashed attempt.
register(
    store,
    identifying=combo,
    annotating={
        "status": "completed",
        "artefacts_dir": str(artefacts.dir),
        "val_loss": artefacts.val_loss,
        # ...
    },
    on_duplicate="overwrite",
)
```

Crash anywhere between steps 1 and 4 and the next dispatch picks up the combo as `status="running"`, retrains it, and overwrites. Combos that completed get skipped at step 2.

## DSL: `F`, `Query`

```python
from wallow import F

q = (
    store.where((F("optimiser") == "adamw") & (F("status") == "completed"))
         .order_by(F("val_accuracy").desc(), F("val_loss").asc())
         .limit(10)
)
top = q.all()
```

Field names resolve at compile time against the schema. Unknown names raise `SchemaValidationError` with a list of valid names.

### Eager validation: `F(name, schema=...)` and `schema.f.<name>`

By default, `F("typo_name")` doesn't raise until the query is materialised — useful for cross-schema reuse, but means typos surface late. Two ways to validate eagerly:

```python
schema = load_schema("wallow.toml")

# 1. Bind a schema explicitly:
F("val_loss", schema=schema)         # raises SchemaValidationError on typo
F("typo_name", schema=schema)        # → SchemaValidationError immediately

# 2. Attribute access on schema.f (autocompletes in IDEs):
schema.f.val_loss                    # → Field
schema.f.typo_name                   # → AttributeError immediately
dir(schema.f)                        # lists every declared field name

# Use either form anywhere F() works:
top = store.where(schema.f.status == "completed").all()
```

`F(name)` (no schema arg) keeps the deferred-resolution semantics for code that constructs expressions before knowing which schema they'll run against.

### Operators

| Operator                               | Meaning                                            |
|----------------------------------------|----------------------------------------------------|
| `F("x") == v` / `!= v`                 | equality (`v=None` becomes IS NULL / IS NOT NULL)  |
| `F("x") < v` / `<= v` / `> v` / `>= v` | comparison                                          |
| `F("x").in_([...])` / `.not_in([...])` | set membership                                      |
| `F("x").contains("...")`                | SQL LIKE-style substring (string/path fields only) |
| `F("x").startswith("...")` / `.endswith(...)`  | string/path fields                          |
| `F("x").is_null()` / `.is_not_null()`  | NULL check                                         |
| `F("x").json_path("a.b").is_not_null()` | json_extract for json fields                      |
| `expr1 & expr2` / `expr1 \| expr2` / `~expr` | boolean composition                          |

**Parenthesise comparisons before composing**: `(F("k") == 4) & (F("v") > 0.85)`. Python's `&` binds tighter than `==`, so `F("k") == 4 & F("v") > 0.85` parses wrong.

### Ordering, paging, materialising

```python
q = store.where(...)
q.order_by(F("val_loss").asc(), F("seed"))   # bare Field implies asc()
q.limit(10).offset(20)
q.all()      # list[Run]
q.first()    # Run | None  (auto-LIMIT 1 if not set)
q.one()      # Run, raises if 0 or >1 matched
q.count()    # int (ignores limit/offset; counts the WHERE match)
q.exists()   # bool
for run in q: ...    # streaming with yield_per(100)
```

### Escape hatch

When the DSL doesn't cover a query, use raw SQLAlchemy via `store.engine` or `store.execute(stmt)`. The dynamically generated ORM class is `store.schema.Run`.

## Recipe: an ML sweep with artefact paths

This is the pattern the user's question is about: dispatch a hyperparameter sweep where each unique combo gets a deterministic artefacts directory, recorded as an annotating `path` field, with the dispatcher fully resume-safe.

`wallow.toml`:

```toml
[project]
name = "ml_sweep"

# Identifying = the experiment definition.
[identifying.architecture]
type = "string"

[identifying.optimiser]
type = "string"

[identifying.learning_rate]
type = "float"

[identifying.batch_size]
type = "int"

[identifying.weight_decay]
type = "float"
default = 0.0

[identifying.num_epochs]
type = "int"
default = 10

[identifying.seed]
type = "int"
default = 0

# Annotating = recorded about the run.
[annotating.status]
type = "string"
indexed = true

[annotating.artefacts_dir]
type = "path"               # filesystem location for this run's outputs

[annotating.best_checkpoint]
type = "path"

[annotating.val_loss]
type = "float"
indexed = true

[annotating.val_accuracy]
type = "float"
indexed = true

[annotating.training_curve]
type = "json"

[annotating.host]
type = "string"

[annotating.git_commit]
type = "string"

[annotating.started_at]
type = "datetime"

[annotating.completed_at]
type = "datetime"

[annotating.error_message]
type = "string"
```

Dispatcher:

```python
import datetime as dt, hashlib, json
from pathlib import Path
from wallow import Store, load_schema, register

ARTEFACTS_ROOT = Path("artefacts")

def artefacts_dir_for(combo: dict) -> Path:
    digest = hashlib.sha1(json.dumps(combo, sort_keys=True).encode()).hexdigest()[:10]
    return ARTEFACTS_ROOT / combo["architecture"] / digest

def now(): return dt.datetime.now(dt.timezone.utc)

schema = load_schema("wallow.toml")
store  = Store("runs.db", schema=schema)

for combo in build_grid():        # list of dicts, full identifying tuple each
    run = register(
        store, identifying=combo,
        annotating={"status": "running", "started_at": now()},
        on_duplicate="return_existing",
    ).run
    if run.status == "completed":
        continue

    artefacts_dir = artefacts_dir_for(combo)
    artefacts_dir.mkdir(parents=True, exist_ok=True)

    try:
        result = train(combo, artefacts_dir)         # writes ckpts, logs, metrics.json
    except Exception as e:
        register(store, identifying=combo,
                 annotating={"status": "failed", "error_message": f"{type(e).__name__}: {e}",
                             "completed_at": now()},
                 on_duplicate="overwrite")
        continue

    register(
        store, identifying=combo,
        annotating={
            "status": "completed",
            "artefacts_dir": str(artefacts_dir),
            "best_checkpoint": result["best_ckpt"],
            "val_loss": result["val_loss"],
            "val_accuracy": result["val_acc"],
            "training_curve": result["curve"],
            "completed_at": now(),
        },
        on_duplicate="overwrite",
    )
```

Analyse:

```python
from wallow import F, Store, load_schema, find

store = Store("runs.db", schema=load_schema("wallow.toml"))

# Best run overall.
best = (
    store.where(F("status") == "completed")
         .order_by(F("val_accuracy").desc(), F("val_loss").asc())
         .first()
)
print(best.artefacts_dir, best.best_checkpoint)

# Direct lookup by identifying tuple.
specific = find(store, architecture="resnet18", optimiser="adamw",
                learning_rate=1e-3, batch_size=128, weight_decay=0.0,
                num_epochs=10, seed=0)
```

A complete, runnable, Alembic-managed version is in [`examples/ml_sweep/`](examples/ml_sweep/) (with the initial migration checked in). For a longer walkthrough of the multi-migration evolution flow — adding an identifying field to a populated DB — see [`examples/matching_feedback/`](examples/matching_feedback/).

### What goes where (rule of thumb)

- **Identifying** anything that, if changed, makes this a new experiment: hyperparameters, dataset version, model code revision (if you're sweeping it), random seed, dataset split index.
- **Annotating** anything *measured* (metrics, training curves), *contextual* (host, git_commit, timestamps), or *referential* (filesystem paths to artefacts, URLs to dashboards). Use `path` for filesystem locations — it's a typed string today, but tooling can use the type tag for things like rsync helpers later.
- **Don't** put `learning_rate` in annotating — you'll lose dedup. **Don't** put `host` in identifying — every restart on a new node will look like a new experiment.

## Schema evolution

Edit `wallow.toml`, then:

```bash
wallow migrate generate "add warmup_steps"   # autogenerate + snapshot of the new toml
# review alembic/versions/<rev>_add_warmup_steps.py
wallow migrate apply
```

`wallow migrate generate` aborts before invoking Alembic if it detects:

- An identifying field being **dropped** — would cause silent dedup collisions. Use `wallow.find_collisions_after_drop(store, "<field>")` to inspect; it returns a list of `CollisionGroup(field_values=..., row_ids=...)` for groups that would collapse if the field were removed. Resolve the collisions manually (delete duplicates, demote to annotating, or keep the field) and rerun.
- A new identifying field added without a `default` — NOT NULL columns can't be added to a non-empty table. Add a `default` to `wallow.toml`, regenerate.

`doc`-only changes don't generate a migration (Alembic doesn't see `doc`; it's not a column attribute).

### Adopting wallow on an existing DB

If your project pre-dates the migration setup, your `runs` table was likely created via SQLAlchemy's `create_all` with no `alembic_version`:

```bash
wallow init                          # writes alembic.ini + templates
wallow migrate generate "baseline"   # autogen against the existing DB → empty migration
wallow migrate stamp head            # records the revision without DDL
```

After this, edits to `wallow.toml` flow through the normal `generate` + `apply` cycle.

## Concurrency

SQLite + WAL handles a few concurrent writer processes fine. Wallow installs the right pragmas on every connection:

- `PRAGMA journal_mode=WAL` (skipped on `:memory:`)
- `PRAGMA synchronous=NORMAL`
- `PRAGMA foreign_keys=ON`

The INSERT-race case (two workers race to register the same combo) is handled at the DB layer: the loser catches `IntegrityError` internally, retries the read, and returns the existing row according to its `on_duplicate` policy.

**Bootstrap note.** WAL is set on the *first* connection a Store opens. If you fork N workers against a fresh DB before any Store has opened it, the workers race to upgrade the journal and may deadlock. Open one Store in the parent before forking.

### Live multi-worker dispatch

The [resume-safe pattern](#the-resume-safe-pattern) handles crash-then-restart but does **not** prevent two *live* workers from double-training the same combo: both call `register(..., return_existing)`, both see `status="running"`, both proceed to train. The second `overwrite` clobbers the first.

For live multi-worker dispatch use `on_duplicate="claim_if_stale"` plus `wallow.heartbeat()`:

```python
import datetime as dt
from wallow import register, heartbeat

STALE_AFTER = dt.timedelta(minutes=10)   # 2-3× your worst-case silent interval

result = register(
    store,
    identifying=combo,
    annotating={"status": "running", "started_at": now()},
    on_duplicate="claim_if_stale",
    stale_after=STALE_AFTER,
)
if result.was_skipped:
    continue                              # another worker is alive on this combo
if result.run.status == "completed":
    continue                              # already done

# We hold the slot. Heartbeat periodically while training so other workers
# see the row as fresh (otherwise our long silence looks stale to them).
def train_with_heartbeat(combo):
    last_beat = time.monotonic()
    for step in train_steps(combo):
        if time.monotonic() - last_beat > 60:
            heartbeat(store, identifying=combo)
            last_beat = time.monotonic()
    ...

train_with_heartbeat(combo)
register(store, identifying=combo,
         annotating={"status": "completed", ...},
         on_duplicate="overwrite")
```

`claim_if_stale` reads the row's `updated_at` (which `register` and `heartbeat` both bump) and decides:

- **No row exists.** Insert it; `result.was_inserted=True`.
- **Row exists, `now - updated_at > stale_after`.** The previous worker has gone silent; overwrite the annotating fields, bump `updated_at`, return with `was_updated=True` (you have claimed it).
- **Row exists, `updated_at` is recent.** Someone else is alive on it; return the existing row unchanged with `was_skipped=True`.

Pick `stale_after` to be 2–3× your worst-case silent interval (longest gap between heartbeats / writes a healthy worker will produce). Too short → live workers get stolen from; too long → crashed work blocks recovery.

For >10 writers or a shared filesystem with patchy locking, switch the `sqlalchemy.url` in `alembic.ini` to a Postgres URL. The schema/DSL/migration layers are backend-agnostic; only the SQLite-specific pragmas are gated.

## Errors

| Class                        | Raised when |
|------------------------------|-------------|
| `WallowError`                | base class  |
| `SchemaParseError`           | `wallow.toml` is invalid (unknown type, reserved name, identifying with non-primitive type, etc.) |
| `SchemaValidationError`      | unknown field or wrong-typed value passed to `register`/`find`/DSL |
| `DuplicateRunError`          | `on_duplicate="raise"` and a row with the identifying tuple already exists. Carries the existing run on `.run` |
| `PendingMigrationError`      | `Store(..., check_schema=True)` and DB revision is behind the schema head. Carries `.current_rev` and `.head_rev` |

## CLI reference

| Command | Description |
|---|---|
| `wallow init [--force] [--dir DIR] [--db DB] [--schema PATH]` | Scaffold a new project. |
| `wallow migrate generate <message>`                            | Autogenerate a revision + snapshot. |
| `wallow migrate apply [--target REV]`                          | Apply pending migrations. |
| `wallow migrate downgrade <target> [--yes]`                    | Downgrade. `--yes` required for `base`. |
| `wallow migrate history`                                       | List revisions; the applied one is marked `*`. |
| `wallow migrate stamp <revision>`                              | Record a revision in `alembic_version` without running DDL. |
| `wallow status`                                                | Print sync state. Exit 0 in sync, 1 pending or no `alembic.ini` found. |
| `wallow inspect <id>`                                          | Pretty-print one run's fields. |

Every `migrate`/`status`/`inspect` command accepts `--alembic-ini PATH`; otherwise the CLI walks up from cwd.

## Tests

```bash
pytest -q
```

## Layout

```
src/wallow/
  schema.py        # TOML parser + dynamic SQLAlchemy model generation
  store.py         # Store, register, find, session management
  dsl.py           # F, Field, Expr, Query — operator-overloaded query builder
  migrations.py    # Alembic wrappers + snapshot mechanism + collision detection
  cli.py           # `wallow` command (argparse)
  errors.py        # WallowError hierarchy
  templates/       # files copied by `wallow init`
examples/
  ml_sweep/             # alembic-managed sweep with artefact paths (this README's recipe)
  matching_feedback/    # alembic-managed with a two-migration history (adds a field to a populated DB)
specs/wallow_spec.md    # authoritative specification
tests/                  # ~129 tests covering all phases
```
