Metadata-Version: 2.4
Name: workflow-middleware
Version: 0.1.0
Summary: Turn any LangChain create_agent into a multi-agent workflow orchestrator (Claude Code Workflow, ported).
Project-URL: Homepage, https://github.com/emanueleielo/workflow-middleware
Project-URL: Documentation, https://github.com/emanueleielo/workflow-middleware#readme
Project-URL: Repository, https://github.com/emanueleielo/workflow-middleware
Project-URL: Issues, https://github.com/emanueleielo/workflow-middleware/issues
Author-email: Emanuele Ielo <emanuele@oraion.com>
License-Expression: MIT
License-File: LICENSE
Keywords: agent,claude,cost-optimization,langchain,langgraph,llm,middleware,multi-agent,orchestration,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: langchain-core>=0.3
Requires-Dist: langchain>=1.2
Requires-Dist: langgraph<2,>=1.0
Requires-Dist: pydantic>=2
Provides-Extra: anthropic
Requires-Dist: langchain-anthropic>=1.4; extra == 'anthropic'
Provides-Extra: dev
Requires-Dist: mypy>=1.13; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Description-Content-Type: text/markdown

<h1 align="center">
  <br>
  <code>workflow-middleware</code>
  <br>
</h1>

<h3 align="center">Claude Code's Workflow tool as a drop-in LangChain middleware.</h3>

<p align="center">
  <a href="https://pypi.org/project/workflow-middleware"><img src="https://img.shields.io/pypi/v/workflow-middleware?style=flat-square&color=blue" alt="PyPI"></a>
  <a href="LICENSE"><img src="https://img.shields.io/badge/license-MIT-blue?style=flat-square" alt="License"></a>
  <a href="https://www.python.org/"><img src="https://img.shields.io/badge/python-3.11+-3776AB?style=flat-square&logo=python&logoColor=white" alt="Python"></a>
  <a href="https://github.com/langchain-ai/langchain"><img src="https://img.shields.io/badge/LangChain-middleware-1C3C3C?style=flat-square" alt="LangChain"></a>
</p>

<p align="center">
  <a href="#what-it-is">What it is</a> &bull;
  <a href="#how-it-works">How it works</a> &bull;
  <a href="#quick-start">Quick start</a> &bull;
  <a href="#live-demo--deep-research">Demo</a> &bull;
  <a href="#the-primitives">Primitives</a> &bull;
  <a href="#configuration">Configuration</a> &bull;
  <a href="#caveats">Caveats</a>
</p>

<p align="center">
  <img src="examples/deep-research-live/docs/deep-research.png" alt="Deep Research Live — graph and phase rail built live from real agent-spawn events" width="100%">
  <br>
  <em><a href="examples/deep-research-live/">Deep Research Live</a> — the orchestrator authors a workflow at runtime; the graph and phase rail are drawn live from real agent-spawn events.</em>
</p>

---

## What It Is

A **1:1 port of [Claude Code's Workflow tool](https://docs.claude.com/en/docs/claude-code)** as a LangChain middleware. Drop it onto any `create_agent` and the agent gains a single tool — `run_workflow` — that lets it **write a Python orchestration script and execute it**: fan out mini-agents (leaves), read their structured results, and summarise.

Like Claude Code, the orchestrator is a **real agent** (a full loop, not a single LLM call): it writes the script, runs it, reads the structured result, decides whether more work is needed, and summarises. The leaves are `create_agent` instances that **inherit the orchestrator's tools and model** by default — so the workflows the agent can write are specialised by the tools you give it. Same engine, different capabilities.

It is **opt-in, never forced**: the middleware appends discretionary guidance to the system prompt, but the model decides whether a task is worth orchestrating. On trivial tasks it just answers directly.

> [!WARNING]
> **This package executes a Python script written by the LLM.** It runs in your
> own process by default. The runtime guard (reduced builtins + an AST guard
> that blocks `__class__` / `__subclasses__` / `getattr`-style dunder escapes)
> stops the common breakouts, but it is **not OS-level isolation** — it does not
> bound CPU/memory and is not hardened against a determined adversary. Use it
> with an orchestrator model *you* control; do **not** run scripts from
> untrusted sources without a real sandbox. See **[SECURITY.md](SECURITY.md)**.

---

## How It Works

The model never sees `@task` or `@entrypoint`. It writes a plain script — a `meta = {...}` literal then `async def main(): ...` — using **only the injected primitives**. The runtime executes that script on top of the **LangGraph Functional API**:

```mermaid
flowchart TD
    A["Orchestrator (create_agent)"] -->|"writes a Python script"| B["run_workflow tool"]
    B --> C["executor: AST-validate meta\nexec with reduced builtins"]
    C --> D["@entrypoint runs main()"]
    D -->|"agent(...) calls"| E["@task-wrapped leaf"]
    E --> F["Leaf agent\n(inherits tools + model)"]
    F -->|"final text / structured object"| D
    D -->|"result + summary"| A

    style A fill:#fff,stroke:#333,color:#333
    style F fill:#2d6a4f,color:#fff
    style C fill:#c0392b,color:#fff
```

1. **Model writes the script.** A `meta` dict literal (name, description, phases) then `async def main()` using `agent / parallel / pipeline / phase / log / args / budget`.
2. **Executor validates and execs it.** `meta` is extracted via AST and `ast.literal_eval` (literal-only — names, calls, f-strings, comprehensions, starred-unpacking and conditional expressions are rejected). The body is `exec`'d with **injected globals** and a **reduced `__builtins__`** (no `__import__`, `open`, `eval`, `exec`, `compile`, `getattr`, `setattr`, `type`, no `os` / `sys`), and a **static AST guard rejects any dunder access** (`__class__` / `__subclasses__` / `__globals__` …) so the common sandbox-escape chains raise `SecurityError` before the script runs. The three obvious nondeterministic entry points (clock reader, RNG, no-arg date/datetime) are intercepted **by name** and raise — a **best-effort guard** against accidental nondeterminism that would break resume, not a sandbox (see [Caveats](#caveats)).
3. **`main()` runs under the Functional API.** It executes inside an `@entrypoint` with an in-memory checkpointer and a `thread_id`. Each `agent()` call routes through a `@task`, so leaf results are **memoized and checkpointed** — an interrupted run replays completed leaves from the checkpoint on resume.
4. **Leaves run as `create_agent` instances** that inherit the orchestrator's tools and model. With a `schema`, the leaf uses `response_format` and returns the validated object; otherwise it returns its final text. Token usage is aggregated for the budget.

The concurrency cap is **ours**: an `asyncio.Semaphore(min(16, cpu-2))` (configurable), plus a lifetime cap of 1000 leaf spawns per run. The semaphore lives host-side on the `Runtime` and never crosses the `@task` boundary.

---

## Quick Start

### Install

```bash
pip install workflow-middleware

# Or from source
pip install git+https://github.com/emanueleielo/workflow-middleware.git
```

### Offline — no keys, `FakeLeafBackend`

You can drive the runtime directly with a deterministic offline backend — no model, no network, no API key. This is `examples/basic_usage.py`:

```python
import asyncio
from workflow_middleware import FakeLeafBackend, WorkflowConfig, run_workflow_script

SCRIPT = '''\
meta = {
    "name": "demo-fanout",
    "description": "Fan out three reviewers and collect their findings",
    "phases": [{"title": "Review", "detail": "one reviewer per dimension"}],
}

async def main():
    phase("Review")
    findings = await parallel([
        (lambda d=d: agent(f"Review for {d}", label=d))
        for d in ("correctness", "security", "perf")
    ])
    return {"findings": [f for f in findings if f]}
'''

async def run() -> None:
    result, summary = await run_workflow_script(
        SCRIPT,
        args={"target": "auth.py"},
        backend=FakeLeafBackend(),
        config=WorkflowConfig(),
        thread_id="demo-1",
    )
    print("result:", result)
    print("summary:", summary)

if __name__ == "__main__":
    asyncio.run(run())
```

This exercises meta extraction, the `parallel` barrier, the error→`None` filter, and budget aggregation — and runs green with zero env vars.

### Real model — plug-and-play middleware

Attach `WorkflowMiddleware` to a real `create_agent`. The leaves inherit the orchestrator's tools and model. This is `examples/real_usage.py`:

```python
from langchain.agents import create_agent
from workflow_middleware import WorkflowMiddleware, WorkflowConfig

agent_graph = create_agent(
    "claude-sonnet-4-5",          # any model id; needs ANTHROPIC_API_KEY
    tools=[...],                  # the orchestrator's tools; leaves inherit them
    middleware=[WorkflowMiddleware(WorkflowConfig(include_patterns=True))],
)

if __name__ == "__main__":
    out = agent_graph.invoke({"messages": [
        {"role": "user", "content": "Thoroughly audit auth.py for security bugs using a workflow."}
    ]})
    print(out["messages"][-1].content)
```

The middleware appends the guidance, the model decides to call `run_workflow`, the leaves inherit tools + model, and the structured result is summarised back to you.

---

## Live Demo — Deep Research

A full browser demo lives in **[`examples/deep-research-live/`](examples/deep-research-live/)** (screenshot above): you type a research question, a real **orchestrator agent** authors a workflow script, **leaf agents** run live **DuckDuckGo** searches, and a graph is drawn **live from real agent-spawn events** while a cited markdown report streams back. It runs in a keyless `fake` mode (canned script, fully offline) or a real `anthropic` mode (Claude + live web search).

```bash
cd examples/deep-research-live
uv venv --python 3.13 .venv && source .venv/bin/activate
uv pip install -r requirements.txt          # installs the package via -e ../..

# Keyless — animates the whole UI offline, no API key, no network:
DEEP_RESEARCH_MODE=fake python -m server.app

# Real — orchestrator authors the script, leaves do real searches (needs a key):
cp .env.example .env        # then set ANTHROPIC_API_KEY=...
python -m server.app
```

Then open **<http://127.0.0.1:8000>** and ask a question. Full walkthrough and the wire protocol are in the demo's [README](examples/deep-research-live/README.md) and [PROTOCOL.md](examples/deep-research-live/PROTOCOL.md).

---

## The Primitives

The model's script sees **only** these. No imports, no filesystem, no `@task`/`@entrypoint`.

| Primitive | Signature | Semantics |
|---|---|---|
| `agent` | `await agent(prompt, schema=None, agent_type=None, label=None)` | Run one leaf agent. Acquires the concurrency semaphore, checks the lifetime cap, calls the backend, aggregates tokens. Returns the validated object when `schema` is given, else the final text. A bare `await agent(...)` **propagates** errors. |
| `parallel` | `await parallel(thunks)` | **BARRIER.** Gather all thunks; a thunk that raises becomes `None` in its slot (filter it out). |
| `pipeline` | `await pipeline(items, *stages)` | **NO barrier.** Each item runs independently through every stage as `stage(prev, item, index)`. A stage that raises drops *that* item to `None` and skips its remaining stages. |
| `phase` | `phase(title)` | Record a progress group (for the summary / streaming). |
| `log` | `log(message)` | Append a progress line. |
| `args` | `args` | The value passed verbatim to `run_workflow` (the orchestrator-supplied input). |
| `budget` | `budget` | Token-ceiling handle: `budget.total`, `budget.spent()`, `budget.remaining()`. Once `spent >= total`, the next `agent()` raises `BudgetExceeded`. `total=None` ⇒ unlimited. |

Past the lifetime cap (`max_agents_total`, default 1000) `agent()` raises `AgentCapExceeded`. The **error→`None`** policy is applied by `parallel` / `pipeline` only — not by `agent` itself.

---

## Opt-In Behaviour

Two surfaces, by design (matching Claude Code):

- **Tool description** (`RUN_WORKFLOW_DESCRIPTION`) keeps a hard gate — use it *only when explicitly opted in*.
- **System-prompt guidance** (`WORKFLOW_GUIDANCE`) is **discretionary**: appended to the orchestrator's prompt so the model knows the capability exists, but it chooses whether to use it.

The guidance is appended, never overwritten — the original system message is preserved with a `"\n\n"` separator. Toggle and tune it via `WorkflowConfig`:

```python
WorkflowConfig(
    inject_workflow_guidance=True,   # False => system message untouched
    include_patterns=True,           # also append the reusable PATTERNS library
    system_prompt="...",             # override the appended text entirely
)
```

---

## Run & Event Tracking

Every completed `run_workflow` execution is recorded on the middleware **instance** (keyed by thread, the same instance-level pattern `AdvisorMiddleware` uses — LangChain 1.2.x ignores `Command(update={...})` returned from middleware, so the run summary is stored host-side rather than in graph state). After each run, the middleware appends one [`WorkflowEvent`](workflow_middleware/state.py) — the executed script's `name`, the number of `agents_spawned`, the aggregated `tokens_spent`, the `thread_id`, and whether `main()` returned without raising (`ok`) — and bumps a lifetime run counter.

Read the history back off the instance you attached:

```python
mw = WorkflowMiddleware(WorkflowConfig())
agent_graph = create_agent("claude-sonnet-4-5", tools=[...], middleware=[mw])

agent_graph.invoke({"messages": [{"role": "user", "content": "Audit auth.py with a workflow."}]})

for event in mw.get_events():        # list[WorkflowEvent] for the current thread
    print(event["name"], event["agents_spawned"], event["tokens_spent"], event["ok"])

print("workflows run this session:", mw.get_total_runs())
```

`get_events(thread_id=None)` returns the events for a thread (defaulting to the current one); `get_total_runs(thread_id=None)` returns the lifetime count. `WorkflowEvent` and `WorkflowState` are exported from the package top level.

---

## Configuration

### `WorkflowConfig`

| Parameter | Type | Default | Description |
|---|---|---|---|
| `leaf_model` | `str \| BaseChatModel \| None` | `None` | Model for leaf agents. `None` ⇒ inherit the orchestrator's model. |
| `max_concurrency` | `int \| None` | `None` | Cap on concurrent `agent()` calls. `None` ⇒ `min(16, cpu_count()-2)` resolved at runtime. |
| `max_agents_total` | `int` | `1000` | Lifetime cap on total `agent()` spawns per run. |
| `budget` | `BudgetConfig` | `BudgetConfig()` | Token budget controls. |
| `inject_workflow_guidance` | `bool` | `True` | Append `WORKFLOW_GUIDANCE` to the system prompt in `wrap_model_call`. |
| `include_patterns` | `bool` | `False` | Also append the `PATTERNS` library to the injected guidance. |
| `system_prompt` | `str \| None` | `None` | Override text appended instead of `WORKFLOW_GUIDANCE`. |
| `leaf_prompt_addendum` | `str \| None` | `None` | Override `LEAF_PROMPT_ADDENDUM` appended to each leaf. |
| `sandbox` | `SandboxConfig` | `SandboxConfig()` | OS-isolation options (Phase 4 stub — see Caveats). |

### `BudgetConfig`

| Parameter | Type | Default | Description |
|---|---|---|---|
| `total` | `int \| None` | `None` | Lifetime token ceiling across all leaves in one run. `None` ⇒ unlimited. |

### `SandboxConfig`

| Parameter | Type | Default | Description |
|---|---|---|---|
| `enabled` | `bool` | `False` | When `True`, route exec through the sandbox seam. **Currently a no-op passthrough.** |
| `backend` | `"none" \| "srt"` | `"none"` | Isolation backend. `"srt"` (Seatbelt/bubblewrap) reserved for Phase 4. |

---

## Leaf Backends

`agent()` calls go through a `LeafBackend` — a small protocol with a single `async def run(...) -> LeafResult`.

- **`FakeLeafBackend`** — deterministic, offline, no keys. Echoes a stable transform of the prompt and, when `schema` is a pydantic model, fills required fields with deterministic placeholders so structured paths run offline. Used by the examples and tests.
- **`LangChainLeafBackend`** — the real one. Builds a leaf via `create_agent` that **inherits** the orchestrator's tools and model, appends `LEAF_PROMPT_ADDENDUM` to the inherited system prompt, passes `response_format=schema` when given (reading `result["structured_response"]`), extracts `result["messages"][-1].text` otherwise, and aggregates `usage_metadata` for the budget. Optional named subagents map `agent_type` → a spec that inherits omitted keys.

`LeafResult` is a plain `@dataclass` (`text`, `structured`, `tokens`) — msgpack-safe, so it can cross the `@task` checkpoint boundary.

---

## Caveats

**The in-process guard stops common escapes — it is not OS isolation.** The script runs with reduced builtins (`getattr` / `setattr` / `type` / `__import__` / `open` / `eval` / `exec` all removed) **and** a static AST guard that rejects any dunder attribute or name (`__class__`, `__bases__`, `__subclasses__`, `__globals__`, `__builtins__`, …). Together these block the classic `().__class__.__bases__[0].__subclasses__()` and `getattr(type(x), "__globals__")` breakout chains — a dunder reference raises `SecurityError` *before* the script runs. This is a real barrier against the one-line escapes, **but it does not bound CPU/memory/recursion and is not a substitute for a process sandbox.**

**The OS sandbox is a documented stub.** `sandbox.py` is a **no-op passthrough** today. The executor already routes its compile/exec step through `sandbox.run_in_sandbox(...)`, so the OS-isolation boundary exists as a future seam — but no OS isolation is enforced yet; the model-written code runs **in-process**. The planned `srt` (Seatbelt/bubblewrap) isolation plus the IPC bridge is a future phase. Until that lands, **keep `sandbox.enabled=False`** and only run scripts whose orchestrator you control; with `enabled=True` and a non-`none` backend, `run_in_sandbox` raises `NotImplementedError`. See [SECURITY.md](SECURITY.md).

**What is real today:** the model-written-script engine on the Functional API, the seven primitives, the AST `meta` validator, the reduced builtins + best-effort determinism guard, both leaf backends, the budget and concurrency caps, the run/event tracking on the middleware instance (see [Run & event tracking](#run--event-tracking)), and the opt-in middleware injection. **What is not:** OS isolation (stub above), the live `/workflows` TUI (event/log streaming only), and prompt-cache economics.

**The determinism guard is best-effort, not a security boundary.** Resume requires that the same script + same args replays the same sequence of `agent()` calls, so reading the clock, drawing randomness, or constructing a no-arg date in the body is forbidden. The runtime intercepts those three obvious entry points **by name** and raises on them — which stops the *accidental* nondeterminism that would silently break resume. It is **not a sandbox**: a deliberately crafted script can still reach the real clock via object introspection, so do not treat the raise as a wall. Hard determinism / isolation guarantees need the OS-level sandbox (the stub above / Phase 4). Pass timestamps in via `args` instead and stamp results after the workflow returns. Memoization is *interrupt→resume within a thread*, not "re-running a finished thread skips work."

---

## Architecture

```
workflow_middleware/
├── __init__.py        # Public API: WorkflowMiddleware, WorkflowConfig, run_workflow_script, ...
├── config.py          # WorkflowConfig + BudgetConfig + SandboxConfig dataclasses
├── state.py           # WorkflowState + WorkflowEvent TypedDicts
├── prompts.py         # RUN_WORKFLOW_DESCRIPTION, WORKFLOW_GUIDANCE, LEAF_PROMPT_ADDENDUM, ...
├── runtime.py         # Runtime + the injected primitives + Budget
├── executor.py        # AST meta-extraction, reduced-builtins exec, @entrypoint/@task wiring
├── leaves.py          # LeafBackend protocol, FakeLeafBackend, LangChainLeafBackend
├── sandbox.py         # Phase 4 OS-isolation seam (no-op stub today)
├── middleware.py      # WorkflowMiddleware: run_workflow tool + opt-in guidance
└── py.typed           # PEP 561 type marker
```

---

## Development

```bash
# Install with dev dependencies
pip install -e ".[dev,anthropic]"

# Run tests
pytest

# Lint
ruff check workflow_middleware/

# Type check
mypy workflow_middleware/
```

---

## License

[MIT](LICENSE) — Emanuele Ielo
