Metadata-Version: 2.4
Name: flowrhythm
Version: 0.1.0
Summary: Asynchronous job processing framework with dynamic worker scaling
Project-URL: Homepage, https://github.com/maxsimov/flowrhythm
Project-URL: Repository, https://github.com/maxsimov/flowrhythm
Author: Andrey Maximov
License-Expression: MIT
License-File: LICENSE
Keywords: asyncio,job processing,pipeline,scaling,worker pool
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.13
Description-Content-Type: text/markdown

# flowrhythm

[![CI](https://img.shields.io/github/actions/workflow/status/maxsimov/flowrhythm/ci.yml?branch=main&label=CI)](https://github.com/maxsimov/flowrhythm/actions)
[![codecov](https://codecov.io/github/maxsimov/flowrhythm/graph/badge.svg?token=KRRENIJ5UF)](https://codecov.io/github/maxsimov/flowrhythm)
[![License: MIT](https://img.shields.io/github/license/maxsimov/flowrhythm)](LICENSE)
[![Python 3.13+](https://img.shields.io/badge/python-3.13%2B-blue)](https://www.python.org/downloads/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

**Asynchronous, auto-scaling job pipeline for Python**

`flowrhythm` is an asyncio-native framework for stream processing pipelines. Define a pipeline as a sequence of plain async functions, then tune scaling and queues per stage at runtime.

## Contents

- [When to use flowrhythm](#when-to-use-flowrhythm) — fit signals, and "why not Celery / Faust / asyncio.Queue"
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Stages](#stages) — what's inside a stage, the four transformer shapes, how each is invoked
- [Designing a flow](#designing-a-flow) — linear, reusable chains, routing, sub-flow composition, naming
- [Configuring a flow](#configuring-a-flow) — per-stage scaling, queues, error handler
- [Scaling Strategies](#scaling-strategies) — `FixedScaling`, `UtilizationScaling`, custom
- [Error Handling](#error-handling) — typed events, handler-decides-policy
- [Driving a flow](#driving-a-flow) — `run`, `push`, `drain`, `stop`; bounded vs unbounded vs push
- [Inspecting a flow](#inspecting-a-flow) — `dump(mode="structure")` and `dump(mode="stats")`
- [Troubleshooting](#troubleshooting) — common failure modes; where to look first
- [Public API](#public-api) — full method and type reference
- [Guarantees](#guarantees) — what the framework promises, and what it doesn't
- [Architecture](#architecture) — design principles, component overview, pipeline flow
- [Project status](#project-status) — roadmap and active plans

---

## When to use flowrhythm

flowrhythm is the right fit when:
- You need to **process a stream of items** through several async stages
- The work is **I/O- or external-process-bound** (HTTP calls, subprocesses, DB writes)
- You want **per-stage auto-scaling** so slow stages don't stall the pipeline
- You need to **stay in-process** — no broker, no external queue infrastructure

It's the wrong fit when:
- Items are independent **fire-and-forget tasks** with no chain — use Celery or arq
- The work is **CPU-bound** in Python — use multiprocessing or hand off to subprocesses
- You need **distributed processing across machines** — use Dask, Ray, or Beam
- You need **persistent / replayable** streams — use Kafka + Faust or a real stream processor

### Why not just `asyncio.Queue` + tasks?

You can build a pipeline with `asyncio.Queue` and a few `create_task` calls, and for a one-stage pipeline that's the right call. flowrhythm becomes worth it when you have multiple stages with different throughput, want auto-scaling per stage, branching, per-worker resources (DB connections, subprocesses), and graceful drain — wiring those by hand is tedious and easy to get wrong.

### Why not Celery or arq?

Those are **task queues** — items don't flow through stages, they're independent jobs. flowrhythm is **pipeline-shaped**: each item goes through the same chain of transformations. If your work is "process this one job, return a result," use a task queue. If it's "stream items through filter → transform → enrich → store," use flowrhythm.

### Why not Faust or Apache Beam?

Those are stream-processing frameworks for real Kafka-scale stream workloads, with persistent state, exactly-once semantics, and distributed execution. flowrhythm is in-process, asyncio-native, and explicitly does not provide persistence or exactly-once delivery (see [Guarantees](#guarantees)). It's the right choice when your stream lives entirely in one process and you want a lightweight async-native API.

---

## Installation

```bash
pip install flowrhythm
```
_Not yet published. Use `pip install .` locally from source._

---

## Quick Start

A `flow` is a chain of async stages. The last stage consumes the items (the "sink"). Activate the chain by passing a source generator to `run()`.

```python
from flowrhythm import flow

async def double(x):
    return x * 2

async def write(x):
    print("stored:", x)

async def items():
    for i in range(10):
        yield i

await flow(double, write).run(items)
```

That's it — three async functions, one chain, one call to drive it. Real-world pipelines add branching, scaling, error handling, and more — see [Designing a flow](#designing-a-flow) and [Driving a flow](#driving-a-flow).

---

## Stages

A flow is a sequence of **stages**. Items enter at the first stage's input queue, flow through each stage, and the last stage consumes them (the **sink**). Where items come from is decided when you activate the flow — see [Driving a flow](#driving-a-flow).

### What's inside a stage

Each stage owns:
- **Input queue** — items waiting to be processed (configurable: FIFO, LIFO, priority)
- **Worker pool** — N async tasks pulling from the input queue, processing items, pushing results into the **next stage's input queue**
- **Scaling strategy** — decides N based on live stats (queue length, worker utilization)

A queue lives **between** two stages. The downstream stage owns it. Configuring a queue (`flow.configure("normalize", queue=priority_queue, queue_size=10)`) configures that stage's *input* queue — which is the upstream stage's destination.

```
                                    ┌─ stage owns input queue ─┐
                                    │                          │
                                    ▼                          ▼
   ┌──────────────┐   queue1   ┌──────────────┐   queue2   ┌──────────────┐   queue3   ┌──────┐
   │ item source  │ ─────────▶ │ transformer1 │ ─────────▶ │ transformer2 │ ─────────▶ │ sink │
   │ (external)   │            │   (N wkrs)   │            │   (M wkrs)   │            │(K wkr)│
   └──────────────┘            └──────────────┘            └──────────────┘            └──────┘
                                  │   │   │
                                  ▼   ▼   ▼
                              scaling strategy
```

- 3 stages → 3 queues (one fronts each stage; the source feeds queue1)
- `queue1` is transformer1's input (and the item source's destination)
- `queue3` is sink's input (and transformer2's destination)
- Sink has no output queue — items terminate there

The **item source** is external to the flow. It's whatever drives items into queue1 — see [Driving a flow](#driving-a-flow).

> **Async-only.** All stages must be async. Sync functions and sync context managers are rejected at construction. For sync code, wrap with `asyncio.to_thread` or use the `sync_stage()` helper (see [Public API](#public-api)).

### Transformer

A middle stage. Multiple workers, auto-scaled (including scale-to-zero).

There are two **per-item transformers** (the framework invokes them once per item):

| Shape you write | When to use |
|---|---|
| `async def f(x) -> y` | Simple stateless processing — parse, transform, enrich |
| Factory `() -> AsyncContextManager` whose `__aenter__` returns an `async def fn(x) -> y` | Per-worker resource lifecycle — subprocess, DB connection, HTTP session |

And two **sub-pipelines** (multi-stage units that the framework expands into the parent pipeline at construction):

| Shape you write | What happens |
|---|---|
| A `Flow` (from `flow(...)`) | Sub-flow's stages are stitched into the parent pipeline; each keeps its own queue + workers + scaling. See [Composing flows](#composing-flows). |
| A `Router` (from `router(...)`) | Each arm becomes a sub-pipeline; classifier dispatches items to the chosen arm. See [Routing](#routing). |

#### Plain async function

```python
async def normalize(x):
    return x.strip().lower()
```

#### CM factory — function form

Use when the stage needs setup/teardown per worker (a connection, a subprocess):

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def with_db():
    db = await connect_db()
    async def fn(x):
        return await db.process(x)
    yield fn
    await db.close()
```

#### CM factory — class form

A class with no-arg constructor and `__aenter__` / `__aexit__` is also a factory:

```python
class WithDB:
    async def __aenter__(self):
        self.db = await connect_db()
        async def fn(x):
            return await self.db.process(x)
        return fn
    async def __aexit__(self, *exc):
        await self.db.close()
```

#### Sub-flow

A `Flow` used as a stage. Inlined into the parent at construction:

```python
preprocess = flow(decode, validate)        # see Composing flows
```

#### Router

A `Router` used as a stage. Dispatches items to one of several arms:

```python
split = router(classify, fast=quick, slow=heavy)   # see Routing
```

### Sink (implicit — last stage)

There is no separate "sink" type. The **last stage in `flow()` plays the sink role when run autonomously**: its return value is dropped, and it is what the items "land on."

```python
async def db_write(x):
    await db.insert(x)

chain = flow(normalize, db_write)    # db_write is the last stage → sink under run()
await chain.run(reader)              # reader's items → normalize → db_write (consumes)
```

The same chain composed inside another flow has no sink — its last stage's output flows into the parent's downstream queue:

```python
inner = flow(normalize, enrich)      # last stage = enrich
outer = flow(inner, db_write)        # inner is a transformer; enrich's output → db_write
await outer.run(reader)
```

Same `flow(normalize, enrich)` definition; sink behavior is determined by context, not declaration.

### How each shape behaves at runtime

Knowing how each shape executes helps you reason about resources, lifecycle, and side effects. This section describes what you should expect — not the framework's internals.

#### Plain async function — `async def fn(x) -> y`

Called **once per item**. No setup, no teardown.

The framework guarantees:
- Your function receives one item, returns one result.
- Multiple workers call your function concurrently — design it to be safe under concurrency, or use the CM factory shape for per-worker state.
- Function-local state (counters, buffers) does NOT persist across calls. If you need per-worker state, use the CM factory shape.

#### CM factory — `() -> AsyncContextManager[Callable]`

Called **once per worker** to set up resources, then the yielded callable is invoked once per item until the worker stops.

The framework guarantees:
- Your factory is called as `factory()` once at worker startup; `__aenter__` runs; the yielded callable is reused for every item that worker processes.
- `__aexit__` runs once when the worker stops (whether on normal shutdown, scale-down, `stop()`, or even unhandled exceptions).
- Anything you set up in the factory body lives for the worker's lifetime — that's where you bind per-worker state (a connection, a subprocess, a model).
- Each worker has its own context — N workers means N independent setup/teardown cycles. No sharing.

**You pass the factory itself, not a built CM:**

```python
flow(with_db, db_write)              # ✓ pass the factory
flow(with_db(), db_write)            # ✗ pass a built CM (only enterable once)
```

Both `@asynccontextmanager`-decorated functions and classes implementing `__aenter__`/`__aexit__` (with a no-arg constructor) satisfy the factory shape.

#### Sub-flow — `Flow`

When a `Flow` appears inside another `flow(...)`, its stages are folded into the parent pipeline at construction. See [Composing flows](#composing-flows) under "Designing a flow" for the full treatment, including how sub-flow stage names work and how to override sub-flow config from the parent.

#### Router — `Router`

When a `Router` appears inside `flow(...)`, each arm becomes its own sub-pipeline. See [Routing](#routing) under "Designing a flow" for the full treatment, including the effective pipeline shape, arm dispatch, and behavior on classifier miss.

### Worker lifecycle and scaling

Workers come and go based on the scaling strategy:

| Event | Plain async fn | CM factory |
|---|---|---|
| Worker spawned (scale up) | Reference captured | `factory()` called → `__aenter__` runs → resource acquired |
| Worker processes item | `await fn(item)` | `await fn(item)` (using resource from `__aenter__`) |
| Worker stopped (scale down) | Coroutine cancelled | `__aexit__` runs → resource released |
| Stage scales 0 → 1 | First worker spawned, processes pending items | First worker spawned, factory called, resource acquired (latency on first item) |
| Stage scales N → 0 | All workers cancelled | All `__aexit__` run; all resources released |

### Error Handler

Pipeline-level. Receives **typed events** describing failures and drops. One per flow. See [Error Handling](#error-handling) for the full set of event types and behavior.

```python
from flowrhythm import TransformerError, SourceError, Dropped

async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case SourceError(exc):
            log.critical("source failed: %s", exc); raise
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s: %s", item, stage, reason.name)
```

### Scaling Strategy

Decides how many workers a stage runs based on live `StageSnapshot`. Built-in: `FixedScaling`, `UtilizationScaling`. Or implement the protocol yourself.

---

## Designing a flow

A flow is **pure structure** — the chain of stages. How items get fed in is decided at activation time (`run()` or `push()`). Configuration (scaling, queues) is separate from both structure and activation.

### Linear

```python
chain = flow(normalize, db_write)
await chain.run(reader)
```

### Routing

`router()` dispatches each item to one of several arms based on a classifier function. It's a sub-pipeline, not a function call — each arm becomes its own mini-pipeline inside the parent, with its own input queue and worker pool.

#### Signature

```python
router(classifier, **arms, default=None)
```

- `classifier` — `async def (item) -> label` — returns the keyword name of the arm to dispatch to
- `**arms` — keyword args mapping label → Transformer (function, CM factory, chain, or full flow)
- `default` — optional fallback Transformer for unmatched labels (if omitted, unmatched items become `Dropped` events)

#### Example

```python
heavy_path = flow(decode, heavy)

main = flow(
    normalize,
    router(classify,
        fast=quick,             # plain async function
        slow=heavy_path,        # transform chain (a Flow)
        default=passthrough,    # optional fallback
    ),
    db_write,
)
await main.run(items)
```

#### What runs at runtime

The router's classifier runs as a single stage. Each arm becomes a sub-graph that flows back into the same downstream queue:

```
                              ┌─ quick ──────────────────────────┐
normalize → [q] → classify ──┤                                  ├──► [q] → db_write
                              └─ heavy_path.decode → heavy_path.heavy ─┘
```

Each arm has its own queue and worker pool. Outputs of all arms feed the same downstream queue (the stage after the router). Sub-flow arms are inlined the same way as sub-flow stages elsewhere — see [Composing flows](#composing-flows).

#### Behavior on classifier miss

If the classifier returns a label that has no matching arm and no `default` is set, the item is **dropped** and the error handler receives a `Dropped(item, stage, reason=DropReason.ROUTER_MISS)` event. The pipeline continues by default. If you want to abort on misses, raise from your error handler — see [Error Handling](#error-handling).

### Composing flows

A flow can be embedded as a stage in another flow. The framework **folds the sub-flow's stages into the parent pipeline** at construction — sub-flows are not function calls; their stages become real stages of the parent. Each sub-stage retains its own queue, worker pool, scaling, and config. Activation (`run`, `push`) only happens on the outermost flow.

```python
ingest = flow(parse, validate)
ingest.configure("validate", scaling=UtilizationScaling(min_workers=1, max_workers=8))

main = flow(ingest, db_write)
await main.run(items)
```

#### What runs at runtime

The effective graph that runs is:

```
items → [queue] → ingest.parse → [queue] → ingest.validate → [queue] → db_write
```

Four stages, three queues. `ingest.validate` keeps its 1–8 worker pool exactly as `ingest.configure` specified — composition does not change scaling. The first `ingest.parse` worker pushes its output into `ingest.validate`'s input queue and moves on. Items flow through autonomously.

#### Stage names in the parent

Sub-flow stages get the sub-flow's variable name as a prefix in the parent: `ingest.parse`, `ingest.validate`. You can override their config from the parent using the dotted name:

```python
main.configure("ingest.parse", scaling=FixedScaling(workers=2))
```

Sub-flows can contain sub-flows; names compose dotted (`outer.middle.inner.stage`).

#### The same flow runs standalone or composed

```python
await ingest.run(items)   # standalone — last stage (validate) is the sink
```

The same `ingest` definition behaves identically whether you `run()` it directly or embed it in another flow. Composition is purely structural; nothing about the sub-flow's behavior changes.

### Naming

Stage names are auto-derived from function names. Collisions get a numeric suffix:

```python
chain = flow(normalize, normalize, db_write)
# stage names: "normalize", "normalize_2", "db_write"
```

Override when needed:

```python
from flowrhythm import stage

chain = flow(
    stage(normalize, name="parse"),
    db_write,
)
```

---

## Configuring a flow

Configuration is operational — it tunes how the flow runs without changing what it does. Two equivalent styles are available: constructor keywords (one-shot setup) and methods (incremental, e.g. reading from a config file).

### Constructor keywords (shorthand)

```python
from flowrhythm import flow, FixedScaling, priority_queue

chain = flow(
    normalize, db_write,
    on_error=on_error,
    default_scaling=FixedScaling(workers=2),
    default_queue=priority_queue,
    default_queue_size=10,
)
await chain.run(items)
```

`on_error`, `default_scaling`, `default_queue`, and `default_queue_size` cover **flow-level** configuration. Per-stage configuration (different scaling for one specific stage) needs the method form.

### Methods (incremental)

```python
from flowrhythm import flow, FixedScaling, UtilizationScaling, priority_queue

chain = flow(normalize, db_write)

# Pipeline-wide defaults
chain.configure_default(scaling=FixedScaling(workers=2))

# Per-stage tuning (only via method — no constructor shorthand)
chain.configure("normalize", scaling=UtilizationScaling(max_workers=8))
chain.configure("normalize", queue=priority_queue, queue_size=10)
chain.configure("db_write", queue_size=20)   # bump just the buffer; default queue type

# Error handler
chain.set_error_handler(on_error)

await chain.run(items)
```

The two forms are fully equivalent — pick whichever fits your code shape.

### Same chain, different environments

```python
def build():
    return flow(normalize, db_write)

dev = build()
dev.configure_default(scaling=FixedScaling(workers=1))
await dev.run(test_source)              # batch from a small fixture

prod = build()
prod.configure("normalize", scaling=UtilizationScaling(max_workers=32))
await prod.run(kafka_source)            # stream from production
```

---

## Scaling Strategies

### FixedScaling
Constant worker count. Requires `workers >= 1` (use `UtilizationScaling` for scale-to-zero).
```python
FixedScaling(workers=4)
```

### UtilizationScaling

Adjusts worker count based on the busy/idle ratio. Supports scale-to-zero via `min_workers=0`.

```python
UtilizationScaling(
    min_workers=0,           # never scale below this
    max_workers=8,           # never scale above this
    lower_utilization=0.2,   # if busy/total < 0.2, consider scaling down
    upper_utilization=0.8,   # if busy/total > 0.8, consider scaling up
    upscaling_rate=2,        # max workers added per scale-up decision
    downscaling_rate=1,      # max workers removed per scale-down decision
    cooldown_seconds=5.0,    # minimum seconds between scale events (anti-flap)
    dampening=0.5,           # multiplier on rate — 0.5 means add/remove half
    sampling_period=2.0,     # only consider scaling every N seconds
    sampling_events=50,      # only consider scaling every N items
)
```

#### How it works

On every item enqueue/dequeue, the strategy checks whether to scale. Two gates run first:

1. **Sampling** — if `sampling_period` or `sampling_events` is set, scaling decisions are throttled (only consider every N seconds or every N events). This avoids checking on every single item in high-throughput stages. If both are set, both must pass.
2. **Cooldown** — if a scale event happened within the last `cooldown_seconds`, do nothing. Prevents flapping between scale-up and scale-down.

If both gates pass, the strategy reads `utilization = busy_workers / total_workers` and decides:
- `utilization > upper_utilization` and `workers < max_workers` → scale up by `upscaling_rate × dampening` (rounded down, minimum 1)
- `utilization < lower_utilization` and `workers > min_workers` → scale down by `downscaling_rate × dampening` (rounded down, minimum 1)
- Otherwise → no change

#### Tuning guide

- **Bursty workload** — set higher `upscaling_rate` (e.g. 4-8) and lower `cooldown_seconds` (e.g. 1.0) to react quickly to spikes
- **Steady workload** — keep defaults; `cooldown_seconds=5.0` and rate of 1-2 is calm and predictable
- **Expensive workers** (subprocesses, GPU models) — keep `dampening` low (0.3-0.5) to scale conservatively
- **High-throughput stage** — set `sampling_events=100` or `sampling_period=1.0` to avoid overhead from per-item scaling decisions

### Custom strategy
Implement the protocol — note the methods are **synchronous**:
```python
class MyStrategy:
    def initial_workers(self) -> int: ...
    def on_enqueue(self, stats: StageSnapshot) -> int: ...
    def on_dequeue(self, stats: StageSnapshot) -> int: ...
```
Return positive to add workers, negative to remove, `0` for no change.

> **Strategies must be sync.** Scaling decisions are called on every item event (potentially millions/sec); awaiting external services here would block the hot path. If you need external state for a decision, refresh it in a background task and read it synchronously here.

### Worker rules
- **The source** (when passed to `run()`) is consumed by exactly one task — async generators cannot be safely consumed concurrently
- **All stages in the chain** can scale, including down to zero
- A worker holds its async context manager for its lifetime; releasing happens on shutdown
- First item after `0→1` transition pays the resource-acquire cost

---

## Error Handling

Errors and drops are reported to a single pipeline-level **error handler** as **typed events**. The handler is **observer-only** — it logs, accounts, or forwards events, but it does not control pipeline flow:

- **Returns normally** → pipeline continues; the failed item is dropped
- **Raises** → the exception is logged to stderr; pipeline still continues; the failed item stays dropped

This means a buggy handler (a logger that throws, a metric backend that's down) cannot abort your pipeline. To stop a run based on what the handler observes, call `chain.stop()` from outside — see [Aborting from inside the handler](#aborting-from-inside-the-handler) below.

Two layers of error handling, in order:

1. **Inside the transformer** (preferred). Catch and handle there: retry, return a sentinel value, drop silently. Most error logic should live here because the transformer has full context.
2. **Pipeline error handler** (last resort). Whatever escapes the transformer (or comes from the source / framework decisions like `Dropped`) is routed here.

### Event types

```python
from dataclasses import dataclass
from enum import Enum

@dataclass
class TransformerError:
    item: Any
    exception: Exception
    stage: str            # name of the stage that failed

@dataclass
class SourceError:
    exception: Exception  # raised inside the source generator

class DropReason(Enum):
    UPSTREAM_TERMINATED   # Last(value) upstream caused this item to be discarded
    ROUTER_MISS           # router classifier returned an unknown arm and no default

@dataclass
class Dropped:
    item: Any
    stage: str
    reason: DropReason
```

### Writing a handler

```python
from flowrhythm import flow, TransformerError, SourceError, Dropped

async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case SourceError(exc):
            log.critical("source failed: %s", exc)
            # source is exhausted; pipeline drains naturally
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s (%s)", item, stage, reason.name)

chain = flow(normalize, db_write)
chain.set_error_handler(on_error)
await chain.run(items)
```

### Default behavior (no handler set)

| Event | Default |
|---|---|
| `TransformerError` | Logged to stderr, pipeline continues |
| `SourceError` | Logged to stderr, pipeline drains (source is treated as exhausted) |
| `Dropped` | Silent continue |

Set a handler whenever you need different behavior for any of these.

### Aborting from inside the handler

The handler cannot abort the pipeline directly — raising just gets logged. To stop a run based on what the handler observes (e.g. "too many errors"), track state and call `chain.stop()` from the task driving the run:

```python
errors = 0

async def on_error(event):
    nonlocal errors
    if isinstance(event, TransformerError):
        errors += 1

run_task = asyncio.create_task(chain.run(items))
while not run_task.done():
    if errors > 100:
        await chain.stop()
        break
    await asyncio.sleep(0.5)
await run_task
```

This separates "observe errors" (handler) from "decide to stop" (caller).

---

## Driving a flow

The flow definition (`flow(...)`) describes the **chain of stages**. To make items actually flow through it, you have to **activate** the flow. There are three ways:

| Mode | What feeds items into the chain | Termination |
|---|---|---|
| **Bounded** — `await chain.run(source)` | An async generator you supply | Generator exhausts → drain → exit |
| **Unbounded** — `await chain.run()` | The framework emits `None` signals indefinitely | External `chain.stop()` or first stage raises |
| **Push** — `async with chain.push() as handle: await handle.send(item)` | You push items via a `PushHandle` | `handle.complete()` (explicit or via `async with` exit) |

Push mode returns a `PushHandle` from `chain.push()`; `send()` and `complete()` live on the handle, not on `Flow`. `stop()` is always available on `Flow` for graceful shutdown.

### Bounded — `run(source)`

You pass an async generator — the **source**. The framework iterates it, pushing each yielded item into the chain's first queue. When the generator is exhausted, the pipeline drains and exits.

```python
async def items():
    for i in range(100):
        yield i

chain = flow(normalize, db_write)
await chain.run(items)        # ← pass the function, not items()
```

> **Pass the generator function itself (`items`), not a call to it (`items()`).** The framework owns iteration — this lets it manage the source lifecycle (close cleanly on `drain()`, re-iterate on retry in future versions). Passing `items()` raises with a clear error pointing this out.

#### Why is the source consumed by exactly one task?

The framework consumes the source generator with **exactly one task** — never more. Async generators hold internal iteration state (where in the loop, last value yielded), and they cannot be safely consumed by multiple tasks:

- **Two tasks calling the generator function** → each gets its own independent generator → both yield the *same* sequence → duplicates downstream.
- **Two tasks sharing one generator instance** → race conditions on the iteration state → undefined behavior, lost or repeated items.

So if you want to ingest from multiple sources in parallel, see [parallel ingestion](#parallel-ingestion) below — don't try to scale the source itself.

#### Source can be a CM factory

If your source needs setup/teardown (open a Kafka connection, then stream from it), pass a no-arg factory returning an `AsyncContextManager` whose `__aenter__` yields an async generator:

```python
@asynccontextmanager
async def kafka_source():
    consumer = await connect_kafka()
    async def gen():
        async for msg in consumer:
            yield msg
    yield gen()
    await consumer.close()

await chain.run(kafka_source)
```

### Unbounded — `run()` (no source)

When you don't pass a source, the framework auto-emits `None` signals into the first stage's queue indefinitely. Useful for "just keep working" pipelines where the first stage knows how to fetch its own data.

```python
async def fetch(_):
    return await kafka.next()

chain = flow(fetch, normalize, db_write)
asyncio.create_task(chain.run())
# ... later, when you want to stop
await chain.stop()
```

### Push — `chain.push()` + `send()`

You drive the flow yourself by pushing items via a `PushHandle`. Useful for embedding flowrhythm into a web server, WebSocket handler, or any event-driven context where items arrive from outside the flow's control.

```python
chain = flow(normalize, db_write)

async with chain.push() as handle:
    await handle.send(item1)
    await handle.send(item2)
# on exit: handle.complete() is called automatically; chain drains; workers shut down
```

`handle.send()` blocks if the downstream queue is full — natural backpressure.

If you need to signal end-of-stream before the `async with` exits (e.g., the producing loop ended but you still want to do work after), call `await handle.complete()` explicitly. Subsequent `send()` calls will raise.

### Parallel ingestion

If your bottleneck is fetching from an upstream system (Kafka with high throughput, paginated API with many pages, SQS poller), don't try to parallelize the source. Instead, put the parallelism in a **fetcher transformer** — the first stage of the chain — which can be scaled freely.

**Option 1: tiny trigger source + fetcher transformer**
```python
async def trigger():
    while True:
        yield None              # one signal per fetch attempt

async def fetch_message(_):
    return await kafka.poll()   # external state; safe to call from N workers

chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run(trigger)
```

**Option 2: omit the source; framework auto-emits `None`**
```python
chain = flow(fetch_message, normalize, db_write)
chain.configure("fetch_message", scaling=UtilizationScaling(min_workers=4, max_workers=32))
await chain.run()              # no source arg → framework emits None forever
```

Same outcome, less boilerplate. The fetcher stage scales to as many workers as you need; each calls the upstream system independently.

### Stopping from inside a transformer — `Last(value)`

Sometimes the chain itself knows when to stop — a transformer that processes items until it sees a terminator marker, or a "process until X" pattern. Wrap the return value in `Last(value)` to signal "this is the final item":

```python
from flowrhythm import Last

async def process_until_done(item):
    if item.is_terminator:
        return Last(process(item))      # process and signal end
    return process(item)
```

What the user observes:
1. The wrapped value (`process(item)`) flows downstream as the **final item** the sink will see.
2. The chain immediately stops accepting new items. Anything still upstream of this transformer is dropped — those items will never reach the sink.
3. Items already past this transformer continue to the sink in order.
4. `chain.run(...)` returns once the sink has received the final item.

The framework guarantees: **the value wrapped in `Last(value)` is the absolute last item the sink processes. Nothing comes after it.**

Dropped upstream items are reported to the error handler as `Dropped(item, stage, reason=DropReason.UPSTREAM_TERMINATED)` events — by default, dropped items continue silently. If you need to log or capture them, set an error handler.

### Stopping a running flow

The three activation modes above are the only public ways to drive a flow. Combined with `stop()` (abort) and `drain()` (graceful), they cover every legitimate scenario.

```python
# Graceful shutdown
task = asyncio.create_task(chain.run(source))
# ...
await chain.drain()       # waits for items in flight to finish
await task                # task returns normally

# Abort
task = asyncio.create_task(chain.run(source))
# ...
await chain.stop()        # cancels workers, drops in-flight items
await task
```

#### What `drain()` does to your source

| Mode | What the user observes |
|---|---|
| `run(source)` | Framework calls `source.aclose()` on your generator; if you have `try/finally` cleanup in the generator, it runs. Then in-flight items finish. |
| `run()` (no source) | Framework stops emitting `None` signals. In-flight items finish. |
| `chain.push()` | The next `send()` raises. In-flight items finish. (Usually you don't need to call `drain()` explicitly here — exit the `async with` instead.) |

In all cases, `drain()` returns once every item that entered the chain has reached the sink or the error handler.

#### Push-mode shortcuts

Exiting the `async with chain.push() as h:` block automatically calls `h.complete()` and waits for the chain to drain — no explicit `drain()` needed. To abort instead of draining, call `chain.stop()` from another task.

---

## Inspecting a flow

`chain.dump()` renders the expanded pipeline graph for debugging. Three formats:

```python
print(chain.dump())                       # text (default)
print(chain.dump(format="mermaid"))       # paste into a markdown file
data = json.loads(chain.dump(format="json"))
```

For a flow like:

```python
chain = flow(
    parse,
    validate,
    stage(router(classify, fast=fast, slow=heavy_path), name="dispatch"),
    db_write,
)
```

`chain.dump()` shows:

```
flow (5 stages):
  [0] parse                              → [1]                          FixedScaling(workers=1) fifo_queue(maxsize=1)
  [1] validate                           → [2]                          FixedScaling(workers=1) fifo_queue(maxsize=1)
  [2] dispatch [classifier]              arms: {fast→[3], slow→[4]}     FixedScaling(workers=1) fifo_queue(maxsize=1)
  [3] dispatch.fast (arm-end)            → [5] (merge)                  FixedScaling(workers=1) fifo_queue(maxsize=1)
  [4] dispatch.slow (arm-end)            → [5] (merge)                  FixedScaling(workers=1) fifo_queue(maxsize=1)
  [5] db_write                           (sink)                         FixedScaling(workers=1) fifo_queue(maxsize=1)
```

Sub-flow stages and router arms appear with dotted names (`dispatch.fast`, `ingest.parse`) — composition is visible at a glance.

For live runtime stats, use `chain.dump(mode="stats")`. Readable both during a run AND after it finishes:

```python
print(chain.dump(mode="stats"))
# flow stats (completed):
#   [0] parse              0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=20  errors=0
#   [1] boom_sometimes     0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=16  errors=4
#   [2] dispatch           0 alive (0 busy, 0 idle)  queue=0 [drained]  processed=16  errors=0
#   ...
#
# events:
#   transformer errors: 4
#   source errors:      0
#   drops: 0
```

Per-stage `processed` and `errors` counters plus aggregate event totals. Useful for spotting bottlenecks (look at `queue_length` and `busy`/`idle` ratios) and validating health (`errors` and `drops`). Mermaid format isn't supported for stats (stats aren't graph-shaped).

---

## Troubleshooting

Common failure modes and what to do about them. Each entry is **symptom → why → fix**.

### `TypeError: pass the generator function, not the called generator`

**Symptom:**
```python
await chain.run(items())   # raises TypeError
```

**Why:** The framework owns iteration over your source — that's what lets it close the generator on `drain()`, manage retries, and otherwise control the source lifecycle. A pre-instantiated generator can only be iterated once and from one place; the framework can't hand it back.

**Fix:** Pass the function itself (no parentheses):
```python
await chain.run(items)
```

### `TypeError: transformer 'foo' is sync`

**Symptom:**
```python
def foo(x):           # def, not async def
    return x.upper()

flow(foo)             # raises at construction
```

**Why:** Sync functions block the event loop, freezing every other stage. The framework rejects them at construction so the failure mode is loud, not silent.

**Fix:** Either rewrite as `async def`, or wrap with `sync_stage()` for code that has to stay sync (CPU-bound work, third-party libs):
```python
from flowrhythm import sync_stage

flow(sync_stage(foo))   # runs each call in a thread via asyncio.to_thread
```

### Items disappear without reaching the sink

**Symptom:** Some inputs never arrive at the last stage. No exception, no obvious error.

**Why:** Items can vanish for several reasons; the framework treats them all as observable events but stays silent unless you set a handler:

| What happened | How to spot it |
|---|---|
| Transformer raised; default handler logged to stderr but pipeline continued | Check stderr; or `chain.dump(mode="stats")` shows `errors > 0` and `events.transformer_errors > 0` |
| Router classifier returned an unknown label and there was no `default` arm | Stats shows `events.drops.ROUTER_MISS > 0` |
| `Last(value)` fired upstream — items already in flight got cut off | Stats shows `events.drops.UPSTREAM_TERMINATED > 0` |
| Source generator raised; default handler logged + drained | Stats shows `events.source_errors > 0` |

**Fix:** Set an `on_error` handler so you see what's being dropped:

```python
async def on_error(event):
    match event:
        case TransformerError(item, exc, stage):
            log.error("stage %s failed on %r: %s", stage, item, exc)
        case Dropped(item, stage, reason):
            log.warn("dropped %r at %s: %s", item, stage, reason.name)
        case SourceError(exc):
            log.critical("source died: %s", exc)

chain = flow(..., on_error=on_error)
```

### Pipeline hangs and never returns

**Symptom:** `await chain.run(...)` doesn't return. Source long since exhausted.

**Why:** Almost always a transformer that never finishes — infinite loop, blocking I/O, deadlock on an external lock.

**Fix:** Use a watchdog to dump stats while the run is in progress. Look for stages with high `busy` count that don't change between dumps — those are stuck:

```python
async def watchdog():
    while True:
        await asyncio.sleep(5)
        print(chain.dump(mode="stats"))

asyncio.create_task(watchdog())
async with asyncio.timeout(60):    # bound the run while debugging
    await chain.run(items)
```

If you can't reproduce it locally, wrap the run in `asyncio.timeout(...)` in production and call `chain.stop()` on timeout — that cancels stuck workers and cleanly runs their resource cleanup (`__aexit__`).

### `Last(value)` isn't actually the last item the sink sees

**Symptom:** You return `Last(value)` from a transformer, but the sink processes items *after* `value`.

**Why:** With multiple workers in a downstream stage, item completion order isn't preserved across workers. The framework guarantees `value` enters the destination *queue* last; with a single-worker downstream that's also the last item *processed*, but with N>1 workers two items can be in flight simultaneously and finish in either order.

**Fix:** Configure the receiving stage with one worker:
```python
chain.configure("sink", scaling=FixedScaling(workers=1))
```

This applies to the stage that consumes `value` — usually the sink, sometimes a single-worker formatting stage right before it.

### `RuntimeError: cannot send() after complete()` in push mode

**Symptom:**
```python
async with chain.push() as h:
    await h.send(1)
await h.send(2)               # raises RuntimeError
```

Or:
```python
async with chain.push() as h:
    await h.send(1)
    await h.complete()
    await h.send(2)            # raises RuntimeError
```

**Why:** `complete()` is irreversible — it signals end-of-stream and starts the drain cascade. Exiting the `async with` block calls it automatically.

**Fix:** Keep all `send()` calls inside the `async with` block (or before any explicit `complete()`). The `async with` exit handles `complete()` for you:
```python
async with chain.push() as h:
    for item in source:
        await h.send(item)
    # complete() called automatically here
```

### Worker count stays high under `UtilizationScaling` after load drops

**Symptom:** After a burst of items, `dump(mode="stats")` shows the worker count hasn't shrunk back.

**Why (most common):** Scaling decisions fire on item enqueue/dequeue events. If items stop flowing entirely, no decision happens — the worker count stays at whatever it was. Workers are idle (`waiting on get`) but alive.

**Less common reasons:**

- **Cooldown period.** `UtilizationScaling(cooldown_seconds=5.0)` prevents another scale event within 5 seconds of the last one.
- **Sampling gates.** If you set `sampling_period` or `sampling_events`, decisions are throttled to those rates.
- **`min_workers > 0`.** Scaling never goes below `min_workers`. For full scale-to-zero, set `min_workers=0` (and you must use `UtilizationScaling` — `FixedScaling` requires `workers >= 1`).

**Fix:** Check your strategy parameters. To force a scale-down decision, send another item (or use a periodic "tick" trigger). To allow scale-to-zero, use `UtilizationScaling(min_workers=0, ...)`.

### Pipeline doesn't backpressure — memory grows unbounded

**Symptom:** Memory usage grows during long runs even though the sink can keep up.

**Why:** A stage with `queue_size=0` (unbounded) doesn't backpressure — its upstream can push items faster than it processes them. Default queue size is `1` precisely to prevent this. If you've explicitly opted into a larger or unbounded queue, you're trading backpressure for buffering.

**Fix:** Default to small queues and only increase per stage where bursty buffering helps:
```python
chain.configure("bursty_stage", queue_size=100)   # explicit, narrow scope
```

Avoid `queue_size=0` (unbounded) unless you have an external bound on input rate.

### Where to look first

If something's wrong:

1. `chain.dump(mode="structure")` — does the topology match what you wrote? Are stage names what you expected?
2. `chain.dump(mode="stats")` — are items flowing? Where are they piling up? How many errors / drops?
3. Set an `on_error` handler if you haven't — silent drops become loud.
4. Wrap the run in `asyncio.timeout(...)` when debugging hangs, and call `chain.stop()` on timeout to inspect cleanup.

---

## Public API

### Construction

| Symbol | Kind | Purpose |
|---|---|---|
| `flow(*stages, on_error=None, default_scaling=None, default_queue=None, default_queue_size=None)` | function | Construct a flow from a sequence of stages. Optional kwargs are shorthand for `set_error_handler` / `configure_default` |
| `router(classifier, **arms, default=...)` | function | Construct a router for branching |
| `stage(fn, name=...)` | function | Override the auto-derived name of a stage |
| `sync_stage(fn)` | function | Wrap a sync function with `asyncio.to_thread` so it can be used as an async stage |

### Activating a flow

| Method on `Flow` | Purpose |
|---|---|
| `flow.run(source=None)` | Run autonomously. With an async-generator source: bounded; without a source: unbounded (framework emits `None` signals) |
| `flow.push()` | Enter push mode — returns an `AsyncContextManager[PushHandle]` |
| `flow.drain()` | Graceful shutdown: stop the source (or `aclose()` your generator), wait for in-flight items to finish, then return |
| `flow.stop()` | Abort: cancel workers, drop in-flight items, release resources |

| Method on `PushHandle` (returned by `flow.push()`) | Purpose |
|---|---|
| `handle.send(item)` | Push an item into the flow's first queue (blocks if queue is full) |
| `handle.complete()` | Signal end-of-stream (called automatically on `async with` exit) |

### Configuration and inspection (Flow methods)

| Method | Purpose |
|---|---|
| `flow.configure(name, scaling=..., queue=..., queue_size=...)` | Per-stage tuning. `queue=` is a queue factory (`fifo_queue`, `priority_queue`, …); `queue_size=` is the queue's `maxsize`. Either or both may be set independently. |
| `flow.configure_default(scaling=..., queue=..., queue_size=...)` | Pipeline-wide defaults; same kwargs as `configure()` |
| `flow.set_error_handler(handler)` | Set the error sink for uncaught transformer exceptions |
| `flow.dump(mode="structure"\|"stats", format="text"\|"mermaid"\|"json")` | Inspect the flow. `mode="structure"` renders the pipeline graph (text/mermaid/JSON). `mode="stats"` renders live runtime stats: per-stage worker counts, queue lengths, processed/error counters, drops by reason. Stats supports text + JSON only. |

### Types and helpers

| Symbol | Kind | Purpose |
|---|---|---|
| `Flow` | class | Type hint only — `def helper(f: Flow) -> Flow`. Construct via `flow()`. |
| `Router` | class | Type hint only — produced by `router()` |
| `PushHandle` | class | Type hint only — returned by `flow.push()`; provides `send()` and `complete()` |
| `Last` | class | Wrap a transformer's return value: `return Last(result)` to mark this as the final item |
| `TransformerError`, `SourceError`, `Dropped` | dataclasses | Event types passed to the error handler |
| `DropReason` | enum | Reasons items get dropped (`UPSTREAM_TERMINATED`, `ROUTER_MISS`) |
| `FixedScaling`, `UtilizationScaling` | classes | Built-in scaling strategies |
| `ScalingStrategy`, `StageSnapshot` | protocol / dataclass | For implementing custom strategies |
| `fifo_queue`, `lifo_queue`, `priority_queue` | functions | Queue factories |

---

## Guarantees

Promises the framework makes to the user. If any of these are violated, it's a bug.

### Item processing
- **Every item that enters the chain reaches a terminal state** before `run()` returns or `async with chain.push()` exits. A terminal state is: consumed by the sink, *or* routed to the error handler.
- **`Last(value)` is final.** If a transformer returns `Last(value)`, the sink will see `value` as its last item. No item can reach the sink after `value`.

### Resources
- **Per-worker context managers always have `__aexit__` called** when the worker stops, even on `stop()` (immediate abort) or unhandled exceptions.
- **Resources are released before `run()` (or `stop()`) returns.** When the call returns, no workers are alive and no resources are held.

### Termination
- **`chain.run(source)` returns naturally** when the source generator completes and the pipeline finishes draining.
- **`chain.run(source)` does not re-raise source exceptions.** A source-generator failure is delivered to the error handler as a `SourceError` event; the pipeline drains and `run()` returns normally. To abort on source failure, call `chain.stop()` from outside (see [Error Handling](#error-handling)).
- **`chain.stop()` returns** only after every worker has exited and every per-worker resource has been released.
- **`chain.drain()` returns** only after the pipeline is fully drained — no items in flight, all workers idle/exited.

### Sources
- **Async generators are consumed by exactly one task.** The framework never forks a generator.
- **The same flow definition behaves identically standalone or composed.** Embedding a flow as a stage in another flow does not change its per-stage scaling, queues, or configuration.

### Order
- **Within a single-worker stage, item order is preserved.** Items leave in the same order they arrived.
- **Across multi-worker stages, order is *not* preserved.** With N > 1 workers, items may complete in any order. If you need order, use `FixedScaling(workers=1)` for that stage.

### Backpressure
- **Slow downstream stages naturally throttle upstream.** A stage with a full input queue causes the upstream stage to block on `put()`, which propagates back to the source. There is no item buffering beyond configured queue sizes.

### What is *not* guaranteed
- **Exactly-once delivery.** If a transformer raises and the error handler routes it to a log, the item is gone — no retry, no checkpointing.
- **Persistence across process restart.** Items in flight when the process dies are lost.
- **Order across router arms.** If two arms have different latencies, items from the faster arm may interleave with items from the slower arm in the downstream stage.

---

## Architecture

### Design Principles

- **Orchestrator, not a worker.** flowrhythm coordinates external heavy work (subprocesses, services, I/O). It is not for running CPU-bound computation inside the Python process.
- **Stream processing pipeline, not a workflow engine.** The graph is a DAG — no cycles. Items flow forward and terminate at a sink.
- **Structure and configuration are separate.** `flow()` defines what flows where. `.configure()` tunes how it runs.
- **Composable.** A flow plugs into another flow as a transformer. Complex topologies are built by composition, not by complicating the DSL.
- **Routing via `router()`.** Branching is a regular transformer that dispatches to sub-pipelines by label. Arms converge after the router.
- **Retry/iteration belongs inside a stage**, not in the graph topology.

### Component overview

The full type/relationship class diagram lives in [`DESIGN.md`](DESIGN.md#component-class-diagram). For day-to-day use, the Pipeline Flow diagram below is enough — it shows how items move through the system.

### How `flow()` processes each kind at construction

`flow()` walks its arguments at construction. Some kinds are **sub-pipelines** (expanded into the parent pipeline); others are **per-item transformers** that occupy a single stage.

```python
match stage:
    case Flow() as sub_flow:
        # SUB-PIPELINE: fold sub_flow's stages into the parent pipeline.
        # Stage names get the sub-flow's name as a prefix (e.g. "ingest.parse").
        # Sub-flow's per-stage configuration is preserved unless overridden by the parent.
    case Router() as r:
        # SUB-PIPELINE: each arm becomes its own sub-pipeline. The classifier
        # runs as a single stage that dispatches to arm pipelines.
    case ctx if callable(ctx) and len(inspect.signature(ctx).parameters) == 0:
        # PER-ITEM: CM factory; framework calls factory() per worker,
        # enters the context, uses yielded callable per item.
    case fn if callable(fn) and len(inspect.signature(fn).parameters) == 1:
        # PER-ITEM: plain async function; called per item.
```

After expansion, every stage in the runtime pipeline is a single per-item transformer (plain async fn, CM factory, or router classifier). Each stage owns one input queue, a worker pool, and a scaling strategy.

### Pipeline Flow

```mermaid
flowchart TD
    Src[/Item source/] --> B[Transformer]
    B --> C{Exception?}
    C -- No --> D[Next stage / last stage drops]
    C -- Yes --> E[Error Sink]
    B -. "router()" .-> F{Classifier}
    F -- arm 1 --> G[Transformer / Chain / Flow]
    F -- arm 2 --> H[Transformer / Chain / Flow]
    F -- unknown --> I[default arm, or Dropped → Error Handler]
    G --> D
    H --> D
    I --> D
    I -. drop .-> E
```

Item source is one of:
- A source generator passed to `run(source)` — bounded
- Auto-emitted `None` signals from `run()` — unbounded
- Items pushed via `handle.send()` after `async with chain.push() as handle` — push mode

---

## Project status

flowrhythm is in early development. The DSL and runtime are still settling — see [`DESIGN.md`](DESIGN.md) for design decisions and open questions, and [`todos/INDEX.md`](todos/INDEX.md) for active plans (in priority order).

---

## License

MIT License. See `LICENSE`.

---

## Author

**Andrey Maximov**
[GitHub](https://github.com/maxsimov)
