Metadata-Version: 2.4
Name: carbonation
Version: 0.0.6
Summary: File delivery monitoring and archival service
Author-email: Jonathan Olsten <jonathan.olsten@gmail.com>
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: alembic<2,>=1.13
Requires-Dist: click>=8.1
Requires-Dist: loguru>=0.7.3
Requires-Dist: platformdirs<5,>=4
Requires-Dist: pydantic<3,>=2.10
Requires-Dist: rich>=14.3.3
Requires-Dist: sdnotify<1,>=0.3
Requires-Dist: sqlalchemy<3,>=2.0.48
Requires-Dist: watchdog>=6.0.0
Provides-Extra: mariadb
Requires-Dist: mysqlclient<3,>=2.2; extra == 'mariadb'
Provides-Extra: postgresql
Requires-Dist: psycopg2<3,>=2.9; extra == 'postgresql'
Provides-Extra: pymysql
Requires-Dist: pymysql<2,>=1.1; extra == 'pymysql'
Description-Content-Type: text/markdown

# Carbonation

File delivery monitoring and archival service. Watches directories for incoming files, groups them by stem, validates integrity, extracts metadata via plugins, applies selection and routing rules, and archives to configurable destinations.

## Installation

Requires Python 3.11+.

```bash
pip install carbonation
```

For MariaDB or PostgreSQL support:

```bash
pip install "carbonation[mariadb]"     # mysqlclient
pip install "carbonation[postgresql]"  # psycopg2
```

## Quick Start

```bash
# 1. Scaffold a new project (config + rules + plugin package skeleton)
mkdir /opt/carbonation && cd /opt/carbonation
carbonation init

# 2. Edit the generated files
$EDITOR config.toml                  # watch paths, archive destination, database
$EDITOR rules.toml                   # integrity, completeness, selection, routing
$EDITOR my_plugin/my_plugin/model.py        # File columns
$EDITOR my_plugin/my_plugin/extractors.py   # extract() implementations

# 3. Install the plugin, validate, initialize, and run
pip install -e ./my_plugin           # or: uv pip install -e ./my_plugin
carbonation check config             # validate configuration
carbonation db init                  # run core + plugin migrations
carbonation run                      # start the service
```

`carbonation init` generates a complete starter project:

```
config.toml                       # service, logging, database, watch, archive
rules.toml                        # integrity, completeness, metadata, selection, routing
my_plugin/
├── pyproject.toml                # declares the carbonation.plugin entry point
└── my_plugin/
    ├── __init__.py               # builds `plugin = Plugin(...)`
    ├── model.py                  # File model (FileBase subclass)
    ├── extractors.py             # extract_sample() etc.
    ├── rules.py                  # optional grouping/selection/routing fns
    └── migrations/versions/      # alembic revisions on the plugin's own branch
```

See [Plugin System](#plugin-system) for details on the `Plugin` contract.

## CLI Reference

```
carbonation check config                        Validate config.toml and rules.toml
carbonation check rules FILE                    Dry-run rules changes against DB
carbonation db init                             Run core + plugin migrations
carbonation db upgrade                          Apply pending schema migrations
carbonation db revision -m "..."                Generate a new plugin alembic revision
                           [--autogenerate]
carbonation db status                           Component counts and metadata check
carbonation run [--dry-run] [--once]            Start the service
carbonation status                              Heartbeat, status table, watch health
carbonation retry [--watch-name X]              Re-enqueue failed components
carbonation reconcile delivery                  Sync delivery directories with DB
carbonation reconcile archive [--watch NAME]    Sync archive directories with DB
carbonation query files [filters]               Query with dynamic plugin filters
```

### Query Examples

```bash
# Filter by plugin columns (dynamically generated from your model)
carbonation query files --category alpha --limit 50

# Date range overlap
carbonation query files --daterange 2026-01-01/2026-02-01

# Recent files by age
carbonation query files --age 7d

# Output formats
carbonation query files --category alpha --format json
carbonation query files --format csv > export.csv

# Count only
carbonation query files --category alpha --count

# Specific columns, sorted
carbonation query files --columns group_key,category,start --order-by -start
```

## Programmatic API

Query the database from Python scripts without going through the CLI:

```python
from carbonation import connect

with connect("config.toml") as db:
    # All files from a watch
    rows = db.query_files({"watch_name": ["incoming"]}, limit=50)
    for row in rows:
        print(row["group_key"], row["created_at"])

    # Count
    n = db.query_files({"complete": True}, count_only=True)

    # Date filtering + plugin columns
    from datetime import datetime
    rows = db.query_files({
        "category": ["alpha"],
        "created_at_after": datetime(2026, 1, 1),
    }, order_by="-created_at", columns=["group_key", "category", "start"])
```

For one-off scripts, use the convenience function that handles setup and teardown in one call:

```python
from carbonation import query_files

rows = query_files("config.toml", {"category": ["alpha"]}, limit=50)
```

### Watermark-based polling

For external consumers (e.g. an orchestrator) that need to poll for newly completed files without gaps:

```python
from carbonation import connect

cursor = None  # persist this between invocations
with connect("config.toml") as db:
    while True:
        rows, cursor = db.get_new_files(cursor=cursor)
        if not rows:
            break
        for row in rows:
            print(row["group_key"], row["completed_at"])

    # Check if the service is alive
    status = db.get_service_status()
    if status:
        print(f"Last heartbeat: {status['last_heartbeat']}")
```

### ORM access

For direct ORM access to query, modify, and commit changes:

```python
from carbonation import session
from carbonation.db.models import FileComponent, FileStatus

with session("config.toml") as s:
    comps = s.query(FileComponent).filter_by(status=FileStatus.ARCHIVED).all()
    for c in comps:
        c.status = FileStatus.CLEARED
    s.commit()
```

### Filter operators

| Suffix | Operator | Example |
|---|---|---|
| `_after` / `_before` | `>=` / `<=` (datetime) | `{"created_at_after": datetime(2026, 1, 1)}` |
| `_min` / `_max` | `>=` / `<=` (numeric) | `{"id_min": 100}` |
| `_daterange` | start/stop overlap | `{"_daterange": (start, end)}` |
| `_age` | recency | `{"_age": datetime(2026, 3, 20)}` |
| (none) | exact / IN / LIKE | `{"category": ["alpha", "beta"]}` |

## Architecture

### Processing Pipeline

```mermaid
flowchart TD
    A["File arrives in<br>watch directory"] --> B["Watchdog detects<br>creation / modification"]
    B --> C["Settle timer<br>debounces"]
    C --> D["Event enqueued"]
    D --> E["Worker thread<br>picks up event"]

    E --> S1["1 — Record component"]
    S1 --> IC{{"2 — Integrity checks<br>(min_size, readability)"}}

    IC -- fail --> IF["INTEGRITY_FAILED"]
    IC -- pass --> S3{{"3 — Completeness<br>grouping"}}

    S3 -- "incomplete<br>(waiting for extensions)" --> W["WAITING"]
    S3 -- "timeout<br>(on_timeout=skip)" --> TO["TIMED_OUT"]
    S3 -- "complete / standalone" --> S4["4 — Metadata extraction<br>(plugin extract)"]

    S4 --> S5{{"5 — Selection rules"}}
    S5 -- rejected --> NS["NOT_SELECTED"]
    S5 -- accepted --> S6["6 — Routing<br>(choose archive)"]

    S6 --> S7{{"7 — Archive<br>(copy / move / hardlink / symlink)"}}
    S7 -- success --> AR["ARCHIVED"]
    S7 -- "failure<br>(retries left)" --> RP["RETRY_PENDING"]
    S7 -- "failure<br>(retries exhausted)" --> FA["FAILED"]

    RP -. "retry_delay<br>elapsed" .-> E
    IF -. "reconciliation<br>(re-assessed)" .-> E
    NS -. "rules hot-reload<br>(re-evaluated)" .-> E

    style IF fill:#d32f2f,color:#fff
    style FA fill:#d32f2f,color:#fff
    style TO fill:#757575,color:#fff
    style NS fill:#757575,color:#fff
    style W fill:#f9a825
    style RP fill:#f57c00,color:#fff
    style AR fill:#388e3c,color:#fff
```

### Reliability

- **Retry with backoff**: Archive failures are retried up to `max_retries` times with `retry_delay` between attempts. Only permanently failed after all retries exhausted.
- **Post-archive verification**: File size verified after copy/move. Mismatched files are deleted and retried.
- **Reconciliation**: Periodic sync between filesystem and database catches missed files, clears removed files, re-enqueues pending work.
- **Rules hot-reload**: The service watches `rules.toml` for changes and validates before applying. Invalid changes are rejected without disruption.

### Plugin System

A plugin is an installable Python package that declares a `carbonation.plugin` entry point. The package's `__init__.py` builds a `Plugin` spec object that carbonation imports at startup:

```python
from pathlib import Path
from carbonation import Plugin
from .model import File
from .extractors import extract_sample
from .rules import should_archive

plugin = Plugin(
    file_model=File,                                          # FileBase subclass
    extractors={"sample": extract_sample},                    # named extractors
    selection_functions={"should_archive": should_archive},   # optional rule fns
    # group_functions={...}, routing_functions={...} also supported
    migrations_path=Path(__file__).parent / "migrations" / "versions",
    query_defaults={"order_by": "-start"},
)
```

**Discovery:** set `[plugins] package = "my_plugin"` in `config.toml` to pin a package, or omit the field and carbonation will use the single registered entry point. `rules.toml` references plugin callables by short key — e.g. `metadata.extract = "sample"`, `selection.function = "should_archive"` — and missing keys fail at config-load time with a `ConfigError` listing the available options.

**Schema:** the plugin owns its own alembic migrations. Carbonation's core migrations live on a `"core"` branch; the plugin ships revisions on its own branch (e.g. `"my_plugin"`) with `depends_on = ("core",)`. `carbonation db init` / `db upgrade` compose both version locations into a single `alembic upgrade heads` so the two branches advance together. Use `carbonation db revision -m "..." [--autogenerate]` to generate a new plugin revision — the wrapper handles first-revision bootstrapping automatically.

**File model contract:** the `FileBase` subclass must set `__tablename__ = "files"` and its domain columns must be nullable (they're populated after the group row is created).

For a full walkthrough of the `Plugin` contract, callable signatures, alembic branch mechanics, and testing patterns, see [docs/plugin-development.md](docs/plugin-development.md).

### Configuration

- **config.toml**: Service settings, logging, database, plugin package, watch directories, archive destinations
- **rules.toml**: Integrity checks, completeness grouping (stem or plugin), metadata extractor, selection rules, routing rules

Paths are resolved relative to the config file's directory. `archive.destination` is resolved relative to `archive.base_path`.

Rules can be hot-reloaded by editing `rules.toml` while the service is running.

## Deployment

### Setting up a production install

Create a dedicated virtualenv so the service has an isolated, reproducible Python environment:

```bash
# Create the project directory and venv
sudo mkdir -p /opt/carbonation
cd /opt/carbonation
python3 -m venv .venv

# Install carbonation into the venv
.venv/bin/pip install carbonation
# Add database drivers if needed:
# .venv/bin/pip install "carbonation[mariadb]"
# .venv/bin/pip install "carbonation[postgresql]"

# Scaffold config, rules, and plugin package skeleton
.venv/bin/carbonation init

# Edit to match your environment
$EDITOR config.toml rules.toml my_plugin/my_plugin/model.py my_plugin/my_plugin/extractors.py

# Install the plugin package, then validate and initialize
.venv/bin/pip install -e ./my_plugin
.venv/bin/carbonation check config
.venv/bin/carbonation db init
```

### systemd

Generate a systemd unit file during scaffolding:

```bash
.venv/bin/carbonation init --systemd --user carbonation
```

Then install and enable it:

```bash
sudo cp carbonation.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now carbonation
sudo systemctl status carbonation
```

The generated unit file uses `Type=notify` with `WatchdogSec=300` — systemd waits for carbonation to signal readiness before marking it as started, and automatically restarts it if the watchdog heartbeat stops (e.g. hung process). Edit `ReadWritePaths=` in the unit file to include your delivery and archive directories.

`WatchdogSec` should be at least 2x your `heartbeat_interval` in `config.toml` (default: `60s`).

### Cron

For environments where a persistent service is not needed, run carbonation as a cron job:

```bash
*/5 * * * * /opt/carbonation/.venv/bin/carbonation -c /opt/carbonation/config.toml run --once
```

`--once` reconciles delivery directories, processes all pending events, and exits. It can also supplement a running service as defense-in-depth.

### Upgrading

```bash
cd /opt/carbonation
.venv/bin/pip install --upgrade carbonation            # core
.venv/bin/pip install --upgrade -e ./my_plugin         # or --upgrade <published-plugin>
.venv/bin/carbonation -c config.toml db upgrade        # run pending migrations
sudo systemctl restart carbonation
```

`db upgrade` applies pending Alembic migrations on both the core and plugin branches in a single `upgrade heads`. It is idempotent — running it twice is a no-op.

### Monitoring

```bash
.venv/bin/carbonation -c /opt/carbonation/config.toml status
```

The `status` command shows:
- Service heartbeat age (warns if stale)
- Component status breakdown by time window (1h/4h/1d/7d/30d)
- Delivery and archive totals
- Retry queue depth
- Oldest incomplete group
- Watch directory health

For log aggregation, set `format = "json"` in the logging config to produce structured JSON lines.

## Development

```bash
uv sync                                    # install dependencies
uv run pytest tests/                       # all tests (~50s)
uv run pytest tests/ -m "not integration"  # unit tests only (~2s)
uv run ruff check src/ tests/              # lint
```

## License

MIT
