Metadata-Version: 2.4
Name: carbonation
Version: 0.0.4
Summary: File delivery monitoring and archival service
Author-email: Jonathan Olsten <jonathan.olsten@gmail.com>
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: alembic<2,>=1.13
Requires-Dist: click>=8.1
Requires-Dist: loguru>=0.7.3
Requires-Dist: pydantic<3,>=2.10
Requires-Dist: rich>=14.3.3
Requires-Dist: sdnotify<1,>=0.3
Requires-Dist: sqlalchemy<3,>=2.0.48
Requires-Dist: watchdog>=6.0.0
Provides-Extra: mariadb
Requires-Dist: mysqlclient<3,>=2.2; extra == 'mariadb'
Provides-Extra: postgresql
Requires-Dist: psycopg2<3,>=2.9; extra == 'postgresql'
Provides-Extra: pymysql
Requires-Dist: pymysql<2,>=1.1; extra == 'pymysql'
Description-Content-Type: text/markdown

# Carbonation

File delivery monitoring and archival service. Watches directories for incoming files, groups them by stem, validates integrity, extracts metadata via plugins, applies selection and routing rules, and archives to configurable destinations.

## Installation

Requires Python 3.11+.

```bash
pip install carbonation
```

For MariaDB or PostgreSQL support:

```bash
pip install "carbonation[mariadb]"     # mysqlclient
pip install "carbonation[postgresql]"  # psycopg2
```

## Quick Start

```bash
# 1. Scaffold a new project
mkdir /opt/carbonation && cd /opt/carbonation
carbonation init

# 2. Edit the generated files
$EDITOR config.toml       # set watch paths, archive destination, database
$EDITOR rules.toml        # configure integrity, completeness, selection rules
$EDITOR plugins/file_model.py  # define your File model columns and extract()

# 3. Validate, initialize, and run
carbonation check-config   # validate configuration
carbonation db init        # create database tables
carbonation run            # start the service
```

`carbonation init` generates a complete starter project:

```
config.toml             # service, logging, database, watch, and archive config
rules.toml              # integrity, completeness, metadata, selection, routing
plugins/file_model.py   # File model + extract() function template
```

Edit `plugins/file_model.py` to define the domain-specific columns for your use case and the `extract()` function that reads metadata from your files. See [Plugin System](#plugin-system) for details.

## CLI Reference

```
carbonation check-config              Validate config.toml and rules.toml
carbonation db init                    Create schema (migrations + plugin columns)
carbonation db upgrade                 Apply pending schema migrations only
carbonation db status                  Component counts and metadata check
carbonation run [--dry-run] [--once]   Start the service
carbonation status                     Heartbeat, status table, watch health
carbonation retry [--watch-name X]     Re-enqueue failed components
carbonation check-rules FILE           Dry-run rules changes against DB
carbonation reconcile delivery         Sync delivery directories with DB
carbonation reconcile archive          Sync archive directories with DB
carbonation query files [filters]      Query with dynamic plugin filters
```

### Query Examples

```bash
# Filter by plugin columns (dynamically generated from your model)
carbonation query files --category alpha --limit 50

# Date range overlap
carbonation query files --daterange 2026-01-01/2026-02-01

# Recent files by age
carbonation query files --age 7d

# Output formats
carbonation query files --category alpha --format json
carbonation query files --format csv > export.csv

# Count only
carbonation query files --category alpha --count

# Specific columns, sorted
carbonation query files --columns group_key,category,start --order-by -start
```

## Programmatic API

Query the database from Python scripts without going through the CLI:

```python
from carbonation import connect

with connect("config.toml") as db:
    # All files from a watch
    rows = db.query_files({"watch_name": ["incoming"]}, limit=50)
    for row in rows:
        print(row["group_key"], row["created_at"])

    # Count
    n = db.query_files({"complete": True}, count_only=True)

    # Date filtering + plugin columns
    from datetime import datetime
    rows = db.query_files({
        "category": ["alpha"],
        "created_at_after": datetime(2026, 1, 1),
    }, order_by="-created_at", columns=["group_key", "category", "start"])
```

For one-off scripts, use the convenience function that handles setup and teardown in one call:

```python
from carbonation import query_files

rows = query_files("config.toml", {"category": ["alpha"]}, limit=50)
```

### Watermark-based polling

For external consumers (e.g. an orchestrator) that need to poll for newly completed files without gaps:

```python
from carbonation import connect

cursor = None  # persist this between invocations
with connect("config.toml") as db:
    while True:
        rows, cursor = db.get_new_files(cursor=cursor)
        if not rows:
            break
        for row in rows:
            print(row["group_key"], row["completed_at"])

    # Check if the service is alive
    status = db.get_service_status()
    if status:
        print(f"Last heartbeat: {status['last_heartbeat']}")
```

### ORM access

For direct ORM access to query, modify, and commit changes:

```python
from carbonation import session
from carbonation.db.models import FileComponent, FileStatus

with session("config.toml") as s:
    comps = s.query(FileComponent).filter_by(status=FileStatus.ARCHIVED).all()
    for c in comps:
        c.status = FileStatus.CLEARED
    s.commit()
```

### Filter operators

| Suffix | Operator | Example |
|---|---|---|
| `_after` / `_before` | `>=` / `<=` (datetime) | `{"created_at_after": datetime(2026, 1, 1)}` |
| `_min` / `_max` | `>=` / `<=` (numeric) | `{"id_min": 100}` |
| `_daterange` | start/stop overlap | `{"_daterange": (start, end)}` |
| `_age` | recency | `{"_age": datetime(2026, 3, 20)}` |
| (none) | exact / IN / LIKE | `{"category": ["alpha", "beta"]}` |

## Architecture

### Processing Pipeline

```mermaid
flowchart TD
    A["File arrives in<br>watch directory"] --> B["Watchdog detects<br>creation / modification"]
    B --> C["Settle timer<br>debounces"]
    C --> D["Event enqueued"]
    D --> E["Worker thread<br>picks up event"]

    E --> S1["1 — Record component"]
    S1 --> IC{{"2 — Integrity checks<br>(min_size, readability)"}}

    IC -- fail --> IF["INTEGRITY_FAILED"]
    IC -- pass --> S3{{"3 — Completeness<br>grouping"}}

    S3 -- "incomplete<br>(waiting for extensions)" --> W["WAITING"]
    S3 -- "timeout<br>(on_timeout=skip)" --> TO["TIMED_OUT"]
    S3 -- "complete / standalone" --> S4["4 — Metadata extraction<br>(plugin extract)"]

    S4 --> S5{{"5 — Selection rules"}}
    S5 -- rejected --> NS["NOT_SELECTED"]
    S5 -- accepted --> S6["6 — Routing<br>(choose archive)"]

    S6 --> S7{{"7 — Archive<br>(copy / move / hardlink / symlink)"}}
    S7 -- success --> AR["ARCHIVED"]
    S7 -- "failure<br>(retries left)" --> RP["RETRY_PENDING"]
    S7 -- "failure<br>(retries exhausted)" --> FA["FAILED"]

    RP -. "retry_delay<br>elapsed" .-> E
    IF -. "reconciliation<br>(re-assessed)" .-> E
    NS -. "rules hot-reload<br>(re-evaluated)" .-> E

    style IF fill:#d32f2f,color:#fff
    style FA fill:#d32f2f,color:#fff
    style TO fill:#757575,color:#fff
    style NS fill:#757575,color:#fff
    style W fill:#f9a825
    style RP fill:#f57c00,color:#fff
    style AR fill:#388e3c,color:#fff
```

### Reliability

- **Retry with backoff**: Archive failures are retried up to `max_retries` times with `retry_delay` between attempts. Only permanently failed after all retries exhausted.
- **Post-archive verification**: File size verified after copy/move. Mismatched files are deleted and retried.
- **Reconciliation**: Periodic sync between filesystem and database catches missed files, clears removed files, re-enqueues pending work.
- **Rules hot-reload**: The service watches `rules.toml` for changes and validates before applying. Invalid changes are rejected without disruption.

### Plugin System

Plugins are `.py` files in the configured plugins directory. A plugin must provide:

- A `FileBase` subclass with `__tablename__ = "files"` defining domain-specific columns
- An `extract(file_paths: list[Path]) -> dict` function that returns metadata for the domain columns

Domain columns must be nullable since they are populated after the group row is created.

### Configuration

- **config.toml**: Service settings, logging, database, plugins, watch directories, archive destinations
- **rules.toml**: Integrity checks, completeness grouping, metadata module, selection rules, routing rules

Paths are resolved relative to the config file's directory. `archive.destination` is resolved relative to `archive.base_path`.

Rules can be hot-reloaded by editing `rules.toml` while the service is running.

## Deployment

### Setting up a production install

Create a dedicated virtualenv so the service has an isolated, reproducible Python environment:

```bash
# Create the project directory and venv
sudo mkdir -p /opt/carbonation
cd /opt/carbonation
python3 -m venv .venv

# Install carbonation into the venv
.venv/bin/pip install carbonation
# Add database drivers if needed:
# .venv/bin/pip install "carbonation[mariadb]"
# .venv/bin/pip install "carbonation[postgresql]"

# Scaffold config, rules, and plugin template
.venv/bin/carbonation init

# Edit to match your environment
$EDITOR config.toml rules.toml plugins/file_model.py

# Validate and initialize
.venv/bin/carbonation check-config
.venv/bin/carbonation db init
```

### systemd

Generate a systemd unit file during scaffolding:

```bash
.venv/bin/carbonation init --systemd --user carbonation
```

Then install and enable it:

```bash
sudo cp carbonation.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now carbonation
sudo systemctl status carbonation
```

The generated unit file uses `Type=notify` with `WatchdogSec=300` — systemd waits for carbonation to signal readiness before marking it as started, and automatically restarts it if the watchdog heartbeat stops (e.g. hung process). Edit `ReadWritePaths=` in the unit file to include your delivery and archive directories.

`WatchdogSec` should be at least 2x your `heartbeat_interval` in `config.toml` (default: `60s`).

### Cron

For environments where a persistent service is not needed, run carbonation as a cron job:

```bash
*/5 * * * * /opt/carbonation/.venv/bin/carbonation -c /opt/carbonation/config.toml run --once
```

`--once` reconciles delivery directories, processes all pending events, and exits. It can also supplement a running service as defense-in-depth.

### Upgrading

```bash
cd /opt/carbonation
.venv/bin/pip install --upgrade carbonation
.venv/bin/carbonation -c config.toml db upgrade   # apply schema migrations
.venv/bin/carbonation -c config.toml db init       # sync plugin columns
sudo systemctl restart carbonation
```

`db upgrade` applies pending Alembic migrations (carbonation's base tables). `db init` then adds any new plugin-defined columns. Both are idempotent.

### Monitoring

```bash
.venv/bin/carbonation -c /opt/carbonation/config.toml status
```

The `status` command shows:
- Service heartbeat age (warns if stale)
- Component status breakdown by time window (1h/4h/1d/7d/30d)
- Delivery and archive totals
- Retry queue depth
- Oldest incomplete group
- Watch directory health

For log aggregation, set `format = "json"` in the logging config to produce structured JSON lines.

## Development

```bash
uv sync                                    # install dependencies
uv run pytest tests/                       # all tests (~50s)
uv run pytest tests/ -m "not integration"  # unit tests only (~2s)
uv run ruff check src/ tests/              # lint
```

## License

MIT
