Coverage for agentos/workflow/__init__.py: 0%
510 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:36 +0800
1"""
2AgentOS Workflow DSL — Declarative multi-agent workflow definition language.
4v1.14.4: YAML/JSON-based DSL for defining complex multi-agent pipelines with
5 sequential, parallel, conditional, loop, and fan-out/fan-in patterns.
7Key features:
8- YAML/JSON declarative workflow definitions
9- Topology validation and cycle detection
10- Sequential, parallel, conditional, loop, sub-workflow patterns
11- Built-in fan-out/fan-in via agentos.core.parallel
12- Workflow execution engine with real-time progress
13- Dry-run mode for validation without execution
14- Visual DAG export (Mermaid/Graphviz)
15- Error recovery strategies (retry, fallback, skip, escalate)
16"""
18import asyncio
19import json
20import logging
21from abc import ABC, abstractmethod
22from dataclasses import dataclass, field
23from enum import Enum, auto
24from pathlib import Path
25from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Set, Type, Union
27import yaml
29logger = logging.getLogger(__name__)
32# ---------------------------------------------------------------------------
33# Enums
34# ---------------------------------------------------------------------------
36class StepType(Enum):
37 """Types of workflow steps."""
38 TASK = "task" # Single agent task
39 SEQUENTIAL = "sequential" # Run children in sequence
40 PARALLEL = "parallel" # Run children in parallel
41 CONDITIONAL = "conditional" # Branch based on condition
42 LOOP = "loop" # Repeat children until condition
43 SUB_WORKFLOW = "sub" # Nested workflow
44 JOIN = "join" # Wait for all branches to complete
45 SPLIT = "split" # Fan-out to multiple agents
48class ExecutionStatus(Enum):
49 PENDING = "pending"
50 RUNNING = "running"
51 SUCCESS = "success"
52 FAILED = "failed"
53 SKIPPED = "skipped"
54 CANCELLED = "cancelled"
55 RETRYING = "retrying"
58class ErrorStrategy(Enum):
59 RETRY = "retry" # Retry the step
60 FALLBACK = "fallback" # Execute fallback step
61 SKIP = "skip" # Skip and continue
62 ESCALATE = "escalate" # Fail the workflow
63 PAUSE = "pause" # Pause for human intervention
66class ConditionOperator(Enum):
67 EQUALS = "eq"
68 NOT_EQUALS = "neq"
69 CONTAINS = "contains"
70 GREATER = "gt"
71 LESS = "lt"
72 IN = "in"
73 MATCHES = "matches" # regex
74 EXISTS = "exists"
75 EMPTY = "empty"
78# ---------------------------------------------------------------------------
79# Data structures
80# ---------------------------------------------------------------------------
82@dataclass
83class WorkflowContext:
84 """Runtime context shared across workflow steps."""
85 variables: Dict[str, Any] = field(default_factory=dict)
86 history: List[Dict[str, Any]] = field(default_factory=list)
87 errors: List[Dict[str, Any]] = field(default_factory=list)
88 metrics: Dict[str, float] = field(default_factory=dict)
89 metadata: Dict[str, str] = field(default_factory=dict)
91 def get(self, key: str, default: Any = None) -> Any:
92 """Get a variable, supporting dot-notation (e.g., 'result.output.text')."""
93 parts = key.split(".")
94 current = self.variables
95 for part in parts:
96 if isinstance(current, dict):
97 current = current.get(part, default)
98 else:
99 return default
100 return current
102 def set(self, key: str, value: Any) -> None:
103 """Set a variable, supporting dot-notation for nested dicts."""
104 parts = key.split(".")
105 current = self.variables
106 for part in parts[:-1]:
107 if part not in current:
108 current[part] = {}
109 current = current[part]
110 current[parts[-1]] = value
113@dataclass
114class StepResult:
115 """Result of a workflow step execution."""
116 step_id: str
117 status: ExecutionStatus
118 output: Any = None
119 error: Optional[str] = None
120 duration: float = 0.0
121 retries: int = 0
122 metadata: Dict[str, Any] = field(default_factory=dict)
125@dataclass
126class WorkflowStep:
127 """A single step in a workflow DAG."""
128 id: str
129 type: StepType
130 name: str = ""
131 description: str = ""
133 # Execution
134 agent: Optional[str] = None # agent_id to dispatch to
135 task: Optional[str] = None # task payload template
136 children: List["WorkflowStep"] = field(default_factory=list)
138 # Conditional
139 condition: Optional[Dict[str, Any]] = None
140 branches: Dict[str, List["WorkflowStep"]] = field(default_factory=dict)
142 # Loop
143 max_iterations: int = 100
144 loop_condition: Optional[Dict[str, Any]] = None
146 # Error handling
147 on_error: ErrorStrategy = ErrorStrategy.ESCALATE
148 max_retries: int = 3
149 retry_delay: float = 1.0
150 fallback_step: Optional["WorkflowStep"] = None
152 # Timing
153 timeout: float = 300.0
154 depends_on: List[str] = field(default_factory=list)
156 # Metadata
157 tags: List[str] = field(default_factory=list)
158 metadata: Dict[str, Any] = field(default_factory=dict)
161@dataclass
162class WorkflowDefinition:
163 """Top-level workflow definition."""
164 name: str
165 version: str = "1.0"
166 description: str = ""
167 root: Optional[WorkflowStep] = None
168 variables: Dict[str, Any] = field(default_factory=dict)
170 @property
171 def steps(self) -> List[WorkflowStep]:
172 """Compatibility: return steps list from root tree."""
173 if self.root is None:
174 return []
175 result = []
176 def _collect(s):
177 result.append(s)
178 for c in s.children or []:
179 _collect(c)
180 _collect(self.root)
181 return result
182 agents: Dict[str, Dict[str, Any]] = field(default_factory=dict)
183 defaults: Dict[str, Any] = field(default_factory=dict)
184 metadata: Dict[str, Any] = field(default_factory=dict)
186 def validate(self) -> List[str]:
187 """Validate workflow structure and return a list of issues."""
188 issues = []
189 step_ids: Set[str] = set()
191 def validate_step(step: WorkflowStep):
192 if step.id in step_ids:
193 issues.append(f"Duplicate step ID: {step.id}")
194 step_ids.add(step.id)
196 if step.type == StepType.CONDITIONAL and not step.condition:
197 issues.append(f"Conditional step '{step.id}' has no condition")
198 if step.type == StepType.TASK and not step.agent:
199 issues.append(f"Task step '{step.id}' has no agent assigned")
201 # Validate depends_on references
202 for dep in step.depends_on:
203 if dep not in step_ids:
204 issues.append(f"Step '{step.id}' depends on unknown step '{dep}'")
206 for child in step.children:
207 validate_step(child)
208 for branch_steps in step.branches.values():
209 for s in branch_steps:
210 validate_step(s)
211 if step.fallback_step:
212 validate_step(step.fallback_step)
214 if self.root:
215 validate_step(self.root)
216 else:
217 issues.append("Workflow has no root step")
219 return issues
221 def to_mermaid(self) -> str:
222 """Export workflow as a Mermaid flowchart."""
223 lines = ["graph TD"]
224 ids: Set[str] = set()
226 def add_step(step: WorkflowStep, parent_id: Optional[str] = None):
227 prefix = {StepType.PARALLEL: "[||]", StepType.CONDITIONAL: "{?}",
228 StepType.LOOP: "[/]", StepType.TASK: "[ ]",
229 StepType.JOIN: "[+]", StepType.SPLIT: "[>]"}.get(step.type, "[ ]")
230 label = step.name or step.id
231 lines.append(f" {step.id}{prefix}{label}")
233 if parent_id:
234 lines.append(f" {parent_id} --> {step.id}")
236 if step.id not in ids:
237 ids.add(step.id)
238 if step.type == StepType.CONDITIONAL:
239 for branch_name, branch_steps in step.branches.items():
240 for s in branch_steps:
241 add_step(s, step.id)
242 lines.append(f" {step.id} -- {branch_name} --> {s.id}")
243 else:
244 for child in step.children:
245 add_step(child, step.id)
247 if self.root:
248 add_step(self.root)
249 return "\n".join(lines)
252# ---------------------------------------------------------------------------
253# Condition evaluator
254# ---------------------------------------------------------------------------
256class ConditionEvaluator:
257 """Evaluate conditions against the workflow context."""
259 @staticmethod
260 def evaluate(condition: Dict[str, Any], ctx: WorkflowContext) -> bool:
261 """Evaluate a condition dict against context."""
262 if not condition:
263 return True
265 # Support AND/OR combinators
266 if "and" in condition:
267 return all(
268 ConditionEvaluator.evaluate(sub, ctx)
269 for sub in condition["and"]
270 )
271 if "or" in condition:
272 return any(
273 ConditionEvaluator.evaluate(sub, ctx)
274 for sub in condition["or"]
275 )
276 if "not" in condition:
277 return not ConditionEvaluator.evaluate(condition["not"], ctx)
279 # Single condition
280 field = condition.get("field", "")
281 op = condition.get("op", "eq")
282 value = condition.get("value")
284 actual = ctx.get(field)
285 operator = ConditionOperator(op)
287 if operator == ConditionOperator.EQUALS:
288 return actual == value
289 elif operator == ConditionOperator.NOT_EQUALS:
290 return actual != value
291 elif operator == ConditionOperator.CONTAINS:
292 return value in str(actual) if actual is not None else False
293 elif operator == ConditionOperator.GREATER:
294 try:
295 return float(actual) > float(value)
296 except (TypeError, ValueError):
297 return False
298 elif operator == ConditionOperator.LESS:
299 try:
300 return float(actual) < float(value)
301 except (TypeError, ValueError):
302 return False
303 elif operator == ConditionOperator.IN:
304 return actual in value if isinstance(value, (list, tuple, set)) else False
305 elif operator == ConditionOperator.MATCHES:
306 import re
307 try:
308 return bool(re.search(str(value), str(actual)))
309 except re.error:
310 return False
311 elif operator == ConditionOperator.EXISTS:
312 return actual is not None
313 elif operator == ConditionOperator.EMPTY:
314 return actual is None or actual == "" or actual == [] or actual == {}
316 return False
319# ---------------------------------------------------------------------------
320# Workflow Engine
321# ---------------------------------------------------------------------------
323class WorkflowEngine:
324 """Executes a WorkflowDefinition with real-time progress tracking."""
326 def __init__(
327 self,
328 agent_dispatcher: Optional[Callable] = None,
329 max_parallelism: int = 10,
330 ):
331 self._dispatcher = agent_dispatcher or self._default_dispatcher
332 self._max_parallelism = max_parallelism
333 self._ctx: Optional[WorkflowContext] = None
334 self._results: Dict[str, StepResult] = {}
335 self._progress_callbacks: List[Callable] = []
336 self._cancelled = False
337 self._semaphore = asyncio.Semaphore(max_parallelism)
339 def on_progress(self, callback: Callable[[StepResult], None]) -> None:
340 """Register a progress callback."""
341 self._progress_callbacks.append(callback)
343 async def execute(self, workflow: WorkflowDefinition) -> WorkflowContext:
344 """Execute a workflow and return the final context."""
345 issues = workflow.validate()
346 if issues:
347 raise ValueError(f"Workflow validation failed: {issues}")
349 self._ctx = WorkflowContext(variables=dict(workflow.variables))
350 self._results = {}
351 self._cancelled = False
353 if workflow.root:
354 await self._execute_step(workflow.root, self._ctx)
356 return self._ctx
358 async def dry_run(self, workflow: WorkflowDefinition) -> Dict[str, Any]:
359 """Validate a workflow without executing it."""
360 issues = workflow.validate()
361 return {
362 "valid": len(issues) == 0,
363 "issues": issues,
364 "steps": self._count_steps(workflow.root) if workflow.root else 0,
365 "mermaid": workflow.to_mermaid(),
366 }
368 def cancel(self) -> None:
369 """Cancel the running workflow."""
370 self._cancelled = True
372 async def _execute_step(
373 self, step: WorkflowStep, ctx: WorkflowContext
374 ) -> StepResult:
375 if self._cancelled:
376 return StepResult(step.id, ExecutionStatus.CANCELLED)
378 if step.id in self._results:
379 return self._results[step.id]
381 logger.info(f"[Workflow] Executing step '{step.id}' ({step.type.value})")
382 result = StepResult(step.id, ExecutionStatus.RUNNING)
384 try:
385 if step.type == StepType.TASK:
386 result = await self._run_task(step, ctx)
387 elif step.type == StepType.SEQUENTIAL:
388 result = await self._run_sequential(step, ctx)
389 elif step.type == StepType.PARALLEL:
390 result = await self._run_parallel(step, ctx)
391 elif step.type == StepType.CONDITIONAL:
392 result = await self._run_conditional(step, ctx)
393 elif step.type == StepType.LOOP:
394 result = await self._run_loop(step, ctx)
395 elif step.type == StepType.SUB_WORKFLOW:
396 result = await self._run_sub_workflow(step, ctx)
397 elif step.type == StepType.JOIN:
398 result = await self._run_join(step, ctx)
399 elif step.type == StepType.SPLIT:
400 result = await self._run_split(step, ctx)
401 else:
402 result.status = ExecutionStatus.SUCCESS
404 except asyncio.TimeoutError:
405 result.status = ExecutionStatus.FAILED
406 result.error = f"Step '{step.id}' timed out after {step.timeout}s"
407 except Exception as e:
408 result.status = ExecutionStatus.FAILED
409 result.error = str(e)
410 logger.exception(f"[Workflow] Step '{step.id}' failed: {e}")
412 # Error recovery
413 result = await self._handle_error(step, ctx, result)
415 self._results[step.id] = result
416 ctx.history.append({"step_id": step.id, "status": result.status.value,
417 "output": str(result.output)[:200] if result.output else None,
418 "error": result.error, "duration": result.duration})
420 for cb in self._progress_callbacks:
421 try:
422 cb(result)
423 except Exception:
424 pass
426 return result
428 async def _run_task(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult:
429 """Execute a single agent task."""
430 import time
431 t0 = time.time()
433 # Resolve template variables in task payload
434 payload = step.task or ""
435 if "{{" in payload:
436 payload = self._resolve_template(payload, ctx)
438 try:
439 output = await asyncio.wait_for(
440 self._dispatcher(step.agent, payload, ctx),
441 timeout=step.timeout,
442 )
443 ctx.set(f"steps.{step.id}.output", output)
444 return StepResult(
445 step.id, ExecutionStatus.SUCCESS,
446 output=output,
447 duration=time.time() - t0,
448 )
449 except asyncio.TimeoutError:
450 raise
452 async def _run_sequential(
453 self, step: WorkflowStep, ctx: WorkflowContext
454 ) -> StepResult:
455 """Run children in sequence."""
456 for child in step.children:
457 result = await self._execute_step(child, ctx)
458 if result.status == ExecutionStatus.FAILED and step.on_error == ErrorStrategy.ESCALATE:
459 return StepResult(step.id, ExecutionStatus.FAILED,
460 error=f"Child '{child.id}' failed: {result.error}")
461 return StepResult(step.id, ExecutionStatus.SUCCESS)
463 async def _run_parallel(
464 self, step: WorkflowStep, ctx: WorkflowContext
465 ) -> StepResult:
466 """Run children in parallel with semaphore control."""
467 async def bounded_execute(child):
468 async with self._semaphore:
469 return await self._execute_step(child, ctx)
471 tasks = [bounded_execute(child) for child in step.children]
472 results = await asyncio.gather(*tasks, return_exceptions=True)
474 outputs = {}
475 for child, result in zip(step.children, results):
476 if isinstance(result, Exception):
477 outputs[child.id] = {"error": str(result)}
478 else:
479 outputs[child.id] = result.output
481 ctx.set(f"steps.{step.id}.outputs", outputs)
482 return StepResult(step.id, ExecutionStatus.SUCCESS, output=outputs)
484 async def _run_conditional(
485 self, step: WorkflowStep, ctx: WorkflowContext
486 ) -> StepResult:
487 """Evaluate condition and execute the matching branch."""
488 if not step.condition:
489 return StepResult(step.id, ExecutionStatus.SKIPPED,
490 error="No condition defined")
492 matched = ConditionEvaluator.evaluate(step.condition, ctx)
493 branch_key = "true" if matched else "false"
494 branch_steps = step.branches.get(branch_key, [])
495 if not branch_steps:
496 # Try numeric/default branches
497 branch_steps = step.branches.get("default", [])
499 for child in branch_steps:
500 result = await self._execute_step(child, ctx)
501 if result.status == ExecutionStatus.FAILED:
502 return StepResult(step.id, ExecutionStatus.FAILED,
503 error=f"Branch child '{child.id}' failed: {result.error}")
505 return StepResult(step.id, ExecutionStatus.SUCCESS,
506 output={"branch": branch_key})
508 async def _run_loop(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult:
509 """Execute children in a loop until condition is false."""
510 iteration = 0
511 while iteration < step.max_iterations:
512 if self._cancelled:
513 return StepResult(step.id, ExecutionStatus.CANCELLED)
515 for child in step.children:
516 result = await self._execute_step(child, ctx)
517 if result.status == ExecutionStatus.FAILED:
518 return StepResult(step.id, ExecutionStatus.FAILED,
519 error=f"Loop iteration {iteration}: child '{child.id}' failed")
521 ctx.set(f"steps.{step.id}.iteration", iteration)
522 iteration += 1
524 # Check loop condition
525 if step.loop_condition:
526 if not ConditionEvaluator.evaluate(step.loop_condition, ctx):
527 break
529 return StepResult(step.id, ExecutionStatus.SUCCESS,
530 output={"iterations": iteration})
532 async def _run_sub_workflow(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult:
533 """Execute a nested sub-workflow."""
534 # Sub-workflow steps just execute their children
535 for child in step.children:
536 result = await self._execute_step(child, ctx)
537 if result.status == ExecutionStatus.FAILED:
538 return StepResult(step.id, ExecutionStatus.FAILED,
539 error=f"Sub-workflow child '{child.id}' failed")
540 return StepResult(step.id, ExecutionStatus.SUCCESS)
542 async def _run_join(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult:
543 """Join point — wait for specified dependencies."""
544 # Already handled by depends_on graph resolution
545 return StepResult(step.id, ExecutionStatus.SUCCESS)
547 async def _run_split(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult:
548 """Fan-out to multiple agents."""
549 children_results = await self._run_parallel(step, ctx)
550 return children_results
552 async def _handle_error(
553 self, step: WorkflowStep, ctx: WorkflowContext, result: StepResult
554 ) -> StepResult:
555 """Apply error recovery strategy."""
556 ctx.errors.append({"step_id": step.id, "error": result.error})
558 if step.on_error == ErrorStrategy.RETRY and result.retries < step.max_retries:
559 logger.info(f"[Workflow] Retrying step '{step.id}' ({result.retries+1}/{step.max_retries})")
560 await asyncio.sleep(step.retry_delay * (2 ** result.retries))
561 result.retries += 1
562 return await self._execute_step(step, ctx)
564 elif step.on_error == ErrorStrategy.FALLBACK and step.fallback_step:
565 logger.info(f"[Workflow] Executing fallback for step '{step.id}'")
566 return await self._execute_step(step.fallback_step, ctx)
568 elif step.on_error == ErrorStrategy.SKIP:
569 result.status = ExecutionStatus.SKIPPED
570 return result
572 elif step.on_error == ErrorStrategy.PAUSE:
573 logger.warning(f"[Workflow] Paused at step '{step.id}': {result.error}")
574 # In production, this would notify the HITL system
575 return result
577 # Default: escalate
578 return result
580 @staticmethod
581 async def _default_dispatcher(agent_id: str, task: str, ctx: WorkflowContext) -> str:
582 """Default task dispatcher — logs and returns mock result."""
583 logger.info(f"[Workflow] Dispatch to '{agent_id}': {task[:100]}")
584 return f"Task dispatched to {agent_id}: {task[:50]}"
586 @staticmethod
587 def _resolve_template(template: str, ctx: WorkflowContext) -> str:
588 """Resolve {{ variable }} placeholders in a template string."""
589 import re
590 def replacer(match):
591 key = match.group(1).strip()
592 return str(ctx.get(key, f"<{key} not found>"))
593 return re.sub(r"\{\{\s*(.*?)\s*\}\}", replacer, template)
595 @staticmethod
596 def _count_steps(step: Optional[WorkflowStep]) -> int:
597 if step is None:
598 return 0
599 count = 1
600 for child in step.children:
601 count += WorkflowEngine._count_steps(child)
602 for branch_steps in step.branches.values():
603 for s in branch_steps:
604 count += WorkflowEngine._count_steps(s)
605 if step.fallback_step:
606 count += WorkflowEngine._count_steps(step.fallback_step)
607 return count
610# ---------------------------------------------------------------------------
611# Workflow YAML/JSON Parser
612# ---------------------------------------------------------------------------
614class WorkflowParser:
615 """Parse YAML/JSON files into WorkflowDefinition objects."""
617 @staticmethod
618 def parse_file(filepath: Union[str, Path]) -> WorkflowDefinition:
619 """Parse a .yaml/.yml/.json file into a WorkflowDefinition."""
620 path = Path(filepath)
621 with open(path, "r", encoding="utf-8") as f:
622 if path.suffix in (".json",):
623 data = json.load(f)
624 else:
625 data = yaml.safe_load(f)
626 return WorkflowParser.parse_dict(data)
628 @staticmethod
629 def parse_str(text: str) -> WorkflowDefinition:
630 """Parse a YAML/JSON string into a WorkflowDefinition."""
631 try:
632 data = json.loads(text)
633 except json.JSONDecodeError:
634 data = yaml.safe_load(text)
635 return WorkflowParser.parse_dict(data)
637 @staticmethod
638 def parse_dict(data: Dict[str, Any]) -> WorkflowDefinition:
639 """Parse a dict into a WorkflowDefinition."""
640 wf = WorkflowDefinition(
641 name=data.get("name", "unnamed"),
642 version=data.get("version", "1.0"),
643 description=data.get("description", ""),
644 variables=data.get("variables", {}),
645 agents=data.get("agents", {}),
646 defaults=data.get("defaults", {}),
647 metadata=data.get("metadata", {}),
648 )
650 if "steps" in data:
651 wf.root = WorkflowParser._parse_steps(data["steps"])
653 return wf
655 @staticmethod
656 def _parse_steps(steps: List[Dict[str, Any]]) -> WorkflowStep:
657 """Parse a list of step dicts into a tree. First step is root."""
658 if not steps:
659 raise ValueError("No steps defined")
661 parsed = [WorkflowParser._parse_step(s) for s in steps]
663 # Build parent-child relationships
664 for i in range(len(parsed) - 1):
665 if not parsed[i].children:
666 parsed[i].children = [parsed[i + 1]]
668 return parsed[0]
670 @staticmethod
671 def _parse_step(data: Dict[str, Any]) -> WorkflowStep:
672 """Parse a single step dict."""
673 step = WorkflowStep(
674 id=data.get("id", ""),
675 type=StepType(data.get("type", "task")),
676 name=data.get("name", ""),
677 description=data.get("description", ""),
678 agent=data.get("agent"),
679 task=data.get("task"),
680 timeout=data.get("timeout", 300.0),
681 depends_on=data.get("depends_on", []),
682 tags=data.get("tags", []),
683 metadata=data.get("metadata", {}),
684 )
686 if "condition" in data:
687 step.condition = data["condition"]
689 if "branches" in data:
690 for branch_name, branch_steps in data["branches"].items():
691 step.branches[branch_name] = [
692 WorkflowParser._parse_step(s) for s in branch_steps
693 ]
695 if "children" in data:
696 step.children = [WorkflowParser._parse_step(c) for c in data["children"]]
698 if "on_error" in data:
699 step.on_error = ErrorStrategy(data["on_error"])
700 if "max_retries" in data:
701 step.max_retries = data["max_retries"]
702 if "retry_delay" in data:
703 step.retry_delay = data["retry_delay"]
704 if "fallback" in data:
705 step.fallback_step = WorkflowParser._parse_step(data["fallback"])
707 if step.type == StepType.LOOP:
708 step.max_iterations = data.get("max_iterations", 100)
709 if "loop_condition" in data:
710 step.loop_condition = data["loop_condition"]
712 return step
714 @staticmethod
715 def to_yaml(workflow: WorkflowDefinition) -> str:
716 """Serialize a WorkflowDefinition to YAML string."""
717 return yaml.dump(WorkflowParser._to_dict(workflow), default_flow_style=False)
719 @staticmethod
720 def to_json(workflow: WorkflowDefinition) -> str:
721 """Serialize a WorkflowDefinition to JSON string."""
722 return json.dumps(WorkflowParser._to_dict(workflow), indent=2)
724 @staticmethod
725 def _to_dict(wf: WorkflowDefinition) -> Dict[str, Any]:
726 data = {
727 "name": wf.name,
728 "version": wf.version,
729 "description": wf.description,
730 "variables": wf.variables,
731 "agents": wf.agents,
732 "defaults": wf.defaults,
733 "metadata": wf.metadata,
734 }
735 if wf.root:
736 data["steps"] = [WorkflowParser._step_to_dict(wf.root)]
737 return data
739 @staticmethod
740 def _step_to_dict(step: WorkflowStep) -> Dict[str, Any]:
741 d = {
742 "id": step.id,
743 "type": step.type.value,
744 "name": step.name,
745 "description": step.description,
746 "agent": step.agent,
747 "task": step.task,
748 "timeout": step.timeout,
749 "depends_on": step.depends_on,
750 "tags": step.tags,
751 "metadata": step.metadata,
752 }
753 if step.condition:
754 d["condition"] = step.condition
755 if step.branches:
756 d["branches"] = {
757 k: [WorkflowParser._step_to_dict(s) for s in v]
758 for k, v in step.branches.items()
759 }
760 if step.children:
761 d["children"] = [WorkflowParser._step_to_dict(c) for c in step.children]
762 if step.on_error != ErrorStrategy.ESCALATE:
763 d["on_error"] = step.on_error.value
764 if step.max_retries != 3:
765 d["max_retries"] = step.max_retries
766 if step.retry_delay != 1.0:
767 d["retry_delay"] = step.retry_delay
768 if step.fallback_step:
769 d["fallback"] = WorkflowParser._step_to_dict(step.fallback_step)
770 if step.type == StepType.LOOP:
771 d["max_iterations"] = step.max_iterations
772 if step.loop_condition:
773 d["loop_condition"] = step.loop_condition
774 return d
777# ---------------------------------------------------------------------------
778# Pre-built workflow templates
779# ---------------------------------------------------------------------------
781class WorkflowTemplates:
782 """Library of common workflow patterns."""
784 @staticmethod
785 def sequential(name: str, agent_ids: List[str], task_template: str) -> WorkflowDefinition:
786 """Create a sequential pipeline: Agent1 → Agent2 → Agent3."""
787 root = None
788 prev = None
789 for agent_id in agent_ids:
790 step = WorkflowStep(
791 id=f"step_{agent_id}",
792 type=StepType.TASK,
793 name=f"Task by {agent_id}",
794 agent=agent_id,
795 task=task_template,
796 )
797 if prev:
798 prev.children = [step]
799 if root is None:
800 root = step
801 prev = step
803 return WorkflowDefinition(name=name, root=root)
805 @staticmethod
806 def parallel_broadcast(
807 name: str, agent_ids: List[str], task_template: str
808 ) -> WorkflowDefinition:
809 """Create a parallel broadcast: all agents run simultaneously."""
810 children = [
811 WorkflowStep(
812 id=f"step_{agent_id}",
813 type=StepType.TASK,
814 name=f"Broadcast to {agent_id}",
815 agent=agent_id,
816 task=task_template,
817 )
818 for agent_id in agent_ids
819 ]
820 root = WorkflowStep(
821 id="broadcast",
822 type=StepType.PARALLEL,
823 name="Parallel broadcast",
824 children=children,
825 )
826 return WorkflowDefinition(name=name, root=root)
828 @staticmethod
829 def map_reduce(
830 name: str,
831 mapper_agents: List[str],
832 reducer_agent: str,
833 map_task: str,
834 reduce_task: str,
835 ) -> WorkflowDefinition:
836 """Map-Reduce pattern: parallel map → single reduce."""
837 map_steps = [
838 WorkflowStep(
839 id=f"map_{agent_id}",
840 type=StepType.TASK,
841 name=f"Map by {agent_id}",
842 agent=agent_id,
843 task=map_task,
844 )
845 for agent_id in mapper_agents
846 ]
847 map_root = WorkflowStep(
848 id="map_phase",
849 type=StepType.PARALLEL,
850 name="Map phase",
851 children=map_steps,
852 )
853 reduce_step = WorkflowStep(
854 id="reduce_phase",
855 type=StepType.TASK,
856 name=f"Reduce by {reducer_agent}",
857 agent=reducer_agent,
858 task=reduce_task,
859 )
860 map_root.children = [reduce_step]
862 return WorkflowDefinition(name=name, root=map_root)
864 @staticmethod
865 def conditional_branch(
866 name: str,
867 condition_field: str,
868 true_agent: str,
869 false_agent: str,
870 task_template: str,
871 ) -> WorkflowDefinition:
872 """Conditional branching: if condition → agentA else → agentB."""
873 true_step = WorkflowStep(
874 id="true_branch",
875 type=StepType.TASK,
876 name=f"True: {true_agent}",
877 agent=true_agent,
878 task=task_template,
879 )
880 false_step = WorkflowStep(
881 id="false_branch",
882 type=StepType.TASK,
883 name=f"False: {false_agent}",
884 agent=false_agent,
885 task=task_template,
886 )
887 root = WorkflowStep(
888 id="condition",
889 type=StepType.CONDITIONAL,
890 name="Condition check",
891 condition={"field": condition_field, "op": "eq", "value": True},
892 branches={"true": [true_step], "false": [false_step]},
893 )
894 return WorkflowDefinition(name=name, root=root)
896 @staticmethod
897 def retry_loop(
898 name: str, agent_id: str, task: str, max_retries: int = 3
899 ) -> WorkflowDefinition:
900 """Task with automatic retry on failure."""
901 step = WorkflowStep(
902 id="retry_task",
903 type=StepType.TASK,
904 name=f"Task with retry",
905 agent=agent_id,
906 task=task,
907 on_error=ErrorStrategy.RETRY,
908 max_retries=max_retries,
909 retry_delay=2.0,
910 )
911 return WorkflowDefinition(name=name, root=step)
914# ---------------------------------------------------------------------------
915# Export
916# ---------------------------------------------------------------------------
918__all__ = [
919 # Enums
920 "StepType",
921 "ExecutionStatus",
922 "ErrorStrategy",
923 "ConditionOperator",
924 # Data
925 "WorkflowContext",
926 "StepResult",
927 "WorkflowStep",
928 "WorkflowDefinition",
929 # Engine
930 "WorkflowEngine",
931 "ConditionEvaluator",
932 # Parser
933 "WorkflowParser",
934 # Templates
935 "WorkflowTemplates",
936]