Metadata-Version: 2.4
Name: redflow
Version: 0.0.10
Summary: Durable workflow engine backed by Redis — Python client
Author: getrelocapp
License-Expression: Apache-2.0
Keywords: background-jobs,durable,queue,redis,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: cronsim>=2.6
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.13; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.25; extra == 'dev'
Requires-Dist: pytest>=8.3; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: fast
Requires-Dist: redis[hiredis]>=5.0.0; extra == 'fast'
Provides-Extra: pydantic
Requires-Dist: pydantic>=2.0.0; extra == 'pydantic'
Description-Content-Type: text/markdown

# redflow (Python)

Durable workflow engine backed by Redis.

`redflow` lets you define async workflows with durable, cached steps, run them from a client, and process them with a Redis-backed worker. It is the Python implementation of the same runtime model used by the TypeScript `@redflow/client`.

## Status

Early-stage project. APIs are usable, but expect iteration.

## Requirements

- Python `3.11+`
- Redis `6+` recommended (older versions work, but some operations use slower fallbacks)

## Install

Using `uv` (recommended):

```bash
uv add redflow
```

Optional extras:

```bash
# faster Redis parser/transport path (hiredis)
uv add "redflow[fast]"

# Pydantic input validation support for workflows
uv add "redflow[pydantic]"
```

Using `pip`:

```bash
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
```

## Quick Start

### 1. Define a workflow

Functional API:

```python
from redflow import WorkflowHandlerContext, define_workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_welcome(email: str) -> dict:
    return {"ok": True, "email": email}


async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}


send_welcome_email = define_workflow(
    "send-welcome-email",
    handler=handler,
    queue="default",
    max_attempts=3,
)
```

Decorator API:

```python
from redflow import WorkflowHandlerContext, workflow


@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}
```

### 2. Start a worker

Import workflow modules before starting the worker so definitions are registered.

```python
import asyncio

from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )

    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())
```

### 3. Trigger a workflow

Most users will call `.run(...)` on the returned `WorkflowDefinition`:

```python
handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)
```

You can also trigger by name via `RedflowClient`:

```python
from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)
```

Delayed run:

```python
from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)
```

## Core Concepts

### Workflow handler context

Inside a workflow handler, `ctx` contains:

- `ctx.input`: workflow input (validated if `input_schema` is configured)
- `ctx.run`: run metadata (`id`, `workflow`, `queue`, `attempt`, `max_attempts`)
- `ctx.signal`: `asyncio.Event` set when cancellation is requested
- `ctx.step`: durable step API

### Step names must be unique per run

Within a single workflow execution, step names are unique. Reusing a step name in the same run raises an error.

### Step results are durable

If a step succeeds, its result is cached in Redis and reused after worker restarts or retry attempts (same run + same step name).

## Step API (inside workflow handlers)

The Python API is intentionally Pythonic and uses positional `name` instead of the TS `{ name }` object.

### `ctx.step.run(...)`

Signature (conceptually):

```python
await ctx.step.run(
    step_name: str,
    fn: Callable[..., Awaitable[T]],
    *args,
    timeout_ms: int | None = None,
    **kwargs,
) -> T
```

Example:

```python
import asyncio


async def fetch_remote_user(user_id: str, *, signal: asyncio.Event | None = None) -> dict:
    # redflow injects `signal=` automatically when the function accepts it
    return {"id": user_id, "email": "demo@example.com"}


user = await ctx.step.run(
    "fetch-user",
    fetch_remote_user,
    ctx.input["user_id"],
    timeout_ms=4_000,
)
```

Notes:

- If `fn` accepts a `signal` parameter (or `**kwargs`), redflow injects `signal=<asyncio.Event>`.
- `timeout_ms` fails the step with `TimeoutError` and records failed step state.
- Step output must be JSON-serializable.

### `ctx.step.run_workflow(...)`

Trigger a child workflow and wait for the child result.

```python
receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",  # or a WorkflowDefinition object
    {"order_id": "ord_1", "email": "user@example.com"},
    timeout_ms=20_000,
    idempotency_key="receipt:ord_1",
)
```

You can pass a workflow object (returned by `define_workflow` / `@workflow`) instead of a string:

```python
receipt = await ctx.step.run_workflow(
    "send-receipt",
    send_receipt_workflow,
    {"order_id": "ord_1"},
)
```

### `ctx.step.emit_workflow(...)`

Trigger a child workflow without waiting. Returns the child run ID.

```python
child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or a WorkflowDefinition object
    {"order_id": "ord_1"},
    timeout_ms=5_000,
    idempotency_key="analytics:ord_1",
)
```

### Child workflow step options

Both `run_workflow` and `emit_workflow` support:

- `timeout_ms`
- `run_at`
- `queue_override`
- `idempotency_key`
- `idempotency_ttl`

## Workflow Options

`define_workflow(...)` / `@workflow(...)` support these main options:

- `queue: str = "default"`
- `max_concurrency: int = 1`
- `max_attempts: int | None = None` (falls back to engine default)
- `cron: list[CronTrigger] | None = None`
- `on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None`
- `input_schema` (typically a Pydantic v2 model class)

### Input validation with Pydantic (optional)

Install the extra first:

```bash
uv add "redflow[pydantic]"
```

Then:

```python
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    # ctx.input is the validated Pydantic model instance
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)
```

### `max_concurrency`

Limits concurrent `running` runs for that workflow. Default is `1`.

```python
define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)
```

### `max_attempts`

Total attempts including the first try.

Retries use exponential backoff with jitter. Throw `NonRetriableError` to fail immediately without retrying.

### Cron

Use `CronTrigger` for scheduled runs (cron parsing is provided by `cronsim`).

```python
from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)
```

Cron scheduling respects `max_concurrency`: if the workflow is already at the limit, that cron tick is skipped.

### `on_failure`

Called when a run reaches terminal failure (retries exhausted or non-retriable error). Not called on cancellation.

```python
from redflow import OnFailureContext, define_workflow


async def on_fail(ctx: OnFailureContext) -> None:
    print("workflow failed", ctx.run.id, ctx.run.workflow, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)
```

## Client API

### Create a client

```python
from redflow import create_client

client = create_client(
    url="redis://127.0.0.1:6379",
    prefix="redflow:v1",
    app="my-service",  # optional, used for queue scoping/metadata
)
```

Environment defaults:

- `REDIS_URL` is used if `url` is omitted
- `REDFLOW_PREFIX` is used by `default_prefix()`
- `REDFLOW_APP` is used by `create_client()` when `app` is omitted

### Triggering runs

- `await client.emit_workflow(name, input, ...)`
- `await client.run_by_name(name, input, ...)` (advanced; supports `queue_override`)

`WorkflowDefinition.run(...)` is usually preferred because it keeps workflow defaults (queue/max attempts) close to the definition.

### Inspect and control runs

```python
from redflow import ListRunsParams

run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed_checkout = await client.list_runs(
    ListRunsParams(workflow="checkout", status="failed", limit=20)
)

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
all_meta = await client.list_workflows_meta()
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")
```

### `RunHandle`

Run-triggering methods return a `RunHandle`-like object with:

- `.id`
- `await .get_state()`
- `await .result(timeout_ms=...)`

```python
handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)
```

### Default client helpers

The package keeps a process-global default client (used by `WorkflowDefinition.run()`):

```python
from redflow import get_default_client, set_default_client

client = get_default_client()
set_default_client(client)
```

## Worker API

`start_worker(...)` launches:

- worker pollers (claim + execute runs)
- scheduled promoter
- reaper (lease recovery)
- cron scheduler
- poller watchdog

### `StartWorkerOptions`

Common fields:

- `app` (required): stable worker/service identifier
- `url` or `redis`
- `prefix`
- `registry`
- `queues`
- `concurrency`

Runtime tuning (flat fields in Python, not nested like TS):

- `lease_ms`
- `blmove_timeout_sec`
- `reaper_interval_ms`
- `reaper_batch_size`
- `watchdog_enabled`
- `watchdog_interval_ms`
- `watchdog_stalled_for_ms`

Example with explicit queues and tuning:

```python
worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
    )
)
```

### Important startup note

`start_worker(...)` automatically syncs the workflow registry to Redis metadata. Make sure all workflow modules are imported first.

## Testing

### Unit tests with `run_inline(...)`

`run_inline` executes a workflow handler in-process without Redis.

```python
from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})

assert result.succeeded is True
assert result.output == {"sent": True}
assert [step.name for step in result.steps] == ["fetch-user", "send-email"]
```

Override step outputs by step name:

```python
result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)
```

Notes:

- `run_inline` is best for handler logic and step sequencing.
- `step.run_workflow(...)` requires `step_overrides` in `run_inline` (no Redis worker exists to execute child runs).
- Use real Redis + `start_worker(...)` for end-to-end lifecycle tests.

## Errors

Common error classes:

```python
from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)
```

Typical semantics:

- `CanceledError`: run/step canceled
- `TimeoutError`: wait or step timeout
- `InputValidationError`: workflow input failed schema validation
- `UnknownWorkflowError`: workflow metadata/definition not found
- `OutputSerializationError`: step/workflow output was not JSON-serializable
- `NonRetriableError`: fail immediately without retrying

## Python vs TypeScript Notes

- Python uses `snake_case` names (`emit_workflow`, `max_attempts`, `run_at`)
- TypeScript uses `camelCase` (`emitWorkflow`, `maxAttempts`, `runAt`)
- Runtime semantics are intentionally aligned across implementations

## Development (package)

From `redflow/packages/python`:

```bash
uv sync --extra dev
uv run ruff check .
uv run pytest
uv run pytest -m e2e  # requires Redis
```

