Skip to content

Human-in-the-Loop

MeshFlow's HITL system pauses workflow execution at designated nodes, persists a checkpoint, and resumes cleanly after a human provides a decision — across process restarts and arbitrary time delays.

from meshflow.core.state import StateGraph, node, interrupt, Command

@node
def review_node(state: dict) -> dict:
    if state.get("confidence", 1.0) < 0.8:
        interrupt("Please review the draft and approve or reject.")
    return {"approved": True}

HumanInLoopConfig in WorkflowDefinition

Configure HITL at the policy level so it applies automatically to any node whose risk tier meets the threshold:

from meshflow.core.schemas import HumanInLoopConfig, RiskTier, Policy

policy = Policy(
    human_in_loop=HumanInLoopConfig(
        enabled          = True,
        tier_threshold   = RiskTier.IRREVERSIBLE,  # pause on irreversible nodes
        timeout_s        = 86400.0,                # 24-hour durable pause
        approval_webhook = "https://hooks.example.com/meshflow",
    )
)

Or in YAML:

policy:
  mode: regulated
  budget_usd: 2.00
  human_approval_tier: irreversible   # none | read_only | internal | external_io | irreversible

nodes:
  approval:
    kind: human   # always pauses; never needs a risk tier

interrupt() in StateGraph

Call interrupt(value) inside any node to pause graph execution. The value is surfaced to the caller as InterruptedError.value.

from meshflow.core.state import StateGraph, node, interrupt, Command

@node
async def human_review(state: dict) -> dict:
    if state.get("needs_review"):
        interrupt({
            "prompt": "Review the draft and approve or reject.",
            "draft":  state.get("draft", ""),
        })
    return {"approved": True}

Resuming with Command

compiled = graph.compile(checkpointer=saver)

# First run — raises InterruptedError at "human_review"
try:
    result = await compiled.run({"query": "...", "needs_review": True})
except InterruptedError as exc:
    print("Paused at:", exc.node)          # "human_review"
    print("Payload:",   exc.value)         # {"prompt": "...", "draft": "..."}
    saved_state = exc.state

# Human reviews and decides; then resume:
result = await compiled.run(
    saved_state,
    resume=Command(
        resume = "approved",               # replaces the interrupt payload
        goto   = None,                     # None = continue from interrupted node
        update = {"approved": True},       # extra state updates to apply first
    ),
)

Command Fields

Field Type Description
resume Any Value that replaces the interrupt payload
goto str \| None Jump to a specific node instead of the interrupted one
update dict Extra state updates merged before resuming

HITL in WorkflowDefinition

For WorkflowDefinition-based workflows, use kind: human nodes or rely on automatic tier-based pausing:

name: contract_review
version: "1"

policy:
  mode: regulated
  budget_usd: 1.00
  human_approval_tier: irreversible

nodes:
  drafter:
    kind: native
    role: executor
    risk: read_only

  legal_approval:
    kind: human           # always pauses here

  publisher:
    kind: native
    role: executor
    risk: irreversible    # pauses automatically due to human_approval_tier

edges:
  - drafter -> legal_approval
  - legal_approval -> publisher

Resuming a WorkflowDefinition Run

from meshflow.core.workflow import WorkflowDefinition, HumanDecision
from meshflow.core.ledger import ReplayLedger

wf     = WorkflowDefinition.from_yaml("contract_review.yaml")
result = await mesh.run_workflow(wf, task="Review NDA v3", ledger_db="runs.db")
# result.paused_nodes == ["legal_approval"]

decision = HumanDecision(approved=True, comment="LGTM", decided_by="jane.doe@example.com")
ledger   = ReplayLedger("runs.db")
runtime  = mesh._make_runtime(run_id=result.run_id)
final    = await wf.resume(run_id=result.run_id, decision=decision, ledger=ledger, runtime=runtime)
assert final.completed is True

Webhook Notifications for Pending Approvals

When approval_webhook is set in HumanInLoopConfig, MeshFlow POSTs a JSON payload to that URL when any node pauses for human input:

{
  "run_id":     "abc-123",
  "node_id":    "legal_approval",
  "workflow":   "contract_review",
  "payload":    {"prompt": "Review NDA v3", "confidence": 0.72},
  "approve_url": "https://your-app.com/meshflow/approve?run_id=abc-123",
  "reject_url":  "https://your-app.com/meshflow/reject?run_id=abc-123"
}

meshflow approve CLI

Approve or reject a paused run without writing code:

# Approve
meshflow approve --run-id abc-123 --db runs.db --comment "LGTM"

# Reject
meshflow approve --run-id abc-123 --db runs.db --approved false --comment "Needs revision"

# List all paused runs
meshflow approve list --db runs.db

Full HITL Workflow Example

import asyncio
from typing import Annotated, TypedDict
from meshflow.core.state import (
    StateGraph, END, node, interrupt, Command, last, SqliteSaver
)

class ContractState(TypedDict):
    contract_text: str
    summary:       Annotated[str, last]
    approved:      Annotated[bool, last]
    final_text:    Annotated[str, last]

@node
async def summarize(state: dict) -> dict:
    text = state["contract_text"]
    return {"summary": f"Summary of {len(text)}-char contract: ..."}

@node
async def legal_review(state: dict) -> dict:
    # Always require human review for contracts
    interrupt({
        "action":  "Please review and approve or reject this summary.",
        "summary": state.get("summary", ""),
    })
    # Execution resumes here after Command(resume=...) is passed
    return {}

@node
async def finalize(state: dict) -> dict:
    if not state.get("approved", False):
        return {"final_text": "[REJECTED]"}
    return {"final_text": f"APPROVED: {state['summary']}"}

def route_after_review(state: dict) -> str:
    return "finalize"    # always continue; finalize reads state["approved"]

graph = StateGraph(ContractState)
graph.add_node("summarize",    summarize)
graph.add_node("legal_review", legal_review)
graph.add_node("finalize",     finalize)
graph.add_edge("summarize",    "legal_review")
graph.add_conditional_edges("legal_review", route_after_review, {"finalize": "finalize"})
graph.add_edge("finalize",     END)
graph.set_entry_point("summarize")

saver    = SqliteSaver("contracts.db")
compiled = graph.compile(checkpointer=saver)

initial = {
    "contract_text": "This agreement is between Party A and Party B...",
    "summary":       "",
    "approved":      False,
    "final_text":    "",
}

# --- First run: pauses at legal_review ---
try:
    result = await compiled.run(initial, config={"thread_id": "contract-001"})
except InterruptedError as exc:
    print("Paused:", exc.value["action"])
    paused_state = exc.state

    # Simulate human approval
    result = await compiled.run(
        paused_state,
        config  = {"thread_id": "contract-001"},
        resume  = Command(resume="approved", update={"approved": True}),
    )
    print(result["final_text"])   # "APPROVED: Summary of ..."