Coverage for src / tracekit / config / pipeline.py: 89%

393 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Pipeline configuration and execution system. 

2 

3This module provides YAML-based pipeline definitions, loading, execution, 

4templates, composition, and conditional logic for analysis workflows with 

5transaction semantics, type validation, circular detection, and expression 

6language support. 

7""" 

8 

9from __future__ import annotations 

10 

11import copy 

12import logging 

13import re 

14from dataclasses import dataclass, field 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any 

17 

18import yaml 

19 

20from tracekit.config.schema import validate_against_schema 

21from tracekit.core.exceptions import ConfigurationError 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Callable 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29@dataclass 

30class PipelineStep: 

31 """Single step in an analysis pipeline. 

32 

33 Attributes: 

34 name: Step identifier 

35 type: Step type (e.g., "input.file", "decoder.uart") 

36 params: Step parameters 

37 inputs: Input mappings from previous steps 

38 outputs: Output definitions 

39 condition: Optional condition for execution 

40 if_steps: Steps to execute if condition is true 

41 elif_conditions: List of (condition, steps) for elif branches 

42 else_steps: Steps to execute if condition is false 

43 

44 Example: 

45 >>> step = PipelineStep( 

46 ... name="decode_uart", 

47 ... type="decoder.uart", 

48 ... params={"baud_rate": 115200} 

49 ... ) 

50 """ 

51 

52 name: str 

53 type: str 

54 params: dict[str, Any] = field(default_factory=dict) 

55 inputs: dict[str, str] = field(default_factory=dict) 

56 outputs: dict[str, str] = field(default_factory=dict) 

57 condition: str | None = None 

58 if_steps: list[PipelineStep] = field(default_factory=list) 

59 elif_conditions: list[tuple[str, list[PipelineStep]]] = field(default_factory=list) 

60 else_steps: list[PipelineStep] = field(default_factory=list) 

61 

62 

63@dataclass 

64class PipelineDefinition: 

65 """Complete pipeline definition. 

66 

67 Attributes: 

68 name: Pipeline identifier 

69 version: Pipeline version 

70 description: Human-readable description 

71 steps: Ordered list of pipeline steps 

72 parallel_groups: Groups of steps that can run in parallel 

73 variables: Template variables for parameterization 

74 includes: List of included sub-pipelines 

75 

76 Example: 

77 >>> pipeline = PipelineDefinition( 

78 ... name="uart_analysis", 

79 ... steps=[load_step, decode_step, export_step] 

80 ... ) 

81 """ 

82 

83 name: str 

84 version: str = "1.0.0" 

85 description: str = "" 

86 steps: list[PipelineStep] = field(default_factory=list) 

87 parallel_groups: list[list[str]] = field(default_factory=list) 

88 variables: dict[str, Any] = field(default_factory=dict) 

89 includes: list[str] = field(default_factory=list) 

90 source_file: str | None = None 

91 

92 

93@dataclass 

94class PipelineResult: 

95 """Result of pipeline execution. 

96 

97 Attributes: 

98 pipeline_name: Name of executed pipeline 

99 outputs: Dictionary of output data from steps 

100 step_results: Results from each step 

101 success: Whether pipeline completed successfully 

102 error: Error if failed 

103 """ 

104 

105 pipeline_name: str 

106 outputs: dict[str, Any] = field(default_factory=dict) 

107 step_results: dict[str, Any] = field(default_factory=dict) 

108 success: bool = True 

109 error: str | None = None 

110 

111 

112class PipelineValidationError(ConfigurationError): 

113 """Pipeline validation error with step information. 

114 

115 Attributes: 

116 step_name: Name of failing step 

117 suggestion: Suggested fix 

118 """ 

119 

120 def __init__( 

121 self, 

122 message: str, 

123 *, 

124 step_name: str | None = None, 

125 line: int | None = None, 

126 suggestion: str | None = None, 

127 ): 

128 self.step_name = step_name 

129 self.line = line 

130 self.suggestion = suggestion 

131 super().__init__(message) 

132 

133 

134class PipelineExecutionError(ConfigurationError): 

135 """Pipeline execution error. 

136 

137 Attributes: 

138 step_name: Name of failing step 

139 traceback_str: Traceback string if available 

140 """ 

141 

142 def __init__( 

143 self, 

144 message: str, 

145 *, 

146 step_name: str | None = None, 

147 traceback_str: str | None = None, 

148 ): 

149 self.step_name = step_name 

150 self.traceback_str = traceback_str 

151 super().__init__(message) 

152 

153 

154class Pipeline: 

155 """Executable analysis pipeline. 

156 

157 Loads, validates, and executes pipeline definitions with support 

158 for progress tracking, error handling, and rollback. 

159 

160 Example: 

161 >>> pipeline = Pipeline.load("uart_analysis.yaml") 

162 >>> pipeline.on_progress(lambda step, pct: print(f"{step}: {pct}%")) 

163 >>> results = pipeline.execute() 

164 """ 

165 

166 def __init__(self, definition: PipelineDefinition): 

167 """Initialize pipeline from definition. 

168 

169 Args: 

170 definition: Pipeline definition 

171 """ 

172 self.definition = definition 

173 self._progress_callbacks: list[Callable[[str, int], None]] = [] 

174 self._step_handlers: dict[str, Callable[..., Any]] = {} 

175 self._state: dict[str, Any] = {} 

176 self._cleanups: list[Callable[[], None]] = [] 

177 

178 @classmethod 

179 def load(cls, path: str | Path, variables: dict[str, Any] | None = None) -> Pipeline: 

180 """Load pipeline from YAML file. 

181 

182 Args: 

183 path: Path to pipeline definition file 

184 variables: Template variable values 

185 

186 Returns: 

187 Loaded and validated Pipeline 

188 

189 Example: 

190 >>> pipeline = Pipeline.load("pipeline.yaml") 

191 >>> pipeline = Pipeline.load("pipeline.yaml", {"input_file": "trace.bin"}) 

192 """ 

193 definition = load_pipeline(path, variables) 

194 return cls(definition) 

195 

196 def on_progress(self, callback: Callable[[str, int], None]) -> None: 

197 """Register progress callback. 

198 

199 Args: 

200 callback: Function called with (step_name, percent_complete) 

201 

202 Example: 

203 >>> pipeline.on_progress(lambda step, pct: print(f"{step}: {pct}%")) 

204 """ 

205 self._progress_callbacks.append(callback) 

206 

207 def register_handler(self, step_type: str, handler: Callable[..., Any]) -> None: 

208 """Register handler for step type. 

209 

210 Args: 

211 step_type: Step type to handle 

212 handler: Handler function 

213 """ 

214 self._step_handlers[step_type] = handler 

215 

216 def execute(self, dry_run: bool = False) -> PipelineResult: 

217 """Execute the pipeline with transaction semantics. 

218 

219 The pipeline executes with ACID-like semantics: 

220 - All steps complete successfully, or 

221 - Pipeline rolls back and cleanup is guaranteed 

222 

223 Args: 

224 dry_run: If True, validate without executing (dry-run mode) 

225 

226 Returns: 

227 Pipeline execution result 

228 

229 Raises: 

230 PipelineExecutionError: If execution fails 

231 

232 Example: 

233 >>> result = pipeline.execute() 

234 >>> result = pipeline.execute(dry_run=True) # Validate only 

235 """ 

236 result = PipelineResult(pipeline_name=self.definition.name) 

237 self._state = {} 

238 self._cleanups = [] 

239 committed = False 

240 

241 try: 

242 total_steps = len(self.definition.steps) 

243 

244 # Transaction begin 

245 logger.debug(f"Beginning pipeline transaction: {self.definition.name}") 

246 

247 for i, step in enumerate(self.definition.steps): 

248 progress = int((i / total_steps) * 100) 

249 self._notify_progress(step.name, progress) 

250 

251 if dry_run: 

252 logger.info(f"[DRY RUN] Would execute step: {step.name}") 

253 # In dry-run, validate step configuration 

254 self._validate_step(step) 

255 continue 

256 

257 # Check condition with short-circuit evaluation 

258 if step.condition: 

259 try: 

260 should_execute = self._evaluate_condition(step.condition) 

261 if not should_execute: 

262 logger.info(f"Skipping step '{step.name}' (condition false)") 

263 continue 

264 except Exception as e: 

265 logger.warning(f"Condition evaluation failed for '{step.name}': {e}") 

266 continue 

267 

268 # Execute step 

269 step_result = self._execute_step(step) 

270 result.step_results[step.name] = step_result 

271 

272 # Store outputs in state with namespace isolation 

273 for output_name, output_key in step.outputs.items(): 

274 namespaced_key = f"{step.name}.{output_name}" 

275 self._state[namespaced_key] = step_result.get(output_key) 

276 logger.debug(f"Stored output: {namespaced_key}") 

277 

278 # Transaction commit 

279 logger.debug(f"Committing pipeline transaction: {self.definition.name}") 

280 committed = True 

281 

282 # Notify completion 

283 self._notify_progress("complete", 100) 

284 result.success = True 

285 result.outputs = dict(self._state) 

286 

287 except Exception as e: 

288 result.success = False 

289 result.error = str(e) 

290 logger.error(f"Pipeline execution failed: {e}") 

291 

292 # Transaction rollback 

293 if not committed: 293 ↛ 297line 293 didn't jump to line 297 because the condition on line 293 was always true

294 logger.warning(f"Rolling back pipeline transaction: {self.definition.name}") 

295 self._rollback() 

296 

297 if not dry_run: 297 ↛ 304line 297 didn't jump to line 304 because the condition on line 297 was always true

298 raise PipelineExecutionError( 

299 f"Pipeline '{self.definition.name}' failed", 

300 step_name=step.name if "step" in dir() else None, 

301 traceback_str=str(e), 

302 ) from e 

303 

304 return result 

305 

306 def _validate_step(self, step: PipelineStep) -> None: 

307 """Validate step configuration (used in dry-run). 

308 

309 Args: 

310 step: Step to validate 

311 

312 Raises: 

313 PipelineValidationError: If step configuration invalid 

314 """ 

315 # Check required fields 

316 if not step.name: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise PipelineValidationError("Step name is required") 

318 if not step.type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 raise PipelineValidationError(f"Step '{step.name}' missing type", step_name=step.name) 

320 

321 # Validate input references 

322 for input_ref in step.inputs.values(): 322 ↛ 323line 322 didn't jump to line 323 because the loop on line 322 never started

323 if "." not in input_ref and input_ref not in self._state: 

324 logger.warning(f"Step '{step.name}' references undefined input: {input_ref}") 

325 

326 def _rollback(self) -> None: 

327 """Rollback pipeline execution (cleanup all resources). 

328 

329 Guaranteed cleanup of all resources allocated during execution. 

330 Runs in reverse order of allocation. 

331 """ 

332 logger.info("Running rollback cleanup") 

333 self._run_cleanups() 

334 self._state.clear() 

335 logger.info("Rollback complete") 

336 

337 def _execute_step(self, step: PipelineStep) -> dict[str, Any]: 

338 """Execute a single pipeline step. 

339 

340 Args: 

341 step: Step to execute 

342 

343 Returns: 

344 Step result dictionary 

345 

346 Raises: 

347 PipelineExecutionError: If no handler found for step type. 

348 """ 

349 logger.debug(f"Executing step: {step.name} (type={step.type})") 

350 

351 # Resolve inputs from state 

352 resolved_inputs = {} 

353 for input_name, input_ref in step.inputs.items(): 

354 if input_ref in self._state: 

355 resolved_inputs[input_name] = self._state[input_ref] 

356 else: 

357 logger.warning(f"Input '{input_ref}' not found in state") 

358 

359 # Get handler 

360 handler = self._step_handlers.get(step.type) 

361 if handler is None: 

362 handler = self._get_default_handler(step.type) 

363 

364 if handler is None: 

365 raise PipelineExecutionError( 

366 f"No handler for step type '{step.type}'", step_name=step.name 

367 ) 

368 

369 # Execute handler 

370 result = handler(inputs=resolved_inputs, params=step.params, step_name=step.name) 

371 

372 return result if isinstance(result, dict) else {"result": result} 

373 

374 def _get_default_handler(self, step_type: str) -> Callable[..., Any] | None: 

375 """Get default handler for step type. 

376 

377 Args: 

378 step_type: Step type 

379 

380 Returns: 

381 Handler function or None 

382 """ 

383 # Built-in handlers for common step types 

384 handlers = { 

385 "input.file": self._handle_input_file, 

386 "output.json": self._handle_output_json, 

387 "analysis.statistics": self._handle_statistics, 

388 } 

389 return handlers.get(step_type) 

390 

391 def _handle_input_file( 

392 self, 

393 inputs: dict[str, Any], 

394 params: dict[str, Any], 

395 step_name: str, 

396 ) -> dict[str, Any]: 

397 """Handle file input step.""" 

398 # Placeholder - actual implementation would use loaders 

399 return {"waveform": params.get("path")} 

400 

401 def _handle_output_json( 

402 self, 

403 inputs: dict[str, Any], 

404 params: dict[str, Any], 

405 step_name: str, 

406 ) -> dict[str, Any]: 

407 """Handle JSON output step.""" 

408 import json 

409 

410 path = params.get("path", "output.json") 

411 data = inputs.get("data", inputs) 

412 with open(path, "w") as f: 

413 json.dump(data, f, indent=2, default=str) 

414 return {"path": path} 

415 

416 def _handle_statistics( 

417 self, 

418 inputs: dict[str, Any], 

419 params: dict[str, Any], 

420 step_name: str, 

421 ) -> dict[str, Any]: 

422 """Handle statistics step.""" 

423 return {"statistics": {"count": len(inputs)}} 

424 

425 def _evaluate_condition(self, condition: str) -> bool: 

426 """Evaluate condition expression using expression language. 

427 

428 Supports: 

429 - Comparison operators: ==, !=, <, >, <=, >= 

430 - Logical operators: and, or, not 

431 - Field access: data.confidence, step_name.output_name 

432 - Short-circuit evaluation (and/or operators) 

433 

434 Args: 

435 condition: Condition expression string 

436 

437 Returns: 

438 Evaluation result 

439 

440 Example: 

441 >>> self._evaluate_condition("data.confidence > 0.8") 

442 >>> self._evaluate_condition("decode_uart.packets > 0 and data.valid == True") 

443 """ 

444 try: 

445 # Parse and evaluate expression with short-circuit support 

446 return self._eval_expression(condition) 

447 except Exception as e: 

448 logger.warning(f"Condition evaluation failed: {condition} - {e}") 

449 return False 

450 

451 def _eval_expression(self, expr: str) -> bool: 

452 """Evaluate expression with short-circuit logic. 

453 

454 Args: 

455 expr: Expression string 

456 

457 Returns: 

458 Boolean result 

459 """ 

460 # Handle logical operators with short-circuit evaluation 

461 if " or " in expr: 

462 parts = expr.split(" or ", 1) 

463 left = self._eval_expression(parts[0].strip()) 

464 if left: # Short-circuit: if left is True, don't evaluate right 

465 logger.debug(f"Short-circuit OR: left={left}, skipping right") 

466 return True 

467 return self._eval_expression(parts[1].strip()) 

468 

469 if " and " in expr: 

470 parts = expr.split(" and ", 1) 

471 left = self._eval_expression(parts[0].strip()) 

472 if not left: # Short-circuit: if left is False, don't evaluate right 

473 logger.debug(f"Short-circuit AND: left={left}, skipping right") 

474 return False 

475 return self._eval_expression(parts[1].strip()) 

476 

477 if expr.startswith("not "): 

478 return not self._eval_expression(expr[4:].strip()) 

479 

480 # Evaluate comparison 

481 return self._eval_comparison(expr) 

482 

483 def _eval_comparison(self, expr: str) -> bool: 

484 """Evaluate comparison expression. 

485 

486 Args: 

487 expr: Comparison expression 

488 

489 Returns: 

490 Boolean result 

491 """ 

492 operators = ["<=", ">=", "==", "!=", "<", ">"] 

493 

494 for op in operators: 

495 if op in expr: 

496 left, right = expr.split(op, 1) 

497 left_val = self._resolve_value(left.strip()) 

498 right_val = self._resolve_value(right.strip()) 

499 

500 if op == "==": 

501 return left_val == right_val # type: ignore[no-any-return] 

502 elif op == "!=": 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 return left_val != right_val # type: ignore[no-any-return] 

504 elif op == "<": 

505 return left_val < right_val # type: ignore[no-any-return] 

506 elif op == ">": 506 ↛ 508line 506 didn't jump to line 508 because the condition on line 506 was always true

507 return left_val > right_val # type: ignore[no-any-return] 

508 elif op == "<=": 

509 return left_val <= right_val # type: ignore[no-any-return] 

510 elif op == ">=": 

511 return left_val >= right_val # type: ignore[no-any-return] 

512 

513 # No comparison operator found, try as boolean 

514 return bool(self._resolve_value(expr.strip())) 

515 

516 def _resolve_value(self, value_str: str) -> Any: 

517 """Resolve value from string (lookup in state or parse literal). 

518 

519 Args: 

520 value_str: Value string (field reference or literal) 

521 

522 Returns: 

523 Resolved value 

524 """ 

525 value_str = value_str.strip() 

526 

527 # Check if it's a field reference in state 

528 if value_str in self._state: 

529 return self._state[value_str] 

530 

531 # Try to parse as literal 

532 # String literals 

533 if (value_str.startswith('"') and value_str.endswith('"')) or ( 

534 value_str.startswith("'") and value_str.endswith("'") 

535 ): 

536 return value_str[1:-1] 

537 

538 # Boolean literals 

539 if value_str.lower() == "true": 

540 return True 

541 if value_str.lower() == "false": 

542 return False 

543 

544 # None/null literal 

545 if value_str.lower() in ("none", "null"): 

546 return None 

547 

548 # Try numeric literals 

549 try: 

550 if "." in value_str: 

551 return float(value_str) 

552 return int(value_str) 

553 except ValueError: 

554 pass 

555 

556 # Return as string if can't resolve 

557 logger.warning(f"Could not resolve value: {value_str}, returning as string") 

558 return value_str 

559 

560 def _notify_progress(self, step: str, percent: int) -> None: 

561 """Notify progress callbacks.""" 

562 for callback in self._progress_callbacks: 

563 try: 

564 callback(step, percent) 

565 except Exception as e: 

566 logger.warning(f"Progress callback failed: {e}") 

567 

568 def _run_cleanups(self) -> None: 

569 """Run registered cleanup functions.""" 

570 for cleanup in reversed(self._cleanups): 570 ↛ 571line 570 didn't jump to line 571 because the loop on line 570 never started

571 try: 

572 cleanup() 

573 except Exception as e: 

574 logger.warning(f"Cleanup failed: {e}") 

575 

576 

577def load_pipeline(path: str | Path, variables: dict[str, Any] | None = None) -> PipelineDefinition: 

578 """Load pipeline definition from file. 

579 

580 Args: 

581 path: Path to YAML file 

582 variables: Template variable values 

583 

584 Returns: 

585 Pipeline definition 

586 

587 Raises: 

588 PipelineValidationError: If validation fails 

589 """ 

590 path = Path(path) 

591 

592 if not path.exists(): 

593 raise PipelineValidationError( 

594 f"Pipeline file not found: {path}", suggestion="Check file path" 

595 ) 

596 

597 try: 

598 with open(path, encoding="utf-8") as f: 

599 content = f.read() 

600 

601 # Apply template variables 

602 if variables: 

603 content = _substitute_variables(content, variables) 

604 

605 data = yaml.safe_load(content) 

606 except yaml.YAMLError as e: 

607 raise PipelineValidationError( 

608 f"YAML parse error in {path}", suggestion="Check YAML syntax" 

609 ) from e 

610 

611 # Handle nested 'pipeline' key 

612 if "pipeline" in data: 

613 data = data["pipeline"] 

614 

615 # Validate against schema 

616 try: 

617 validate_against_schema(data, "pipeline") 

618 except Exception as e: 

619 raise PipelineValidationError( 

620 f"Pipeline validation failed for {path}", suggestion=str(e) 

621 ) from e 

622 

623 # Parse steps 

624 steps = [] 

625 for step_data in data.get("steps", []): 

626 step = _parse_step(step_data) 

627 steps.append(step) 

628 

629 return PipelineDefinition( 

630 name=data.get("name", path.stem), 

631 version=data.get("version", "1.0.0"), 

632 description=data.get("description", ""), 

633 steps=steps, 

634 parallel_groups=data.get("parallel_groups", []), 

635 variables=variables or {}, 

636 includes=data.get("includes", []), 

637 source_file=str(path), 

638 ) 

639 

640 

641def _parse_step(data: dict[str, Any]) -> PipelineStep: 

642 """Parse step from dictionary.""" 

643 step = PipelineStep( 

644 name=data.get("name", "unnamed"), 

645 type=data.get("type", "unknown"), 

646 params=data.get("params", {}), 

647 inputs=data.get("inputs", {}), 

648 outputs=data.get("outputs", {}), 

649 condition=data.get("condition"), 

650 ) 

651 

652 # Parse conditional steps 

653 if "if_steps" in data: 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true

654 step.if_steps = [_parse_step(s) for s in data["if_steps"]] 

655 if "elif_conditions" in data: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 for elif_data in data["elif_conditions"]: 

657 cond = elif_data.get("condition") 

658 steps = [_parse_step(s) for s in elif_data.get("steps", [])] 

659 step.elif_conditions.append((cond, steps)) 

660 if "else_steps" in data: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 step.else_steps = [_parse_step(s) for s in data["else_steps"]] 

662 

663 return step 

664 

665 

666def _substitute_variables(content: str, variables: dict[str, Any], max_depth: int = 3) -> str: 

667 """Substitute template variables in content with nested substitution. 

668 

669 Supports nested substitution up to 3 levels deep (CFG-011): 

670 - Level 1: ${VAR1} -> "value1" 

671 - Level 2: ${VAR2} where VAR2 = "${VAR1}" -> "value1" 

672 - Level 3: ${VAR3} where VAR3 = "${VAR2}" -> "value1" 

673 

674 Args: 

675 content: String content with ${VAR_NAME} placeholders 

676 variables: Variable name to value mapping 

677 max_depth: Maximum nested substitution depth (default 3.) 

678 

679 Returns: 

680 Content with variables substituted 

681 

682 Raises: 

683 PipelineValidationError: If nested substitution depth exceeded 

684 

685 Example: 

686 >>> vars = {"BASE": "trace", "FILE": "${BASE}.bin"} 

687 >>> _substitute_variables("path: ${FILE}", vars) 

688 'path: trace.bin' 

689 """ 

690 pattern = re.compile(r"\$\{(\w+)\}") 

691 depth = 0 

692 

693 for depth in range(max_depth): # noqa: B007 

694 prev_content = content 

695 substitutions_made = False 

696 

697 # Find all matches in current content 

698 matches = list(pattern.finditer(content)) 

699 for match in matches: 

700 var_name = match.group(1) 

701 if var_name in variables: 

702 value = str(variables[var_name]) 

703 content = content.replace(match.group(0), value) 

704 substitutions_made = True 

705 

706 # No more substitutions possible 

707 if content == prev_content or not substitutions_made: 

708 break 

709 

710 # Check if we still have unresolved variables after this pass 

711 remaining = pattern.findall(content) 

712 if not remaining: 

713 break 

714 

715 # Check for unresolved variables 

716 remaining_vars = pattern.findall(content) 

717 if remaining_vars: 

718 if depth >= max_depth - 1: 

719 raise PipelineValidationError( 

720 f"Nested substitution depth exceeded {max_depth} levels", 

721 suggestion=f"Reduce nesting or increase max_depth. Unresolved: {remaining_vars}", 

722 ) 

723 else: 

724 # Some variables are undefined 

725 undefined = [v for v in remaining_vars if v not in variables] 

726 if undefined: 726 ↛ 729line 726 didn't jump to line 729 because the condition on line 726 was always true

727 logger.warning(f"Undefined variables: {undefined}") 

728 

729 logger.debug(f"Variable substitution completed in {depth + 1} passes") 

730 return content 

731 

732 

733def resolve_includes( 

734 pipeline: PipelineDefinition, 

735 base_path: Path, 

736 *, 

737 max_depth: int = 5, 

738 namespace_isolation: bool = True, 

739 _visited: set[str] | None = None, 

740 _depth: int = 0, 

741) -> PipelineDefinition: 

742 """Resolve pipeline includes (composition) with circular detection. 

743 

744 Supports include depth up to 5 levels with dependency graph traversal 

745 for cycle detection. Provides namespace isolation for included pipelines. 

746 

747 Args: 

748 pipeline: Pipeline with includes 

749 base_path: Base path for resolving relative includes 

750 max_depth: Maximum include depth (default 5.) 

751 namespace_isolation: If True, prefix included steps with namespace 

752 _visited: Set of visited pipelines for cycle detection (DFS) 

753 _depth: Current depth (for tracking) 

754 

755 Returns: 

756 Pipeline with includes resolved 

757 

758 Raises: 

759 PipelineValidationError: If circular includes or depth exceeded 

760 

761 Example: 

762 >>> pipeline = load_pipeline("main.yaml") 

763 >>> resolved = resolve_includes(pipeline, Path(".")) 

764 """ 

765 if _visited is None: 

766 _visited = set() 

767 

768 if not pipeline.includes: 

769 return pipeline 

770 

771 # Normalize source file path for comparison 

772 source_key = str(Path(pipeline.source_file).resolve()) if pipeline.source_file else None 

773 

774 # Cycle detection using DFS with visited set 

775 if source_key and source_key in _visited: 

776 cycle_list = [*list(_visited), source_key] 

777 cycle = " → ".join([Path(p).name for p in cycle_list]) 

778 raise PipelineValidationError( 

779 f"Circular pipeline include detected: {cycle}", 

780 suggestion=f"Remove circular dependency from {Path(source_key).name}", 

781 ) 

782 

783 # Depth limit check 

784 if _depth >= max_depth: 

785 chain = " → ".join( 

786 [Path(p).name for p in _visited] + [Path(source_key).name if source_key else "?"] 

787 ) 

788 raise PipelineValidationError( 

789 f"Pipeline include depth exceeded maximum of {max_depth}", 

790 suggestion=f"Reduce nesting. Current chain: {chain}", 

791 ) 

792 

793 if source_key: 

794 _visited.add(source_key) 

795 

796 # Merge included pipelines 

797 merged_steps = [] 

798 for include_path in pipeline.includes: 

799 include_full = base_path / include_path 

800 

801 if not include_full.exists(): 

802 logger.warning(f"Included pipeline not found: {include_path}") 

803 continue 

804 

805 try: 

806 # Load included pipeline 

807 included = load_pipeline(include_full, pipeline.variables) 

808 

809 # Recursively resolve nested includes 

810 resolved = resolve_includes( 

811 included, 

812 include_full.parent, 

813 max_depth=max_depth, 

814 namespace_isolation=namespace_isolation, 

815 _visited=_visited.copy(), # Copy to avoid mutation across branches 

816 _depth=_depth + 1, 

817 ) 

818 

819 # Apply namespace isolation 

820 if namespace_isolation: 820 ↛ 826line 820 didn't jump to line 826 because the condition on line 820 was always true

821 namespace = Path(include_path).stem # Use filename as namespace 

822 namespaced_steps = _apply_namespace(resolved.steps, namespace) 

823 merged_steps.extend(namespaced_steps) 

824 logger.debug(f"Included pipeline '{namespace}' with {len(namespaced_steps)} steps") 

825 else: 

826 merged_steps.extend(resolved.steps) 

827 

828 except Exception as e: 

829 logger.error(f"Failed to include pipeline {include_path}: {e}") 

830 raise PipelineValidationError( 

831 f"Failed to include pipeline: {include_path}", 

832 suggestion=f"Check file exists and is valid YAML: {e}", 

833 ) from e 

834 

835 # Add main pipeline steps after includes 

836 merged_steps.extend(pipeline.steps) 

837 

838 return PipelineDefinition( 

839 name=pipeline.name, 

840 version=pipeline.version, 

841 description=pipeline.description, 

842 steps=merged_steps, 

843 parallel_groups=pipeline.parallel_groups, 

844 variables=pipeline.variables, 

845 includes=[], # Clear after resolution 

846 source_file=pipeline.source_file, 

847 ) 

848 

849 

850def _apply_namespace(steps: list[PipelineStep], namespace: str) -> list[PipelineStep]: 

851 """Apply namespace prefix to pipeline steps. 

852 

853 Args: 

854 steps: Steps to namespace 

855 namespace: Namespace prefix 

856 

857 Returns: 

858 Namespaced steps 

859 

860 Example: 

861 >>> steps = [PipelineStep(name="decode", ...)] 

862 >>> _apply_namespace(steps, "uart") 

863 [PipelineStep(name="uart.decode", ...)] 

864 """ 

865 namespaced = [] 

866 for step in steps: 

867 # Create a copy with namespaced name 

868 namespaced_step = PipelineStep( 

869 name=f"{namespace}.{step.name}", 

870 type=step.type, 

871 params=step.params.copy(), 

872 inputs=step.inputs.copy(), 

873 outputs={k: f"{namespace}.{v}" if "." not in v else v for k, v in step.outputs.items()}, 

874 condition=step.condition, 

875 if_steps=step.if_steps.copy() if step.if_steps else [], 

876 elif_conditions=step.elif_conditions.copy() if step.elif_conditions else [], 

877 else_steps=step.else_steps.copy() if step.else_steps else [], 

878 ) 

879 namespaced.append(namespaced_step) 

880 return namespaced 

881 

882 

883class PipelineTemplate: 

884 """Parameterized pipeline template. 

885 

886 Provides pipeline definition with parameter placeholders that 

887 can be instantiated with different values. 

888 

889 Example: 

890 >>> template = PipelineTemplate.load("analysis_template.yaml") 

891 >>> pipeline = template.instantiate(sample_rate=1e9, protocol="uart") 

892 """ 

893 

894 def __init__(self, definition: PipelineDefinition, parameters: dict[str, dict[str, Any]]): 

895 """Initialize template. 

896 

897 Args: 

898 definition: Base pipeline definition 

899 parameters: Parameter definitions with type, default, required 

900 """ 

901 self.definition = definition 

902 self.parameters = parameters 

903 

904 @classmethod 

905 def load(cls, path: str | Path) -> PipelineTemplate: 

906 """Load template from file. 

907 

908 Args: 

909 path: Path to template file 

910 

911 Returns: 

912 Loaded template 

913 """ 

914 path = Path(path) 

915 

916 with open(path, encoding="utf-8") as f: 

917 data = yaml.safe_load(f) 

918 

919 # Extract parameter definitions 

920 params = data.get("parameters", {}) 

921 parameter_defs = {} 

922 for name, spec in params.items(): 

923 parameter_defs[name] = { 

924 "type": spec.get("type", "string"), 

925 "default": spec.get("default"), 

926 "required": spec.get("required", False), 

927 "description": spec.get("description", ""), 

928 } 

929 

930 # Load pipeline without variable substitution 

931 definition = PipelineDefinition( 

932 name=data.get("pipeline", {}).get("name", path.stem), 

933 version=data.get("pipeline", {}).get("version", "1.0.0"), 

934 description=data.get("pipeline", {}).get("description", ""), 

935 steps=[_parse_step(s) for s in data.get("pipeline", {}).get("steps", [])], 

936 source_file=str(path), 

937 ) 

938 

939 return cls(definition, parameter_defs) 

940 

941 def instantiate(self, **kwargs: Any) -> Pipeline: 

942 """Create pipeline instance with parameter values and type validation. 

943 

944 Validates all parameters against their type specifications before 

945 instantiation. Supports: int, float, string, bool, list, dict. 

946 

947 Args: 

948 **kwargs: Parameter values 

949 

950 Returns: 

951 Instantiated Pipeline 

952 

953 Raises: 

954 PipelineValidationError: If required parameters missing or type mismatch 

955 

956 Example: 

957 >>> template = PipelineTemplate.load("analysis.yaml") 

958 >>> pipeline = template.instantiate(sample_rate=1000000, protocol="uart") 

959 """ 

960 # Collect required and provided parameters 

961 required_params = [ 

962 name for name, spec in self.parameters.items() if spec.get("required", False) 

963 ] 

964 provided_params = list(kwargs.keys()) 

965 missing_params = [p for p in required_params if p not in kwargs] 

966 

967 # Check for missing required parameters 

968 if missing_params: 

969 raise PipelineValidationError( 

970 f"Missing required parameters: {missing_params}", 

971 suggestion=f"Required: {required_params}, provided: {provided_params}", 

972 ) 

973 

974 # Validate parameters with type checking 

975 variables = {} 

976 type_errors = [] 

977 

978 for name, spec in self.parameters.items(): 

979 if name in kwargs: 

980 value = kwargs[name] 

981 expected_type = spec.get("type", "string") 

982 

983 # Type validation with detailed error reporting 

984 if not _validate_type(value, expected_type): 

985 type_errors.append( 

986 f"{name}: expects {expected_type}, got {type(value).__name__} ('{value}')" 

987 ) 

988 continue 

989 

990 variables[name] = value 

991 elif spec.get("required", False): 991 ↛ 993line 991 didn't jump to line 993 because the condition on line 991 was never true

992 # Already caught above, but defensive check 

993 raise PipelineValidationError( 

994 f"Required parameter '{name}' not provided", 

995 suggestion=f"Provide value for '{name}'", 

996 ) 

997 elif "default" in spec: 997 ↛ 978line 997 didn't jump to line 978 because the condition on line 997 was always true

998 default_val = spec["default"] 

999 # Validate default value type 

1000 expected_type = spec.get("type", "string") 

1001 if not _validate_type(default_val, expected_type): 1001 ↛ 1002line 1001 didn't jump to line 1002 because the condition on line 1001 was never true

1002 logger.warning(f"Default value for '{name}' doesn't match type {expected_type}") 

1003 variables[name] = default_val 

1004 

1005 # Report all type errors at once 

1006 if type_errors: 

1007 raise PipelineValidationError( 

1008 f"Type validation failed for {len(type_errors)} parameter(s)", 

1009 suggestion="Fix parameter types:\n - " + "\n - ".join(type_errors), 

1010 ) 

1011 

1012 # Create copy of definition with substituted values 

1013 definition_copy = copy.deepcopy(self.definition) 

1014 definition_copy.variables = variables 

1015 

1016 # Substitute in step params 

1017 for step in definition_copy.steps: 

1018 step.params = _substitute_dict_variables(step.params, variables) 

1019 

1020 logger.info( 

1021 f"Instantiated pipeline template '{self.definition.name}' with {len(variables)} variables" 

1022 ) 

1023 return Pipeline(definition_copy) 

1024 

1025 

1026def _validate_type(value: Any, expected_type: str) -> bool: 

1027 """Validate value matches expected type.""" 

1028 type_map = { 

1029 "string": str, 

1030 "int": int, 

1031 "integer": int, 

1032 "float": float, 

1033 "number": (int, float), 

1034 "bool": bool, 

1035 "boolean": bool, 

1036 "list": list, 

1037 "array": list, 

1038 "dict": dict, 

1039 "object": dict, 

1040 } 

1041 expected = type_map.get(expected_type, str) 

1042 return isinstance(value, expected) # type: ignore[arg-type] 

1043 

1044 

1045def _substitute_dict_variables(d: dict[str, Any], variables: dict[str, Any]) -> dict[str, Any]: 

1046 """Recursively substitute variables in dictionary.""" 

1047 result = {} 

1048 for key, value in d.items(): 

1049 if isinstance(value, str): 

1050 result[key] = _substitute_variables(value, variables) 

1051 elif isinstance(value, dict): 

1052 result[key] = _substitute_dict_variables(value, variables) # type: ignore[assignment] 

1053 elif isinstance(value, list): 

1054 result[key] = [ # type: ignore[assignment] 

1055 _substitute_dict_variables(v, variables) 

1056 if isinstance(v, dict) 

1057 else _substitute_variables(v, variables) 

1058 if isinstance(v, str) 

1059 else v 

1060 for v in value 

1061 ] 

1062 else: 

1063 result[key] = value 

1064 return result 

1065 

1066 

1067__all__ = [ 

1068 "Pipeline", 

1069 "PipelineDefinition", 

1070 "PipelineExecutionError", 

1071 "PipelineResult", 

1072 "PipelineStep", 

1073 "PipelineTemplate", 

1074 "PipelineValidationError", 

1075 "load_pipeline", 

1076 "resolve_includes", 

1077]