# runsheet — Python API Reference

Type-safe, composable business logic pipelines for Python.
In-memory, single-call pipeline orchestration with Pydantic validation,
rollback, retry, timeout, and middleware.

## Installation

pip install runsheet

## Core API

### @step decorator

Creates a Step from a function. Both sync and async functions supported.

```python
from runsheet import step, RetryPolicy
from pydantic import BaseModel

class Input(BaseModel):
    order_id: str

class Output(BaseModel):
    charge_id: str

@step(requires=Input, provides=Output)
async def charge(ctx: Input) -> Output:
    return Output(charge_id="ch_123")

# With rollback
@charge.rollback
async def undo_charge(ctx: Input, output: Output) -> None:
    pass  # refund logic

# With retry and timeout
@step(
    requires=Input,
    provides=Output,
    retry=RetryPolicy(count=3, delay=0.2, backoff="exponential"),
    timeout=5.0,
)
async def charge_with_retry(ctx: Input) -> Output:
    return Output(charge_id="ch_123")
```

Parameters:
- requires: type[BaseModel] | None — Pydantic model class for input validation
- provides: type[BaseModel] | None — Pydantic model class for output validation
- name: str | None — override step name (default: function name)
- retry: RetryPolicy | None — retry configuration
- timeout: float | None — timeout in seconds

### RetryPolicy

```python
RetryPolicy(
    count=3,                    # max retries (not counting initial attempt)
    delay=0.2,                  # base delay in seconds
    backoff="exponential",      # "linear" or "exponential"
    retry_if=lambda errors: True,  # predicate to control retry
)
```

### Pipeline

```python
from runsheet import Pipeline

pipeline = Pipeline(
    name="checkout",
    steps=[step_a, step_b, step_c],
    output=CheckoutOutput,      # optional, validates & types result.data
    args_schema=OrderInput,     # optional, validates pipeline args
    middleware=[timing_mw],     # optional, wraps step lifecycle
    strict=True,                # optional, detect provides key collisions
)

result = await pipeline.run(OrderInput(order_id="123"))
```

pipeline.run() NEVER raises. Always returns a result (AggregateSuccess or AggregateFailure).

Pipelines are steps — use one pipeline as a step in another:

```python
checkout = Pipeline(name="checkout", steps=[validate, charge, confirm])
full_flow = Pipeline(name="full_flow", steps=[checkout, ship, notify])
```

Parameters:
- name: str — pipeline name
- steps: Sequence[Runnable] — steps to execute in order
- output: type[BaseModel] | None — optional, validates accumulated context and types result.data
- args_schema: type[BaseModel] | None — optional, validates pipeline args
- middleware: list[StepMiddleware] | None — optional, wraps step lifecycle
- strict: bool — optional, detect provides key collisions at build time

When output= is provided, result.data is a typed Pydantic model instance with attribute access.
Without output=, result.data is dict[str, Any]. If output validation fails, the pipeline returns
AggregateFailure with ProvidesValidationError (failed_step="<output>").

### Step result

```python
result = await pipeline.run(args)

if result.success:
    # AggregateSuccess
    result.data          # dict[str, Any], or typed model if output= was set
    result.meta.name     # str — pipeline name
    result.meta.args     # Mapping[str, Any] — original args
    result.meta.steps_executed  # tuple[str, ...]
else:
    # AggregateFailure
    result.error         # Exception
    result.failed_step   # str
    result.rollback      # RollbackReport
    result.rollback.completed  # tuple[str, ...]
    result.rollback.failed     # tuple[RollbackFailure, ...]
```

## Combinators

### when — conditional step

```python
from runsheet import when

when(lambda ctx: ctx.get("amount", 0) > 10000, notify_manager)
```

Skipped steps produce no snapshot, no rollback entry, and do not appear in
meta.steps_executed.

### parallel — concurrent execution

```python
from runsheet import parallel

parallel(reserve_inventory, charge_payment)
```

All steps receive same pre-parallel context. Outputs merge in list order.
On partial failure, succeeded steps rolled back in reverse.

### choice — branching

```python
from runsheet import choice

choice(
    (lambda ctx: ctx.get("method") == "card", charge_card),
    (lambda ctx: ctx.get("method") == "bank", charge_bank),
    charge_default,  # bare step = default branch
)
```

First match wins. No match raises ChoiceNoMatchError.
Only matched branch participates in rollback.

### distribute — collection distribution

```python
from runsheet import distribute

# Single collection — run step once per account_id
distribute("emails", {"account_ids": "account_id"}, send_email)
# Context: {"org_id": "org-1", "account_ids": ["a1", "a2"]}
# Output:  {"emails": [{"email_id": "email-a1"}, {"email_id": "email-a2"}]}

# Cross product — run once per (account_id, region_id) pair
distribute(
    "reports",
    {"account_ids": "account_id", "region_ids": "region_id"},
    generate_report,
)
# 2 accounts x 3 regions = 6 concurrent executions
```

Mapping dict connects context array keys to step scalar input keys.
Non-mapped context keys pass through unchanged. Items run concurrently.
Supports partial-failure rollback and external-failure rollback.

### map_step — collection iteration

```python
from runsheet import map_step

# Function form: mapper(item, ctx) -> result
map_step("results", lambda ctx: ctx.get("items", []), lambda item, ctx: item * 2)

# Step form: run existing step per item (item merged into context)
map_step("results", lambda ctx: ctx.get("items", []), process_item_step)
```

Items run concurrently. Step form supports rollback on partial failure.

### filter_step — collection filtering

```python
from runsheet import filter_step

filter_step("eligible", lambda ctx: ctx.get("users", []), lambda user, ctx: user.opted_in)
```

Predicate receives (item, ctx). Predicates run concurrently. Order preserved. No rollback (pure).

### flat_map — collection expansion

```python
from runsheet import flat_map

flat_map("line_items", lambda ctx: ctx.get("orders", []), lambda order, ctx: order.items)
```

Mapper receives (item, ctx). Map each item to a list, flatten one level. No rollback (pure).

## Middleware

```python
from runsheet import StepInfo

async def timing(step_info: StepInfo, next_fn, ctx: dict) -> Any:
    start = time.perf_counter()
    result = await next_fn(ctx)
    elapsed = time.perf_counter() - start
    print(f"{step_info.name}: {elapsed:.3f}s")
    return result

Pipeline(name="p", steps=[...], middleware=[timing])
```

First in list = outermost wrapper. Middleware wraps full step lifecycle
including validation. Combinator inner steps bypass middleware.

## Error Hierarchy

All library errors: isinstance(e, RunsheetError) == True
Application exceptions pass through as-is.

```
RunsheetError (base)
  RequiresValidationError   — code: REQUIRES_VALIDATION
  ProvidesValidationError   — code: PROVIDES_VALIDATION
  ArgsValidationError       — code: ARGS_VALIDATION
  PredicateError            — code: PREDICATE
  TimeoutError              — code: TIMEOUT, .timeout_seconds: float
  RetryExhaustedError       — code: RETRY_EXHAUSTED, .attempts: int, .last_error: Exception
  StrictOverlapError        — code: STRICT_OVERLAP, .key: str, .steps: tuple[str, str]
  ChoiceNoMatchError        — code: CHOICE_NO_MATCH
  RollbackError             — code: ROLLBACK
  UnknownError              — code: UNKNOWN, .original_value: object
```

RunsheetErrorCode is a StrEnum with all code values.

## Key Invariants

1. Args persist — initial args flow through entire pipeline
2. Steps signal failure by raising — pipeline catches and returns Result
3. Context is immutable at every step boundary (MappingProxyType)
4. Rollback is best-effort — failures collected, never abort remaining
5. Last writer wins for context keys (strict=True detects at build time)
6. pipeline.run() never raises
7. Middleware wraps full lifecycle; combinator inner steps bypass middleware
8. Pipelines are steps — composable as nested steps in outer pipelines
