Coverage for src / tracekit / optimization / parallel.py: 100%
126 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Parallel processing utilities for optimization and analysis.
3This module provides utilities for efficient parallel execution of analysis tasks
4using both thread and process-based parallelism.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Callable
11from concurrent.futures import (
12 ProcessPoolExecutor,
13 ThreadPoolExecutor,
14 as_completed,
15)
16from dataclasses import dataclass
17from typing import TYPE_CHECKING, Any
19import numpy as np
21from tracekit.core.exceptions import AnalysisError
23if TYPE_CHECKING:
24 from collections.abc import Iterable
26 from numpy.typing import NDArray
28logger = logging.getLogger(__name__)
31@dataclass
32class ParallelResult[R]:
33 """Result from parallel execution.
35 Attributes:
36 results: List of results from all tasks.
37 execution_time: Total execution time in seconds.
38 success_count: Number of successfully completed tasks.
39 error_count: Number of failed tasks.
40 errors: List of exceptions encountered.
42 Example:
43 >>> result = parallel_map(fn, items)
44 >>> print(f"Completed {result.success_count}/{len(items)}")
45 """
47 results: list[R]
48 execution_time: float
49 success_count: int
50 error_count: int
51 errors: list[Exception] | None = None
54@dataclass
55class WorkerPool:
56 """Configuration for worker pool management.
58 Attributes:
59 max_workers: Maximum number of workers.
60 use_threads: Use threads (True) or processes (False).
61 timeout: Timeout per task in seconds.
62 chunk_size: Number of items per worker chunk.
64 Example:
65 >>> pool = WorkerPool(max_workers=4, use_threads=True, timeout=30)
66 """
68 max_workers: int = 4
69 use_threads: bool = True
70 timeout: float | None = None
71 chunk_size: int = 1
74def get_optimal_workers(max_workers: int | None = None) -> int:
75 """Get optimal number of workers for current system.
77 Uses CPU count by default, respecting max_workers limit.
79 Args:
80 max_workers: Maximum workers to use. None for all CPUs.
82 Returns:
83 Optimal number of workers.
85 Example:
86 >>> workers = get_optimal_workers(max_workers=8)
87 """
88 import os
90 cpu_count = os.cpu_count() or 1
91 if max_workers is None:
92 return cpu_count
93 return min(max_workers, cpu_count)
96def parallel_map[T, R](
97 func: Callable[[T], R],
98 iterable: Iterable[T],
99 *,
100 max_workers: int | None = None,
101 use_threads: bool = True,
102 timeout: float | None = None,
103 collect_errors: bool = True,
104) -> ParallelResult[R]:
105 """Apply function to items in parallel.
107 Maps a function over an iterable using either threads or processes.
109 Args:
110 func: Function to apply to each item.
111 iterable: Items to process.
112 max_workers: Maximum concurrent workers.
113 use_threads: Use threads (True) or processes (False).
114 timeout: Timeout per task in seconds.
115 collect_errors: Collect errors instead of raising.
117 Returns:
118 ParallelResult with results and execution stats.
120 Raises:
121 AnalysisError: If collect_errors=False and a task fails.
123 Example:
124 >>> def process_item(x):
125 ... return x * 2
126 >>> result = parallel_map(process_item, range(100))
127 >>> print(f"Completed: {result.success_count}")
129 References:
130 OPT-001: Parallel Execution Framework
131 """
132 import time
134 items = list(iterable)
135 if not items:
136 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0)
138 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
139 max_workers = get_optimal_workers(max_workers)
141 start_time = time.time()
142 results: list[R] = [None] * len(items) # type: ignore[list-item]
143 errors: list[Exception] = []
144 success_count = 0
145 error_count = 0
147 with executor_class(max_workers=max_workers) as executor:
148 futures = {executor.submit(func, item): i for i, item in enumerate(items)}
150 for future in as_completed(futures, timeout=timeout):
151 idx = futures[future]
152 try:
153 results[idx] = future.result()
154 success_count += 1
155 except Exception as e:
156 error_count += 1
157 errors.append(e)
159 if not collect_errors:
160 execution_time = time.time() - start_time
161 raise AnalysisError(f"Task {idx} failed: {e!s}") from e
163 execution_time = time.time() - start_time
165 return ParallelResult(
166 results=results,
167 execution_time=execution_time,
168 success_count=success_count,
169 error_count=error_count,
170 errors=errors if errors else None,
171 )
174def parallel_reduce[T, R](
175 func: Callable[[T], R],
176 iterable: Iterable[T],
177 reducer: Callable[[list[R]], Any],
178 *,
179 max_workers: int | None = None,
180 use_threads: bool = True,
181 timeout: float | None = None,
182) -> Any:
183 """Map and reduce results in parallel.
185 Applies function to items in parallel, then reduces results.
187 Args:
188 func: Function to apply to each item.
189 iterable: Items to process.
190 reducer: Function to reduce list of results.
191 max_workers: Maximum concurrent workers.
192 use_threads: Use threads (True) or processes (False).
193 timeout: Timeout per task in seconds.
195 Returns:
196 Reduced result.
198 Example:
199 >>> def compute(x):
200 ... return x * 2
201 >>> result = parallel_reduce(
202 ... compute,
203 ... range(100),
204 ... reducer=lambda x: sum(x)
205 ... )
207 References:
208 OPT-001: Parallel Execution Framework
209 """
210 result = parallel_map(
211 func,
212 iterable,
213 max_workers=max_workers,
214 use_threads=use_threads,
215 timeout=timeout,
216 collect_errors=False,
217 )
219 return reducer(result.results)
222def batch_parallel_map[T, R](
223 func: Callable[[list[T]], list[R]],
224 iterable: Iterable[T],
225 *,
226 batch_size: int = 100,
227 max_workers: int | None = None,
228 use_threads: bool = True,
229 timeout: float | None = None,
230) -> ParallelResult[R]:
231 """Apply function to batches of items in parallel.
233 Processes items in batches, useful when function benefits from
234 batch processing.
236 Args:
237 func: Function accepting list of items.
238 iterable: Items to process.
239 batch_size: Number of items per batch.
240 max_workers: Maximum concurrent workers.
241 use_threads: Use threads (True) or processes (False).
242 timeout: Timeout per batch in seconds.
244 Returns:
245 ParallelResult with flattened results.
247 Example:
248 >>> def process_batch(items):
249 ... return [x * 2 for x in items]
250 >>> result = batch_parallel_map(
251 ... process_batch,
252 ... range(1000),
253 ... batch_size=100
254 ... )
256 References:
257 OPT-001: Parallel Execution Framework
258 """
259 import time
261 items = list(iterable)
262 if not items:
263 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0)
265 # Create batches
266 batches = [items[i : i + batch_size] for i in range(0, len(items), batch_size)]
268 start_time = time.time()
269 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
270 max_workers = get_optimal_workers(max_workers)
272 all_results: list[R] = []
273 errors: list[Exception] = []
274 success_count = 0
275 error_count = 0
277 with executor_class(max_workers=max_workers) as executor:
278 futures = {executor.submit(func, batch): i for i, batch in enumerate(batches)}
280 for future in as_completed(futures, timeout=timeout):
281 try:
282 batch_results = future.result()
283 all_results.extend(batch_results)
284 success_count += 1
285 except Exception as e:
286 error_count += 1
287 errors.append(e)
289 execution_time = time.time() - start_time
291 return ParallelResult(
292 results=all_results,
293 execution_time=execution_time,
294 success_count=success_count,
295 error_count=error_count,
296 errors=errors if errors else None,
297 )
300def parallel_filter[T](
301 func: Callable[[T], bool],
302 iterable: Iterable[T],
303 *,
304 max_workers: int | None = None,
305 use_threads: bool = True,
306 timeout: float | None = None,
307) -> ParallelResult[T]:
308 """Filter items in parallel.
310 Applies predicate to items in parallel, filtering results.
312 Args:
313 func: Predicate function returning True to keep item.
314 iterable: Items to filter.
315 max_workers: Maximum concurrent workers.
316 use_threads: Use threads (True) or processes (False).
317 timeout: Timeout per task in seconds.
319 Returns:
320 ParallelResult with filtered items.
322 Example:
323 >>> def is_even(x):
324 ... return x % 2 == 0
325 >>> result = parallel_filter(is_even, range(100))
327 References:
328 OPT-001: Parallel Execution Framework
329 """
330 import time
332 items = list(iterable)
333 if not items:
334 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0)
336 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
337 max_workers = get_optimal_workers(max_workers)
339 start_time = time.time()
340 results: list[T] = []
341 errors: list[Exception] = []
342 success_count = 0
343 error_count = 0
345 with executor_class(max_workers=max_workers) as executor:
346 futures = {executor.submit(func, item): item for item in items}
348 for future in as_completed(futures, timeout=timeout):
349 item = futures[future]
350 try:
351 if future.result():
352 results.append(item)
353 success_count += 1
354 except Exception as e:
355 error_count += 1
356 errors.append(e)
358 execution_time = time.time() - start_time
360 return ParallelResult(
361 results=results,
362 execution_time=execution_time,
363 success_count=success_count,
364 error_count=error_count,
365 errors=errors if errors else None,
366 )
369def chunked_parallel_map(
370 func: Callable[[NDArray[np.float64]], NDArray[np.float64]],
371 data: NDArray[np.float64],
372 *,
373 chunk_size: int = 10000,
374 max_workers: int | None = None,
375 use_threads: bool = True,
376 timeout: float | None = None,
377) -> NDArray[np.float64]:
378 """Apply function to chunks of array data in parallel.
380 Useful for processing large arrays where parallelization overhead
381 is justified.
383 Args:
384 func: Function accepting 1D array chunk.
385 data: Array to process.
386 chunk_size: Number of samples per chunk.
387 max_workers: Maximum concurrent workers.
388 use_threads: Use threads (True) or processes (False).
389 timeout: Timeout per chunk in seconds.
391 Returns:
392 Processed array (concatenated chunks).
394 Raises:
395 AnalysisError: If processing fails.
397 Example:
398 >>> def process_chunk(chunk):
399 ... return np.fft.fft(chunk)
400 >>> result = chunked_parallel_map(process_chunk, data, chunk_size=1000)
402 References:
403 OPT-001: Parallel Execution Framework
404 """
405 if len(data) == 0:
406 return np.array([])
408 if len(data) <= chunk_size:
409 return func(data)
411 # Create chunks
412 chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
414 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
415 max_workers = get_optimal_workers(max_workers)
417 results: list[NDArray[np.float64]] = []
419 with executor_class(max_workers=max_workers) as executor:
420 futures = {executor.submit(func, chunk): i for i, chunk in enumerate(chunks)}
422 for future in as_completed(futures, timeout=timeout):
423 try:
424 results.append(future.result())
425 except Exception as e:
426 raise AnalysisError(f"Chunk processing failed: {e!s}") from e
428 return np.concatenate(results)
431__all__ = [
432 "ParallelResult",
433 "WorkerPool",
434 "batch_parallel_map",
435 "chunked_parallel_map",
436 "get_optimal_workers",
437 "parallel_filter",
438 "parallel_map",
439 "parallel_reduce",
440]