Metadata-Version: 2.4
Name: redflow
Version: 0.0.3
Summary: Durable workflow engine backed by Redis — Python client
Author: getrelocapp
License-Expression: Apache-2.0
Keywords: background-jobs,durable,queue,redis,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: croniter>=2.0.0
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Provides-Extra: fast
Requires-Dist: redis[hiredis]>=5.0.0; extra == 'fast'
Provides-Extra: pydantic
Requires-Dist: pydantic>=2.0.0; extra == 'pydantic'
Description-Content-Type: text/markdown

# redflow

Durable workflow engine backed by Redis.

## Install

```bash
pip install redflow
# or with hiredis for better performance:
pip install redflow[fast]
```

## Define a workflow

```python
from redflow import define_workflow, WorkflowHandlerContext

async def handler(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}

send_welcome_email = define_workflow("send-welcome-email", handler=handler)
```

Or with the decorator:

```python
from redflow import workflow, WorkflowHandlerContext

@workflow("send-welcome-email")
async def send_welcome_email(ctx: WorkflowHandlerContext) -> dict:
    user = await ctx.step.run("fetch-user", fetch_user, ctx.input["user_id"])
    await ctx.step.run("send-email", send_welcome, user["email"])
    return {"sent": True}
```

Handler context gives you:

- `input` — workflow input data
- `run` — run metadata (`id`, `workflow`, `queue`, `attempt`, `max_attempts`)
- `signal` — `asyncio.Event`, set when cancellation is requested
- `step` — durable step API

## Step API (inside workflow handlers)

### `step.run`

Durable, cached units of work. On crash recovery, completed steps return
their cached result instead of re-executing.

```python
payment = await ctx.step.run("capture-payment", capture_payment)
```

With timeout:

```python
payment = await ctx.step.run("capture-payment", capture_payment, timeout_ms=4000)
```

### `step.run_workflow`

Trigger a child workflow and wait for its result.

```python
receipt = await ctx.step.run_workflow(
    "send-receipt",
    "receipt-workflow",
    {"order_id": order_id, "email": email},
    timeout_ms=20_000,
    idempotency_key=f"receipt:{order_id}",
)
```

### `step.emit_workflow`

Trigger a child workflow without waiting — returns the child run ID.

```python
child_run_id = await ctx.step.emit_workflow(
    "emit-analytics",
    "analytics-workflow",
    {"order_id": order_id},
    idempotency_key=f"analytics:{order_id}",
)
```

## Run workflows

```python
from redflow import create_client

client = create_client(url="redis://localhost:6379")

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_123"},
    idempotency_key="welcome:user_123",
)

output = await handle.result(timeout_ms=15_000)
```

Delayed run:

```python
from datetime import datetime, timedelta

handle = await client.emit_workflow(
    "send-welcome-email",
    {"user_id": "user_789"},
    run_at=datetime.now() + timedelta(minutes=1),
    idempotency_key="welcome:user_789:delayed",
)
```

## Start a worker

```python
import asyncio
from redflow import start_worker, StartWorkerOptions

# import your workflow modules so they register
import workflows

async def main():
    worker = await start_worker(StartWorkerOptions(
        app="billing-worker",
        url="redis://localhost:6379",
        concurrency=4,
    ))

    # graceful shutdown
    try:
        await asyncio.Event().wait()
    finally:
        await worker.stop()

asyncio.run(main())
```

Explicit queues and runtime tuning:

```python
worker = await start_worker(StartWorkerOptions(
    app="billing-worker",
    url="redis://localhost:6379",
    queues=["critical", "io", "analytics"],
    concurrency=8,
    lease_ms=5000,
    blmove_timeout_sec=1,
    reaper_interval_ms=500,
))
```

## Workflow options

### max_concurrency

Limits concurrent running runs per workflow. Default is `1`.

```python
define_workflow(
    "heavy-sync",
    handler=handler,
    queue="ops",
    max_concurrency=1,
)
```

### Cron

```python
from redflow import CronTrigger

define_workflow(
    "digest-cron",
    handler=handler,
    queue="ops",
    cron=[
        CronTrigger(id="digest-hourly", expression="0 * * * *"),
        CronTrigger(expression="*/5 * * * *", timezone="UTC", input={"source": "cron"}),
    ],
)
```

Cron respects `max_concurrency`: if the limit is reached, that tick is skipped.

### on_failure

```python
from redflow import NonRetriableError, OnFailureContext

async def on_fail(ctx: OnFailureContext) -> None:
    print(f"workflow failed: {ctx.run.id} {ctx.run.workflow} {ctx.error}")

define_workflow(
    "invoice-sync",
    handler=handler,
    queue="billing",
    max_attempts=4,
    on_failure=on_fail,
)
```

## Client API

### Inspect and control runs

```python
run = await client.get_run("run_123")
steps = await client.get_run_steps("run_123")

recent = await client.list_runs(ListRunsParams(limit=50))
failed = await client.list_runs(ListRunsParams(
    workflow="checkout",
    status="failed",
    limit=20,
))

workflows = await client.list_workflows()
meta = await client.get_workflow_meta("checkout")
stats = await client.get_stats()

canceled = await client.cancel_run("run_123", reason="requested by user")
```

### RunHandle

```python
handle = await client.emit_workflow("checkout", {"order_id": "ord_3"})

state = await handle.get_state()
print(state["status"])

output = await handle.result(timeout_ms=30_000)
```

## Testing

`run_inline` executes a workflow handler in-process without Redis — useful
for unit tests.

```python
from redflow import run_inline

result = await run_inline(my_workflow_def, input={"user_id": "test"})
assert result.succeeded
assert result.output == {"sent": True}
```

Override external steps:

```python
result = await run_inline(
    my_workflow_def,
    input={"user_id": "test"},
    step_overrides={"fetch-user": {"email": "mock@test.com"}},
)
```

## Errors

```python
from redflow import (
    RedflowError,         # base class
    CanceledError,        # run was canceled
    TimeoutError,         # operation timed out
    NonRetriableError,    # permanent failure, no retries
    InputValidationError, # input schema mismatch
    UnknownWorkflowError, # workflow not registered
)
```

Throw `NonRetriableError` from a handler to fail the run immediately
without exhausting retry attempts.
