Metadata-Version: 2.4
Name: redflow
Version: 0.1.1
Summary: Durable workflow engine backed by Redis — Python client
Author: getrelocapp
License-Expression: Apache-2.0
Keywords: background-jobs,durable,queue,redis,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: cronsim>=2.6
Requires-Dist: jsonschema[format-nongpl]>=4.25
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.13; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.25; extra == 'dev'
Requires-Dist: pytest>=8.3; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: fast
Requires-Dist: redis[hiredis]>=5.0.0; extra == 'fast'
Provides-Extra: pydantic
Requires-Dist: pydantic>=2.0.0; extra == 'pydantic'
Description-Content-Type: text/markdown

# redflow (Python)

Durable Redis-backed workflows and event orchestration for Python.

`redflow` lets you run async workflows with durable steps, retries, delayed runs, cron triggers, event fan-out, child workflows, and durable event history. The Python runtime is intentionally aligned with the TypeScript `@redflow/client` model while keeping a Pythonic API.

## Status

Beta. The package is usable today, but the API is still evolving.

## Highlights

- Durable `step.run(...)` with cached step output across retries and restarts
- Delayed runs and cron-triggered workflows
- Event fan-out via `EventTrigger(name=...)` and `emit_event(...)`
- Durable event history via `list_events(...)` and `get_event(...)`
- Child workflows via `run_workflow(...)` and `emit_workflow(...)`
- Cancellation propagation across parent and child runs
- Inline testing via `run_inline(...)`
- Python 3.11+ with optional Pydantic validation

## Documentation shape

This README follows a Diataxis-style structure:

- Tutorial: first working workflow
- Cookbook: practical production recipes
- Explanation: execution model and guarantees
- Reference: key APIs and environment variables

## Requirements

- Python `3.11+`
- Redis `6.2+`

## Install

Using `uv`:

```bash
uv add redflow
```

Optional extras:

```bash
# Faster Redis parser / transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"
```

Using `pip`:

```bash
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
```

## Tutorial

### 1. Define a workflow

```python
from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str, *, signal=None) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str, *, signal=None) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}
```

### 2. Start a worker

Import workflow modules before starting the worker so definitions are registered.

```python
import asyncio

from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())
```

### 3. Trigger a run and wait for the result

```python
from redflow import create_client, set_default_client

client = create_client(url="redis://127.0.0.1:6379")
set_default_client(client)

handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}
```

You can also emit by workflow name:

```python
handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)
```

## Cookbook

### Pick the right primitive

Use this rule of thumb:

| Need | Primitive |
| --- | --- |
| Durable unit of work | `ctx.step.run(...)` |
| Sleep until a duration or datetime | `ctx.step.wait_for(...)` |
| Wait for a specific external event | `ctx.step.wait_for_event(...)` |
| Start a child workflow and wait for output | `ctx.step.run_workflow(...)` |
| Start a child workflow fire-and-forget | `ctx.step.emit_workflow(...)` |
| Fan out to workflows subscribed to an event | `ctx.step.emit_event(...)` or `client.emit_event(...)` |

Python step APIs are intentionally positional and Pythonic: the step name is the first argument, not an options object.

### Use durable steps

```python
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)
```

Notes:

- Step names must be unique within a run
- Successful step output is cached for that run and reused on retry / recovery
- If the function accepts `signal` or `**kwargs`, redflow injects `signal=<asyncio.Event>`
- Step output must be JSON-serializable

### Sleep durably

```python
from datetime import datetime, timedelta

await ctx.step.wait_for("pause-before-retry", "5m")
await ctx.step.wait_for("pause-until-cutoff", datetime.now() + timedelta(minutes=15))
```

`wait_for(...)` accepts:

- milliseconds as `int` / `float`
- duration strings like `"30s"`, `"5m"`, `"2h"`
- `datetime`

### Emit recurring events with cron workflows

If the schedule is known up front, model it as a cron workflow that emits a domain event. This keeps scheduling in workflows and delivery in event subscribers.

```python
from redflow import CronTrigger, EventTrigger, workflow


async def send_email(email: str, *, signal=None) -> dict:
    return {"ok": True, "email": email}


@workflow(
    "emit-daily-digest-events",
    queue="system",
    cron=[
        CronTrigger(
            id="daily-digest",
            expression="0 9 * * *",
            timezone="UTC",
            input={"source": "cron"},
        )
    ],
)
async def emit_daily_digest(ctx):
    subscriber_run_ids = await ctx.step.emit_event(
        "emit-digest-due",
        "digest.due",
        {"source": ctx.input["source"]},
        event_id=f"digest.due:{ctx.run.id}",
    )
    return {"subscriber_run_ids": subscriber_run_ids}


@workflow(
    "send-digest-email",
    queue="notifications",
    event=[EventTrigger(name="digest.due")],
)
async def send_digest_email(ctx):
    await ctx.step.run("deliver-email", send_email, "team@example.com")
    return {"delivered": True}
```

This pattern is usually a better fit than keeping a workflow open and waiting when you already know the time the event should happen.

### Schedule a one-off event for later

For a single future dispatch, schedule a workflow run with `run_at=...` and emit the event from inside that workflow.

```python
from datetime import datetime, timedelta
from redflow import workflow


@workflow("dispatch-invoice-due", queue="system")
async def dispatch_invoice_due(ctx):
    subscriber_run_ids = await ctx.step.emit_event(
        "emit-invoice-due",
        "invoice.due",
        {"invoice_id": ctx.input["invoice_id"]},
        event_id=f"invoice.due:{ctx.input['invoice_id']}",
    )
    return {"subscriber_run_ids": subscriber_run_ids}


due_at = datetime.now() + timedelta(minutes=30)

await dispatch_invoice_due.run(
    {"invoice_id": "inv_123"},
    run_at=due_at,
    idempotency_key="invoice-due:inv_123",
)
```

Use this when the event should happen at a known future time but you still want event fan-out at dispatch time.

### Subscribe workflows to events

```python
from redflow import EventTrigger, workflow


async def record_metric(order_id: str, *, signal=None) -> dict:
    return {"tracked": order_id}


@workflow(
    "analytics-order-created",
    queue="analytics",
    event=[EventTrigger(name="order.created")],
)
async def analytics_order_created(ctx):
    order_id = ctx.input["order_id"]
    await ctx.step.run("track", record_metric, order_id)
    return {"tracked": order_id}
```

Emitting the event fans out to every subscribed workflow:

```python
import asyncio


handles = await client.emit_event(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

outputs = await asyncio.gather(*(handle.result(timeout_ms=10_000) for handle in handles))
```

Need the event later? Schedule a workflow run with `run_at` and emit the event
from that workflow instead.

This is the right pattern for domain events such as:

- `order.created`
- `invoice.due`
- `notification.due`
- `document.parsed`

### Inspect durable event history

If you provide an explicit `event_id`, you can fetch the event record later:

```python
result = await client.emit_event_detailed(
    "order.created",
    {"order_id": "ord_123"},
    event_id="evt_order_created_ord_123",
)

print(result["handles"])
print(result["resumed_run_ids"])

event = await client.get_event("evt_order_created_ord_123")
print(event["fanout_status"])
print(event["deliveries"])
```

You can also list recent events:

```python
from redflow import ListEventsParams

events = await client.list_events(
    ListEventsParams(
        name="order.created",
        limit=20,
        sort="desc",
    )
)
```

Durable event records include:

- event envelope (`id`, `name`, `ts`, `data`)
- fan-out summary (`subscriber_count`, `delivery_count`, `failed_delivery_count`, `fanout_status`)
- resumed waiter summary (`resumed_run_count`, `resumed_run_ids`)
- per-subscriber delivery records

### Run child workflows

Wait for the child output:

```python
child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)
```

Fire-and-forget child run:

```python
child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": "ord_1"},
)
```

These APIs also support:

- `timeout_ms`
- `run_at` for workflow-running APIs such as `run_workflow(...)` and `emit_workflow(...)`
- `queue_override`
- `idempotency_key`
- `idempotency_ttl`

For delayed event dispatch, schedule a workflow run with `run_at=...` and emit
the event from that workflow.

### Trigger a delayed run

```python
from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)
```

### Wait for an external callback when you really need a live pause

Use `step.wait_for_event(...)` when a running workflow must pause until a later callback arrives for that specific run.

```python
from redflow import workflow


@workflow("await-approval", queue="default")
async def await_approval(ctx):
    received = await ctx.step.wait_for_event(
        "wait-approval",
        "approval.received",
        ctx.input["order_id"],
        "30m",
    )

    if received is None:
        return {"status": "timed_out"}

    return {
        "status": "approved",
        "event_id": received["id"],
        "approved_at": received["ts"],
    }
```

Then emit the matching event:

```python
await client.emit_event(
    "approval.received",
    {"approved": True, "note": "ok"},
    correlation_key="ord_123",
    event_id="evt_approval_ord_123",
)
```

Semantics:

- matching is exact on `event_name + correlation_key`
- the step returns `ReceivedEvent` when resumed by an event
- it returns `None` when the timeout wins
- the wait is durable across worker restarts

Use this for:

- human approval flows
- webhook callbacks
- payment provider confirmations
- chat / bot interactions where a later action resumes a waiting run

If the time is known up front, prefer `run_at` or cron-triggered workflows that emit an event instead of keeping a workflow open waiting.

### Add input validation with Pydantic

Install the extra first:

```bash
uv add "redflow[pydantic]"
```

Then:

```python
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)
```

### Configure retries and failure handling

```python
from redflow import NonRetriableError, OnFailureContext, define_workflow
from redflow.types import WorkflowRetryPolicy


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    retries=WorkflowRetryPolicy(
        max_attempts=4,
        delay=lambda ctx: "5s" if ctx.run.attempt < 3 else "30s",
        should_retry=lambda ctx: "validation" not in str(ctx.error).lower(),
    ),
    on_failure=on_fail,
)
```

Notes:

- `max_attempts` includes the initial attempt
- `retries.max_attempts` takes precedence over top-level `max_attempts`
- raise `NonRetriableError` to fail immediately
- `on_failure` runs only after terminal failure, not on cancellation

### Control admission with debounce, mutex, throttle, and rate limits

```python
from redflow import define_workflow
from redflow.types import WorkflowDebounceOptions, WorkflowRateLimitOptions


define_workflow(
    "invoice-sync",
    handler=handler,
    debounce=WorkflowDebounceOptions(
        key=lambda input: f"invoice:{input['invoice_id']}",
        period="30s",
        timeout="5m",
    ),
    mutex=lambda input: f"customer:{input['customer_id']}",
    throttle=WorkflowRateLimitOptions(
        key=lambda input: f"customer:{input['customer_id']}",
        limit=1,
        period="2s",
    ),
    rate_limit=WorkflowRateLimitOptions(
        key=lambda input: f"tenant:{input['tenant_id']}",
        limit=100,
        period="1m",
    ),
)
```

Semantics:

- `debounce` coalesces duplicate enqueue attempts by key
- `mutex` allows only one running run per key
- `throttle` delays execution when the budget is exceeded
- `rate_limit` fails fast with retry metadata

### Schedule workflows with cron

```python
from redflow import CronTrigger, define_workflow


define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(
            expression="*/5 * * * *",
            timezone="UTC",
            input={"source": "cron"},
        ),
    ],
)
```

Cron ticks respect `max_concurrency`: if a workflow is already at its concurrency limit, the tick is skipped.

### Test handlers with `run_inline(...)`

`run_inline(...)` executes workflow logic in-process without Redis:

```python
from redflow import run_inline


result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}
```

Step overrides are keyed by step name:

```python
result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)
```

You can also override `wait_for_event(...)` steps with either a ReceivedEvent-like dict or `None`.

For full lifecycle behavior such as queues, retries, cron, cancellation, and event fan-out, use a real Redis instance and `start_worker(...)`.

## Explanation

### Handler context

Inside a workflow handler:

- `ctx.input`: validated workflow input, or raw input if no schema is configured
- `ctx.run`: run metadata (`id`, `workflow`, `queue`, `attempt`, `max_attempts`)
- `ctx.signal`: cancellation event
- `ctx.step`: durable step API

### Run lifecycle

Each run moves through:

`scheduled -> queued -> running -> succeeded | failed | canceled`

Step state is stored separately from top-level run state. If a step with the same name already succeeded for the current run, its cached output is reused.

### Durability and idempotency

- `step.run(...)` persists successful step output and reuses it after retry / recovery
- duplicate step names in the same run are rejected
- `idempotency_key` deduplicates run creation
- explicit `event_id` deduplicates external event emits
- event records and run history are retained for the configured retention window

### Events model

`redflow` has two event-oriented patterns, but the primary one is fan-out:

1. Event fan-out
   - workflows subscribe with `event=[EventTrigger(name=...)]`
   - `emit_event(...)` creates runs for matching subscribers
   - cron or delayed workflows can emit these events on a schedule

2. Event resume
   - a running workflow pauses with `step.wait_for_event(...)`
   - a later `emit_event(...)` with the same `event_name + correlation_key` resumes the waiting run
   - this is a narrower pattern for callbacks, approvals, and external confirmations

Durable event records let you inspect what happened after the fact:

- which runs were created
- which waiters were resumed
- whether fan-out completed, partially failed, or had no subscribers

Rule of thumb:

- If you want to broadcast work to subscribers, emit an event
- If the time is known ahead of time, schedule a workflow and emit the event from there
- If you need to resume one waiting run from a later callback, use `wait_for_event(...)`

### Cancellation model

- `ctx.signal` is set when workflow cancellation is requested
- `step.run(...)` injects `signal=` into the step function when supported
- parent cancellation propagates to descendant runs spawned via step APIs
- lease-loss recovery does not itself imply child-cancel cascades

### Python / TypeScript parity

Behavior is intentionally aligned with `@redflow/client`. Naming follows language conventions:

- Python: `emit_workflow`, `emit_event`, `run_at`, `max_attempts`
- TypeScript: `emitWorkflow`, `emitEvent`, `runAt`, `maxAttempts`

## Reference

### Public workflow APIs

- `define_workflow(...)`
- `@workflow(...)`
- `WorkflowDefinition.run(...)`
- `start_worker(StartWorkerOptions(...))`
- `run_inline(...)`

### Step APIs

- `ctx.step.run(name, fn, *args, timeout_ms=None, **kwargs)`
- `ctx.step.wait_for(name, target, timeout_ms=None)`
- `ctx.step.wait_for_event(name, event_name, correlation_key, timeout)`
- `ctx.step.run_workflow(name, workflow, input, ...)`
- `ctx.step.emit_workflow(name, workflow, input, ...)`
- `ctx.step.emit_event(name, event_name, input, ...)`

### Client APIs

Create a client:

```python
from redflow import create_client

client = create_client(
    app="my-service",                 # optional, falls back to REDFLOW_APP
    url="redis://127.0.0.1:6379",     # optional, falls back to REDIS_URL
    prefix="redflow:v1",              # optional, falls back to REDFLOW_PREFIX
)
```

Common methods:

- `emit_workflow(...)`
- `emit_event(...)`
- `emit_event_detailed(...)`
- `run_by_name(...)`
- `get_run(run_id)`
- `get_run_steps(run_id)`
- `get_run_attempts(run_id)`
- `list_runs(ListRunsParams(...))`
- `search_runs(SearchRunsParams(...))`
- `list_workflows()`
- `get_workflow_meta(name)`
- `list_workflows_meta()`
- `list_event_definitions()`
- `list_events(ListEventsParams(...))`
- `get_event(event_id)`
- `get_stats()`
- `cancel_run(run_id, reason=...)`
- `cleanup_expired_runs_now(...)`
- `cleanup_expired_events_now(...)`
- `reset_runs()` for dev/testing
- `reset_state()` for dev/testing

### Run handle

Run creation APIs return a `RunHandle` with:

- `.id`
- `await .get_state()`
- `await .result(timeout_ms=...)`

### Default client helpers

`WorkflowDefinition.run(...)` uses the process-global default client:

- `get_default_client()`
- `set_default_client(client)`

If you never call `set_default_client(...)`, redflow lazily creates one from environment defaults.

### Environment variables

- `REDIS_URL`
  Redis connection string used by `create_client()` and `start_worker(...)` when no explicit URL or Redis instance is provided
- `REDFLOW_PREFIX`
  Redis key prefix, default: `redflow:v1`
- `REDFLOW_APP`
  Default app name used by `create_client(...)`
- `REDFLOW_RUN_RETENTION_DAYS`
  Retention window for terminal runs and durable event records, default: `30`

### Worker options

Important `StartWorkerOptions` fields:

- `app`
- `redis` or `url`
- `prefix`
- `registry`
- `queues`
- `concurrency`
- `lease_ms`
- `blmove_timeout_sec`
- `reaper_interval_ms`
- `reaper_batch_size`
- `watchdog_enabled`
- `watchdog_interval_ms`
- `watchdog_stalled_for_ms`
- `run_history_cleanup_interval_ms`
- `registry_recovery_interval_ms`

Python keeps runtime tuning flat on `StartWorkerOptions`; there is no nested `runtime` object.

### Source layout

- `src/redflow/` - public modules and stable entry points
- `src/redflow/_client/` - command, query, retention, and registry internals
- `src/redflow/_worker/` - worker loops, executor, and step runtime internals
- `src/redflow/_core/` - shared private primitives such as payload storage, retry logic, and Redis helpers
