Metadata-Version: 2.4
Name: vention-state-machine
Version: 0.4.46
Summary: Declarative state machine framework for machine apps
License: Proprietary
Author: VentionCo
Requires-Python: >=3.10,<3.11
Classifier: License :: Other/Proprietary License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Requires-Dist: asyncio (>=3.4.3,<4.0.0)
Requires-Dist: coverage (>=7.10.1,<8.0.0)
Requires-Dist: graphviz (>=0.21,<0.22)
Requires-Dist: transitions (>=0.9.3,<0.10.0)
Requires-Dist: uvicorn (>=0.35.0,<0.36.0)
Requires-Dist: vention-communication (>=0.4.0,<0.5.0)
Description-Content-Type: text/markdown

# vention-state-machine

A lightweight wrapper around `transitions` for building async-safe, recoverable hierarchical state machines with minimal boilerplate.

## Table of Contents

- [✨ Features](#-features)
- [🧠 Concepts & Overview](#-concepts--overview)
- [⚙️ Installation & Setup](#️-installation--setup)
- [🚀 Quickstart Tutorial](#-quickstart-tutorial)
- [🛠 How-to Guides](#-how-to-guides)
- [🧩 Extension Hooks](#-extension-hooks)
- [📖 API Reference](#-api-reference)
- [🔍 Troubleshooting & FAQ](#-troubleshooting--faq)

## ✨ Features

- Built-in `ready` / `fault` states
- Global transitions: `to_fault`, `reset`
- Optional state recovery (`recover__state`)
- Async task spawning and cancellation
- Timeouts and auto-fault handling
- Transition history recording with timestamps + durations
- Guard conditions for blocking transitions
- Global state change callbacks for logging/MQTT
- Optional RPC bundle for exposing state machine via Connect RPCs
- Extension hooks loaded from a folder for deployment-specific customization without forking

## 🧠 Concepts & Overview

This library uses a **declarative domain-specific language (DSL)** to define state machines in a readable, strongly typed way.

- **State** → A leaf node in the state machine
- **StateGroup** → Groups related states, creating hierarchical namespaces
- **Trigger** → Named events that initiate transitions

Example:

```python
class MyStates(StateGroup):
    idle: State = State()
    working: State = State()

class Triggers:
    begin = Trigger("begin")
    finish = Trigger("finish")

TRANSITIONS = [
    Triggers.finish.transition(MyStates.working, MyStates.idle),
]
```


### Base States and Triggers

All machines include:

**States:**
- `ready` (initial)
- `fault` (global error)

**Triggers:**
- `start`, `to_fault`, `reset`

```python
from state_machine.core import BaseStates, BaseTriggers

state_machine.trigger(BaseTriggers.RESET.value)
assert state_machine.state == BaseStates.READY.value
```

## ⚙️ Installation & Setup

```bash
pip install vention-state-machine
```

**Optional dependencies:**
- Graphviz (required for diagram generation)
- vention-communication (for RPC bundle integration)

**Install optional tools:**

MacOS:
```bash
brew install graphviz
pip install vention-communication
```

Linux (Debian/Ubuntu)
```bash
sudo apt-get install graphviz
pip install vention-communication
```


## 🚀 Quickstart Tutorial

### 1. Define States and Triggers

```python
from state_machine.defs import StateGroup, State, Trigger

class Running(StateGroup):
    picking: State = State()
    placing: State = State()
    homing: State = State()

class States:
    running = Running()

class Triggers:
    start = Trigger("start")
    finished_picking = Trigger("finished_picking")
    finished_placing = Trigger("finished_placing")
    finished_homing = Trigger("finished_homing")
    to_fault = Trigger("to_fault")
    reset = Trigger("reset")
```

### 2. Define Transitions

```python
TRANSITIONS = [
    Triggers.start.transition("ready", States.running.picking),
    Triggers.finished_picking.transition(States.running.picking, States.running.placing),
    Triggers.finished_placing.transition(States.running.placing, States.running.homing),
    Triggers.finished_homing.transition(States.running.homing, States.running.picking),
]
```

### 3. Implement Your State Machine

```python
from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state, auto_timeout, guard, on_state_change

class CustomMachine(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)

    @on_enter_state(States.running.picking)
    @auto_timeout(5.0, Triggers.to_fault)
    def enter_picking(self, _):
        print("🔹 Entering picking")

    @on_enter_state(States.running.placing)
    def enter_placing(self, _):
        print("🔸 Entering placing")

    @on_enter_state(States.running.homing)
    def enter_homing(self, _):
        print("🔺 Entering homing")

    @guard(Triggers.reset)
    def check_safety_conditions(self) -> bool:
        return not self.estop_pressed

    @on_state_change
    def publish_state_to_mqtt(self, old_state: str, new_state: str, trigger: str):
        mqtt_client.publish("machine/state", {
            "old_state": old_state,
            "new_state": new_state,
            "trigger": trigger
        })
```

### 4. Start It

`start()` is async — it awaits any `@on_runtime_start` hook handlers and then schedules `@register_background_task` coroutines. Even when you have no hooks, you still need to `await` it.

```python
import asyncio
from state_machine.core import StateMachine

async def main():
    state_machine = StateMachine()
    await state_machine.start()
    # ... run ...
    await state_machine.stop_background_tasks()  # cancels @register_background_task tasks only

asyncio.run(main())
```

If you don't use the hook system at all, `state_machine.trigger("start")` is a sync drop-in that fires the initial transition without the lifecycle dispatch.

## 🛠 How-to Guides

### Expose Over RPC with VentionApp

```python
import asyncio
from communication.app import VentionApp
from state_machine.vention_communication import build_state_machine_bundle
from state_machine.core import StateMachine

async def setup():
    state_machine = StateMachine(...)
    await state_machine.start()

    app = VentionApp(name="MyApp")
    bundle = build_state_machine_bundle(state_machine)
    app.register_rpc_plugin(bundle)
    app.finalize()
    return state_machine, app

state_machine, app = asyncio.run(setup())
```

**RPC Actions:**
- `GetState` → Returns current state and last known state
- `GetHistory` → Returns transition history with timestamps
- `Trigger_<TriggerName>` → Triggers a state transition (e.g., `Trigger_Start`, `Trigger_Activate`)

**Options:**
```python
# Customize which actions are included
bundle = build_state_machine_bundle(
    state_machine,
    include_state_actions=True,  # Include GetState
    include_history_action=True,  # Include GetHistory
    triggers=["start", "activate"],  # Only include specific triggers
)
```

### Timeout Example

```python
@auto_timeout(5.0, Triggers.to_fault)
def enter_state(self, _):
    ...
```

### Recovery Example

```python
state_machine = StateMachine(enable_last_state_recovery=True)
await state_machine.start()  # will attempt recover__{last_state}
```

### Triggering state transitions via I/O

Here's an example of hooking up state transitions to I/O events via MQTT

```python
import asyncio
import paho.mqtt.client as mqtt
from state_machine.core import StateMachine
from state_machine.defs import State, StateGroup, Trigger
from state_machine.decorators import on_enter_state

class MachineStates(StateGroup):
    idle: State = State()
    running: State = State()

class States:
    machine = MachineStates()

class Triggers:
    start_button = Trigger("start_button")
    box_missing = Trigger("box_missing")

TRANSITIONS = [
    Triggers.start_button.transition(States.machine.idle, States.machine.running),
    Triggers.box_missing.transition(States.machine.running, States.machine.idle),
]

class MachineController(StateMachine):
    def __init__(self):
        super().__init__(states=States, transitions=TRANSITIONS)
        self.mqtt_client = mqtt.Client()
        self.setup_mqtt()

    def setup_mqtt(self):
        """Configure MQTT client to listen for I/O signals."""
        self.mqtt_client.on_connect = self.on_mqtt_connect
        self.mqtt_client.on_message = self.on_mqtt_message
        self.mqtt_client.connect("localhost", 1883, 60)
        
        # Start MQTT loop in background
        self.spawn(self.mqtt_loop())

    async def mqtt_loop(self):
        """Background task to handle MQTT messages."""
        self.mqtt_client.loop_start()
        while True:
            await asyncio.sleep(0.1)

    def on_mqtt_connect(self, client, userdata, flags, rc):
        """Subscribe to I/O topics when connected."""
        client.subscribe("machine/io/start_button")
        client.subscribe("machine/sensors/box_sensor")

    def on_mqtt_message(self, client, userdata, msg):
        """Handle incoming MQTT messages and trigger state transitions."""
        topic = msg.topic
        payload = msg.payload.decode()
        
        # Map MQTT topics to state machine triggers
        if topic == "machine/io/start_button" and payload == "pressed":
            self.trigger(Triggers.start_button.value)
        elif topic == "machine/sensors/box_sensor" and payload == "0":
            self.trigger(Triggers.box_missing.value)

    @on_enter_state(States.machine.running)
    def enter_running(self, _):
        print("🔧 Machine started - processing parts")
        self.mqtt_client.publish("machine/status", "running")

    @on_enter_state(States.machine.idle)
    def enter_idle(self, _):
        print("⏸️ Machine idle - ready for start")
        self.mqtt_client.publish("machine/status", "idle")
```

## 🧩 Extension Hooks

Extension hooks let deployment-specific code attach to an existing state machine without forking the consumer's source. Handler files live in a folder on disk, self-register via decorators when imported, and are wired into the machine by a single `load_hooks(...)` call at boot. Once loaded they participate in the same guard / on-enter / on-exit / lifecycle machinery that class-level decorators use.

The library exposes the hook surface from `state_machine.hooks`. Consumers (palletizer, sanding, future controllers) own the integration root they hand to handlers via a context subclass.

### When to reach for hooks

Use **class-level decorators** (`@on_enter_state`, `@guard`, etc.) for behavior the consuming application owns: anything that ships with the state machine and is the same for every deployment.

Use **hooks** for behavior a deployment adds on top: a barcode scan during validation for one customer, a tower-light renderer for another, a CSV logger that follows the firmware across over-the-air updates. Hooks live outside the consumer's source tree, persist across firmware updates, and never require touching application code.

`add_*` and `replace_*` exist so a deployment can either compose with class-level behavior or take over from it. See **Composition rules** below.

### Decorators

All eight decorators are exported from `state_machine.hooks`. Each handler takes a single `HookContext` argument (or your subclass of it).

**State entry and exit** — sync or async. Async hooks are scheduled on the running event loop via `machine.spawn(...)` and do not block the transition. If no event loop is running, the coroutine is closed and a warning is logged.

```python
from state_machine.hooks import (
    add_on_enter_state, replace_on_enter_state,
    add_on_exit_state,  replace_on_exit_state,
)

@add_on_enter_state("Picking")          # runs alongside class @on_enter_state
def telemetry_on_pick(context): ...

@replace_on_enter_state("Picking")      # runs instead of class @on_enter_state
async def custom_pick(context): ...

@add_on_exit_state("Picking")
def telemetry_on_exit(context): ...

@replace_on_exit_state("Picking")
async def custom_exit(context): ...
```

**Guards** — sync only; async guards are rejected at boot because pytransitions does not await condition callables.

```python
from state_machine.hooks import add_guard, replace_guard

@add_guard("start_pallet")              # composes with class-level @guard(s)
def check_capacity(context) -> bool: ...

@replace_guard("start_pallet")          # only gate for this trigger
def custom_safety_check(context) -> bool: ...
```

**Lifecycle** — run around `state_machine.start()`. `@on_runtime_start` handlers fire in registration order (sync or async, awaited if async; failure aborts startup). `@register_background_task` coroutines are scheduled after runtime-start completes, survive `to_fault`, and are only cancelled by `stop_background_tasks()`.

```python
from state_machine.hooks import on_runtime_start, register_background_task

@on_runtime_start
async def connect_mqtt(context):
    ...

@register_background_task
async def telemetry_loop(context):
    while True:
        await publish_status(context)
        await asyncio.sleep(1)
```

### Composition rules

| Decorator pair | What runs |
|---|---|
| `add_on_enter_state` | hook first, then class-level `@on_enter_state` |
| `replace_on_enter_state` | hook only; class-level binding skipped |
| `add_on_exit_state` | hook first, then class-level `@on_exit_state` |
| `replace_on_exit_state` | hook only; class-level binding skipped |
| `add_guard` | class-level `@guard`s first, then hook; all must return `True` |
| `replace_guard` | hook only; class-level guards skipped |
| `on_runtime_start` | every handler runs in registration order; failure aborts startup |
| `register_background_task` | scheduled after runtime-start completes; cancelled by `stop_background_tasks()` |

One handler per `(decorator, target)` pair. A second registration for the same slot is a boot error.

### Extending `HookContext`

Each handler receives a `HookContext` carrying the live state machine. Consumers extend it with one integration root — services, config, model accessors — and pass a `context_factory` to `load_hooks(...)`.

```python
from dataclasses import dataclass
from state_machine.hooks import HookContext

@dataclass(frozen=True)
class PalletizerHookContext(HookContext):
    app: PalletizerApp        # services, config, model, runtime

def palletizer_context_factory(base: HookContext) -> PalletizerHookContext:
    return PalletizerHookContext(state_machine=base.state_machine, app=palletizer_app)
```

A fresh context is constructed for every hook invocation so handlers see consistent live state. The factory closes over whatever the consumer wants to expose.

### Loading hooks

```python
state_machine.load_hooks(hooks_directory, *, context_factory=None)
```

- `hooks_directory` — a `Path` whose `handlers/` package contains the handler modules. One handler per file is a useful convention but not a constraint; the loader imports every top-level `.py` module inside `handlers/` (no recursion into subpackages) and decorators self-register globally. Missing directory is a silent no-op (deployment installed no hooks). Missing `handlers/` package raises `ModuleNotFoundError`.
- `context_factory` — optional `Callable[[HookContext], YourContext]`. If omitted, handlers receive the base `HookContext`.
- One-shot per machine. A second call raises `RuntimeError` — create a fresh machine to swap directories.
- Boot-time validation catches unknown states / triggers, async guards, sync background tasks, and signatures that don't accept a context. All of these aggregate into a single `HookValidationError` so consumers fix every offending handler in one pass, not one reboot at a time. Duplicate `(decorator, target)` registrations are the exception: they raise `DuplicateHookError` at import time, before the aggregated validation runs.

Expected layout:

```
/data/vention/my-hooks/
└── handlers/
    ├── __init__.py
    ├── barcode_check.py
    ├── capacity_guard.py
    ├── startup_banner.py
    └── telemetry_loop.py
```

Adding a file activates it. Deleting it (or removing the decorator) deactivates it. No manifest required.

Files can hold any number of decorators, including a mix of types — `@add_guard`, `@add_on_enter_state`, and `@register_background_task` can live side by side. The only registration rule is that `(decorator, target)` pairs are globally unique across the whole `handlers/` package: two `@add_on_enter_state("Picking")` handlers in the same package (same file or different files) is a `DuplicateHookError` at boot. `@on_runtime_start` and `@register_background_task` can register any number and fire in import order.

### End-to-end example

The consumer wires the state machine, declares its context type, and calls `load_hooks(...)`:

```python
import asyncio
from dataclasses import dataclass
from pathlib import Path

from state_machine.core import StateMachine
from state_machine.decorators import on_enter_state
from state_machine.defs import State, StateGroup, Trigger
from state_machine.hooks import HookContext


@dataclass(frozen=True)
class DemoApp:
    inventory_threshold: int


@dataclass(frozen=True)
class DemoHookContext(HookContext):
    app: DemoApp


class Running(StateGroup):
    picking: State = State()
    placing: State = State()


class States:
    running = Running()


class Triggers:
    start = Trigger("start")
    picked = Trigger("picked")


TRANSITIONS = [
    Triggers.start.transition("ready", States.running.picking),
    Triggers.picked.transition(States.running.picking, States.running.placing),
]


class DemoMachine(StateMachine):
    def __init__(self) -> None:
        super().__init__(states=States, transitions=TRANSITIONS)

    @on_enter_state(States.running.placing)
    def enter_placing(self, _):
        print("placing reached")


async def main() -> None:
    machine = DemoMachine()
    app = DemoApp(inventory_threshold=5)
    machine.load_hooks(
        Path("/data/vention/demo-hooks"),
        context_factory=lambda base: DemoHookContext(state_machine=base.state_machine, app=app),
    )
    await machine.start()
    # ... run ...
    await machine.stop_background_tasks()


asyncio.run(main())
```

The deployment ships a `handlers/` package, one decorator per file. Handlers pass state and trigger targets as strings (`"Running_picking"`, `"start"`) rather than typed descriptors (`States.running.picking`) because they don't import the consumer's `States` / `Triggers` containers — only the shared context type and the hook decorators cross the contract boundary:

```python
# /data/vention/demo-hooks/handlers/capacity_guard.py
from state_machine.hooks import add_guard
from my_contract import DemoHookContext

@add_guard("start")
def check_capacity(context: DemoHookContext) -> bool:
    return context.app.inventory_threshold > 0
```

```python
# /data/vention/demo-hooks/handlers/barcode_check.py
from state_machine.hooks import add_on_enter_state
from my_contract import DemoHookContext

@add_on_enter_state("Running_picking")
async def scan_barcode(context: DemoHookContext) -> None:
    # imagine an async barcode read here
    context.state_machine.trigger("picked")
```

```python
# /data/vention/demo-hooks/handlers/startup_banner.py
from state_machine.hooks import on_runtime_start
from my_contract import DemoHookContext

@on_runtime_start
async def announce_startup(context: DemoHookContext) -> None:
    print(f"hooks online, threshold={context.app.inventory_threshold}")
```

```python
# /data/vention/demo-hooks/handlers/telemetry_loop.py
import asyncio
from state_machine.hooks import register_background_task
from my_contract import DemoHookContext

@register_background_task
async def telemetry(context: DemoHookContext) -> None:
    while True:
        # publish whatever your app exposes
        await asyncio.sleep(1)
```

`DemoHookContext` lives in a shared contract module (`my_contract` above) so both the consumer and the handler package import the same type and CI catches breaking changes on either side. Outside a monorepo, publish that module as a small companion package.

### Boot-time validation

`load_hooks(...)` fails loudly if any handler is misconfigured. Two distinct failure paths:

**Decorator-time (fail-fast, raised during handler import):**

- Two handlers register for the same `(decorator, target)` slot → `DuplicateHookError`.

**Validate-time (aggregated into a single `HookValidationError` so consumers fix every offending handler in one pass):**

- Unknown state target → lists the registered handler plus every known state name (top-level and nested).
- Unknown trigger target → lists every known trigger.
- Async function registered with `@add_guard` / `@replace_guard` → pytransitions does not await guard conditions; an async guard always returns a coroutine (truthy) and silently blocks every transition.
- Sync function registered with `@register_background_task` → background tasks are scheduled on the event loop and must be coroutine functions.
- Handler signature does not accept the context argument → caught by `inspect.signature`.

Either failure runs before any hook is wired, so the machine isn't left half-modified. The instance itself is still consumed by the attempt though — fix the handlers and construct a fresh machine to load again. Retrying `load_hooks()` on the same instance raises `RuntimeError`.

## 📖 API Reference

### StateMachine

```python
class StateMachine(HierarchicalGraphMachine):
    def __init__(
        self,
        states: Union[object, list[dict[str, Any]], None],
        *,
        transitions: Optional[list[dict[str, str]]] = None,
        history_size: Optional[int] = None,
        enable_last_state_recovery: bool = True,
        **kw: Any,
    )
```

**Parameters:**
- `states`: Either a container of StateGroups or a list of state dicts.
- `transitions`: List of transition dictionaries, or `[]`.
- `history_size`: Max number of entries in transition history (default 1000).
- `enable_last_state_recovery`: If True, machine can resume from last recorded state.

### Methods

**`spawn(coro: Coroutine) -> asyncio.Task`**
Start a background coroutine and track it. Auto-cancelled on fault/reset.

**`cancel_tasks() -> None`**
Cancel all tracked tasks and timeouts.

**`set_timeout(state_name: str, seconds: float, trigger_fn: Callable[[], str]) -> None`**
Schedule a trigger if state_name stays active too long.

**`record_last_state() -> None`**
Save current state for recovery.

**`get_last_state() -> Optional[str]`**
Return most recently recorded state.

**`async start() -> None`**
Run `@on_runtime_start` handlers in registration order (awaited if async), then fire the initial transition (`recover__<last_state>` if recovery is enabled and a state was recorded, else `start`), then schedule `@register_background_task` coroutines. A failure in `@on_runtime_start` aborts startup, cancels anything scheduled in the same call, and propagates.

**`async stop_background_tasks() -> None`**
Cancel every `@register_background_task` task. Named explicitly so it doesn't shadow `"stop"` if a consumer uses that as a trigger. Does NOT touch `cancel_tasks` / timeouts — `to_fault` and `stop_background_tasks` have independent lifecycles, so background tasks survive faults. Safe to call without a prior `start()`.

**`load_hooks(hooks_directory: Path, *, context_factory: Optional[Callable[[HookContext], T]] = None) -> None`**
Import every module under `hooks_directory/handlers/` so decorators self-register, validate every registration against this machine, and wire the result into pytransitions' callback / condition machinery. Missing directory is a silent no-op. Validation aggregates into a single `HookValidationError`. One-shot per machine. See [Extension Hooks](#-extension-hooks).

### Properties

**`history -> list[dict[str, Any]]`**
Full transition history with timestamps/durations.

**`get_last_history_entries(n: int) -> list[dict[str, Any]]`**
Return last n transitions.

### Decorators

**`@on_enter_state(state: State)`**
Bind function to run on entry.

**`@on_exit_state(state: State)`**
Bind function to run on exit.

**`@auto_timeout(seconds: float, trigger: Trigger)`**
Auto-trigger if timeout expires.

**`@guard(*triggers: Trigger)`**
Guard transition; blocks if function returns False.

**`@on_state_change`**
Global callback `(old_state, new_state, trigger)` fired after each transition.

### Extension Hook API

Exported from `state_machine.hooks`. See [Extension Hooks](#-extension-hooks) for the full guide.

**`@dataclass(frozen=True) class HookContext`**
Argument passed to every hook handler. Carries `state_machine: StateMachine`. Consumers extend it with a subclass that adds their integration root and supply a `context_factory` to `load_hooks(...)`.

**`@add_on_enter_state(state)`** / **`@replace_on_enter_state(state)`**
Run a handler when `state` is entered. `add` composes with class-level `@on_enter_state` (hook first, then class). `replace` runs instead of the class binding. Sync or async; async hooks are scheduled on the running event loop and do not block the transition. If no event loop is running, the coroutine is closed and a warning is logged.

**`@add_on_exit_state(state)`** / **`@replace_on_exit_state(state)`**
Symmetric to entry hooks, fired on exit.

**`@add_guard(trigger)`** / **`@replace_guard(trigger)`**
Gate a trigger. `add` composes with class-level `@guard` (all must return `True`). `replace` runs instead of every class-level guard. **Sync only** — async guards are rejected at boot because pytransitions does not await condition callables.

**`@on_runtime_start`**
Run once on `state_machine.start()` in registration order. Sync or async. Any failure aborts startup before the initial transition fires and propagates from `start()`.

**`@register_background_task`**
Schedule a coroutine on the runtime loop after `@on_runtime_start` completes. Must be `async def`; sync functions are rejected at boot. Cancelled only by `stop_background_tasks()` — bg tasks survive `to_fault` so MQTT subscribers, telemetry loggers, and tower-light renderers keep running to observe and report the fault. Exceptions are logged and the machine continues.

**`HookValidationError(RuntimeError)`**
Raised by `load_hooks(...)` listing every failing handler. Read `.errors` for the list.

**`DuplicateHookError(Exception)`**
Raised during decorator import when two handlers register for the same `(decorator, target)` slot.

### RPC Bundle

```python
def build_state_machine_bundle(
    sm: StateMachine,
    *,
    include_state_actions: bool = True,
    include_history_action: bool = True,
    triggers: Optional[Sequence[str]] = None,
) -> RpcBundle
```

Builds an RPC bundle exposing the state machine via Connect-style RPCs:
- `GetState` - Returns current and last known state
- `GetHistory` - Returns transition history
- `Trigger_<TriggerName>` - One RPC per trigger (PascalCase naming)

The bundle can be registered with a `VentionApp` using `app.register_rpc_plugin(bundle)`.

## 🔍 Troubleshooting & FAQ

- **Transitions blocked unexpectedly** → Check guard conditions.
- **Callbacks not firing** → Only successful transitions trigger them.
- **State not restored after restart** → Ensure `enable_last_state_recovery=True`.
- **RPC actions not available** → Ensure `app.finalize()` is called after registering bundles.
