Skip to content

YAML Workflows

WorkflowDefinition is a governed, graph-topological workflow that can be loaded from YAML, executed with full audit trails, and round-tripped back to YAML for version control.

from meshflow.core.workflow import WorkflowDefinition
from meshflow.core.mesh import Mesh

wf     = WorkflowDefinition.from_yaml("pipeline.yaml")
mesh   = Mesh()
result = await mesh.run_workflow(wf, task="Summarise Q3 earnings")
print(result.output)

from_yaml(path, node_registry=None)

Loads a WorkflowDefinition from a YAML file. The file's SHA-256 is stored on wf.yaml_sha256 for exact-replay version pinning.

registry = {
    "crews.market_research": my_crewai_crew,
    "graphs.fact_check":     my_langgraph_graph,
    "agents.custom_fn":      my_python_callable,
}
wf = WorkflowDefinition.from_yaml("mesh.yaml", node_registry=registry)

Full YAML Schema

name: research_pipeline        # workflow name (required)
version: "2"                   # arbitrary version string

metadata:                      # free-form user metadata
  owner: platform-team
  ticket: PLAT-1234

policy:
  mode: standard               # dev | standard | regulated | legal-critical | hipaa
  budget_usd: 2.00             # hard spend cap; aborts if exceeded
  budget_tokens: 500000
  max_steps: 30
  timeout_s: 300
  enable_guardian: true        # prompt-injection scanner on every node
  enable_collusion_audit: true
  enable_uncertainty: true
  human_approval_tier: irreversible  # none | read_only | internal | external_io | irreversible
  max_forecast_usd: 1.50       # pre-run cost gate; 0 = disabled
  max_replans: 3               # dynamic replanning limit

nodes:
  planner:
    kind: native               # native | python | crewai | langgraph | autogen | human | http | subgraph
    role: planner              # planner | researcher | executor | critic
    model: claude-sonnet-4-6   # override model tier
    risk: read_only            # read_only | internal | external_io | irreversible
    timeout_s: 60
    retry_on_fail: true
    max_retries: 2
    output_schema:             # JSON schema for structured output validation
      type: object
      required: [plan]
      properties:
        plan: {type: string}

  researcher:
    kind: python
    ref: agents.research_fn    # resolved via node_registry

  fact_check:
    kind: langgraph
    ref: graphs.fact_check

  approval:
    kind: human                # always pauses for human input

  publisher:
    kind: native
    role: executor

edges:
  - planner -> researcher                  # shorthand
  - researcher -> fact_check
  - from: fact_check                       # long form — supports conditions
    to: approval
    condition: "confidence < 0.8"          # Python expression; available: output, content, confidence
  - from: fact_check
    to: publisher
    condition: "confidence >= 0.8"

loop_edges:                    # back-edges for iterative refinement
  - from: fact_check
    to: researcher
    condition: "confidence < 0.5"
    max_iterations: 5

entry: planner                 # explicit entry node (default: first node declared)
terminal:                      # nodes that end the workflow
  - publisher

compliance:                    # optional real-time compliance enforcement
  frameworks: [hipaa, sox]
  block_on_violation: true

context_bus:                   # fan-in merge strategies for parallel branches
  merge_strategies:
    summary: append            # overwrite | append | select_highest_confidence | logical_and | logical_or

to_yaml(path=None)

Round-trip export — works on any WorkflowDefinition whether built from YAML or the Python API.

wf = WorkflowDefinition.from_yaml("pipeline.yaml")
# ... modify nodes or edges ...
yaml_str = wf.to_yaml()                  # in-memory string
wf.to_yaml("pipeline_v2.yaml")           # write to disk

CLI

# Run a workflow YAML against a task
meshflow run pipeline.yaml --task "Summarise Q3 earnings"

# Diff two YAML versions
meshflow diff pipeline_v1.yaml pipeline_v2.yaml

Crew YAML (kind: crew)

Wrap a CrewAI crew in the MeshFlow governance plane by referencing it via node_registry:

nodes:
  market_research:
    kind: crewai
    ref: crews.market_research   # resolved via node_registry at load time
    risk: external_io
import my_crews
wf = WorkflowDefinition.from_yaml("crew_pipeline.yaml", {
    "crews.market_research": my_crews.market_research_crew,
})

@workflow Decorator

Makes any factory function portable and CI-diffable:

from meshflow.core.workflow_decorator import workflow
from meshflow.core.workflow import WorkflowDefinition
from meshflow.core.node import MeshNode, NodeKind

@workflow
def research_pipeline():
    wf = WorkflowDefinition(name="research", version="2")
    wf.add_node(MeshNode(id="planner",    kind=NodeKind.NATIVE))
    wf.add_node(MeshNode(id="researcher", kind=NodeKind.NATIVE))
    wf.add_node(MeshNode(id="writer",     kind=NodeKind.NATIVE))
    wf.add_edge("planner", "researcher")
    wf.add_edge("researcher", "writer")
    wf.set_terminal("writer")
    return wf

# Export to YAML (versionable in git)
research_pipeline.to_yaml("pipelines/research.yaml")

# Round-trip load
wf = research_pipeline.load("pipelines/research.yaml")

# CI diff between versions
diff = research_pipeline.diff("pipelines/v1/research.yaml", "pipelines/v2/research.yaml")
print(diff.summary())
if diff.has_breaking_changes:
    raise SystemExit("Breaking pipeline changes detected")

# Get the live WorkflowDefinition
wf = research_pipeline()        # or research_pipeline.build()

WorkflowProxy Methods

Method Description
to_yaml(path=None) Export YAML string; optionally write to file
load(path, node_registry=None) Load WorkflowDefinition from YAML
diff(path_a, path_b) Compare two YAML versions; returns DiffResult
schema() Return node topology as JSON-serialisable dict
build() Materialise the WorkflowDefinition (alias for calling the proxy)

Fan-Out / Fan-In Example

Parallel branches with automatic join:

name: parallel_research
version: "1"

policy:
  budget_usd: 3.00
  max_steps: 20

nodes:
  planner:      {kind: native, role: planner}
  branch_eu:    {kind: python, ref: agents.eu_research}
  branch_us:    {kind: python, ref: agents.us_research}
  branch_apac:  {kind: python, ref: agents.apac_research}
  synthesizer:  {kind: native, role: executor}

edges:
  - planner -> branch_eu
  - planner -> branch_us
  - planner -> branch_apac
  - branch_eu   -> synthesizer
  - branch_us   -> synthesizer
  - branch_apac -> synthesizer

context_bus:
  merge_strategies:
    findings: append   # all three branches' findings are concatenated

terminal:
  - synthesizer

branch_eu, branch_us, and branch_apac run concurrently via asyncio.gather. synthesizer runs after all three complete.