Coverage for src / tracekit / pipeline / parallel.py: 84%
86 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Parallel pipeline execution with automatic dependency analysis.
3This module provides a parallel-capable Pipeline that analyzes stage dependencies
4and executes independent stages concurrently using thread or process pools.
6The ParallelPipeline maintains full API compatibility with the standard Pipeline
7while providing linear speedup for independent transformations.
9Example:
10 >>> from tracekit.pipeline import ParallelPipeline
11 >>> # Create pipeline with independent branches
12 >>> pipeline = ParallelPipeline([
13 ... ('filter1', LowPassFilter(cutoff=1e6)),
14 ... ('filter2', HighPassFilter(cutoff=1e5)),
15 ... ('merge', MergeTransformer())
16 ... ])
17 >>> # Independent filters run in parallel
18 >>> result = pipeline.transform(trace)
20Performance:
21 For N independent stages, ParallelPipeline provides up to Nx speedup
22 compared to sequential execution. Overhead is ~10ms for thread pool,
23 ~50ms for process pool.
25References:
26 IEEE 1057-2017: Parallel processing for digitizer characterization
27 sklearn.pipeline.Pipeline: Sequential pipeline pattern
28"""
30from __future__ import annotations
32import os
33from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
34from typing import TYPE_CHECKING, Literal
36from .pipeline import Pipeline
38if TYPE_CHECKING:
39 from collections.abc import Sequence
41 from ..core.types import WaveformTrace
42 from .base import TraceTransformer
45class ParallelPipeline(Pipeline):
46 """Pipeline with parallel execution of independent stages.
48 Analyzes dependencies between pipeline stages and executes independent
49 stages concurrently. Automatically determines dependency graph from
50 stage inputs/outputs.
52 Stages are independent if they:
53 1. Don't modify shared state
54 2. Only depend on the input trace (not other stage outputs)
55 3. Can be executed in any order
57 Attributes:
58 steps: List of (name, transformer) tuples defining the pipeline stages.
59 named_steps: Dictionary mapping step names to transformers.
60 executor_type: Type of executor ('thread' or 'process').
61 max_workers: Maximum number of parallel workers (None = auto).
63 Example - Independent Filters:
64 >>> # These filters are independent - they both take the input trace
65 >>> pipeline = ParallelPipeline([
66 ... ('lowpass', LowPassFilter(cutoff=1e6)),
67 ... ('highpass', HighPassFilter(cutoff=1e5)),
68 ... ('bandpass', BandPassFilter(low=1e5, high=1e6)),
69 ... ], executor_type='thread', max_workers=3)
70 >>> result = pipeline.transform(trace) # All 3 run in parallel
72 Example - Mixed Sequential and Parallel:
73 >>> # First stage sequential, then parallel analysis
74 >>> pipeline = ParallelPipeline([
75 ... ('preprocess', Normalize()), # Sequential
76 ... ('fft', FFT()), # Parallel (from preprocessed)
77 ... ('wavelet', WaveletTransform()), # Parallel (from preprocessed)
78 ... ('merge', CombineResults()) # Sequential (waits for fft+wavelet)
79 ... ])
81 Example - Process Pool for CPU-Intensive Tasks:
82 >>> # Use process pool for heavy computation
83 >>> pipeline = ParallelPipeline([
84 ... ('fft1', FFT(nfft=65536)),
85 ... ('fft2', FFT(nfft=32768)),
86 ... ('fft3', FFT(nfft=16384)),
87 ... ], executor_type='process', max_workers=None) # Auto worker count
89 Performance Characteristics:
90 Thread pool:
91 - Best for I/O-bound tasks (file loading, network)
92 - Low overhead (~10ms startup)
93 - Shared memory (no serialization)
94 - Limited by GIL for CPU-bound tasks
96 Process pool:
97 - Best for CPU-bound tasks (FFT, filtering, analysis)
98 - Higher overhead (~50ms startup)
99 - Requires picklable transformers
100 - True parallelism (bypasses GIL)
102 References:
103 concurrent.futures.ThreadPoolExecutor
104 concurrent.futures.ProcessPoolExecutor
105 """
107 def __init__(
108 self,
109 steps: Sequence[tuple[str, TraceTransformer]],
110 executor_type: Literal["thread", "process"] = "thread",
111 max_workers: int | None = None,
112 ) -> None:
113 """Initialize parallel pipeline with executor configuration.
115 Args:
116 steps: Sequence of (name, transformer) tuples. Each transformer
117 must be a TraceTransformer instance.
118 executor_type: Type of executor to use:
119 - 'thread': ThreadPoolExecutor (default, low overhead)
120 - 'process': ProcessPoolExecutor (true parallelism, higher overhead)
121 max_workers: Maximum number of parallel workers. If None, uses
122 automatic selection:
123 - Thread pool: min(32, num_cpu + 4)
124 - Process pool: num_cpu
126 Raises:
127 TypeError: If any step is not a TraceTransformer.
128 ValueError: If step names are not unique, empty, or executor_type invalid.
130 Example:
131 >>> # Auto worker count (recommended)
132 >>> pipeline = ParallelPipeline([
133 ... ('stage1', Transformer1()),
134 ... ('stage2', Transformer2())
135 ... ])
136 >>> # Explicit worker count
137 >>> pipeline = ParallelPipeline([
138 ... ('stage1', Transformer1()),
139 ... ('stage2', Transformer2())
140 ... ], max_workers=4)
141 """
142 # Initialize parent Pipeline
143 super().__init__(steps)
145 # Validate executor type
146 if executor_type not in ("thread", "process"):
147 raise ValueError(f"executor_type must be 'thread' or 'process', got '{executor_type}'")
149 self.executor_type = executor_type
150 self.max_workers = max_workers
151 self._dependency_graph: dict[str, list[str]] = {}
152 self._execution_order: list[list[str]] = []
154 # Analyze dependencies on initialization
155 self._analyze_dependencies()
157 def _analyze_dependencies(self) -> None:
158 """Analyze dependencies between pipeline stages.
160 Builds a dependency graph by examining which stages depend on outputs
161 from other stages. Currently uses a conservative approach:
163 - First stage has no dependencies
164 - All other stages depend on the previous stage (sequential)
165 - Future enhancement: analyze transformer inputs to detect true dependencies
167 This conservative approach ensures correctness while still allowing
168 parallel execution when stages are explicitly independent.
170 The dependency analysis produces an execution order as a list of
171 "generations" (lists of stage names). Stages in the same generation
172 can be executed in parallel.
174 Example:
175 For pipeline: [filter1, filter2, merge]
176 If all stages are sequential:
177 execution_order = [['filter1'], ['filter2'], ['merge']]
178 If filter1 and filter2 are independent:
179 execution_order = [['filter1', 'filter2'], ['merge']]
181 NOTE: Current implementation is conservative and assumes sequential
182 dependencies. Future versions will support dependency hints via
183 transformer metadata to enable automatic parallelization.
185 References:
186 FUTURE-002: Advanced dependency analysis
187 """
188 # Build dependency graph (conservative: each stage depends on previous)
189 self._dependency_graph = {}
191 for i, (name, _transformer) in enumerate(self.steps):
192 if i == 0:
193 # First stage has no dependencies
194 self._dependency_graph[name] = []
195 else:
196 # Each stage depends on the previous stage
197 prev_name = self.steps[i - 1][0]
198 self._dependency_graph[name] = [prev_name]
200 # Build execution order (topological sort by generations)
201 self._execution_order = self._compute_execution_order()
203 def _compute_execution_order(self) -> list[list[str]]:
204 """Compute execution order as list of parallel generations.
206 Uses topological sort to group stages into generations where each
207 generation contains stages that can execute in parallel.
209 Returns:
210 List of generations, where each generation is a list of stage names
211 that can execute in parallel.
213 Raises:
214 ValueError: If circular dependencies detected.
216 Example:
217 For graph: {A: [], B: [A], C: [A], D: [B, C]}
218 Returns: [[A], [B, C], [D]]
219 """
220 # Copy dependency graph (we'll modify it)
221 deps = {name: set(deps) for name, deps in self._dependency_graph.items()}
222 generations: list[list[str]] = []
224 # All stage names
225 all_stages = set(self._dependency_graph.keys())
226 completed: set[str] = set()
228 # Build generations
229 while completed != all_stages:
230 # Find all stages with no remaining dependencies
231 ready = [name for name in all_stages - completed if not deps[name]]
233 if not ready: 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was never true
234 # Cycle detected (shouldn't happen with valid pipeline)
235 raise ValueError("Circular dependency detected in pipeline")
237 generations.append(ready)
238 completed.update(ready)
240 # Remove completed stages from dependencies
241 for name in all_stages - completed:
242 deps[name] -= set(ready)
244 return generations
246 def _get_max_workers(self) -> int:
247 """Get the maximum number of workers to use.
249 Returns:
250 Number of workers, using automatic selection if max_workers is None.
252 Example:
253 >>> pipeline._get_max_workers()
254 8 # On 8-core machine with thread executor
255 """
256 if self.max_workers is not None:
257 return self.max_workers
259 # Automatic worker count selection
260 cpu_count = os.cpu_count() or 4
262 if self.executor_type == "thread":
263 # Thread pool: min(32, cpu_count + 4) - default from ThreadPoolExecutor
264 return min(32, cpu_count + 4)
265 else:
266 # Process pool: cpu_count - default from ProcessPoolExecutor
267 return cpu_count
269 def transform(self, trace: WaveformTrace) -> WaveformTrace:
270 """Transform trace through pipeline with parallel execution.
272 Executes stages in parallel when possible, according to the dependency
273 graph. Stages in the same generation run concurrently.
275 Args:
276 trace: Input WaveformTrace to transform.
278 Returns:
279 Transformed WaveformTrace after passing through all stages.
281 Example:
282 >>> result = pipeline.transform(trace)
284 Performance:
285 For N independent stages with T execution time each:
286 - Sequential: N * T
287 - Parallel: T + overhead (~10-50ms)
288 - Speedup: ~Nx (minus overhead)
289 """
290 current = trace
291 self._intermediate_results.clear()
293 # Choose executor based on configuration
294 executor_class = (
295 ThreadPoolExecutor if self.executor_type == "thread" else ProcessPoolExecutor
296 )
298 with executor_class(max_workers=self._get_max_workers()) as executor:
299 # Execute each generation in parallel
300 for generation in self._execution_order:
301 if len(generation) == 1: 301 ↛ 309line 301 didn't jump to line 309 because the condition on line 301 was always true
302 # Single stage - execute directly (no overhead)
303 name = generation[0]
304 transformer = self.named_steps[name]
305 current = transformer.transform(current)
306 self._intermediate_results[name] = current
307 else:
308 # Multiple stages - execute in parallel
309 futures = {}
310 for name in generation:
311 transformer = self.named_steps[name]
312 future = executor.submit(transformer.transform, current)
313 futures[name] = future
315 # Collect results
316 results = {}
317 for name, future in futures.items():
318 result = future.result()
319 results[name] = result
320 self._intermediate_results[name] = result
322 # For conservative sequential execution, current is the last result
323 # For true parallel execution with merge, this would be handled differently
324 current = results[generation[-1]]
326 return current
328 def fit(self, trace: WaveformTrace) -> ParallelPipeline:
329 """Fit all transformers in the pipeline.
331 Fits each transformer sequentially on the output of the previous stage.
332 Fitting is always sequential because it may modify transformer state.
334 Args:
335 trace: Reference WaveformTrace to fit to.
337 Returns:
338 Self for method chaining.
340 Example:
341 >>> pipeline = ParallelPipeline([
342 ... ('normalize', AdaptiveNormalizer()),
343 ... ('filter', AdaptiveFilter())
344 ... ])
345 >>> pipeline.fit(reference_trace)
347 NOTE: fit() is always sequential because transformers may have
348 interdependent learned parameters. Use transform() for parallel execution.
349 """
350 # Fitting is always sequential - reuse parent implementation
351 super().fit(trace)
352 return self
354 def clone(self) -> ParallelPipeline:
355 """Create a copy of this parallel pipeline.
357 Returns:
358 New ParallelPipeline instance with cloned transformers and same configuration.
360 Example:
361 >>> pipeline_copy = pipeline.clone()
362 """
363 cloned_steps = [(name, transformer.clone()) for name, transformer in self.steps]
364 return ParallelPipeline(
365 cloned_steps, executor_type=self.executor_type, max_workers=self.max_workers
366 )
368 def get_dependency_graph(self) -> dict[str, list[str]]:
369 """Get the dependency graph for pipeline stages.
371 Returns:
372 Dictionary mapping stage names to list of dependency stage names.
374 Example:
375 >>> graph = pipeline.get_dependency_graph()
376 >>> print(graph)
377 {'filter1': [], 'filter2': ['filter1'], 'merge': ['filter2']}
379 References:
380 API-006: Pipeline Dependency Analysis
381 """
382 return self._dependency_graph.copy()
384 def get_execution_order(self) -> list[list[str]]:
385 """Get the execution order as parallel generations.
387 Returns:
388 List of generations, where each generation is a list of stage names
389 that will execute in parallel.
391 Example:
392 >>> order = pipeline.get_execution_order()
393 >>> print(order)
394 [['filter1', 'filter2'], ['merge']]
395 >>> # filter1 and filter2 run in parallel, then merge
397 References:
398 API-006: Pipeline Dependency Analysis
399 """
400 return [gen.copy() for gen in self._execution_order]
402 def set_parallel_config(
403 self,
404 executor_type: Literal["thread", "process"] | None = None,
405 max_workers: int | None = None,
406 ) -> ParallelPipeline:
407 """Update parallel execution configuration.
409 Args:
410 executor_type: New executor type ('thread' or 'process'). If None, keeps current.
411 max_workers: New max worker count. If None, keeps current (may be auto).
413 Returns:
414 Self for method chaining.
416 Raises:
417 ValueError: If executor_type is invalid.
419 Example:
420 >>> # Switch to process pool with 4 workers
421 >>> pipeline.set_parallel_config(executor_type='process', max_workers=4)
422 >>> # Switch to auto worker count
423 >>> pipeline.set_parallel_config(max_workers=None)
425 References:
426 API-006: Pipeline Parallel Configuration
427 """
428 if executor_type is not None:
429 if executor_type not in ("thread", "process"):
430 raise ValueError(
431 f"executor_type must be 'thread' or 'process', got '{executor_type}'"
432 )
433 self.executor_type = executor_type
435 if max_workers is not None:
436 self.max_workers = max_workers
438 return self
440 def __repr__(self) -> str:
441 """String representation of the parallel pipeline."""
442 step_strs = [
443 f"('{name}', {transformer.__class__.__name__})" for name, transformer in self.steps
444 ]
445 config_str = f"executor={self.executor_type}, workers={self.max_workers or 'auto'}"
446 return "ParallelPipeline([\n " + ",\n ".join(step_strs) + f"\n], {config_str})"
449__all__ = ["ParallelPipeline"]