Metadata-Version: 2.4
Name: restate-saga
Version: 0.1.0
Summary: Saga pattern implementation for Restate durable workflows with automatic compensation
Author-email: Douglas Amoo-Sargon <douglasbiomed3@gmail.com>
License-Expression: MIT
Project-URL: Repository, https://github.com/kowalski21/restate-saga-python
Keywords: restate,saga,workflow,durable,compensation,distributed
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Framework :: AsyncIO
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: restate_sdk[serde]<2.0,>=1.0
Provides-Extra: pydantic
Requires-Dist: pydantic>=2.0; extra == "pydantic"
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-asyncio; extra == "dev"
Requires-Dist: ruff; extra == "dev"
Dynamic: license-file

# restate-saga

Saga pattern implementation for [Restate](https://restate.dev/) durable workflows in Python with automatic compensation.

## Features

- **Automatic compensation** - When a step fails, all previous steps are rolled back in reverse order
- **Flexible step types** - Hybrid (`create_saga_step`) or strict (`create_saga_step_strict`) compensation modes
- **Decorator API** - `@saga_step`, `@saga_step_strict`, `@saga_workflow` decorators for concise definitions
- **Global error registry** - Register error classes that should always trigger compensation
- **Composable workflows** - Embed workflows within workflows using `run_as_step`
- **Virtual Object support** - Saga pattern for stateful keyed entities
- **Restate Workflows** - Long-running workflows with signals, queries, and saga support
- **Pydantic validation** - Automatic input validation when type hints use Pydantic models

## Installation

```bash
pip install restate-saga
```

**Requires:** Python 3.11+, `restate_sdk[serde]`

## Quick Start

```python
import restate
from restate_saga import (
    SagaContext,
    StepResponse,
    saga_step_strict,
    saga_workflow,
)

# Define steps with compensation

async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val["customer_id"])
    return StepResponse(output={"order_id": order.id})

async def refund_payment(data):
    await payment_service.refund(data["payment_id"])

@saga_step_strict("ProcessPayment", compensate=refund_payment)
async def process_payment(ctx: restate.Context, input_val):
    payment = await payment_service.charge(input_val["order_id"])
    return StepResponse(output={"payment_id": payment.id})

# Define the workflow

@saga_workflow("CheckoutWorkflow")
async def checkout_workflow(saga: SagaContext, input_val):
    # If process_payment fails, create_order.compensate() runs automatically
    order = await create_order(saga, input_val)
    payment = await process_payment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"], "payment_id": payment["payment_id"]}
```

## Core Concepts

### Saga Pattern

Each step has a corresponding compensation (undo) action. If a later step fails, all earlier compensations run in reverse order.

```
Step 1 → Step 2 → Step 3 (fails!)
                    ↓
         Compensate 2 ← Compensate 1
```

### Step Types

#### Hybrid (`create_saga_step` / `@saga_step`)

Registers compensation **before** execution. Compensation runs even if the step fails partway through. The `failed` keyword argument indicates whether the forward action completed.

```python
async def cancel_order(data, failed=False):
    # `failed` tells you if the step threw an error
    await order_service.cancel(data.get("order_id"))

@saga_step("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order_id = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order_id},
        compensation_data={"order_id": order_id},
    )
```

#### Strict (`create_saga_step_strict` / `@saga_step_strict`)

Registers compensation **after** success. Use when compensation requires data that only exists after completion.

```python
async def cancel_order(data):
    await order_service.cancel(data["order_id"])

@saga_step_strict("CreateOrder", compensate=cancel_order)
async def create_order(ctx: restate.Context, input_val):
    order = await order_service.create(input_val)
    return StepResponse(
        output={"order_id": order.id},
        compensation_data={"order_id": order.id},
    )
```

### StepResponse

```python
# compensation_data defaults to output when omitted
StepResponse(output={"order_id": "123"})

# explicit compensation_data
StepResponse(output={"order_id": "123"}, compensation_data={"order_id": "123", "extra": True})

# no compensation data (e.g. validation step)
StepResponse(output={"valid": True}, compensation_data=None)

# permanent failure — triggers compensation with provided data
StepResponse.permanent_failure("Payment declined", {"auth_id": "abc"})
```

### Steps Without Compensation

For validation, read-only operations, or idempotent actions, omit the `compensate` function:

```python
@saga_step("ValidateInput")
async def validate_input(ctx: restate.Context, input_val):
    if not input_val.get("email"):
        return StepResponse.permanent_failure("Email required", None)
    return StepResponse(output={"valid": True}, compensation_data=None)
```

### Function API

Steps and workflows can also be created without decorators:

```python
from restate_saga import create_saga_step, create_saga_step_strict, create_saga_workflow

create_order = create_saga_step(
    name="CreateOrder",
    run=create_order_handler,
    compensate=cancel_order,
)

checkout = create_saga_workflow(
    name="CheckoutWorkflow",
    handler=checkout_handler,
)
```

### Global Error Registry

Register error classes that should always trigger compensation without retrying:

```python
from restate_saga import register_terminal_errors

class ValidationError(Exception): ...
class NotFoundError(Exception): ...

register_terminal_errors([ValidationError, NotFoundError])

# Now any step that raises these will trigger compensation
@saga_step("MyStep")
async def my_step(ctx, input_val):
    raise ValidationError("Invalid input")  # → triggers compensation
```

Custom error mapping:

```python
from restate_saga import set_global_error_mapper
import restate

def my_mapper(err: Exception) -> restate.TerminalError | None:
    if isinstance(err, BusinessError):
        return restate.TerminalError(str(err))
    return None

set_global_error_mapper(my_mapper)
```

### Composing Workflows

Use `run_as_step` to embed a workflow within another, sharing the compensation context:

```python
@saga_workflow("PaymentWorkflow")
async def payment_workflow(saga, input_val):
    auth = await authorize_payment(saga, input_val)
    capture = await capture_payment(saga, {"auth_id": auth["id"]})
    return {"payment_id": capture["id"]}

@saga_workflow("OrderWorkflow")
async def order_workflow(saga, input_val):
    order = await create_order(saga, input_val)

    # Payment workflow's compensations join this saga
    payment = await payment_workflow.run_as_step(saga, {"amount": order["total"]})

    # If shipping fails, both order AND payment are compensated
    shipment = await create_shipment(saga, {"order_id": order["id"]})

    return {"order_id": order["id"], "payment_id": payment["payment_id"]}
```

### Nested Sagas

For inline nested logic without a full workflow:

```python
from restate_saga import run_nested_saga, create_saga_module

# Inline nested saga
async def handle_payment(saga):
    auth = await authorize(saga, {"amount": 100})
    capture = await capture(saga, {"auth_id": auth["id"]})
    return capture

result = await run_nested_saga(saga, handle_payment)

# Reusable saga module (not a Restate service)
payment_module = create_saga_module(payment_handler)
result = await payment_module(saga, input_val)
```

### Virtual Objects

Stateful entities with saga support:

```python
from restate_saga import create_saga_virtual_object, SagaContext

cart = create_saga_virtual_object("ShoppingCart")

@cart.handler()
async def checkout(saga: SagaContext, ctx: restate.ObjectContext, input_val):
    payment = await charge_payment(saga, {"amount": input_val["total"]})
    ctx.clear("items")
    return {"order_id": payment["order_id"]}

@cart.handler(kind="shared")
async def get_items(ctx: restate.ObjectSharedContext):
    return await ctx.get("items") or []
```

### Restate Workflows (Long-Running)

For workflows with signals and queries:

```python
from restate_saga import create_saga_restate_workflow, SagaContext

wf = create_saga_restate_workflow("ApprovalWorkflow")

@wf.main()
async def run(saga: SagaContext, ctx: restate.WorkflowContext, input_val):
    order = await create_order(saga, input_val)

    # Wait for approval signal (durable promise)
    approved = await ctx.promise("approval").value()

    if not approved:
        raise restate.TerminalError("Order rejected")

    shipment = await create_shipment(saga, {"order_id": order["order_id"]})
    return {"order_id": order["order_id"]}

@wf.handler()
async def approve(ctx: restate.WorkflowSharedContext, input_val):
    await ctx.promise("approval").resolve(input_val["approved"])
```

### Pydantic Validation

When step or workflow handlers use Pydantic model type hints, inputs are automatically validated:

```python
from pydantic import BaseModel

class CheckoutInput(BaseModel):
    customer_id: str

@saga_workflow("CheckoutWorkflow")
async def checkout(saga: SagaContext, input_val: CheckoutInput):
    # input_val is validated and converted to CheckoutInput
    order = await create_order(saga, input_val)
    return {"order_id": order["order_id"]}
```

### Step & Workflow Options

Configure retry policies and service-level options:

```python
from datetime import timedelta
from restate_saga import SagaStepOptions, StepRetryPolicy, SagaWorkflowOptions, WorkflowRetryPolicy

# Step-level retry
step = create_saga_step(
    name="ChargePayment",
    run=charge_handler,
    compensate=refund_handler,
    options=SagaStepOptions(
        retry=StepRetryPolicy(
            max_retry_attempts=3,
            initial_retry_interval=timedelta(seconds=1),
            retry_interval_factor=2.0,
            max_retry_interval=timedelta(seconds=30),
        ),
        compensation_retry=StepRetryPolicy(max_retry_attempts=5),
    ),
)

# Workflow-level options
workflow = create_saga_workflow(
    name="OrderWorkflow",
    handler=order_handler,
    options=SagaWorkflowOptions(
        retry_policy=WorkflowRetryPolicy(max_attempts=3),
        idempotency_retention=timedelta(days=1),
        ingress_private=True,
    ),
)
```

## Project Structure

```
restate_saga/
├── __init__.py            # Public API exports
├── steps.py               # create_saga_step, create_saga_step_strict, decorators
├── workflows.py           # create_saga_workflow, SagaWorkflowService
├── restate_workflows.py   # create_saga_restate_workflow (long-running)
├── virtual_objects.py     # create_saga_virtual_object
├── step_response.py       # StepResponse class
├── error_registry.py      # Terminal error registration
├── nested.py              # run_nested_saga, create_saga_module
├── types.py               # SagaContext, options, policies
└── _validation.py         # Pydantic input validation
app/
├── main.py                # FastAPI app with Restate mounted
└── services/              # Example service handlers
tests/                     # Test suite
```

## Running the Example App

### Prerequisites

1. Install [Restate Server](https://docs.restate.dev/develop/local_dev):
   ```bash
   # macOS
   brew install restatedev/tap/restate-server

   # Or Docker
   docker run -d --name restate -p 8080:8080 -p 9070:9070 docker.io/restatedev/restate:latest
   ```

2. Start Restate Server:
   ```bash
   restate-server
   ```

### Start the Service

```bash
# Install dependencies
pip install -e ".[dev]"

# Run the server
hypercorn app.main:api --bind 0.0.0.0:9080
```

### Register with Restate

```bash
restate deployments register http://localhost:9080/restate/v1
```

### Invoke a Workflow

```bash
curl -X POST http://localhost:8080/CheckoutWorkflow/run \
  -H "Content-Type: application/json" \
  -d '{"customer_id": "cust_123"}'
```

## API Reference

### Steps

- `create_saga_step(name, run, compensate?, options?)` - Hybrid compensation (registered before execution)
- `create_saga_step_strict(name, run, compensate?, options?)` - Strict compensation (registered after success)
- `@saga_step(name, compensate?, options?)` - Decorator for hybrid steps
- `@saga_step_strict(name, compensate?, options?)` - Decorator for strict steps
- `StepResponse(output, compensation_data?)` - Step result with optional compensation data
- `StepResponse.permanent_failure(message, compensation_data)` - Signal permanent failure

### Workflows

- `create_saga_workflow(name, handler, options?)` - Create a saga workflow service
- `@saga_workflow(name, options?)` - Decorator for workflows
- `SagaWorkflowService.run_as_step(parent_saga, input)` - Embed in parent workflow

### Restate Workflows

- `create_saga_restate_workflow(name, options?)` - Create a long-running workflow
- `SagaRestateWorkflow.main()` - Register main handler with saga support
- `SagaRestateWorkflow.handler()` - Register shared signal/query handlers
- `SagaRestateWorkflow.run_as_step(parent_saga, input)` - Embed in parent workflow

### Virtual Objects

- `create_saga_virtual_object(name, options?)` - Create a Virtual Object
- `SagaVirtualObject.handler(kind="exclusive")` - Exclusive handler with saga
- `SagaVirtualObject.handler(kind="shared")` - Shared handler without saga

### Error Registry

- `register_terminal_errors(error_classes)` - Register error classes as terminal
- `unregister_terminal_errors(error_classes)` - Unregister error classes
- `clear_terminal_errors()` - Clear all registered errors
- `set_global_error_mapper(mapper)` - Set a custom error mapper
- `resolve_terminal_error(err, step_mapper?)` - Resolve error to terminal

### Nested Sagas

- `run_nested_saga(saga, handler)` - Run inline saga with shared compensation
- `create_saga_module(handler)` - Create a reusable saga module

### Types

- `SagaContext` - Context with Restate `ctx` and `compensations` stack
- `StepRetryPolicy` - Retry config for step-level operations
- `SagaStepOptions` - Step options (retry, compensation_retry, as_terminal_error)
- `WorkflowRetryPolicy` - Retry config for service/workflow level
- `SagaWorkflowOptions` - Workflow options (retry, retention, timeouts)

## License

MIT
