Coverage for src / tracekit / pipeline / parallel.py: 84%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Parallel pipeline execution with automatic dependency analysis. 

2 

3This module provides a parallel-capable Pipeline that analyzes stage dependencies 

4and executes independent stages concurrently using thread or process pools. 

5 

6The ParallelPipeline maintains full API compatibility with the standard Pipeline 

7while providing linear speedup for independent transformations. 

8 

9Example: 

10 >>> from tracekit.pipeline import ParallelPipeline 

11 >>> # Create pipeline with independent branches 

12 >>> pipeline = ParallelPipeline([ 

13 ... ('filter1', LowPassFilter(cutoff=1e6)), 

14 ... ('filter2', HighPassFilter(cutoff=1e5)), 

15 ... ('merge', MergeTransformer()) 

16 ... ]) 

17 >>> # Independent filters run in parallel 

18 >>> result = pipeline.transform(trace) 

19 

20Performance: 

21 For N independent stages, ParallelPipeline provides up to Nx speedup 

22 compared to sequential execution. Overhead is ~10ms for thread pool, 

23 ~50ms for process pool. 

24 

25References: 

26 IEEE 1057-2017: Parallel processing for digitizer characterization 

27 sklearn.pipeline.Pipeline: Sequential pipeline pattern 

28""" 

29 

30from __future__ import annotations 

31 

32import os 

33from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor 

34from typing import TYPE_CHECKING, Literal 

35 

36from .pipeline import Pipeline 

37 

38if TYPE_CHECKING: 

39 from collections.abc import Sequence 

40 

41 from ..core.types import WaveformTrace 

42 from .base import TraceTransformer 

43 

44 

45class ParallelPipeline(Pipeline): 

46 """Pipeline with parallel execution of independent stages. 

47 

48 Analyzes dependencies between pipeline stages and executes independent 

49 stages concurrently. Automatically determines dependency graph from 

50 stage inputs/outputs. 

51 

52 Stages are independent if they: 

53 1. Don't modify shared state 

54 2. Only depend on the input trace (not other stage outputs) 

55 3. Can be executed in any order 

56 

57 Attributes: 

58 steps: List of (name, transformer) tuples defining the pipeline stages. 

59 named_steps: Dictionary mapping step names to transformers. 

60 executor_type: Type of executor ('thread' or 'process'). 

61 max_workers: Maximum number of parallel workers (None = auto). 

62 

63 Example - Independent Filters: 

64 >>> # These filters are independent - they both take the input trace 

65 >>> pipeline = ParallelPipeline([ 

66 ... ('lowpass', LowPassFilter(cutoff=1e6)), 

67 ... ('highpass', HighPassFilter(cutoff=1e5)), 

68 ... ('bandpass', BandPassFilter(low=1e5, high=1e6)), 

69 ... ], executor_type='thread', max_workers=3) 

70 >>> result = pipeline.transform(trace) # All 3 run in parallel 

71 

72 Example - Mixed Sequential and Parallel: 

73 >>> # First stage sequential, then parallel analysis 

74 >>> pipeline = ParallelPipeline([ 

75 ... ('preprocess', Normalize()), # Sequential 

76 ... ('fft', FFT()), # Parallel (from preprocessed) 

77 ... ('wavelet', WaveletTransform()), # Parallel (from preprocessed) 

78 ... ('merge', CombineResults()) # Sequential (waits for fft+wavelet) 

79 ... ]) 

80 

81 Example - Process Pool for CPU-Intensive Tasks: 

82 >>> # Use process pool for heavy computation 

83 >>> pipeline = ParallelPipeline([ 

84 ... ('fft1', FFT(nfft=65536)), 

85 ... ('fft2', FFT(nfft=32768)), 

86 ... ('fft3', FFT(nfft=16384)), 

87 ... ], executor_type='process', max_workers=None) # Auto worker count 

88 

89 Performance Characteristics: 

90 Thread pool: 

91 - Best for I/O-bound tasks (file loading, network) 

92 - Low overhead (~10ms startup) 

93 - Shared memory (no serialization) 

94 - Limited by GIL for CPU-bound tasks 

95 

96 Process pool: 

97 - Best for CPU-bound tasks (FFT, filtering, analysis) 

98 - Higher overhead (~50ms startup) 

99 - Requires picklable transformers 

100 - True parallelism (bypasses GIL) 

101 

102 References: 

103 concurrent.futures.ThreadPoolExecutor 

104 concurrent.futures.ProcessPoolExecutor 

105 """ 

106 

107 def __init__( 

108 self, 

109 steps: Sequence[tuple[str, TraceTransformer]], 

110 executor_type: Literal["thread", "process"] = "thread", 

111 max_workers: int | None = None, 

112 ) -> None: 

113 """Initialize parallel pipeline with executor configuration. 

114 

115 Args: 

116 steps: Sequence of (name, transformer) tuples. Each transformer 

117 must be a TraceTransformer instance. 

118 executor_type: Type of executor to use: 

119 - 'thread': ThreadPoolExecutor (default, low overhead) 

120 - 'process': ProcessPoolExecutor (true parallelism, higher overhead) 

121 max_workers: Maximum number of parallel workers. If None, uses 

122 automatic selection: 

123 - Thread pool: min(32, num_cpu + 4) 

124 - Process pool: num_cpu 

125 

126 Raises: 

127 TypeError: If any step is not a TraceTransformer. 

128 ValueError: If step names are not unique, empty, or executor_type invalid. 

129 

130 Example: 

131 >>> # Auto worker count (recommended) 

132 >>> pipeline = ParallelPipeline([ 

133 ... ('stage1', Transformer1()), 

134 ... ('stage2', Transformer2()) 

135 ... ]) 

136 >>> # Explicit worker count 

137 >>> pipeline = ParallelPipeline([ 

138 ... ('stage1', Transformer1()), 

139 ... ('stage2', Transformer2()) 

140 ... ], max_workers=4) 

141 """ 

142 # Initialize parent Pipeline 

143 super().__init__(steps) 

144 

145 # Validate executor type 

146 if executor_type not in ("thread", "process"): 

147 raise ValueError(f"executor_type must be 'thread' or 'process', got '{executor_type}'") 

148 

149 self.executor_type = executor_type 

150 self.max_workers = max_workers 

151 self._dependency_graph: dict[str, list[str]] = {} 

152 self._execution_order: list[list[str]] = [] 

153 

154 # Analyze dependencies on initialization 

155 self._analyze_dependencies() 

156 

157 def _analyze_dependencies(self) -> None: 

158 """Analyze dependencies between pipeline stages. 

159 

160 Builds a dependency graph by examining which stages depend on outputs 

161 from other stages. Currently uses a conservative approach: 

162 

163 - First stage has no dependencies 

164 - All other stages depend on the previous stage (sequential) 

165 - Future enhancement: analyze transformer inputs to detect true dependencies 

166 

167 This conservative approach ensures correctness while still allowing 

168 parallel execution when stages are explicitly independent. 

169 

170 The dependency analysis produces an execution order as a list of 

171 "generations" (lists of stage names). Stages in the same generation 

172 can be executed in parallel. 

173 

174 Example: 

175 For pipeline: [filter1, filter2, merge] 

176 If all stages are sequential: 

177 execution_order = [['filter1'], ['filter2'], ['merge']] 

178 If filter1 and filter2 are independent: 

179 execution_order = [['filter1', 'filter2'], ['merge']] 

180 

181 NOTE: Current implementation is conservative and assumes sequential 

182 dependencies. Future versions will support dependency hints via 

183 transformer metadata to enable automatic parallelization. 

184 

185 References: 

186 FUTURE-002: Advanced dependency analysis 

187 """ 

188 # Build dependency graph (conservative: each stage depends on previous) 

189 self._dependency_graph = {} 

190 

191 for i, (name, _transformer) in enumerate(self.steps): 

192 if i == 0: 

193 # First stage has no dependencies 

194 self._dependency_graph[name] = [] 

195 else: 

196 # Each stage depends on the previous stage 

197 prev_name = self.steps[i - 1][0] 

198 self._dependency_graph[name] = [prev_name] 

199 

200 # Build execution order (topological sort by generations) 

201 self._execution_order = self._compute_execution_order() 

202 

203 def _compute_execution_order(self) -> list[list[str]]: 

204 """Compute execution order as list of parallel generations. 

205 

206 Uses topological sort to group stages into generations where each 

207 generation contains stages that can execute in parallel. 

208 

209 Returns: 

210 List of generations, where each generation is a list of stage names 

211 that can execute in parallel. 

212 

213 Raises: 

214 ValueError: If circular dependencies detected. 

215 

216 Example: 

217 For graph: {A: [], B: [A], C: [A], D: [B, C]} 

218 Returns: [[A], [B, C], [D]] 

219 """ 

220 # Copy dependency graph (we'll modify it) 

221 deps = {name: set(deps) for name, deps in self._dependency_graph.items()} 

222 generations: list[list[str]] = [] 

223 

224 # All stage names 

225 all_stages = set(self._dependency_graph.keys()) 

226 completed: set[str] = set() 

227 

228 # Build generations 

229 while completed != all_stages: 

230 # Find all stages with no remaining dependencies 

231 ready = [name for name in all_stages - completed if not deps[name]] 

232 

233 if not ready: 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was never true

234 # Cycle detected (shouldn't happen with valid pipeline) 

235 raise ValueError("Circular dependency detected in pipeline") 

236 

237 generations.append(ready) 

238 completed.update(ready) 

239 

240 # Remove completed stages from dependencies 

241 for name in all_stages - completed: 

242 deps[name] -= set(ready) 

243 

244 return generations 

245 

246 def _get_max_workers(self) -> int: 

247 """Get the maximum number of workers to use. 

248 

249 Returns: 

250 Number of workers, using automatic selection if max_workers is None. 

251 

252 Example: 

253 >>> pipeline._get_max_workers() 

254 8 # On 8-core machine with thread executor 

255 """ 

256 if self.max_workers is not None: 

257 return self.max_workers 

258 

259 # Automatic worker count selection 

260 cpu_count = os.cpu_count() or 4 

261 

262 if self.executor_type == "thread": 

263 # Thread pool: min(32, cpu_count + 4) - default from ThreadPoolExecutor 

264 return min(32, cpu_count + 4) 

265 else: 

266 # Process pool: cpu_count - default from ProcessPoolExecutor 

267 return cpu_count 

268 

269 def transform(self, trace: WaveformTrace) -> WaveformTrace: 

270 """Transform trace through pipeline with parallel execution. 

271 

272 Executes stages in parallel when possible, according to the dependency 

273 graph. Stages in the same generation run concurrently. 

274 

275 Args: 

276 trace: Input WaveformTrace to transform. 

277 

278 Returns: 

279 Transformed WaveformTrace after passing through all stages. 

280 

281 Example: 

282 >>> result = pipeline.transform(trace) 

283 

284 Performance: 

285 For N independent stages with T execution time each: 

286 - Sequential: N * T 

287 - Parallel: T + overhead (~10-50ms) 

288 - Speedup: ~Nx (minus overhead) 

289 """ 

290 current = trace 

291 self._intermediate_results.clear() 

292 

293 # Choose executor based on configuration 

294 executor_class = ( 

295 ThreadPoolExecutor if self.executor_type == "thread" else ProcessPoolExecutor 

296 ) 

297 

298 with executor_class(max_workers=self._get_max_workers()) as executor: 

299 # Execute each generation in parallel 

300 for generation in self._execution_order: 

301 if len(generation) == 1: 301 ↛ 309line 301 didn't jump to line 309 because the condition on line 301 was always true

302 # Single stage - execute directly (no overhead) 

303 name = generation[0] 

304 transformer = self.named_steps[name] 

305 current = transformer.transform(current) 

306 self._intermediate_results[name] = current 

307 else: 

308 # Multiple stages - execute in parallel 

309 futures = {} 

310 for name in generation: 

311 transformer = self.named_steps[name] 

312 future = executor.submit(transformer.transform, current) 

313 futures[name] = future 

314 

315 # Collect results 

316 results = {} 

317 for name, future in futures.items(): 

318 result = future.result() 

319 results[name] = result 

320 self._intermediate_results[name] = result 

321 

322 # For conservative sequential execution, current is the last result 

323 # For true parallel execution with merge, this would be handled differently 

324 current = results[generation[-1]] 

325 

326 return current 

327 

328 def fit(self, trace: WaveformTrace) -> ParallelPipeline: 

329 """Fit all transformers in the pipeline. 

330 

331 Fits each transformer sequentially on the output of the previous stage. 

332 Fitting is always sequential because it may modify transformer state. 

333 

334 Args: 

335 trace: Reference WaveformTrace to fit to. 

336 

337 Returns: 

338 Self for method chaining. 

339 

340 Example: 

341 >>> pipeline = ParallelPipeline([ 

342 ... ('normalize', AdaptiveNormalizer()), 

343 ... ('filter', AdaptiveFilter()) 

344 ... ]) 

345 >>> pipeline.fit(reference_trace) 

346 

347 NOTE: fit() is always sequential because transformers may have 

348 interdependent learned parameters. Use transform() for parallel execution. 

349 """ 

350 # Fitting is always sequential - reuse parent implementation 

351 super().fit(trace) 

352 return self 

353 

354 def clone(self) -> ParallelPipeline: 

355 """Create a copy of this parallel pipeline. 

356 

357 Returns: 

358 New ParallelPipeline instance with cloned transformers and same configuration. 

359 

360 Example: 

361 >>> pipeline_copy = pipeline.clone() 

362 """ 

363 cloned_steps = [(name, transformer.clone()) for name, transformer in self.steps] 

364 return ParallelPipeline( 

365 cloned_steps, executor_type=self.executor_type, max_workers=self.max_workers 

366 ) 

367 

368 def get_dependency_graph(self) -> dict[str, list[str]]: 

369 """Get the dependency graph for pipeline stages. 

370 

371 Returns: 

372 Dictionary mapping stage names to list of dependency stage names. 

373 

374 Example: 

375 >>> graph = pipeline.get_dependency_graph() 

376 >>> print(graph) 

377 {'filter1': [], 'filter2': ['filter1'], 'merge': ['filter2']} 

378 

379 References: 

380 API-006: Pipeline Dependency Analysis 

381 """ 

382 return self._dependency_graph.copy() 

383 

384 def get_execution_order(self) -> list[list[str]]: 

385 """Get the execution order as parallel generations. 

386 

387 Returns: 

388 List of generations, where each generation is a list of stage names 

389 that will execute in parallel. 

390 

391 Example: 

392 >>> order = pipeline.get_execution_order() 

393 >>> print(order) 

394 [['filter1', 'filter2'], ['merge']] 

395 >>> # filter1 and filter2 run in parallel, then merge 

396 

397 References: 

398 API-006: Pipeline Dependency Analysis 

399 """ 

400 return [gen.copy() for gen in self._execution_order] 

401 

402 def set_parallel_config( 

403 self, 

404 executor_type: Literal["thread", "process"] | None = None, 

405 max_workers: int | None = None, 

406 ) -> ParallelPipeline: 

407 """Update parallel execution configuration. 

408 

409 Args: 

410 executor_type: New executor type ('thread' or 'process'). If None, keeps current. 

411 max_workers: New max worker count. If None, keeps current (may be auto). 

412 

413 Returns: 

414 Self for method chaining. 

415 

416 Raises: 

417 ValueError: If executor_type is invalid. 

418 

419 Example: 

420 >>> # Switch to process pool with 4 workers 

421 >>> pipeline.set_parallel_config(executor_type='process', max_workers=4) 

422 >>> # Switch to auto worker count 

423 >>> pipeline.set_parallel_config(max_workers=None) 

424 

425 References: 

426 API-006: Pipeline Parallel Configuration 

427 """ 

428 if executor_type is not None: 

429 if executor_type not in ("thread", "process"): 

430 raise ValueError( 

431 f"executor_type must be 'thread' or 'process', got '{executor_type}'" 

432 ) 

433 self.executor_type = executor_type 

434 

435 if max_workers is not None: 

436 self.max_workers = max_workers 

437 

438 return self 

439 

440 def __repr__(self) -> str: 

441 """String representation of the parallel pipeline.""" 

442 step_strs = [ 

443 f"('{name}', {transformer.__class__.__name__})" for name, transformer in self.steps 

444 ] 

445 config_str = f"executor={self.executor_type}, workers={self.max_workers or 'auto'}" 

446 return "ParallelPipeline([\n " + ",\n ".join(step_strs) + f"\n], {config_str})" 

447 

448 

449__all__ = ["ParallelPipeline"]