Metadata-Version: 2.4
Name: redflow
Version: 0.0.12
Summary: Durable workflow engine backed by Redis — Python client
Author: getrelocapp
License-Expression: Apache-2.0
Keywords: background-jobs,durable,queue,redis,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: cronsim>=2.6
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.13; extra == 'dev'
Requires-Dist: pydantic>=2.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.25; extra == 'dev'
Requires-Dist: pytest>=8.3; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: fast
Requires-Dist: redis[hiredis]>=5.0.0; extra == 'fast'
Provides-Extra: pydantic
Requires-Dist: pydantic>=2.0.0; extra == 'pydantic'
Description-Content-Type: text/markdown

# redflow (Python)

Durable workflow engine backed by Redis.

`redflow` helps you run async workflows with durable steps, retries, cancellation, and scheduling. The Python runtime is intentionally aligned with the TypeScript `@redflow/client` runtime model.

## Status

Early stage. APIs are usable, but expect iteration.

## Documentation Map

This README is grouped by Diátaxis-style intent:

- **Tutorial**: first working workflow in ~5 minutes
- **How-to**: focused recipes for common production tasks
- **Explanation**: execution model and guarantees
- **Reference**: options, environment variables, and API surface

## Requirements

- Python `3.11+`
- Redis `6+` recommended

## Install

Using `uv` (recommended):

```bash
uv add redflow
```

Optional extras:

```bash
# Faster Redis parser/transport path
uv add "redflow[fast]"

# Pydantic input validation support
uv add "redflow[pydantic]"
```

Using `pip`:

```bash
pip install redflow
pip install "redflow[fast]"
pip install "redflow[pydantic]"
```

## Tutorial

### 1) Define a workflow

```python
from redflow import WorkflowHandlerContext, workflow


async def fetch_user(user_id: str) -> dict:
    return {"id": user_id, "email": "user@example.com"}


async def send_email(email: str) -> dict:
    return {"ok": True, "email": email}


@workflow("send-welcome-email", queue="default", max_attempts=3)
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_email, user["email"])
    return {"sent": True}
```

### 2) Start a worker

Import workflow modules before starting the worker so definitions are registered.

```python
import asyncio
from redflow import StartWorkerOptions, start_worker

import myapp.workflows  # noqa: F401


async def main() -> None:
    worker = await start_worker(
        StartWorkerOptions(
            app="billing-worker",
            url="redis://127.0.0.1:6379",
            concurrency=4,
        )
    )
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()


asyncio.run(main())
```

### 3) Trigger a run and wait for output

```python
handle = await send_welcome_email.run(
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

result = await handle.result(timeout_ms=15_000)
print(result)  # {"sent": True}
```

You can also trigger by workflow name:

```python
from redflow import create_client

client = create_client(url="redis://127.0.0.1:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)
```

## How-to

### Choose the right step primitive

Python step API is intentionally Pythonic: step name is positional (`"step-name"`), not an options object like in TS.

Use `ctx.step.run(...)` for durable units of work:

```python
user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"], timeout_ms=4_000)
```

Notes:

- Step names are unique within a run; duplicate names fail the run.
- If the function accepts `signal` (or `**kwargs`), redflow injects `signal=<asyncio.Event>`.
- `timeout_ms` raises `TimeoutError` and records failed step state.
- Step output must be JSON-serializable.

Use `ctx.step.run_workflow(...)` when parent must wait for child output:

```python
child_output = await ctx.step.run_workflow(
    "generate-receipt",
    "receipt-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
    timeout_ms=20_000,
)
```

Use `ctx.step.emit_workflow(...)` for fire-and-forget child runs:

```python
child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",  # or WorkflowDefinition
    {"order_id": "ord_1"},
)
```

`run_workflow` and `emit_workflow` support:

- `timeout_ms`
- `run_at`
- `queue_override`
- `idempotency_key`
- `idempotency_ttl`

### Trigger a one-off delayed run

```python
from datetime import datetime, timedelta

handle = await send_welcome_email.run(
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)
```

### Add input validation with Pydantic

Install the optional extra first:

```bash
uv add "redflow[pydantic]"
```

Then:

```python
from pydantic import BaseModel
from redflow import WorkflowHandlerContext, define_workflow


class SendWelcomeInput(BaseModel):
    user_id: str


async def handler(ctx: WorkflowHandlerContext[SendWelcomeInput]) -> dict:
    return {"user_id": ctx.input.user_id}


send_welcome = define_workflow(
    "send-welcome",
    handler=handler,
    input_schema=SendWelcomeInput,
)
```

### Configure retries and failure handling

- Retries use exponential backoff with jitter.
- `max_attempts` includes the initial attempt.
- Raise `NonRetriableError` to fail immediately without retry.
- Use `on_failure` for final failure hooks (not called on cancel).

```python
from redflow import NonRetriableError, OnFailureContext, define_workflow


async def on_fail(ctx: OnFailureContext) -> None:
    print("final failure", ctx.run.id, ctx.error)


define_workflow(
    "invoice-sync",
    handler=handler,
    max_attempts=4,
    on_failure=on_fail,
)
```

### Schedule workflows with cron

```python
from redflow import CronTrigger, define_workflow

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)
```

Cron scheduling respects `max_concurrency`: ticks are skipped while the workflow is at its concurrency limit.

### Test handlers quickly with `run_inline(...)`

`run_inline` executes workflow logic in-process without Redis:

```python
from redflow import run_inline

result = await run_inline(send_welcome_email, input={"user_id": "test"})
assert result.succeeded is True
assert result.output == {"sent": True}
```

Step overrides by step name:

```python
result = await run_inline(
    send_welcome_email,
    input={"user_id": "test"},
    step_overrides={
        "fetch-user": {"id": "test", "email": "mock@example.com"},
        "send-email": {"ok": True},
    },
)
```

For full lifecycle behavior (queues, retries, cron, cancel), run e2e tests against real Redis and `start_worker(...)`.

Important `run_inline` limitation:

- `step.run_workflow(...)` in `run_inline` requires matching `step_overrides` because no Redis worker executes child runs.

## Explanation

### Handler context

Inside a workflow handler:

- `ctx.input`: validated workflow input (or raw input if no schema)
- `ctx.run`: run metadata (`id`, `workflow`, `queue`, `attempt`, `max_attempts`)
- `ctx.signal`: cancellation event
- `ctx.step`: durable step API

### Execution model

Each workflow run lives in Redis and moves through:

`scheduled -> queued -> running -> succeeded | failed | canceled`

Step state is stored per run. If a step with the same name already succeeded in that run, its cached output is reused.

### Durability and idempotency guarantees

- **Step durability**: successful step output is persisted and reused across retries/restarts for the same run + step name.
- **Step uniqueness**: duplicate step names in one run are rejected.
- **Run idempotency**: `idempotency_key` deduplicates run creation.

### Cancellation model

- `ctx.signal` is set when workflow cancellation is requested.
- `ctx.step.run(...)` injects `signal=` into the function when supported.
- Parent cancellation propagates to `run_workflow(...)` child runs.

### Runtime parity with TypeScript

Behavior is intentionally aligned with `@redflow/client`. API naming differs by language convention:

- Python: `snake_case` (`emit_workflow`, `max_attempts`, `run_at`)
- TypeScript: `camelCase` (`emitWorkflow`, `maxAttempts`, `runAt`)

## Reference

### Workflow definition options

`define_workflow(...)` / `@workflow(...)`:

- `queue: str = "default"`
- `max_concurrency: int = 1`
- `max_attempts: int | None = None` (falls back to engine default)
- `cron: list[CronTrigger] | None = None`
- `on_failure: Callable[[OnFailureContext], Awaitable[None] | None] | None`
- `input_schema: Any | None = None`
- `registry: WorkflowRegistry | None = None`

### Client API

Create client:

```python
from redflow import create_client

client = create_client(
    app="my-service",                 # optional
    url="redis://127.0.0.1:6379",     # optional if REDIS_URL is set
    prefix="redflow:v1",              # optional
)
```

Useful methods:

- `emit_workflow(...)`
- `run_by_name(...)` (advanced)
- `get_run(run_id)`
- `get_run_steps(run_id)`
- `list_runs(ListRunsParams(...))`
- `list_workflows()`
- `get_workflow_meta(name)`
- `list_workflows_meta()`
- `get_stats()`
- `cancel_run(run_id, reason=...)`
- `cleanup_expired_runs_now()`

`WorkflowDefinition.run(...)` is the common path for application code.

Run calls return a `RunHandle` with:

- `.id`
- `await .get_state()`
- `await .result(timeout_ms=...)`

Default client helpers used by `WorkflowDefinition.run(...)`:

- `get_default_client()`
- `set_default_client(client)`

### Run history retention

Terminal runs are retained for a configurable window, then purged.

- Default: `30` days
- Override: `REDFLOW_RUN_RETENTION_DAYS` (fractional values supported)

Manual cleanup hook:

```python
removed = await client.cleanup_expired_runs_now()
```

### Worker API

`start_worker(StartWorkerOptions(...))` starts:

- pollers (claim + execute runs)
- scheduled promoter
- reaper
- cron scheduler
- run history cleanup loop
- registry recovery loop
- watchdog

`StartWorkerOptions` main fields:

- `app` (required)
- `redis` or `url`
- `prefix`
- `registry`
- `queues`
- `concurrency`
- `lease_ms`
- `blmove_timeout_sec`
- `reaper_interval_ms`
- `reaper_batch_size`
- `watchdog_enabled`
- `watchdog_interval_ms`
- `watchdog_stalled_for_ms`
- `run_history_cleanup_interval_ms`
- `registry_recovery_interval_ms`

Runtime tuning fields in Python are flat (not nested under `runtime` like TS).

`registry_recovery_interval_ms` lets a running worker recover workflow metadata after Redis state resets.

Example with explicit queues + runtime tuning:

```python
worker = await start_worker(
    StartWorkerOptions(
        app="billing-worker",
        url="redis://127.0.0.1:6379",
        prefix="redflow:prod",
        queues=["critical", "io", "analytics"],
        concurrency=8,
        lease_ms=5000,
        blmove_timeout_sec=1,
        reaper_interval_ms=500,
        reaper_batch_size=100,
        watchdog_enabled=True,
        run_history_cleanup_interval_ms=60_000,
        registry_recovery_interval_ms=5_000,
    )
)
```

Important startup note:

`start_worker(...)` syncs the in-process workflow registry to Redis metadata on startup. Import workflow modules before worker start.

### Environment variables

- `REDIS_URL`
- `REDFLOW_PREFIX`
- `REDFLOW_APP`
- `REDFLOW_RUN_RETENTION_DAYS`

Resolution rules:

- `create_client(url=...)` uses `url`, otherwise `REDIS_URL`, otherwise `redis://localhost:6379`
- prefix defaults to `REDFLOW_PREFIX` (or built-in default)
- `create_client(app=...)` uses explicit `app`, otherwise `REDFLOW_APP`

### Common errors

```python
from redflow import (
    CanceledError,
    InputValidationError,
    NonRetriableError,
    OutputSerializationError,
    RedflowError,
    TimeoutError,
    UnknownWorkflowError,
)
```

Semantics:

- `CanceledError`: run/step canceled
- `TimeoutError`: wait or step timeout
- `InputValidationError`: workflow input failed schema validation
- `UnknownWorkflowError`: workflow definition/metadata not found
- `OutputSerializationError`: output is not JSON-serializable
- `NonRetriableError`: fail without retry

## Support

- Open issues in the main repository: <https://github.com/getrelocapp/redflow/issues>
- Include:
  - Python package version
  - Redis version
  - minimal repro (workflow + input)

## Development

From `redflow/packages/python`:

```bash
uv sync --extra dev
uv run ruff check .
uv run mypy src
uv run pytest
uv run pytest -m e2e  # requires Redis
```

If e2e tests are skipped, set `REDIS_URL` or run local `redis-server`.
