Metadata-Version: 2.4
Name: pystator
Version: 0.0.1
Summary: A configuration-driven, stateless finite state machine library for Python
Author-email: StatFYI <contact@statfyi.com>
License: MIT
Project-URL: Homepage, https://github.com/statfyi/pystator
Project-URL: Documentation, https://github.com/statfyi/pystator#readme
Project-URL: Repository, https://github.com/statfyi/pystator
Project-URL: Issues, https://github.com/statfyi/pystator/issues
Project-URL: Changelog, https://github.com/statfyi/pystator/blob/main/CHANGELOG.md
Keywords: state-machine,fsm,finite-state-machine,workflow,configuration-driven,declarative,guards,transitions,order-management,event-driven
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pyyaml>=6.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.4.0; extra == "dev"
Requires-Dist: pytest-cov>=4.1.0; extra == "dev"
Requires-Dist: ruff>=0.4.0; extra == "dev"
Requires-Dist: mypy>=1.8.0; extra == "dev"
Requires-Dist: build>=1.0.0; extra == "dev"
Requires-Dist: twine>=5.0.0; extra == "dev"
Requires-Dist: types-PyYAML>=6.0.0; extra == "dev"
Provides-Extra: api
Requires-Dist: fastapi>=0.104.0; extra == "api"
Requires-Dist: uvicorn[standard]>=0.24.0; extra == "api"
Requires-Dist: pydantic>=2.0.0; extra == "api"
Requires-Dist: pydantic-settings>=2.0.0; extra == "api"
Requires-Dist: python-multipart>=0.0.6; extra == "api"
Requires-Dist: sqlalchemy>=2.0.0; extra == "api"
Provides-Extra: ui
Requires-Dist: fastapi>=0.104.0; extra == "ui"
Requires-Dist: uvicorn[standard]>=0.24.0; extra == "ui"
Requires-Dist: httpx>=0.24.0; extra == "ui"
Requires-Dist: aiofiles>=23.0.0; extra == "ui"
Provides-Extra: recipes
Requires-Dist: simpleeval>=0.9.0; extra == "recipes"
Requires-Dist: httpx>=0.24.0; extra == "recipes"
Dynamic: license-file

# PyStator

A configuration-driven, stateless finite state machine library for Python with support for hierarchical states, parallel states (orthogonal regions), and async execution.

PyStator defines behavioral contracts through YAML/JSON specifications, computing state transitions without holding internal state. Designed for high-integrity systems like order management, trading workflows, and distributed applications.

## Features

- **Configuration-Driven**: Define state machines in YAML/JSON with schema validation
- **Stateless Design**: Pure computation—takes state in, returns state/actions out
- **Hierarchical States**: Compound states with parent/child; transitions from parent match any sub-state; exit/enter chains follow LCA (see [Hierarchical States](#hierarchical-states-statecharts))
- **Parallel States**: Orthogonal regions—each region is an independent sub-machine; events can trigger region-scoped transitions (see [Parallel States](#parallel-states-orthogonal-regions))
- **Delayed Transitions**: Schedule transitions to fire after a delay with pluggable schedulers (see [Delayed Transitions](#delayed-transitions))
- **Inline Guard Expressions**: Define guards directly in YAML without Python code (see [Inline Guard Expressions](#inline-guard-expressions))
- **Action Parameters**: Pass configuration to actions from YAML (see [Action Parameters](#action-parameters))
- **Async Actions**: Register async callables and run them with sequential, parallel, or phased execution (see [Async Support](#async-support) and [Execution Modes (Actions)](#execution-modes-actions))
- **Guards**: Conditional transitions based on runtime context (sync and async)
- **Actions/Hooks**: Entry/exit hooks and transition actions
- **Timeouts/TTL**: Automatic transitions after configurable durations
- **Type-Safe**: Full type hints and PEP 561 compliance
- **Retry Mechanism**: Configurable retry with exponential backoff
- **Idempotency**: Pluggable backends for duplicate detection
- **Visualization**: Generate diagrams from FSM definitions
- **Scheduler Adapters**: Pluggable backends for delayed transitions (asyncio, Redis, Celery)
- **Optional API**: REST API via `pip install pystator[api]`

## Installation

```bash
# Core library
pip install pystator

# With API server
pip install pystator[api]

# Development
pip install -e ".[dev]"
```

## Quick Start

### 1. Define Your State Machine (YAML)

```yaml
# order_fsm.yaml
meta:
  version: "1.0.0"
  machine_name: "order_management"
  strict_mode: true

states:
  - name: PENDING
    type: initial
    timeout:
      seconds: 5.0
      destination: TIMED_OUT

  - name: OPEN
    type: stable
    on_enter:
      - notify_ui
      - log_audit

  - name: FILLED
    type: terminal

transitions:
  - trigger: exchange_ack
    source: PENDING
    dest: OPEN
    actions:
      - update_order_id

  - trigger: execution_report
    source: OPEN
    dest: FILLED
    guards:
      - is_full_fill
    actions:
      - update_positions
```

### 2. Use the State Machine

```python
from pystator import StateMachine, GuardRegistry, ActionRegistry, Event
from pystator.actions import ActionExecutor

# Load FSM
machine = StateMachine.from_yaml("order_fsm.yaml")

# Register guards and actions
guards = GuardRegistry()
guards.register("is_full_fill", lambda ctx: ctx["fill_qty"] >= ctx["order_qty"])
machine.bind_guards(guards)

actions = ActionRegistry()
actions.register("update_order_id", lambda ctx: print(f"Order ID: {ctx['order_id']}"))
actions.register("update_positions", lambda ctx: print("Positions updated"))

executor = ActionExecutor(actions)

# Process event (pure computation)
result = machine.process(
    current_state="OPEN",
    event="execution_report",
    context={"fill_qty": 100, "order_qty": 100}
)

# Execute actions after persistence
if result.success:
    db.update_state(order_id, result.target_state)
    executor.execute(result, context)
```

## Core Concepts

### States

States represent nodes in the state machine graph:

```python
from pystator import State, StateType, Timeout

state = State(
    name="PENDING",
    type=StateType.INITIAL,  # initial, stable, terminal, error, parallel
    on_enter=("log_entry",),
    on_exit=("log_exit",),
    timeout=Timeout(seconds=5.0, destination="TIMED_OUT"),
)
```

### Transitions

```python
from pystator import Transition

transition = Transition(
    trigger="execution_report",
    source=frozenset({"OPEN", "PARTIALLY_FILLED"}),
    dest="FILLED",
    guards=("is_full_fill",),
    actions=("update_positions",),
)
```

### Guards

Guards are pure functions that control whether transitions are allowed:

```python
from pystator import GuardRegistry

guards = GuardRegistry()

@guards.decorator("has_buying_power")
def has_buying_power(ctx: dict) -> bool:
    return ctx["buying_power"] >= ctx["order_value"]

# Async guards
@guards.decorator("check_external")
async def check_external(ctx: dict) -> bool:
    account = await broker.get_account()
    return account.buying_power >= ctx["order_value"]
```

### Actions

Actions handle side effects and are executed **after** state persistence. You can register **sync or async** callables; both take a single `context` dict (event payload, entity state, etc.).

**Async actions**: Register async callables with `ActionRegistry` and run them with `executor.async_execute_*`. Use [Execution Modes](#execution-modes) (sequential, parallel, phased) to control how actions are run.

```python
from pystator import ActionRegistry
from pystator.actions import ActionExecutor, ExecutionMode

actions = ActionRegistry()

@actions.decorator()
def send_notification(ctx: dict) -> None:
    email_service.send(ctx["user_email"], "Order updated")

@actions.decorator()
async def update_position(ctx: dict) -> None:
    await db.update_position(ctx["symbol"], ctx["quantity"])

executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)

# Sync execution (sequential by default)
result = executor.execute(transition_result, context)

# Async execution: parallel (all at once) or phased (exit → transition → enter)
result = await executor.async_execute_parallel(transition_result, context)
result = await executor.async_execute_phased(transition_result, context)
```

## Hierarchical States (Statecharts)

States can be nested with **parent/child** relationships. The machine resolves to a single **active leaf**: compound states specify an `initial_child`, and the engine follows that chain until it reaches a leaf. Transitions are matched by **ancestry**—a transition whose `source` is a parent applies when the current state is that parent or any descendant.

**Exit/enter order**: When transitioning between two leaves, the engine computes the lowest common ancestor (LCA). Exit actions run from the current leaf up to (but not including) the LCA; enter actions run from the LCA down to the target leaf. This preserves statechart semantics.

```yaml
states:
  - name: active
    type: stable
    initial_child: active.scanning
    on_enter: [start_feed]
    on_exit: [stop_feed]
    
  - name: active.scanning
    parent: active
    type: stable
    
  - name: active.analyzing
    parent: active
    type: stable
    
  - name: halted
    type: terminal

transitions:
  # Transition between siblings
  - trigger: signal_detected
    source: active.scanning
    dest: active.analyzing
    
  # Transition from parent applies to any child
  - trigger: emergency_stop
    source: active
    dest: halted
```

When processing `emergency_stop` from `active.scanning`, the transition succeeds because `active` is an ancestor of the current leaf. `get_initial_state()` returns the resolved leaf (e.g. `active.scanning`), not the compound root.

## Parallel States (Orthogonal Regions)

**Parallel states** contain **orthogonal regions**—each region is an independent sub-machine with its own current state. All regions are active at once; an event can trigger a transition in one or more regions via **region-scoped** transitions (`region: region_name`). Use parallel states when you need concurrent behaviors (e.g. trading workflow + risk monitor + data feed).

Each region has an `initial` state and a `states` list (the valid states in that region). You can define region states as separate state entries (with `parent: parallel_state_name`) to attach `on_enter`/`on_exit` hooks.

```yaml
states:
  - name: active
    type: parallel
    regions:
      - name: trading
        initial: scanning
        states: [scanning, analyzing, executing]
        
      - name: risk_monitor
        initial: normal
        states: [normal, elevated, critical]
        
      - name: data_feed
        initial: connecting
        states: [connecting, connected, failed]

  # Optional: define region states for on_enter/on_exit
  - name: scanning
    type: stable
    parent: active
    on_enter: [start_scanners]
    
  - name: normal
    type: stable
    parent: active
    on_enter: [reset_risk_alerts]

transitions:
  # Region-scoped: only this region is considered
  - trigger: signal_detected
    source: scanning
    dest: analyzing
    region: trading
    
  - trigger: risk_warning
    source: normal
    dest: elevated
    region: risk_monitor
```

### Using Parallel States

```python
machine = StateMachine.from_yaml("trading_fsm.yaml")

# Enter parallel state - initializes all regions
config = machine.enter_parallel_state("active")
# config.region_states = {"trading": "scanning", "risk_monitor": "normal", "data_feed": "connecting"}

# Process events that affect specific regions
config, results = machine.process_parallel(config, "signal_detected", context)
# Trading region: scanning -> analyzing

config, results = machine.process_parallel(config, "risk_warning", context)
# Risk region: normal -> elevated (if guards pass)

# Each result contains actions for that region transition
for result in results:
    await executor.async_execute_phased(result, context)

# Exit parallel state
exit_actions = machine.get_parallel_exit_actions(config)
```

## Delayed Transitions

Schedule transitions to fire automatically after a specified delay using the `after` field. This is useful for timeouts, retries, and scheduled state changes.

### YAML Syntax

```yaml
transitions:
  - trigger: timeout
    source: waiting
    dest: retry
    after: 5000        # 5 seconds (milliseconds)
  
  - trigger: expire
    source: pending
    dest: cancelled
    after: "30m"       # 30 minutes (string with unit)
  
  - trigger: daily_close
    source: active
    dest: closed
    after: "1h"        # 1 hour
```

**Delay formats:**
- Integer: milliseconds (e.g., `5000` = 5 seconds)
- String with unit: `"5s"` (seconds), `"30m"` (minutes), `"1h"` (hours)

**Implicit trigger:** When only `after` (and `dest`, optional guards) is set, you can omit `trigger`; the loader assigns a synthetic trigger so the orchestrator can schedule the transition. Delayed transitions with implicit trigger must have exactly one source state.

### Using Delayed Transitions

Delayed transitions require a scheduler adapter. Use `AsyncioScheduler` for zero-infrastructure scheduling:

```python
from pystator import StateMachine, Orchestrator, AsyncioScheduler
from pystator.core.state_store import InMemoryStateStore

machine = StateMachine.from_yaml("order_fsm.yaml")
store = InMemoryStateStore()
scheduler = AsyncioScheduler()

orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    guards=guards,
    actions=actions,
    scheduler=scheduler,  # Enable delayed transitions
)

# Process events - delayed transitions are scheduled automatically
result = await orchestrator.async_process_event("order-123", "submit", context)
# If target state has delayed transitions, they'll be scheduled
# When the delay expires, the event fires automatically
```

### Scheduler Adapters

| Adapter | Use Case | Infrastructure |
|---------|----------|----------------|
| `AsyncioScheduler` | Development, testing, single-process apps | None (in-memory) |
| `RedisScheduler` | Production, distributed, persistence needed | Redis server |
| `CeleryScheduler` | Production, task queue integration | Celery + broker |

```python
# Zero infrastructure (asyncio)
from pystator.scheduler import AsyncioScheduler
scheduler = AsyncioScheduler()

# With Redis (production)
from pystator.scheduler import RedisScheduler
from redis.asyncio import Redis
redis = Redis.from_url("redis://localhost:6379")
scheduler = RedisScheduler(redis)
await scheduler.start()  # Start polling loop

# With Celery
from pystator.scheduler import CeleryScheduler
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379')
scheduler = CeleryScheduler(celery_app)
scheduler.register_task()  # Register with Celery
```

## Inline Guard Expressions

Define guards directly in YAML using the `expr` syntax. No Python code needed for simple conditions.

### YAML Syntax

```yaml
transitions:
  - trigger: fill
    source: open
    dest: filled
    guards:
      - expr: "fill_qty >= order_qty"           # Inline expression
      - expr: "price > 0 and symbol != ''"      # Multiple conditions
      - is_valid_order                           # Named guard function
  
  - trigger: partial_fill
    source: open
    dest: partial
    guards:
      - expr: "fill_qty > 0 and fill_qty < order_qty"
```

### Expression Features

Context variables are directly available in expressions:

```yaml
# If context = {"qty": 100, "filled": 50, "price": 10.5}
guards:
  - expr: "qty > 0"              # True
  - expr: "filled < qty"         # True
  - expr: "price * qty < 2000"   # True (1050 < 2000)
  - expr: "status == 'active'"   # String comparison
  - expr: "item in items"        # List membership
  - expr: "not is_cancelled"     # Boolean negation
```

**Supported operations:**
- Comparisons: `>`, `<`, `>=`, `<=`, `==`, `!=`
- Boolean: `and`, `or`, `not`
- Arithmetic: `+`, `-`, `*`, `/`
- Membership: `in`, `not in`

**Safe builtins:** Inline expressions can use a small allowlist of builtins: `len`, `min`, `max`, `abs`, `sum`. For example: `len(positions) < max_positions` works without precomputing `position_count` in context.

**Note:** Inline expressions require `simpleeval`. Install with: `pip install pystator[recipes]`

## Event Normalization

Optional `meta.event_normalizer` (e.g. `"lower"` or `"upper"`) normalizes trigger and incoming event names **at process time** before matching. Config and application code can use UPPERCASE event names while the engine normalizes for comparison.

```yaml
meta:
  event_normalizer: lower

transitions:
  - trigger: FILL
    source: open
    dest: filled
```

Calling `machine.process(state, "fill", context)` or `machine.process(state, "FILL", context)` both match the transition.

## Context for guards

Guard expressions see a **flat** namespace. Build it in your application before calling `process`, or use the helper from the recipes package:

```python
from pystator.recipes import flatten_context_for_guards

# Nested context: add derived values for guards
ctx = flatten_context_for_guards(
    {"trading_context": {"positions": [...], "buying_power": 10000}},
    overrides={"position_count": 3},  # or len(trading_context["positions"])
)
result = machine.process(current_state, "signal", ctx)
```

Recommend building the flat dict in the application or in the state store adapter when providing context.

## Invoke (long-lived services)

Optional `invoke` on a state: list of service refs (`id`, `src`, optional `on_done`). The **orchestrator** starts services on enter and stops them on exit via an **invoke adapter**. When a service completes, the adapter emits the `on_done` event (e.g. the app calls `process_event(entity_id, on_done)`). Without an adapter, invoke is a no-op.

```yaml
states:
  - name: monitoring
    on_enter: [start_metrics]
    invoke:
      - id: price_feed
        src: websocket
        on_done: feed_ended
```

Provide an `InvokeAdapter` when creating the orchestrator; implement `start_services(entity_id, state_name, invoke_specs, context)` and `stop_services(entity_id, state_name)`. See `orchestration.invoke.InvokeAdapter` and `NoOpInvokeAdapter`.

## Action Parameters

Pass configuration to actions directly from YAML using the `params` syntax. Actions receive parameters via the context. **Parameterized entry/exit:** `on_enter` and `on_exit` on states support the same shape as transition actions: a string (action name) or `{ name, params }`.

### YAML Syntax

```yaml
transitions:
  - trigger: order_filled
    source: open
    dest: filled
    actions:
      - log_fill                    # Simple action (no params)
      - name: send_notification     # Parameterized action
        params:
          channel: slack
          priority: high
      - name: update_metrics
        params:
          metric_name: fills
          increment: 1
```

### Accessing Parameters

Parameters are injected into the context under `_action_params`:

```python
from pystator.actions import ACTION_PARAMS_KEY

@actions.decorator()
def send_notification(context: dict) -> None:
    params = context.get(ACTION_PARAMS_KEY, {})
    channel = params.get("channel", "email")  # "slack"
    priority = params.get("priority", "normal")  # "high"
    
    notify(channel=channel, priority=priority, message=context["message"])
```

## Async Support

PyStator supports **async** throughout: guards, actions, and processing.

- **Async guards**: Register async callables; they are awaited when you use `machine.async_process()` or `machine.async_process_parallel()`. Use for checks that call external services (e.g. buying power, permissions).
- **Async actions**: Register async callables; run them with `executor.async_execute_*` after persisting the transition. Use for side effects that call APIs, DBs, or message queues.
- **Async processing**: Use `async_process()` / `async_process_parallel()` when guards are async or you want non-blocking transition computation.

```python
# Async guards (evaluated during async_process / async_process_parallel)
@guards.decorator()
async def check_buying_power(ctx: dict) -> bool:
    account = await broker.get_account()
    return account.buying_power >= ctx["order_value"]

# Async actions (run with executor.async_execute_* after persistence)
@actions.decorator()
async def update_position(ctx: dict) -> None:
    async with db.transaction():
        await db.update_position(ctx["symbol"], ctx["qty"])

# Async transition computation
result = await machine.async_process("OPEN", "execution_report", context)

# Async parallel state processing
config, results = await machine.async_process_parallel(config, event, context)

# Async action execution (parallel or phased)
execution = await executor.async_execute_parallel(result, context)
```

### Execution Modes (Actions)

How **actions** are run after a transition (sync with `execute()`, async with `async_execute_*`):

| Mode | Description | Use Case |
|------|-------------|----------|
| `SEQUENTIAL` | Actions run one after another | Default, safest |
| `PARALLEL` | All actions run concurrently | Independent actions, latency critical |
| `PHASED` | Exit → transition → enter in parallel phases | Respects state machine semantics |

```python
from pystator.actions import ExecutionMode

executor = ActionExecutor(actions, default_mode=ExecutionMode.PHASED)

# Or specify per-call
result = await executor.async_execute_with_mode(
    transition_result, context, mode=ExecutionMode.PARALLEL
)
```

## The Sandwich Pattern

PyStator is designed around "Load → Decide → Commit → Act":

```
┌─────────────────────────────────────────────────────────────┐
│ 1. INGRESS: Receive event, normalize to trigger + payload   │
├─────────────────────────────────────────────────────────────┤
│ 2. HYDRATION: Load current state from database              │
├─────────────────────────────────────────────────────────────┤
│ 3. COMPUTE: machine.process() - pure, no side effects       │
├─────────────────────────────────────────────────────────────┤
│ 4. PERSIST: Atomic DB transaction (state + audit trail)     │
├─────────────────────────────────────────────────────────────┤
│ 5. EXECUTE: Run actions AFTER successful commit             │
└─────────────────────────────────────────────────────────────┘
```

## Configuration Schema

```yaml
meta:
  version: "1.0.0"
  machine_name: "my_fsm"
  strict_mode: true

states:
  - name: STATE_NAME
    type: initial|stable|terminal|error|parallel
    parent: PARENT_STATE        # For hierarchical states
    initial_child: CHILD_STATE  # For compound states
    regions:                    # For parallel states
      - name: region_name
        initial: initial_state
        states: [state1, state2]
    on_enter: [action1]
    on_exit: [action2]
    timeout:
      seconds: 5.0
      destination: TIMEOUT_STATE

transitions:
  - trigger: event_name
    source: STATE_A
    dest: STATE_B
    region: region_name         # For parallel state transitions
    after: 5000                 # Delayed transition (milliseconds or "5s"/"5m"/"1h")
    guards:
      - named_guard             # Named guard function
      - expr: "qty > 0"         # Inline expression
    actions:
      - simple_action           # Simple action
      - name: parameterized     # Action with parameters
        params:
          key: value

error_policy:
  default_fallback: ERROR_STATE
  retry_attempts: 3
```

## Examples

See the `examples/` directory for complete examples:

- **`order_fsm.yaml`** / **`basic_usage.py`**: Simple order lifecycle
- **`day_trading_fsm.yaml`** / **`day_trading_example.py`**: Parallel states for trading
- **`portfolio_optimization_fsm.yaml`** / **`portfolio_optimization_example.py`**: Hierarchical states for workflows

## REST API

With `pip install pystator[api]`:

```bash
uvicorn pystator.api.main:app --port 8000
```

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check |
| `/api/v1/validate` | POST | Validate FSM config |
| `/api/v1/process` | POST | Compute transition |
| `/api/v1/machines` | GET/POST | List/create machines |
| `/api/v1/machines/{id}` | GET/PUT/DELETE | CRUD operations |

## API Reference

### StateMachine

```python
# Creation
machine = StateMachine.from_yaml("config.yaml")
machine = StateMachine.from_dict(config_dict)

# Processing
result = machine.process(current_state, event, context)
result = await machine.async_process(current_state, event, context)

# Parallel states
config = machine.enter_parallel_state("parallel_state")
config, results = machine.process_parallel(config, event, context)
config, results = await machine.async_process_parallel(config, event, context)

# Queries
machine.get_state("STATE")
machine.get_initial_state()
machine.get_available_transitions("STATE")
machine.is_parallel_state("STATE")
machine.validate_parallel_config(config)
```

### TransitionResult

```python
result.success           # bool
result.source_state      # str
result.target_state      # str | None
result.trigger           # str
result.all_actions       # tuple[str, ...] (exit + transition + enter)
result.error             # FSMError | None
```

### ParallelStateConfig

```python
config.parallel_state       # str
config.region_states        # dict[str, str]
config.get_region_state("trading")  # str | None
config.is_in_state("scanning")      # bool
config.get_all_active()             # list[str]
config.to_string()                  # "active:trading=scanning,risk=normal"
```

## Development

```bash
# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/

# Linting
ruff check .
ruff format .
```

## License

MIT
