Coverage for src / tracekit / config / pipeline.py: 89%
393 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Pipeline configuration and execution system.
3This module provides YAML-based pipeline definitions, loading, execution,
4templates, composition, and conditional logic for analysis workflows with
5transaction semantics, type validation, circular detection, and expression
6language support.
7"""
9from __future__ import annotations
11import copy
12import logging
13import re
14from dataclasses import dataclass, field
15from pathlib import Path
16from typing import TYPE_CHECKING, Any
18import yaml
20from tracekit.config.schema import validate_against_schema
21from tracekit.core.exceptions import ConfigurationError
23if TYPE_CHECKING:
24 from collections.abc import Callable
26logger = logging.getLogger(__name__)
29@dataclass
30class PipelineStep:
31 """Single step in an analysis pipeline.
33 Attributes:
34 name: Step identifier
35 type: Step type (e.g., "input.file", "decoder.uart")
36 params: Step parameters
37 inputs: Input mappings from previous steps
38 outputs: Output definitions
39 condition: Optional condition for execution
40 if_steps: Steps to execute if condition is true
41 elif_conditions: List of (condition, steps) for elif branches
42 else_steps: Steps to execute if condition is false
44 Example:
45 >>> step = PipelineStep(
46 ... name="decode_uart",
47 ... type="decoder.uart",
48 ... params={"baud_rate": 115200}
49 ... )
50 """
52 name: str
53 type: str
54 params: dict[str, Any] = field(default_factory=dict)
55 inputs: dict[str, str] = field(default_factory=dict)
56 outputs: dict[str, str] = field(default_factory=dict)
57 condition: str | None = None
58 if_steps: list[PipelineStep] = field(default_factory=list)
59 elif_conditions: list[tuple[str, list[PipelineStep]]] = field(default_factory=list)
60 else_steps: list[PipelineStep] = field(default_factory=list)
63@dataclass
64class PipelineDefinition:
65 """Complete pipeline definition.
67 Attributes:
68 name: Pipeline identifier
69 version: Pipeline version
70 description: Human-readable description
71 steps: Ordered list of pipeline steps
72 parallel_groups: Groups of steps that can run in parallel
73 variables: Template variables for parameterization
74 includes: List of included sub-pipelines
76 Example:
77 >>> pipeline = PipelineDefinition(
78 ... name="uart_analysis",
79 ... steps=[load_step, decode_step, export_step]
80 ... )
81 """
83 name: str
84 version: str = "1.0.0"
85 description: str = ""
86 steps: list[PipelineStep] = field(default_factory=list)
87 parallel_groups: list[list[str]] = field(default_factory=list)
88 variables: dict[str, Any] = field(default_factory=dict)
89 includes: list[str] = field(default_factory=list)
90 source_file: str | None = None
93@dataclass
94class PipelineResult:
95 """Result of pipeline execution.
97 Attributes:
98 pipeline_name: Name of executed pipeline
99 outputs: Dictionary of output data from steps
100 step_results: Results from each step
101 success: Whether pipeline completed successfully
102 error: Error if failed
103 """
105 pipeline_name: str
106 outputs: dict[str, Any] = field(default_factory=dict)
107 step_results: dict[str, Any] = field(default_factory=dict)
108 success: bool = True
109 error: str | None = None
112class PipelineValidationError(ConfigurationError):
113 """Pipeline validation error with step information.
115 Attributes:
116 step_name: Name of failing step
117 suggestion: Suggested fix
118 """
120 def __init__(
121 self,
122 message: str,
123 *,
124 step_name: str | None = None,
125 line: int | None = None,
126 suggestion: str | None = None,
127 ):
128 self.step_name = step_name
129 self.line = line
130 self.suggestion = suggestion
131 super().__init__(message)
134class PipelineExecutionError(ConfigurationError):
135 """Pipeline execution error.
137 Attributes:
138 step_name: Name of failing step
139 traceback_str: Traceback string if available
140 """
142 def __init__(
143 self,
144 message: str,
145 *,
146 step_name: str | None = None,
147 traceback_str: str | None = None,
148 ):
149 self.step_name = step_name
150 self.traceback_str = traceback_str
151 super().__init__(message)
154class Pipeline:
155 """Executable analysis pipeline.
157 Loads, validates, and executes pipeline definitions with support
158 for progress tracking, error handling, and rollback.
160 Example:
161 >>> pipeline = Pipeline.load("uart_analysis.yaml")
162 >>> pipeline.on_progress(lambda step, pct: print(f"{step}: {pct}%"))
163 >>> results = pipeline.execute()
164 """
166 def __init__(self, definition: PipelineDefinition):
167 """Initialize pipeline from definition.
169 Args:
170 definition: Pipeline definition
171 """
172 self.definition = definition
173 self._progress_callbacks: list[Callable[[str, int], None]] = []
174 self._step_handlers: dict[str, Callable[..., Any]] = {}
175 self._state: dict[str, Any] = {}
176 self._cleanups: list[Callable[[], None]] = []
178 @classmethod
179 def load(cls, path: str | Path, variables: dict[str, Any] | None = None) -> Pipeline:
180 """Load pipeline from YAML file.
182 Args:
183 path: Path to pipeline definition file
184 variables: Template variable values
186 Returns:
187 Loaded and validated Pipeline
189 Example:
190 >>> pipeline = Pipeline.load("pipeline.yaml")
191 >>> pipeline = Pipeline.load("pipeline.yaml", {"input_file": "trace.bin"})
192 """
193 definition = load_pipeline(path, variables)
194 return cls(definition)
196 def on_progress(self, callback: Callable[[str, int], None]) -> None:
197 """Register progress callback.
199 Args:
200 callback: Function called with (step_name, percent_complete)
202 Example:
203 >>> pipeline.on_progress(lambda step, pct: print(f"{step}: {pct}%"))
204 """
205 self._progress_callbacks.append(callback)
207 def register_handler(self, step_type: str, handler: Callable[..., Any]) -> None:
208 """Register handler for step type.
210 Args:
211 step_type: Step type to handle
212 handler: Handler function
213 """
214 self._step_handlers[step_type] = handler
216 def execute(self, dry_run: bool = False) -> PipelineResult:
217 """Execute the pipeline with transaction semantics.
219 The pipeline executes with ACID-like semantics:
220 - All steps complete successfully, or
221 - Pipeline rolls back and cleanup is guaranteed
223 Args:
224 dry_run: If True, validate without executing (dry-run mode)
226 Returns:
227 Pipeline execution result
229 Raises:
230 PipelineExecutionError: If execution fails
232 Example:
233 >>> result = pipeline.execute()
234 >>> result = pipeline.execute(dry_run=True) # Validate only
235 """
236 result = PipelineResult(pipeline_name=self.definition.name)
237 self._state = {}
238 self._cleanups = []
239 committed = False
241 try:
242 total_steps = len(self.definition.steps)
244 # Transaction begin
245 logger.debug(f"Beginning pipeline transaction: {self.definition.name}")
247 for i, step in enumerate(self.definition.steps):
248 progress = int((i / total_steps) * 100)
249 self._notify_progress(step.name, progress)
251 if dry_run:
252 logger.info(f"[DRY RUN] Would execute step: {step.name}")
253 # In dry-run, validate step configuration
254 self._validate_step(step)
255 continue
257 # Check condition with short-circuit evaluation
258 if step.condition:
259 try:
260 should_execute = self._evaluate_condition(step.condition)
261 if not should_execute:
262 logger.info(f"Skipping step '{step.name}' (condition false)")
263 continue
264 except Exception as e:
265 logger.warning(f"Condition evaluation failed for '{step.name}': {e}")
266 continue
268 # Execute step
269 step_result = self._execute_step(step)
270 result.step_results[step.name] = step_result
272 # Store outputs in state with namespace isolation
273 for output_name, output_key in step.outputs.items():
274 namespaced_key = f"{step.name}.{output_name}"
275 self._state[namespaced_key] = step_result.get(output_key)
276 logger.debug(f"Stored output: {namespaced_key}")
278 # Transaction commit
279 logger.debug(f"Committing pipeline transaction: {self.definition.name}")
280 committed = True
282 # Notify completion
283 self._notify_progress("complete", 100)
284 result.success = True
285 result.outputs = dict(self._state)
287 except Exception as e:
288 result.success = False
289 result.error = str(e)
290 logger.error(f"Pipeline execution failed: {e}")
292 # Transaction rollback
293 if not committed: 293 ↛ 297line 293 didn't jump to line 297 because the condition on line 293 was always true
294 logger.warning(f"Rolling back pipeline transaction: {self.definition.name}")
295 self._rollback()
297 if not dry_run: 297 ↛ 304line 297 didn't jump to line 304 because the condition on line 297 was always true
298 raise PipelineExecutionError(
299 f"Pipeline '{self.definition.name}' failed",
300 step_name=step.name if "step" in dir() else None,
301 traceback_str=str(e),
302 ) from e
304 return result
306 def _validate_step(self, step: PipelineStep) -> None:
307 """Validate step configuration (used in dry-run).
309 Args:
310 step: Step to validate
312 Raises:
313 PipelineValidationError: If step configuration invalid
314 """
315 # Check required fields
316 if not step.name: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 raise PipelineValidationError("Step name is required")
318 if not step.type: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 raise PipelineValidationError(f"Step '{step.name}' missing type", step_name=step.name)
321 # Validate input references
322 for input_ref in step.inputs.values(): 322 ↛ 323line 322 didn't jump to line 323 because the loop on line 322 never started
323 if "." not in input_ref and input_ref not in self._state:
324 logger.warning(f"Step '{step.name}' references undefined input: {input_ref}")
326 def _rollback(self) -> None:
327 """Rollback pipeline execution (cleanup all resources).
329 Guaranteed cleanup of all resources allocated during execution.
330 Runs in reverse order of allocation.
331 """
332 logger.info("Running rollback cleanup")
333 self._run_cleanups()
334 self._state.clear()
335 logger.info("Rollback complete")
337 def _execute_step(self, step: PipelineStep) -> dict[str, Any]:
338 """Execute a single pipeline step.
340 Args:
341 step: Step to execute
343 Returns:
344 Step result dictionary
346 Raises:
347 PipelineExecutionError: If no handler found for step type.
348 """
349 logger.debug(f"Executing step: {step.name} (type={step.type})")
351 # Resolve inputs from state
352 resolved_inputs = {}
353 for input_name, input_ref in step.inputs.items():
354 if input_ref in self._state:
355 resolved_inputs[input_name] = self._state[input_ref]
356 else:
357 logger.warning(f"Input '{input_ref}' not found in state")
359 # Get handler
360 handler = self._step_handlers.get(step.type)
361 if handler is None:
362 handler = self._get_default_handler(step.type)
364 if handler is None:
365 raise PipelineExecutionError(
366 f"No handler for step type '{step.type}'", step_name=step.name
367 )
369 # Execute handler
370 result = handler(inputs=resolved_inputs, params=step.params, step_name=step.name)
372 return result if isinstance(result, dict) else {"result": result}
374 def _get_default_handler(self, step_type: str) -> Callable[..., Any] | None:
375 """Get default handler for step type.
377 Args:
378 step_type: Step type
380 Returns:
381 Handler function or None
382 """
383 # Built-in handlers for common step types
384 handlers = {
385 "input.file": self._handle_input_file,
386 "output.json": self._handle_output_json,
387 "analysis.statistics": self._handle_statistics,
388 }
389 return handlers.get(step_type)
391 def _handle_input_file(
392 self,
393 inputs: dict[str, Any],
394 params: dict[str, Any],
395 step_name: str,
396 ) -> dict[str, Any]:
397 """Handle file input step."""
398 # Placeholder - actual implementation would use loaders
399 return {"waveform": params.get("path")}
401 def _handle_output_json(
402 self,
403 inputs: dict[str, Any],
404 params: dict[str, Any],
405 step_name: str,
406 ) -> dict[str, Any]:
407 """Handle JSON output step."""
408 import json
410 path = params.get("path", "output.json")
411 data = inputs.get("data", inputs)
412 with open(path, "w") as f:
413 json.dump(data, f, indent=2, default=str)
414 return {"path": path}
416 def _handle_statistics(
417 self,
418 inputs: dict[str, Any],
419 params: dict[str, Any],
420 step_name: str,
421 ) -> dict[str, Any]:
422 """Handle statistics step."""
423 return {"statistics": {"count": len(inputs)}}
425 def _evaluate_condition(self, condition: str) -> bool:
426 """Evaluate condition expression using expression language.
428 Supports:
429 - Comparison operators: ==, !=, <, >, <=, >=
430 - Logical operators: and, or, not
431 - Field access: data.confidence, step_name.output_name
432 - Short-circuit evaluation (and/or operators)
434 Args:
435 condition: Condition expression string
437 Returns:
438 Evaluation result
440 Example:
441 >>> self._evaluate_condition("data.confidence > 0.8")
442 >>> self._evaluate_condition("decode_uart.packets > 0 and data.valid == True")
443 """
444 try:
445 # Parse and evaluate expression with short-circuit support
446 return self._eval_expression(condition)
447 except Exception as e:
448 logger.warning(f"Condition evaluation failed: {condition} - {e}")
449 return False
451 def _eval_expression(self, expr: str) -> bool:
452 """Evaluate expression with short-circuit logic.
454 Args:
455 expr: Expression string
457 Returns:
458 Boolean result
459 """
460 # Handle logical operators with short-circuit evaluation
461 if " or " in expr:
462 parts = expr.split(" or ", 1)
463 left = self._eval_expression(parts[0].strip())
464 if left: # Short-circuit: if left is True, don't evaluate right
465 logger.debug(f"Short-circuit OR: left={left}, skipping right")
466 return True
467 return self._eval_expression(parts[1].strip())
469 if " and " in expr:
470 parts = expr.split(" and ", 1)
471 left = self._eval_expression(parts[0].strip())
472 if not left: # Short-circuit: if left is False, don't evaluate right
473 logger.debug(f"Short-circuit AND: left={left}, skipping right")
474 return False
475 return self._eval_expression(parts[1].strip())
477 if expr.startswith("not "):
478 return not self._eval_expression(expr[4:].strip())
480 # Evaluate comparison
481 return self._eval_comparison(expr)
483 def _eval_comparison(self, expr: str) -> bool:
484 """Evaluate comparison expression.
486 Args:
487 expr: Comparison expression
489 Returns:
490 Boolean result
491 """
492 operators = ["<=", ">=", "==", "!=", "<", ">"]
494 for op in operators:
495 if op in expr:
496 left, right = expr.split(op, 1)
497 left_val = self._resolve_value(left.strip())
498 right_val = self._resolve_value(right.strip())
500 if op == "==":
501 return left_val == right_val # type: ignore[no-any-return]
502 elif op == "!=": 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true
503 return left_val != right_val # type: ignore[no-any-return]
504 elif op == "<":
505 return left_val < right_val # type: ignore[no-any-return]
506 elif op == ">": 506 ↛ 508line 506 didn't jump to line 508 because the condition on line 506 was always true
507 return left_val > right_val # type: ignore[no-any-return]
508 elif op == "<=":
509 return left_val <= right_val # type: ignore[no-any-return]
510 elif op == ">=":
511 return left_val >= right_val # type: ignore[no-any-return]
513 # No comparison operator found, try as boolean
514 return bool(self._resolve_value(expr.strip()))
516 def _resolve_value(self, value_str: str) -> Any:
517 """Resolve value from string (lookup in state or parse literal).
519 Args:
520 value_str: Value string (field reference or literal)
522 Returns:
523 Resolved value
524 """
525 value_str = value_str.strip()
527 # Check if it's a field reference in state
528 if value_str in self._state:
529 return self._state[value_str]
531 # Try to parse as literal
532 # String literals
533 if (value_str.startswith('"') and value_str.endswith('"')) or (
534 value_str.startswith("'") and value_str.endswith("'")
535 ):
536 return value_str[1:-1]
538 # Boolean literals
539 if value_str.lower() == "true":
540 return True
541 if value_str.lower() == "false":
542 return False
544 # None/null literal
545 if value_str.lower() in ("none", "null"):
546 return None
548 # Try numeric literals
549 try:
550 if "." in value_str:
551 return float(value_str)
552 return int(value_str)
553 except ValueError:
554 pass
556 # Return as string if can't resolve
557 logger.warning(f"Could not resolve value: {value_str}, returning as string")
558 return value_str
560 def _notify_progress(self, step: str, percent: int) -> None:
561 """Notify progress callbacks."""
562 for callback in self._progress_callbacks:
563 try:
564 callback(step, percent)
565 except Exception as e:
566 logger.warning(f"Progress callback failed: {e}")
568 def _run_cleanups(self) -> None:
569 """Run registered cleanup functions."""
570 for cleanup in reversed(self._cleanups): 570 ↛ 571line 570 didn't jump to line 571 because the loop on line 570 never started
571 try:
572 cleanup()
573 except Exception as e:
574 logger.warning(f"Cleanup failed: {e}")
577def load_pipeline(path: str | Path, variables: dict[str, Any] | None = None) -> PipelineDefinition:
578 """Load pipeline definition from file.
580 Args:
581 path: Path to YAML file
582 variables: Template variable values
584 Returns:
585 Pipeline definition
587 Raises:
588 PipelineValidationError: If validation fails
589 """
590 path = Path(path)
592 if not path.exists():
593 raise PipelineValidationError(
594 f"Pipeline file not found: {path}", suggestion="Check file path"
595 )
597 try:
598 with open(path, encoding="utf-8") as f:
599 content = f.read()
601 # Apply template variables
602 if variables:
603 content = _substitute_variables(content, variables)
605 data = yaml.safe_load(content)
606 except yaml.YAMLError as e:
607 raise PipelineValidationError(
608 f"YAML parse error in {path}", suggestion="Check YAML syntax"
609 ) from e
611 # Handle nested 'pipeline' key
612 if "pipeline" in data:
613 data = data["pipeline"]
615 # Validate against schema
616 try:
617 validate_against_schema(data, "pipeline")
618 except Exception as e:
619 raise PipelineValidationError(
620 f"Pipeline validation failed for {path}", suggestion=str(e)
621 ) from e
623 # Parse steps
624 steps = []
625 for step_data in data.get("steps", []):
626 step = _parse_step(step_data)
627 steps.append(step)
629 return PipelineDefinition(
630 name=data.get("name", path.stem),
631 version=data.get("version", "1.0.0"),
632 description=data.get("description", ""),
633 steps=steps,
634 parallel_groups=data.get("parallel_groups", []),
635 variables=variables or {},
636 includes=data.get("includes", []),
637 source_file=str(path),
638 )
641def _parse_step(data: dict[str, Any]) -> PipelineStep:
642 """Parse step from dictionary."""
643 step = PipelineStep(
644 name=data.get("name", "unnamed"),
645 type=data.get("type", "unknown"),
646 params=data.get("params", {}),
647 inputs=data.get("inputs", {}),
648 outputs=data.get("outputs", {}),
649 condition=data.get("condition"),
650 )
652 # Parse conditional steps
653 if "if_steps" in data: 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true
654 step.if_steps = [_parse_step(s) for s in data["if_steps"]]
655 if "elif_conditions" in data: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 for elif_data in data["elif_conditions"]:
657 cond = elif_data.get("condition")
658 steps = [_parse_step(s) for s in elif_data.get("steps", [])]
659 step.elif_conditions.append((cond, steps))
660 if "else_steps" in data: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true
661 step.else_steps = [_parse_step(s) for s in data["else_steps"]]
663 return step
666def _substitute_variables(content: str, variables: dict[str, Any], max_depth: int = 3) -> str:
667 """Substitute template variables in content with nested substitution.
669 Supports nested substitution up to 3 levels deep (CFG-011):
670 - Level 1: ${VAR1} -> "value1"
671 - Level 2: ${VAR2} where VAR2 = "${VAR1}" -> "value1"
672 - Level 3: ${VAR3} where VAR3 = "${VAR2}" -> "value1"
674 Args:
675 content: String content with ${VAR_NAME} placeholders
676 variables: Variable name to value mapping
677 max_depth: Maximum nested substitution depth (default 3.)
679 Returns:
680 Content with variables substituted
682 Raises:
683 PipelineValidationError: If nested substitution depth exceeded
685 Example:
686 >>> vars = {"BASE": "trace", "FILE": "${BASE}.bin"}
687 >>> _substitute_variables("path: ${FILE}", vars)
688 'path: trace.bin'
689 """
690 pattern = re.compile(r"\$\{(\w+)\}")
691 depth = 0
693 for depth in range(max_depth): # noqa: B007
694 prev_content = content
695 substitutions_made = False
697 # Find all matches in current content
698 matches = list(pattern.finditer(content))
699 for match in matches:
700 var_name = match.group(1)
701 if var_name in variables:
702 value = str(variables[var_name])
703 content = content.replace(match.group(0), value)
704 substitutions_made = True
706 # No more substitutions possible
707 if content == prev_content or not substitutions_made:
708 break
710 # Check if we still have unresolved variables after this pass
711 remaining = pattern.findall(content)
712 if not remaining:
713 break
715 # Check for unresolved variables
716 remaining_vars = pattern.findall(content)
717 if remaining_vars:
718 if depth >= max_depth - 1:
719 raise PipelineValidationError(
720 f"Nested substitution depth exceeded {max_depth} levels",
721 suggestion=f"Reduce nesting or increase max_depth. Unresolved: {remaining_vars}",
722 )
723 else:
724 # Some variables are undefined
725 undefined = [v for v in remaining_vars if v not in variables]
726 if undefined: 726 ↛ 729line 726 didn't jump to line 729 because the condition on line 726 was always true
727 logger.warning(f"Undefined variables: {undefined}")
729 logger.debug(f"Variable substitution completed in {depth + 1} passes")
730 return content
733def resolve_includes(
734 pipeline: PipelineDefinition,
735 base_path: Path,
736 *,
737 max_depth: int = 5,
738 namespace_isolation: bool = True,
739 _visited: set[str] | None = None,
740 _depth: int = 0,
741) -> PipelineDefinition:
742 """Resolve pipeline includes (composition) with circular detection.
744 Supports include depth up to 5 levels with dependency graph traversal
745 for cycle detection. Provides namespace isolation for included pipelines.
747 Args:
748 pipeline: Pipeline with includes
749 base_path: Base path for resolving relative includes
750 max_depth: Maximum include depth (default 5.)
751 namespace_isolation: If True, prefix included steps with namespace
752 _visited: Set of visited pipelines for cycle detection (DFS)
753 _depth: Current depth (for tracking)
755 Returns:
756 Pipeline with includes resolved
758 Raises:
759 PipelineValidationError: If circular includes or depth exceeded
761 Example:
762 >>> pipeline = load_pipeline("main.yaml")
763 >>> resolved = resolve_includes(pipeline, Path("."))
764 """
765 if _visited is None:
766 _visited = set()
768 if not pipeline.includes:
769 return pipeline
771 # Normalize source file path for comparison
772 source_key = str(Path(pipeline.source_file).resolve()) if pipeline.source_file else None
774 # Cycle detection using DFS with visited set
775 if source_key and source_key in _visited:
776 cycle_list = [*list(_visited), source_key]
777 cycle = " → ".join([Path(p).name for p in cycle_list])
778 raise PipelineValidationError(
779 f"Circular pipeline include detected: {cycle}",
780 suggestion=f"Remove circular dependency from {Path(source_key).name}",
781 )
783 # Depth limit check
784 if _depth >= max_depth:
785 chain = " → ".join(
786 [Path(p).name for p in _visited] + [Path(source_key).name if source_key else "?"]
787 )
788 raise PipelineValidationError(
789 f"Pipeline include depth exceeded maximum of {max_depth}",
790 suggestion=f"Reduce nesting. Current chain: {chain}",
791 )
793 if source_key:
794 _visited.add(source_key)
796 # Merge included pipelines
797 merged_steps = []
798 for include_path in pipeline.includes:
799 include_full = base_path / include_path
801 if not include_full.exists():
802 logger.warning(f"Included pipeline not found: {include_path}")
803 continue
805 try:
806 # Load included pipeline
807 included = load_pipeline(include_full, pipeline.variables)
809 # Recursively resolve nested includes
810 resolved = resolve_includes(
811 included,
812 include_full.parent,
813 max_depth=max_depth,
814 namespace_isolation=namespace_isolation,
815 _visited=_visited.copy(), # Copy to avoid mutation across branches
816 _depth=_depth + 1,
817 )
819 # Apply namespace isolation
820 if namespace_isolation: 820 ↛ 826line 820 didn't jump to line 826 because the condition on line 820 was always true
821 namespace = Path(include_path).stem # Use filename as namespace
822 namespaced_steps = _apply_namespace(resolved.steps, namespace)
823 merged_steps.extend(namespaced_steps)
824 logger.debug(f"Included pipeline '{namespace}' with {len(namespaced_steps)} steps")
825 else:
826 merged_steps.extend(resolved.steps)
828 except Exception as e:
829 logger.error(f"Failed to include pipeline {include_path}: {e}")
830 raise PipelineValidationError(
831 f"Failed to include pipeline: {include_path}",
832 suggestion=f"Check file exists and is valid YAML: {e}",
833 ) from e
835 # Add main pipeline steps after includes
836 merged_steps.extend(pipeline.steps)
838 return PipelineDefinition(
839 name=pipeline.name,
840 version=pipeline.version,
841 description=pipeline.description,
842 steps=merged_steps,
843 parallel_groups=pipeline.parallel_groups,
844 variables=pipeline.variables,
845 includes=[], # Clear after resolution
846 source_file=pipeline.source_file,
847 )
850def _apply_namespace(steps: list[PipelineStep], namespace: str) -> list[PipelineStep]:
851 """Apply namespace prefix to pipeline steps.
853 Args:
854 steps: Steps to namespace
855 namespace: Namespace prefix
857 Returns:
858 Namespaced steps
860 Example:
861 >>> steps = [PipelineStep(name="decode", ...)]
862 >>> _apply_namespace(steps, "uart")
863 [PipelineStep(name="uart.decode", ...)]
864 """
865 namespaced = []
866 for step in steps:
867 # Create a copy with namespaced name
868 namespaced_step = PipelineStep(
869 name=f"{namespace}.{step.name}",
870 type=step.type,
871 params=step.params.copy(),
872 inputs=step.inputs.copy(),
873 outputs={k: f"{namespace}.{v}" if "." not in v else v for k, v in step.outputs.items()},
874 condition=step.condition,
875 if_steps=step.if_steps.copy() if step.if_steps else [],
876 elif_conditions=step.elif_conditions.copy() if step.elif_conditions else [],
877 else_steps=step.else_steps.copy() if step.else_steps else [],
878 )
879 namespaced.append(namespaced_step)
880 return namespaced
883class PipelineTemplate:
884 """Parameterized pipeline template.
886 Provides pipeline definition with parameter placeholders that
887 can be instantiated with different values.
889 Example:
890 >>> template = PipelineTemplate.load("analysis_template.yaml")
891 >>> pipeline = template.instantiate(sample_rate=1e9, protocol="uart")
892 """
894 def __init__(self, definition: PipelineDefinition, parameters: dict[str, dict[str, Any]]):
895 """Initialize template.
897 Args:
898 definition: Base pipeline definition
899 parameters: Parameter definitions with type, default, required
900 """
901 self.definition = definition
902 self.parameters = parameters
904 @classmethod
905 def load(cls, path: str | Path) -> PipelineTemplate:
906 """Load template from file.
908 Args:
909 path: Path to template file
911 Returns:
912 Loaded template
913 """
914 path = Path(path)
916 with open(path, encoding="utf-8") as f:
917 data = yaml.safe_load(f)
919 # Extract parameter definitions
920 params = data.get("parameters", {})
921 parameter_defs = {}
922 for name, spec in params.items():
923 parameter_defs[name] = {
924 "type": spec.get("type", "string"),
925 "default": spec.get("default"),
926 "required": spec.get("required", False),
927 "description": spec.get("description", ""),
928 }
930 # Load pipeline without variable substitution
931 definition = PipelineDefinition(
932 name=data.get("pipeline", {}).get("name", path.stem),
933 version=data.get("pipeline", {}).get("version", "1.0.0"),
934 description=data.get("pipeline", {}).get("description", ""),
935 steps=[_parse_step(s) for s in data.get("pipeline", {}).get("steps", [])],
936 source_file=str(path),
937 )
939 return cls(definition, parameter_defs)
941 def instantiate(self, **kwargs: Any) -> Pipeline:
942 """Create pipeline instance with parameter values and type validation.
944 Validates all parameters against their type specifications before
945 instantiation. Supports: int, float, string, bool, list, dict.
947 Args:
948 **kwargs: Parameter values
950 Returns:
951 Instantiated Pipeline
953 Raises:
954 PipelineValidationError: If required parameters missing or type mismatch
956 Example:
957 >>> template = PipelineTemplate.load("analysis.yaml")
958 >>> pipeline = template.instantiate(sample_rate=1000000, protocol="uart")
959 """
960 # Collect required and provided parameters
961 required_params = [
962 name for name, spec in self.parameters.items() if spec.get("required", False)
963 ]
964 provided_params = list(kwargs.keys())
965 missing_params = [p for p in required_params if p not in kwargs]
967 # Check for missing required parameters
968 if missing_params:
969 raise PipelineValidationError(
970 f"Missing required parameters: {missing_params}",
971 suggestion=f"Required: {required_params}, provided: {provided_params}",
972 )
974 # Validate parameters with type checking
975 variables = {}
976 type_errors = []
978 for name, spec in self.parameters.items():
979 if name in kwargs:
980 value = kwargs[name]
981 expected_type = spec.get("type", "string")
983 # Type validation with detailed error reporting
984 if not _validate_type(value, expected_type):
985 type_errors.append(
986 f"{name}: expects {expected_type}, got {type(value).__name__} ('{value}')"
987 )
988 continue
990 variables[name] = value
991 elif spec.get("required", False): 991 ↛ 993line 991 didn't jump to line 993 because the condition on line 991 was never true
992 # Already caught above, but defensive check
993 raise PipelineValidationError(
994 f"Required parameter '{name}' not provided",
995 suggestion=f"Provide value for '{name}'",
996 )
997 elif "default" in spec: 997 ↛ 978line 997 didn't jump to line 978 because the condition on line 997 was always true
998 default_val = spec["default"]
999 # Validate default value type
1000 expected_type = spec.get("type", "string")
1001 if not _validate_type(default_val, expected_type): 1001 ↛ 1002line 1001 didn't jump to line 1002 because the condition on line 1001 was never true
1002 logger.warning(f"Default value for '{name}' doesn't match type {expected_type}")
1003 variables[name] = default_val
1005 # Report all type errors at once
1006 if type_errors:
1007 raise PipelineValidationError(
1008 f"Type validation failed for {len(type_errors)} parameter(s)",
1009 suggestion="Fix parameter types:\n - " + "\n - ".join(type_errors),
1010 )
1012 # Create copy of definition with substituted values
1013 definition_copy = copy.deepcopy(self.definition)
1014 definition_copy.variables = variables
1016 # Substitute in step params
1017 for step in definition_copy.steps:
1018 step.params = _substitute_dict_variables(step.params, variables)
1020 logger.info(
1021 f"Instantiated pipeline template '{self.definition.name}' with {len(variables)} variables"
1022 )
1023 return Pipeline(definition_copy)
1026def _validate_type(value: Any, expected_type: str) -> bool:
1027 """Validate value matches expected type."""
1028 type_map = {
1029 "string": str,
1030 "int": int,
1031 "integer": int,
1032 "float": float,
1033 "number": (int, float),
1034 "bool": bool,
1035 "boolean": bool,
1036 "list": list,
1037 "array": list,
1038 "dict": dict,
1039 "object": dict,
1040 }
1041 expected = type_map.get(expected_type, str)
1042 return isinstance(value, expected) # type: ignore[arg-type]
1045def _substitute_dict_variables(d: dict[str, Any], variables: dict[str, Any]) -> dict[str, Any]:
1046 """Recursively substitute variables in dictionary."""
1047 result = {}
1048 for key, value in d.items():
1049 if isinstance(value, str):
1050 result[key] = _substitute_variables(value, variables)
1051 elif isinstance(value, dict):
1052 result[key] = _substitute_dict_variables(value, variables) # type: ignore[assignment]
1053 elif isinstance(value, list):
1054 result[key] = [ # type: ignore[assignment]
1055 _substitute_dict_variables(v, variables)
1056 if isinstance(v, dict)
1057 else _substitute_variables(v, variables)
1058 if isinstance(v, str)
1059 else v
1060 for v in value
1061 ]
1062 else:
1063 result[key] = value
1064 return result
1067__all__ = [
1068 "Pipeline",
1069 "PipelineDefinition",
1070 "PipelineExecutionError",
1071 "PipelineResult",
1072 "PipelineStep",
1073 "PipelineTemplate",
1074 "PipelineValidationError",
1075 "load_pipeline",
1076 "resolve_includes",
1077]