Metadata-Version: 2.4
Name: nodus-workflow
Version: 0.1.0
Summary: Declarative DAG workflows with WAIT/RESUME, priority scheduling, and distributed rehydration
Author: Shawn Knight
License: MIT
Project-URL: Homepage, https://github.com/Masterplanner25/nodus-workflow
Project-URL: Repository, https://github.com/Masterplanner25/nodus-workflow
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: persistence
Requires-Dist: sqlalchemy>=2.0.0; extra == "persistence"
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21; extra == "dev"
Dynamic: license-file

# nodus-workflow

**Declarative DAG workflows with WAIT/RESUME, priority scheduling, and
distributed rehydration for Nodus AI systems.**

Standalone workflow primitives: define DAGs, execute them with priority-queued
scheduling, suspend nodes on events, and rehydrate WAITING runs after process
restart. No required external dependencies — pure stdlib.

> **Note on naming:** This is the standalone `nodus-workflow` package
> (`C:\dev\nodus-workflow`). The nodus-lang runtime also ships an in-tree
> `nodus_workflow` package (`src/nodus_workflow/`) with a full HTTP/CLI
> server, SQLite store, and nodus-lang integration. The two are distinct.

> **Status:** v0.1.0 — prepared, not yet published.

---

## Install

```bash
pip install nodus-workflow
```

---

## What it provides

| Component | Purpose |
|---|---|
| `FlowDefinition` | DAG: nodes, edges, default retry, timeout |
| `FlowNode` | One node with handler_id, config, optional retry override |
| `FlowEdge` | Directed edge with optional condition function |
| `FlowRun` / `InMemoryRunStore` | Run state + thread-safe in-memory store |
| `SchedulerEngine` | Priority queue (high/normal/low) + WAIT/RESUME |
| `FlowExecutor` | Orchestrates start(), resume(), handler registration |
| `FlowRehydrator` | Re-registers WAITING runs after process restart |
| `WorkflowWaitSignal` | Raise inside a handler to suspend node execution |

---

## Quick start

```python
import asyncio
from nodus_workflow import (
    FlowDefinition, FlowNode, FlowEdge, FlowExecutor,
    InMemoryRunStore, SchedulerEngine,
)

# Define handlers
async def fetch_data(ctx):
    return {"data": "fetched"}

async def process_data(ctx):
    return {"processed": ctx["state"].get("data")}

# Build the DAG
flow = FlowDefinition(
    name="my-pipeline",
    nodes=[
        FlowNode(id="fetch",   handler_id="fetch_data"),
        FlowNode(id="process", handler_id="process_data"),
    ],
    edges=[
        FlowEdge(from_node="fetch", to_node="process"),
    ],
)

# Execute
store = InMemoryRunStore()
scheduler = SchedulerEngine()
executor = FlowExecutor(store=store, scheduler=scheduler)
executor.register_handler("fetch_data",   fetch_data)
executor.register_handler("process_data", process_data)

run = await executor.start(flow, initial_state={})
print(run.status)   # FlowStatus.COMPLETED
```

---

## WAIT/RESUME semantics

```python
from nodus_workflow import WorkflowWaitSignal

async def approval_node(ctx):
    raise WorkflowWaitSignal(
        event_type="approval.granted",
        correlation_key=ctx["run_id"],
    )

# Later, when the event fires:
await executor.resume(run_id, event_payload={"approver": "alice"})
```

When a node raises `WorkflowWaitSignal`, the run transitions to `WAITING`
and is parked in the scheduler until `notify_event` or `resume` is called.

---

## SchedulerEngine

```python
from nodus_workflow import SchedulerEngine
from nodus_workflow.run import FlowStatus

scheduler = SchedulerEngine()

# Schedule with priority
scheduler.schedule(run_id, priority="high")      # high / normal / low
scheduler.schedule(run_id, priority="normal")

next_run_id = scheduler.pop()   # returns highest-priority pending run | None

# WAIT/RESUME
scheduler.wait_for_event(run_id, event_type="approval.granted", key="k")
scheduler.notify_event(event_type="approval.granted", key="k")  # re-queues run
scheduler.cancel_wait(run_id)
```

---

## FlowRehydrator

```python
from nodus_workflow import FlowRehydrator, InMemoryRunStore

store = InMemoryRunStore()
rehydrator = FlowRehydrator(store=store, scheduler=scheduler)

# On process startup — re-register all WAITING runs
rehydrator.rehydrate()
```

---

## FlowStatus transitions

```
PENDING → RUNNING → WAITING → (event fires) → EXECUTING → COMPLETED
                                                          ↘ FAILED
```

---

## Design

- **No required dependencies.** Pure stdlib (`asyncio`, `threading`, `heapq`,
  `dataclasses`, `datetime`, `uuid`).
- **Protocol-based handlers.** Any async callable `(context: dict) → dict`
  satisfies `NodeHandler`.
- **Thread-safe.** `SchedulerEngine` and `InMemoryRunStore` use `threading.Lock`.
- **Separate from nodus-lang.** No nodus-lang import required — use as a
  standalone workflow engine or integrate with any Python application.

---

## Development

```bash
pip install -e ".[dev]"
pytest tests/ -q
```

---

## License

MIT — see [LICENSE](LICENSE).
