Coverage for agentos/workflow/__init__.py: 0%

510 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 16:36 +0800

1""" 

2AgentOS Workflow DSL — Declarative multi-agent workflow definition language. 

3 

4v1.14.4: YAML/JSON-based DSL for defining complex multi-agent pipelines with 

5 sequential, parallel, conditional, loop, and fan-out/fan-in patterns. 

6 

7Key features: 

8- YAML/JSON declarative workflow definitions 

9- Topology validation and cycle detection 

10- Sequential, parallel, conditional, loop, sub-workflow patterns 

11- Built-in fan-out/fan-in via agentos.core.parallel 

12- Workflow execution engine with real-time progress 

13- Dry-run mode for validation without execution 

14- Visual DAG export (Mermaid/Graphviz) 

15- Error recovery strategies (retry, fallback, skip, escalate) 

16""" 

17 

18import asyncio 

19import json 

20import logging 

21from abc import ABC, abstractmethod 

22from dataclasses import dataclass, field 

23from enum import Enum, auto 

24from pathlib import Path 

25from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Set, Type, Union 

26 

27import yaml 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32# --------------------------------------------------------------------------- 

33# Enums 

34# --------------------------------------------------------------------------- 

35 

36class StepType(Enum): 

37 """Types of workflow steps.""" 

38 TASK = "task" # Single agent task 

39 SEQUENTIAL = "sequential" # Run children in sequence 

40 PARALLEL = "parallel" # Run children in parallel 

41 CONDITIONAL = "conditional" # Branch based on condition 

42 LOOP = "loop" # Repeat children until condition 

43 SUB_WORKFLOW = "sub" # Nested workflow 

44 JOIN = "join" # Wait for all branches to complete 

45 SPLIT = "split" # Fan-out to multiple agents 

46 

47 

48class ExecutionStatus(Enum): 

49 PENDING = "pending" 

50 RUNNING = "running" 

51 SUCCESS = "success" 

52 FAILED = "failed" 

53 SKIPPED = "skipped" 

54 CANCELLED = "cancelled" 

55 RETRYING = "retrying" 

56 

57 

58class ErrorStrategy(Enum): 

59 RETRY = "retry" # Retry the step 

60 FALLBACK = "fallback" # Execute fallback step 

61 SKIP = "skip" # Skip and continue 

62 ESCALATE = "escalate" # Fail the workflow 

63 PAUSE = "pause" # Pause for human intervention 

64 

65 

66class ConditionOperator(Enum): 

67 EQUALS = "eq" 

68 NOT_EQUALS = "neq" 

69 CONTAINS = "contains" 

70 GREATER = "gt" 

71 LESS = "lt" 

72 IN = "in" 

73 MATCHES = "matches" # regex 

74 EXISTS = "exists" 

75 EMPTY = "empty" 

76 

77 

78# --------------------------------------------------------------------------- 

79# Data structures 

80# --------------------------------------------------------------------------- 

81 

82@dataclass 

83class WorkflowContext: 

84 """Runtime context shared across workflow steps.""" 

85 variables: Dict[str, Any] = field(default_factory=dict) 

86 history: List[Dict[str, Any]] = field(default_factory=list) 

87 errors: List[Dict[str, Any]] = field(default_factory=list) 

88 metrics: Dict[str, float] = field(default_factory=dict) 

89 metadata: Dict[str, str] = field(default_factory=dict) 

90 

91 def get(self, key: str, default: Any = None) -> Any: 

92 """Get a variable, supporting dot-notation (e.g., 'result.output.text').""" 

93 parts = key.split(".") 

94 current = self.variables 

95 for part in parts: 

96 if isinstance(current, dict): 

97 current = current.get(part, default) 

98 else: 

99 return default 

100 return current 

101 

102 def set(self, key: str, value: Any) -> None: 

103 """Set a variable, supporting dot-notation for nested dicts.""" 

104 parts = key.split(".") 

105 current = self.variables 

106 for part in parts[:-1]: 

107 if part not in current: 

108 current[part] = {} 

109 current = current[part] 

110 current[parts[-1]] = value 

111 

112 

113@dataclass 

114class StepResult: 

115 """Result of a workflow step execution.""" 

116 step_id: str 

117 status: ExecutionStatus 

118 output: Any = None 

119 error: Optional[str] = None 

120 duration: float = 0.0 

121 retries: int = 0 

122 metadata: Dict[str, Any] = field(default_factory=dict) 

123 

124 

125@dataclass 

126class WorkflowStep: 

127 """A single step in a workflow DAG.""" 

128 id: str 

129 type: StepType 

130 name: str = "" 

131 description: str = "" 

132 

133 # Execution 

134 agent: Optional[str] = None # agent_id to dispatch to 

135 task: Optional[str] = None # task payload template 

136 children: List["WorkflowStep"] = field(default_factory=list) 

137 

138 # Conditional 

139 condition: Optional[Dict[str, Any]] = None 

140 branches: Dict[str, List["WorkflowStep"]] = field(default_factory=dict) 

141 

142 # Loop 

143 max_iterations: int = 100 

144 loop_condition: Optional[Dict[str, Any]] = None 

145 

146 # Error handling 

147 on_error: ErrorStrategy = ErrorStrategy.ESCALATE 

148 max_retries: int = 3 

149 retry_delay: float = 1.0 

150 fallback_step: Optional["WorkflowStep"] = None 

151 

152 # Timing 

153 timeout: float = 300.0 

154 depends_on: List[str] = field(default_factory=list) 

155 

156 # Metadata 

157 tags: List[str] = field(default_factory=list) 

158 metadata: Dict[str, Any] = field(default_factory=dict) 

159 

160 

161@dataclass 

162class WorkflowDefinition: 

163 """Top-level workflow definition.""" 

164 name: str 

165 version: str = "1.0" 

166 description: str = "" 

167 root: Optional[WorkflowStep] = None 

168 variables: Dict[str, Any] = field(default_factory=dict) 

169 

170 @property 

171 def steps(self) -> List[WorkflowStep]: 

172 """Compatibility: return steps list from root tree.""" 

173 if self.root is None: 

174 return [] 

175 result = [] 

176 def _collect(s): 

177 result.append(s) 

178 for c in s.children or []: 

179 _collect(c) 

180 _collect(self.root) 

181 return result 

182 agents: Dict[str, Dict[str, Any]] = field(default_factory=dict) 

183 defaults: Dict[str, Any] = field(default_factory=dict) 

184 metadata: Dict[str, Any] = field(default_factory=dict) 

185 

186 def validate(self) -> List[str]: 

187 """Validate workflow structure and return a list of issues.""" 

188 issues = [] 

189 step_ids: Set[str] = set() 

190 

191 def validate_step(step: WorkflowStep): 

192 if step.id in step_ids: 

193 issues.append(f"Duplicate step ID: {step.id}") 

194 step_ids.add(step.id) 

195 

196 if step.type == StepType.CONDITIONAL and not step.condition: 

197 issues.append(f"Conditional step '{step.id}' has no condition") 

198 if step.type == StepType.TASK and not step.agent: 

199 issues.append(f"Task step '{step.id}' has no agent assigned") 

200 

201 # Validate depends_on references 

202 for dep in step.depends_on: 

203 if dep not in step_ids: 

204 issues.append(f"Step '{step.id}' depends on unknown step '{dep}'") 

205 

206 for child in step.children: 

207 validate_step(child) 

208 for branch_steps in step.branches.values(): 

209 for s in branch_steps: 

210 validate_step(s) 

211 if step.fallback_step: 

212 validate_step(step.fallback_step) 

213 

214 if self.root: 

215 validate_step(self.root) 

216 else: 

217 issues.append("Workflow has no root step") 

218 

219 return issues 

220 

221 def to_mermaid(self) -> str: 

222 """Export workflow as a Mermaid flowchart.""" 

223 lines = ["graph TD"] 

224 ids: Set[str] = set() 

225 

226 def add_step(step: WorkflowStep, parent_id: Optional[str] = None): 

227 prefix = {StepType.PARALLEL: "[||]", StepType.CONDITIONAL: "{?}", 

228 StepType.LOOP: "[/]", StepType.TASK: "[ ]", 

229 StepType.JOIN: "[+]", StepType.SPLIT: "[>]"}.get(step.type, "[ ]") 

230 label = step.name or step.id 

231 lines.append(f" {step.id}{prefix}{label}") 

232 

233 if parent_id: 

234 lines.append(f" {parent_id} --> {step.id}") 

235 

236 if step.id not in ids: 

237 ids.add(step.id) 

238 if step.type == StepType.CONDITIONAL: 

239 for branch_name, branch_steps in step.branches.items(): 

240 for s in branch_steps: 

241 add_step(s, step.id) 

242 lines.append(f" {step.id} -- {branch_name} --> {s.id}") 

243 else: 

244 for child in step.children: 

245 add_step(child, step.id) 

246 

247 if self.root: 

248 add_step(self.root) 

249 return "\n".join(lines) 

250 

251 

252# --------------------------------------------------------------------------- 

253# Condition evaluator 

254# --------------------------------------------------------------------------- 

255 

256class ConditionEvaluator: 

257 """Evaluate conditions against the workflow context.""" 

258 

259 @staticmethod 

260 def evaluate(condition: Dict[str, Any], ctx: WorkflowContext) -> bool: 

261 """Evaluate a condition dict against context.""" 

262 if not condition: 

263 return True 

264 

265 # Support AND/OR combinators 

266 if "and" in condition: 

267 return all( 

268 ConditionEvaluator.evaluate(sub, ctx) 

269 for sub in condition["and"] 

270 ) 

271 if "or" in condition: 

272 return any( 

273 ConditionEvaluator.evaluate(sub, ctx) 

274 for sub in condition["or"] 

275 ) 

276 if "not" in condition: 

277 return not ConditionEvaluator.evaluate(condition["not"], ctx) 

278 

279 # Single condition 

280 field = condition.get("field", "") 

281 op = condition.get("op", "eq") 

282 value = condition.get("value") 

283 

284 actual = ctx.get(field) 

285 operator = ConditionOperator(op) 

286 

287 if operator == ConditionOperator.EQUALS: 

288 return actual == value 

289 elif operator == ConditionOperator.NOT_EQUALS: 

290 return actual != value 

291 elif operator == ConditionOperator.CONTAINS: 

292 return value in str(actual) if actual is not None else False 

293 elif operator == ConditionOperator.GREATER: 

294 try: 

295 return float(actual) > float(value) 

296 except (TypeError, ValueError): 

297 return False 

298 elif operator == ConditionOperator.LESS: 

299 try: 

300 return float(actual) < float(value) 

301 except (TypeError, ValueError): 

302 return False 

303 elif operator == ConditionOperator.IN: 

304 return actual in value if isinstance(value, (list, tuple, set)) else False 

305 elif operator == ConditionOperator.MATCHES: 

306 import re 

307 try: 

308 return bool(re.search(str(value), str(actual))) 

309 except re.error: 

310 return False 

311 elif operator == ConditionOperator.EXISTS: 

312 return actual is not None 

313 elif operator == ConditionOperator.EMPTY: 

314 return actual is None or actual == "" or actual == [] or actual == {} 

315 

316 return False 

317 

318 

319# --------------------------------------------------------------------------- 

320# Workflow Engine 

321# --------------------------------------------------------------------------- 

322 

323class WorkflowEngine: 

324 """Executes a WorkflowDefinition with real-time progress tracking.""" 

325 

326 def __init__( 

327 self, 

328 agent_dispatcher: Optional[Callable] = None, 

329 max_parallelism: int = 10, 

330 ): 

331 self._dispatcher = agent_dispatcher or self._default_dispatcher 

332 self._max_parallelism = max_parallelism 

333 self._ctx: Optional[WorkflowContext] = None 

334 self._results: Dict[str, StepResult] = {} 

335 self._progress_callbacks: List[Callable] = [] 

336 self._cancelled = False 

337 self._semaphore = asyncio.Semaphore(max_parallelism) 

338 

339 def on_progress(self, callback: Callable[[StepResult], None]) -> None: 

340 """Register a progress callback.""" 

341 self._progress_callbacks.append(callback) 

342 

343 async def execute(self, workflow: WorkflowDefinition) -> WorkflowContext: 

344 """Execute a workflow and return the final context.""" 

345 issues = workflow.validate() 

346 if issues: 

347 raise ValueError(f"Workflow validation failed: {issues}") 

348 

349 self._ctx = WorkflowContext(variables=dict(workflow.variables)) 

350 self._results = {} 

351 self._cancelled = False 

352 

353 if workflow.root: 

354 await self._execute_step(workflow.root, self._ctx) 

355 

356 return self._ctx 

357 

358 async def dry_run(self, workflow: WorkflowDefinition) -> Dict[str, Any]: 

359 """Validate a workflow without executing it.""" 

360 issues = workflow.validate() 

361 return { 

362 "valid": len(issues) == 0, 

363 "issues": issues, 

364 "steps": self._count_steps(workflow.root) if workflow.root else 0, 

365 "mermaid": workflow.to_mermaid(), 

366 } 

367 

368 def cancel(self) -> None: 

369 """Cancel the running workflow.""" 

370 self._cancelled = True 

371 

372 async def _execute_step( 

373 self, step: WorkflowStep, ctx: WorkflowContext 

374 ) -> StepResult: 

375 if self._cancelled: 

376 return StepResult(step.id, ExecutionStatus.CANCELLED) 

377 

378 if step.id in self._results: 

379 return self._results[step.id] 

380 

381 logger.info(f"[Workflow] Executing step '{step.id}' ({step.type.value})") 

382 result = StepResult(step.id, ExecutionStatus.RUNNING) 

383 

384 try: 

385 if step.type == StepType.TASK: 

386 result = await self._run_task(step, ctx) 

387 elif step.type == StepType.SEQUENTIAL: 

388 result = await self._run_sequential(step, ctx) 

389 elif step.type == StepType.PARALLEL: 

390 result = await self._run_parallel(step, ctx) 

391 elif step.type == StepType.CONDITIONAL: 

392 result = await self._run_conditional(step, ctx) 

393 elif step.type == StepType.LOOP: 

394 result = await self._run_loop(step, ctx) 

395 elif step.type == StepType.SUB_WORKFLOW: 

396 result = await self._run_sub_workflow(step, ctx) 

397 elif step.type == StepType.JOIN: 

398 result = await self._run_join(step, ctx) 

399 elif step.type == StepType.SPLIT: 

400 result = await self._run_split(step, ctx) 

401 else: 

402 result.status = ExecutionStatus.SUCCESS 

403 

404 except asyncio.TimeoutError: 

405 result.status = ExecutionStatus.FAILED 

406 result.error = f"Step '{step.id}' timed out after {step.timeout}s" 

407 except Exception as e: 

408 result.status = ExecutionStatus.FAILED 

409 result.error = str(e) 

410 logger.exception(f"[Workflow] Step '{step.id}' failed: {e}") 

411 

412 # Error recovery 

413 result = await self._handle_error(step, ctx, result) 

414 

415 self._results[step.id] = result 

416 ctx.history.append({"step_id": step.id, "status": result.status.value, 

417 "output": str(result.output)[:200] if result.output else None, 

418 "error": result.error, "duration": result.duration}) 

419 

420 for cb in self._progress_callbacks: 

421 try: 

422 cb(result) 

423 except Exception: 

424 pass 

425 

426 return result 

427 

428 async def _run_task(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult: 

429 """Execute a single agent task.""" 

430 import time 

431 t0 = time.time() 

432 

433 # Resolve template variables in task payload 

434 payload = step.task or "" 

435 if "{{" in payload: 

436 payload = self._resolve_template(payload, ctx) 

437 

438 try: 

439 output = await asyncio.wait_for( 

440 self._dispatcher(step.agent, payload, ctx), 

441 timeout=step.timeout, 

442 ) 

443 ctx.set(f"steps.{step.id}.output", output) 

444 return StepResult( 

445 step.id, ExecutionStatus.SUCCESS, 

446 output=output, 

447 duration=time.time() - t0, 

448 ) 

449 except asyncio.TimeoutError: 

450 raise 

451 

452 async def _run_sequential( 

453 self, step: WorkflowStep, ctx: WorkflowContext 

454 ) -> StepResult: 

455 """Run children in sequence.""" 

456 for child in step.children: 

457 result = await self._execute_step(child, ctx) 

458 if result.status == ExecutionStatus.FAILED and step.on_error == ErrorStrategy.ESCALATE: 

459 return StepResult(step.id, ExecutionStatus.FAILED, 

460 error=f"Child '{child.id}' failed: {result.error}") 

461 return StepResult(step.id, ExecutionStatus.SUCCESS) 

462 

463 async def _run_parallel( 

464 self, step: WorkflowStep, ctx: WorkflowContext 

465 ) -> StepResult: 

466 """Run children in parallel with semaphore control.""" 

467 async def bounded_execute(child): 

468 async with self._semaphore: 

469 return await self._execute_step(child, ctx) 

470 

471 tasks = [bounded_execute(child) for child in step.children] 

472 results = await asyncio.gather(*tasks, return_exceptions=True) 

473 

474 outputs = {} 

475 for child, result in zip(step.children, results): 

476 if isinstance(result, Exception): 

477 outputs[child.id] = {"error": str(result)} 

478 else: 

479 outputs[child.id] = result.output 

480 

481 ctx.set(f"steps.{step.id}.outputs", outputs) 

482 return StepResult(step.id, ExecutionStatus.SUCCESS, output=outputs) 

483 

484 async def _run_conditional( 

485 self, step: WorkflowStep, ctx: WorkflowContext 

486 ) -> StepResult: 

487 """Evaluate condition and execute the matching branch.""" 

488 if not step.condition: 

489 return StepResult(step.id, ExecutionStatus.SKIPPED, 

490 error="No condition defined") 

491 

492 matched = ConditionEvaluator.evaluate(step.condition, ctx) 

493 branch_key = "true" if matched else "false" 

494 branch_steps = step.branches.get(branch_key, []) 

495 if not branch_steps: 

496 # Try numeric/default branches 

497 branch_steps = step.branches.get("default", []) 

498 

499 for child in branch_steps: 

500 result = await self._execute_step(child, ctx) 

501 if result.status == ExecutionStatus.FAILED: 

502 return StepResult(step.id, ExecutionStatus.FAILED, 

503 error=f"Branch child '{child.id}' failed: {result.error}") 

504 

505 return StepResult(step.id, ExecutionStatus.SUCCESS, 

506 output={"branch": branch_key}) 

507 

508 async def _run_loop(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult: 

509 """Execute children in a loop until condition is false.""" 

510 iteration = 0 

511 while iteration < step.max_iterations: 

512 if self._cancelled: 

513 return StepResult(step.id, ExecutionStatus.CANCELLED) 

514 

515 for child in step.children: 

516 result = await self._execute_step(child, ctx) 

517 if result.status == ExecutionStatus.FAILED: 

518 return StepResult(step.id, ExecutionStatus.FAILED, 

519 error=f"Loop iteration {iteration}: child '{child.id}' failed") 

520 

521 ctx.set(f"steps.{step.id}.iteration", iteration) 

522 iteration += 1 

523 

524 # Check loop condition 

525 if step.loop_condition: 

526 if not ConditionEvaluator.evaluate(step.loop_condition, ctx): 

527 break 

528 

529 return StepResult(step.id, ExecutionStatus.SUCCESS, 

530 output={"iterations": iteration}) 

531 

532 async def _run_sub_workflow(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult: 

533 """Execute a nested sub-workflow.""" 

534 # Sub-workflow steps just execute their children 

535 for child in step.children: 

536 result = await self._execute_step(child, ctx) 

537 if result.status == ExecutionStatus.FAILED: 

538 return StepResult(step.id, ExecutionStatus.FAILED, 

539 error=f"Sub-workflow child '{child.id}' failed") 

540 return StepResult(step.id, ExecutionStatus.SUCCESS) 

541 

542 async def _run_join(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult: 

543 """Join point — wait for specified dependencies.""" 

544 # Already handled by depends_on graph resolution 

545 return StepResult(step.id, ExecutionStatus.SUCCESS) 

546 

547 async def _run_split(self, step: WorkflowStep, ctx: WorkflowContext) -> StepResult: 

548 """Fan-out to multiple agents.""" 

549 children_results = await self._run_parallel(step, ctx) 

550 return children_results 

551 

552 async def _handle_error( 

553 self, step: WorkflowStep, ctx: WorkflowContext, result: StepResult 

554 ) -> StepResult: 

555 """Apply error recovery strategy.""" 

556 ctx.errors.append({"step_id": step.id, "error": result.error}) 

557 

558 if step.on_error == ErrorStrategy.RETRY and result.retries < step.max_retries: 

559 logger.info(f"[Workflow] Retrying step '{step.id}' ({result.retries+1}/{step.max_retries})") 

560 await asyncio.sleep(step.retry_delay * (2 ** result.retries)) 

561 result.retries += 1 

562 return await self._execute_step(step, ctx) 

563 

564 elif step.on_error == ErrorStrategy.FALLBACK and step.fallback_step: 

565 logger.info(f"[Workflow] Executing fallback for step '{step.id}'") 

566 return await self._execute_step(step.fallback_step, ctx) 

567 

568 elif step.on_error == ErrorStrategy.SKIP: 

569 result.status = ExecutionStatus.SKIPPED 

570 return result 

571 

572 elif step.on_error == ErrorStrategy.PAUSE: 

573 logger.warning(f"[Workflow] Paused at step '{step.id}': {result.error}") 

574 # In production, this would notify the HITL system 

575 return result 

576 

577 # Default: escalate 

578 return result 

579 

580 @staticmethod 

581 async def _default_dispatcher(agent_id: str, task: str, ctx: WorkflowContext) -> str: 

582 """Default task dispatcher — logs and returns mock result.""" 

583 logger.info(f"[Workflow] Dispatch to '{agent_id}': {task[:100]}") 

584 return f"Task dispatched to {agent_id}: {task[:50]}" 

585 

586 @staticmethod 

587 def _resolve_template(template: str, ctx: WorkflowContext) -> str: 

588 """Resolve {{ variable }} placeholders in a template string.""" 

589 import re 

590 def replacer(match): 

591 key = match.group(1).strip() 

592 return str(ctx.get(key, f"<{key} not found>")) 

593 return re.sub(r"\{\{\s*(.*?)\s*\}\}", replacer, template) 

594 

595 @staticmethod 

596 def _count_steps(step: Optional[WorkflowStep]) -> int: 

597 if step is None: 

598 return 0 

599 count = 1 

600 for child in step.children: 

601 count += WorkflowEngine._count_steps(child) 

602 for branch_steps in step.branches.values(): 

603 for s in branch_steps: 

604 count += WorkflowEngine._count_steps(s) 

605 if step.fallback_step: 

606 count += WorkflowEngine._count_steps(step.fallback_step) 

607 return count 

608 

609 

610# --------------------------------------------------------------------------- 

611# Workflow YAML/JSON Parser 

612# --------------------------------------------------------------------------- 

613 

614class WorkflowParser: 

615 """Parse YAML/JSON files into WorkflowDefinition objects.""" 

616 

617 @staticmethod 

618 def parse_file(filepath: Union[str, Path]) -> WorkflowDefinition: 

619 """Parse a .yaml/.yml/.json file into a WorkflowDefinition.""" 

620 path = Path(filepath) 

621 with open(path, "r", encoding="utf-8") as f: 

622 if path.suffix in (".json",): 

623 data = json.load(f) 

624 else: 

625 data = yaml.safe_load(f) 

626 return WorkflowParser.parse_dict(data) 

627 

628 @staticmethod 

629 def parse_str(text: str) -> WorkflowDefinition: 

630 """Parse a YAML/JSON string into a WorkflowDefinition.""" 

631 try: 

632 data = json.loads(text) 

633 except json.JSONDecodeError: 

634 data = yaml.safe_load(text) 

635 return WorkflowParser.parse_dict(data) 

636 

637 @staticmethod 

638 def parse_dict(data: Dict[str, Any]) -> WorkflowDefinition: 

639 """Parse a dict into a WorkflowDefinition.""" 

640 wf = WorkflowDefinition( 

641 name=data.get("name", "unnamed"), 

642 version=data.get("version", "1.0"), 

643 description=data.get("description", ""), 

644 variables=data.get("variables", {}), 

645 agents=data.get("agents", {}), 

646 defaults=data.get("defaults", {}), 

647 metadata=data.get("metadata", {}), 

648 ) 

649 

650 if "steps" in data: 

651 wf.root = WorkflowParser._parse_steps(data["steps"]) 

652 

653 return wf 

654 

655 @staticmethod 

656 def _parse_steps(steps: List[Dict[str, Any]]) -> WorkflowStep: 

657 """Parse a list of step dicts into a tree. First step is root.""" 

658 if not steps: 

659 raise ValueError("No steps defined") 

660 

661 parsed = [WorkflowParser._parse_step(s) for s in steps] 

662 

663 # Build parent-child relationships 

664 for i in range(len(parsed) - 1): 

665 if not parsed[i].children: 

666 parsed[i].children = [parsed[i + 1]] 

667 

668 return parsed[0] 

669 

670 @staticmethod 

671 def _parse_step(data: Dict[str, Any]) -> WorkflowStep: 

672 """Parse a single step dict.""" 

673 step = WorkflowStep( 

674 id=data.get("id", ""), 

675 type=StepType(data.get("type", "task")), 

676 name=data.get("name", ""), 

677 description=data.get("description", ""), 

678 agent=data.get("agent"), 

679 task=data.get("task"), 

680 timeout=data.get("timeout", 300.0), 

681 depends_on=data.get("depends_on", []), 

682 tags=data.get("tags", []), 

683 metadata=data.get("metadata", {}), 

684 ) 

685 

686 if "condition" in data: 

687 step.condition = data["condition"] 

688 

689 if "branches" in data: 

690 for branch_name, branch_steps in data["branches"].items(): 

691 step.branches[branch_name] = [ 

692 WorkflowParser._parse_step(s) for s in branch_steps 

693 ] 

694 

695 if "children" in data: 

696 step.children = [WorkflowParser._parse_step(c) for c in data["children"]] 

697 

698 if "on_error" in data: 

699 step.on_error = ErrorStrategy(data["on_error"]) 

700 if "max_retries" in data: 

701 step.max_retries = data["max_retries"] 

702 if "retry_delay" in data: 

703 step.retry_delay = data["retry_delay"] 

704 if "fallback" in data: 

705 step.fallback_step = WorkflowParser._parse_step(data["fallback"]) 

706 

707 if step.type == StepType.LOOP: 

708 step.max_iterations = data.get("max_iterations", 100) 

709 if "loop_condition" in data: 

710 step.loop_condition = data["loop_condition"] 

711 

712 return step 

713 

714 @staticmethod 

715 def to_yaml(workflow: WorkflowDefinition) -> str: 

716 """Serialize a WorkflowDefinition to YAML string.""" 

717 return yaml.dump(WorkflowParser._to_dict(workflow), default_flow_style=False) 

718 

719 @staticmethod 

720 def to_json(workflow: WorkflowDefinition) -> str: 

721 """Serialize a WorkflowDefinition to JSON string.""" 

722 return json.dumps(WorkflowParser._to_dict(workflow), indent=2) 

723 

724 @staticmethod 

725 def _to_dict(wf: WorkflowDefinition) -> Dict[str, Any]: 

726 data = { 

727 "name": wf.name, 

728 "version": wf.version, 

729 "description": wf.description, 

730 "variables": wf.variables, 

731 "agents": wf.agents, 

732 "defaults": wf.defaults, 

733 "metadata": wf.metadata, 

734 } 

735 if wf.root: 

736 data["steps"] = [WorkflowParser._step_to_dict(wf.root)] 

737 return data 

738 

739 @staticmethod 

740 def _step_to_dict(step: WorkflowStep) -> Dict[str, Any]: 

741 d = { 

742 "id": step.id, 

743 "type": step.type.value, 

744 "name": step.name, 

745 "description": step.description, 

746 "agent": step.agent, 

747 "task": step.task, 

748 "timeout": step.timeout, 

749 "depends_on": step.depends_on, 

750 "tags": step.tags, 

751 "metadata": step.metadata, 

752 } 

753 if step.condition: 

754 d["condition"] = step.condition 

755 if step.branches: 

756 d["branches"] = { 

757 k: [WorkflowParser._step_to_dict(s) for s in v] 

758 for k, v in step.branches.items() 

759 } 

760 if step.children: 

761 d["children"] = [WorkflowParser._step_to_dict(c) for c in step.children] 

762 if step.on_error != ErrorStrategy.ESCALATE: 

763 d["on_error"] = step.on_error.value 

764 if step.max_retries != 3: 

765 d["max_retries"] = step.max_retries 

766 if step.retry_delay != 1.0: 

767 d["retry_delay"] = step.retry_delay 

768 if step.fallback_step: 

769 d["fallback"] = WorkflowParser._step_to_dict(step.fallback_step) 

770 if step.type == StepType.LOOP: 

771 d["max_iterations"] = step.max_iterations 

772 if step.loop_condition: 

773 d["loop_condition"] = step.loop_condition 

774 return d 

775 

776 

777# --------------------------------------------------------------------------- 

778# Pre-built workflow templates 

779# --------------------------------------------------------------------------- 

780 

781class WorkflowTemplates: 

782 """Library of common workflow patterns.""" 

783 

784 @staticmethod 

785 def sequential(name: str, agent_ids: List[str], task_template: str) -> WorkflowDefinition: 

786 """Create a sequential pipeline: Agent1 → Agent2 → Agent3.""" 

787 root = None 

788 prev = None 

789 for agent_id in agent_ids: 

790 step = WorkflowStep( 

791 id=f"step_{agent_id}", 

792 type=StepType.TASK, 

793 name=f"Task by {agent_id}", 

794 agent=agent_id, 

795 task=task_template, 

796 ) 

797 if prev: 

798 prev.children = [step] 

799 if root is None: 

800 root = step 

801 prev = step 

802 

803 return WorkflowDefinition(name=name, root=root) 

804 

805 @staticmethod 

806 def parallel_broadcast( 

807 name: str, agent_ids: List[str], task_template: str 

808 ) -> WorkflowDefinition: 

809 """Create a parallel broadcast: all agents run simultaneously.""" 

810 children = [ 

811 WorkflowStep( 

812 id=f"step_{agent_id}", 

813 type=StepType.TASK, 

814 name=f"Broadcast to {agent_id}", 

815 agent=agent_id, 

816 task=task_template, 

817 ) 

818 for agent_id in agent_ids 

819 ] 

820 root = WorkflowStep( 

821 id="broadcast", 

822 type=StepType.PARALLEL, 

823 name="Parallel broadcast", 

824 children=children, 

825 ) 

826 return WorkflowDefinition(name=name, root=root) 

827 

828 @staticmethod 

829 def map_reduce( 

830 name: str, 

831 mapper_agents: List[str], 

832 reducer_agent: str, 

833 map_task: str, 

834 reduce_task: str, 

835 ) -> WorkflowDefinition: 

836 """Map-Reduce pattern: parallel map → single reduce.""" 

837 map_steps = [ 

838 WorkflowStep( 

839 id=f"map_{agent_id}", 

840 type=StepType.TASK, 

841 name=f"Map by {agent_id}", 

842 agent=agent_id, 

843 task=map_task, 

844 ) 

845 for agent_id in mapper_agents 

846 ] 

847 map_root = WorkflowStep( 

848 id="map_phase", 

849 type=StepType.PARALLEL, 

850 name="Map phase", 

851 children=map_steps, 

852 ) 

853 reduce_step = WorkflowStep( 

854 id="reduce_phase", 

855 type=StepType.TASK, 

856 name=f"Reduce by {reducer_agent}", 

857 agent=reducer_agent, 

858 task=reduce_task, 

859 ) 

860 map_root.children = [reduce_step] 

861 

862 return WorkflowDefinition(name=name, root=map_root) 

863 

864 @staticmethod 

865 def conditional_branch( 

866 name: str, 

867 condition_field: str, 

868 true_agent: str, 

869 false_agent: str, 

870 task_template: str, 

871 ) -> WorkflowDefinition: 

872 """Conditional branching: if condition → agentA else → agentB.""" 

873 true_step = WorkflowStep( 

874 id="true_branch", 

875 type=StepType.TASK, 

876 name=f"True: {true_agent}", 

877 agent=true_agent, 

878 task=task_template, 

879 ) 

880 false_step = WorkflowStep( 

881 id="false_branch", 

882 type=StepType.TASK, 

883 name=f"False: {false_agent}", 

884 agent=false_agent, 

885 task=task_template, 

886 ) 

887 root = WorkflowStep( 

888 id="condition", 

889 type=StepType.CONDITIONAL, 

890 name="Condition check", 

891 condition={"field": condition_field, "op": "eq", "value": True}, 

892 branches={"true": [true_step], "false": [false_step]}, 

893 ) 

894 return WorkflowDefinition(name=name, root=root) 

895 

896 @staticmethod 

897 def retry_loop( 

898 name: str, agent_id: str, task: str, max_retries: int = 3 

899 ) -> WorkflowDefinition: 

900 """Task with automatic retry on failure.""" 

901 step = WorkflowStep( 

902 id="retry_task", 

903 type=StepType.TASK, 

904 name=f"Task with retry", 

905 agent=agent_id, 

906 task=task, 

907 on_error=ErrorStrategy.RETRY, 

908 max_retries=max_retries, 

909 retry_delay=2.0, 

910 ) 

911 return WorkflowDefinition(name=name, root=step) 

912 

913 

914# --------------------------------------------------------------------------- 

915# Export 

916# --------------------------------------------------------------------------- 

917 

918__all__ = [ 

919 # Enums 

920 "StepType", 

921 "ExecutionStatus", 

922 "ErrorStrategy", 

923 "ConditionOperator", 

924 # Data 

925 "WorkflowContext", 

926 "StepResult", 

927 "WorkflowStep", 

928 "WorkflowDefinition", 

929 # Engine 

930 "WorkflowEngine", 

931 "ConditionEvaluator", 

932 # Parser 

933 "WorkflowParser", 

934 # Templates 

935 "WorkflowTemplates", 

936]