Metadata-Version: 2.4
Name: otter-stream
Version: 0.1.0
Summary: dbt inspired streaming tool
Project-URL: Source, https://gitlab.com/get-otter/otter-sdk
Author: Oliver Guy
License-Expression: AGPL-3.0-only
License-File: LICENSE
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.11
Requires-Dist: duckdb>=1.0
Requires-Dist: pika>=1.3
Requires-Dist: pyarrow>=15.0
Requires-Dist: pydantic-settings>=2.0
Requires-Dist: pydantic>=2.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: structlog>=24.0
Requires-Dist: typer>=0.15.0
Description-Content-Type: text/markdown

# Otter

dbt-inspired streaming transforms. Consume events from a message queue, transform them with SQL, write structured Parquet output.

```
RabbitMQ  ──▶  DuckDB SQL models  ──▶  Parquet files
```

Otter connects to a message queue, pulls batches of JSON messages, loads each one into a DuckDB table called `incoming`, runs your SQL SELECT model against it, and writes the results as Parquet files. Failed messages go to a dead-letter queue. That's it.

## Install

```bash
uv add otter-stream
# or with pip
pip install otter-stream
```

## Quick start

```bash
# Scaffold a new project
otter init my-pipeline
cd my-pipeline

# Edit config and models
vim config/sources.yml
vim models/example.sql

# Validate everything parses
otter validate

# Test against a fixture
otter test --fixture tests/sample.json

# Run the pipeline
otter run
```

## How it works

### Project structure

```
my-pipeline/
  config/
    sources.yml       # Queue connection + batch settings
    sinks.yml         # Output path + format
  models/
    orders.sql        # SQL SELECT models — one per transformation
    purchases.sql
```

### The `incoming` table

Every message consumed from the queue is loaded into a single-row DuckDB table called `incoming`. Your SQL model SELECTs from it. DuckDB handles nested JSON natively — access nested fields with dot notation (`incoming.user.id`) and arrays with `unnest()`.

### SQL models

Models are plain SQL SELECT statements. If it runs in DuckDB, it works in Otter (because it uses DuckDB!).

```sql
-- models/orders.sql
SELECT
    incoming.order_id,
    incoming.user.id            AS user_id,
    incoming.user.name          AS user_name,
    len(incoming.items)         AS item_count,
    list_sum([
        item.price * item.quantity
        FOR item IN incoming.items
    ])                          AS order_total,
    incoming.currency,
    incoming.placed_at
FROM incoming
```

One message can produce zero, one, or many output rows. Use `unnest()` to explode arrays:

```sql
-- models/purchases.sql
SELECT
    incoming.order_id,
    unnest.sku,
    unnest.name                              AS product_name,
    unnest.quantity,
    unnest.price                             AS unit_price,
    round(unnest.price * unnest.quantity, 2)  AS line_total,
    incoming.user.id                         AS user_id,
    incoming.placed_at
FROM incoming, unnest(incoming.items)
```

### Processing contract

| Outcome | Behaviour |
|---------|-----------|
| Good message | Transformed row(s) appended to the WAL, fsynced, then acked. Later coalesced into Parquet. |
| Bad SQL transform | Original message + error sent to DLQ. Message acked. |
| Unparseable JSON | Raw bytes + parse error sent to DLQ. Message acked. |
| Worker crash (WAL intact) | On restart, WAL segments are replayed to Parquet before polling resumes. |
| Worker crash (WAL disk lost) | Un-flushed rows are lost — rabbit already acked them. Use a durable volume in prod. |
| Parquet write failure | WAL segment stays on disk; next flush retries. `/readyz` goes red via `sink.is_healthy()`. |

## Message flow and guarantees

Otter coalesces many source batches into fewer, larger Parquet files using a per-worker write-ahead log. The main loop looks like this:

```
rabbit.poll ─▶ deserialize ─▶ transform (per model) ─▶ wal.append (per model)
                                                              │ fsync
                                                              ▼
                                                          rabbit.ack
                                                              │
                                              (size or age trigger per model)
                                                              ▼
                                      wal.read ─▶ sink.write (atomic) ─▶ wal.truncate
```

- **WAL format.** Each model gets a segment file at `<wal_path>/<model_name>/segment.arrow` in the Arrow IPC stream format. Every append is fsynced before the source ack, so at most the last in-flight batch can be lost on a mid-append crash.
- **Flush triggers.** Per model, you configure `max_file_bytes` and `max_age_seconds`. Either threshold triggers a drain. The loop checks after every poll, including `None` polls, so idle streams still flush by age. `max_file_bytes` is measured against in-memory Arrow `Table.nbytes` (column-buffer bytes), **not** bytes on disk — the WAL segment file itself can be noticeably larger once the Arrow IPC stream's per-record-batch flatbuffer headers and alignment padding are accounted for (easily 1.5–2× for small, frequent batches). The compressed Parquet output is then typically 2–10× smaller than the in-memory figure. Size the threshold against in-memory nbytes, not `ls -lh` on the WAL file.
- **Atomic Parquet writes.** Non-partitioned writes land in a `.<name>.tmp` file and are `os.replace`-d into place after `fsync`. Partitioned writes stage into a `.staging_<uuid>/` directory and are committed with a single rename to `commit_<ts>_<uuid>/`. Readers scan the output tree recursively, so the added directory layer is invisible to Trino / Athena / DuckDB / Spark.
- **Crash recovery.** On startup the worker walks its WAL root and replays any surviving segments through the sink before polling resumes. Corrupted segment tails (detectable as `pyarrow.ArrowInvalid`) are logged and deleted. A startup sweep also removes any `.staging_*` / `.*.tmp` leftovers from a prior crashed write.
- **Delivery semantics.** At-least-once. A crash after `sink.write` but before `wal.truncate` will re-emit the segment as a new Parquet file on recovery. Deduplicate downstream if you need exactly-once.
- **Sink failures.** If the sink rejects a flush, the WAL segment stays on disk, the loop retries on the next flush tick, and `/readyz` reports unhealthy via `sink.is_healthy()`.

### Per-model flush thresholds (`config/models.yml`)

```yaml
max_file_bytes: 134217728    # 128 MB default
max_age_seconds: 300         # 5 min default

overrides:
  orders:
    max_file_bytes: 536870912   # 512 MB
    max_age_seconds: 60
  purchases:
    max_file_bytes: 10485760    # 10 MB
    max_age_seconds: 1500
```

### Deployment: durable WAL storage

Otter workers are **stateful** — the WAL lives on local disk. Production deployments should point `wal.path` at a cloud-managed durable volume via a Kubernetes `StatefulSet` + `PersistentVolumeClaim` (EBS gp3, GCP Persistent Disk, Azure Managed Disks). Semantics are identical to local disk — `fsync`, atomic rename — but survive host failure with ~99.999% durability.

Otter does **not** replicate the WAL to object storage. The right answer for cross-host durability is infrastructure (a durable PV), not application-layer S3 mirroring. Local ephemeral disk is fine for dev and for workloads that can tolerate losing the in-flight WAL on host failure.

## Configuration

### `config/sources.yml`

```yaml
source:
  type: rabbitmq
  host: ${RABBITMQ_HOST:-localhost}
  port: 5672
  vhost: /
  username: guest
  password: ${RABBITMQ_PASSWORD}
  queue: orders.raw
  dlq: orders.dlq
  batch_size: 100          # Messages per batch
  batch_timeout_ms: 5000   # Flush partial batch after this timeout
```

### `config/sinks.yml`

```yaml
sink:
  type: parquet
  path: ./output
  partition_by: [event_type]   # Optional: partition into subdirectories
  dedupe_key: order_id         # Optional: downstream deduplication key
```

### Environment variables

Use `${VAR}` or `${VAR:-default}` anywhere in YAML config. Values are interpolated before parsing.

### CLI overrides

CLI flags override config file values (e.g. `--log-level DEBUG`).

## CLI reference

### `otter run`

Start the pipeline. Discovers models and config in the current directory.

```bash
otter run                              # Run all models
otter run --model models/orders.sql    # Run a specific model
otter run --dry-run                    # Consume one batch, print output, no writes
otter run --log-json                   # JSON log output (for production)
otter run --log-level DEBUG            # Verbose logging
otter run --config path/to/config      # Custom config directory
```

### `otter validate`

Check config files and SQL models without connecting to anything.

```bash
otter validate
otter validate --config path/to/config --models path/to/models
```

### `otter test`

Run a model against a local JSON fixture file. No queue connection needed.

```bash
otter test --fixture fixtures/sample_order.json
otter test --fixture fixtures/sample_order.json --model models/orders.sql
```

### `otter init`

Scaffold a new project with example config and models.

```bash
otter init my-pipeline
```

### `otter models list`

Show all discovered SQL models.

```bash
otter models list
otter models list --models path/to/models
```

### `otter dlq inspect`

Peek at messages in the dead-letter queue (non-destructive).

```bash
otter dlq inspect
otter dlq inspect --limit 5
```

### `otter dlq replay`

Replay all DLQ messages back to the source queue for reprocessing.

```bash
otter dlq replay
otter dlq replay --target another.queue
```

## Development

```bash
make install-dev    # Install dev dependencies
make test           # Run all tests
make lint           # Run ruff check
make format         # Run ruff format
```
