Coverage for src / tracekit / optimization / parallel.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Parallel processing utilities for optimization and analysis. 

2 

3This module provides utilities for efficient parallel execution of analysis tasks 

4using both thread and process-based parallelism. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Callable 

11from concurrent.futures import ( 

12 ProcessPoolExecutor, 

13 ThreadPoolExecutor, 

14 as_completed, 

15) 

16from dataclasses import dataclass 

17from typing import TYPE_CHECKING, Any 

18 

19import numpy as np 

20 

21from tracekit.core.exceptions import AnalysisError 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Iterable 

25 

26 from numpy.typing import NDArray 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31@dataclass 

32class ParallelResult[R]: 

33 """Result from parallel execution. 

34 

35 Attributes: 

36 results: List of results from all tasks. 

37 execution_time: Total execution time in seconds. 

38 success_count: Number of successfully completed tasks. 

39 error_count: Number of failed tasks. 

40 errors: List of exceptions encountered. 

41 

42 Example: 

43 >>> result = parallel_map(fn, items) 

44 >>> print(f"Completed {result.success_count}/{len(items)}") 

45 """ 

46 

47 results: list[R] 

48 execution_time: float 

49 success_count: int 

50 error_count: int 

51 errors: list[Exception] | None = None 

52 

53 

54@dataclass 

55class WorkerPool: 

56 """Configuration for worker pool management. 

57 

58 Attributes: 

59 max_workers: Maximum number of workers. 

60 use_threads: Use threads (True) or processes (False). 

61 timeout: Timeout per task in seconds. 

62 chunk_size: Number of items per worker chunk. 

63 

64 Example: 

65 >>> pool = WorkerPool(max_workers=4, use_threads=True, timeout=30) 

66 """ 

67 

68 max_workers: int = 4 

69 use_threads: bool = True 

70 timeout: float | None = None 

71 chunk_size: int = 1 

72 

73 

74def get_optimal_workers(max_workers: int | None = None) -> int: 

75 """Get optimal number of workers for current system. 

76 

77 Uses CPU count by default, respecting max_workers limit. 

78 

79 Args: 

80 max_workers: Maximum workers to use. None for all CPUs. 

81 

82 Returns: 

83 Optimal number of workers. 

84 

85 Example: 

86 >>> workers = get_optimal_workers(max_workers=8) 

87 """ 

88 import os 

89 

90 cpu_count = os.cpu_count() or 1 

91 if max_workers is None: 

92 return cpu_count 

93 return min(max_workers, cpu_count) 

94 

95 

96def parallel_map[T, R]( 

97 func: Callable[[T], R], 

98 iterable: Iterable[T], 

99 *, 

100 max_workers: int | None = None, 

101 use_threads: bool = True, 

102 timeout: float | None = None, 

103 collect_errors: bool = True, 

104) -> ParallelResult[R]: 

105 """Apply function to items in parallel. 

106 

107 Maps a function over an iterable using either threads or processes. 

108 

109 Args: 

110 func: Function to apply to each item. 

111 iterable: Items to process. 

112 max_workers: Maximum concurrent workers. 

113 use_threads: Use threads (True) or processes (False). 

114 timeout: Timeout per task in seconds. 

115 collect_errors: Collect errors instead of raising. 

116 

117 Returns: 

118 ParallelResult with results and execution stats. 

119 

120 Raises: 

121 AnalysisError: If collect_errors=False and a task fails. 

122 

123 Example: 

124 >>> def process_item(x): 

125 ... return x * 2 

126 >>> result = parallel_map(process_item, range(100)) 

127 >>> print(f"Completed: {result.success_count}") 

128 

129 References: 

130 OPT-001: Parallel Execution Framework 

131 """ 

132 import time 

133 

134 items = list(iterable) 

135 if not items: 

136 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0) 

137 

138 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor 

139 max_workers = get_optimal_workers(max_workers) 

140 

141 start_time = time.time() 

142 results: list[R] = [None] * len(items) # type: ignore[list-item] 

143 errors: list[Exception] = [] 

144 success_count = 0 

145 error_count = 0 

146 

147 with executor_class(max_workers=max_workers) as executor: 

148 futures = {executor.submit(func, item): i for i, item in enumerate(items)} 

149 

150 for future in as_completed(futures, timeout=timeout): 

151 idx = futures[future] 

152 try: 

153 results[idx] = future.result() 

154 success_count += 1 

155 except Exception as e: 

156 error_count += 1 

157 errors.append(e) 

158 

159 if not collect_errors: 

160 execution_time = time.time() - start_time 

161 raise AnalysisError(f"Task {idx} failed: {e!s}") from e 

162 

163 execution_time = time.time() - start_time 

164 

165 return ParallelResult( 

166 results=results, 

167 execution_time=execution_time, 

168 success_count=success_count, 

169 error_count=error_count, 

170 errors=errors if errors else None, 

171 ) 

172 

173 

174def parallel_reduce[T, R]( 

175 func: Callable[[T], R], 

176 iterable: Iterable[T], 

177 reducer: Callable[[list[R]], Any], 

178 *, 

179 max_workers: int | None = None, 

180 use_threads: bool = True, 

181 timeout: float | None = None, 

182) -> Any: 

183 """Map and reduce results in parallel. 

184 

185 Applies function to items in parallel, then reduces results. 

186 

187 Args: 

188 func: Function to apply to each item. 

189 iterable: Items to process. 

190 reducer: Function to reduce list of results. 

191 max_workers: Maximum concurrent workers. 

192 use_threads: Use threads (True) or processes (False). 

193 timeout: Timeout per task in seconds. 

194 

195 Returns: 

196 Reduced result. 

197 

198 Example: 

199 >>> def compute(x): 

200 ... return x * 2 

201 >>> result = parallel_reduce( 

202 ... compute, 

203 ... range(100), 

204 ... reducer=lambda x: sum(x) 

205 ... ) 

206 

207 References: 

208 OPT-001: Parallel Execution Framework 

209 """ 

210 result = parallel_map( 

211 func, 

212 iterable, 

213 max_workers=max_workers, 

214 use_threads=use_threads, 

215 timeout=timeout, 

216 collect_errors=False, 

217 ) 

218 

219 return reducer(result.results) 

220 

221 

222def batch_parallel_map[T, R]( 

223 func: Callable[[list[T]], list[R]], 

224 iterable: Iterable[T], 

225 *, 

226 batch_size: int = 100, 

227 max_workers: int | None = None, 

228 use_threads: bool = True, 

229 timeout: float | None = None, 

230) -> ParallelResult[R]: 

231 """Apply function to batches of items in parallel. 

232 

233 Processes items in batches, useful when function benefits from 

234 batch processing. 

235 

236 Args: 

237 func: Function accepting list of items. 

238 iterable: Items to process. 

239 batch_size: Number of items per batch. 

240 max_workers: Maximum concurrent workers. 

241 use_threads: Use threads (True) or processes (False). 

242 timeout: Timeout per batch in seconds. 

243 

244 Returns: 

245 ParallelResult with flattened results. 

246 

247 Example: 

248 >>> def process_batch(items): 

249 ... return [x * 2 for x in items] 

250 >>> result = batch_parallel_map( 

251 ... process_batch, 

252 ... range(1000), 

253 ... batch_size=100 

254 ... ) 

255 

256 References: 

257 OPT-001: Parallel Execution Framework 

258 """ 

259 import time 

260 

261 items = list(iterable) 

262 if not items: 

263 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0) 

264 

265 # Create batches 

266 batches = [items[i : i + batch_size] for i in range(0, len(items), batch_size)] 

267 

268 start_time = time.time() 

269 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor 

270 max_workers = get_optimal_workers(max_workers) 

271 

272 all_results: list[R] = [] 

273 errors: list[Exception] = [] 

274 success_count = 0 

275 error_count = 0 

276 

277 with executor_class(max_workers=max_workers) as executor: 

278 futures = {executor.submit(func, batch): i for i, batch in enumerate(batches)} 

279 

280 for future in as_completed(futures, timeout=timeout): 

281 try: 

282 batch_results = future.result() 

283 all_results.extend(batch_results) 

284 success_count += 1 

285 except Exception as e: 

286 error_count += 1 

287 errors.append(e) 

288 

289 execution_time = time.time() - start_time 

290 

291 return ParallelResult( 

292 results=all_results, 

293 execution_time=execution_time, 

294 success_count=success_count, 

295 error_count=error_count, 

296 errors=errors if errors else None, 

297 ) 

298 

299 

300def parallel_filter[T]( 

301 func: Callable[[T], bool], 

302 iterable: Iterable[T], 

303 *, 

304 max_workers: int | None = None, 

305 use_threads: bool = True, 

306 timeout: float | None = None, 

307) -> ParallelResult[T]: 

308 """Filter items in parallel. 

309 

310 Applies predicate to items in parallel, filtering results. 

311 

312 Args: 

313 func: Predicate function returning True to keep item. 

314 iterable: Items to filter. 

315 max_workers: Maximum concurrent workers. 

316 use_threads: Use threads (True) or processes (False). 

317 timeout: Timeout per task in seconds. 

318 

319 Returns: 

320 ParallelResult with filtered items. 

321 

322 Example: 

323 >>> def is_even(x): 

324 ... return x % 2 == 0 

325 >>> result = parallel_filter(is_even, range(100)) 

326 

327 References: 

328 OPT-001: Parallel Execution Framework 

329 """ 

330 import time 

331 

332 items = list(iterable) 

333 if not items: 

334 return ParallelResult(results=[], execution_time=0.0, success_count=0, error_count=0) 

335 

336 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor 

337 max_workers = get_optimal_workers(max_workers) 

338 

339 start_time = time.time() 

340 results: list[T] = [] 

341 errors: list[Exception] = [] 

342 success_count = 0 

343 error_count = 0 

344 

345 with executor_class(max_workers=max_workers) as executor: 

346 futures = {executor.submit(func, item): item for item in items} 

347 

348 for future in as_completed(futures, timeout=timeout): 

349 item = futures[future] 

350 try: 

351 if future.result(): 

352 results.append(item) 

353 success_count += 1 

354 except Exception as e: 

355 error_count += 1 

356 errors.append(e) 

357 

358 execution_time = time.time() - start_time 

359 

360 return ParallelResult( 

361 results=results, 

362 execution_time=execution_time, 

363 success_count=success_count, 

364 error_count=error_count, 

365 errors=errors if errors else None, 

366 ) 

367 

368 

369def chunked_parallel_map( 

370 func: Callable[[NDArray[np.float64]], NDArray[np.float64]], 

371 data: NDArray[np.float64], 

372 *, 

373 chunk_size: int = 10000, 

374 max_workers: int | None = None, 

375 use_threads: bool = True, 

376 timeout: float | None = None, 

377) -> NDArray[np.float64]: 

378 """Apply function to chunks of array data in parallel. 

379 

380 Useful for processing large arrays where parallelization overhead 

381 is justified. 

382 

383 Args: 

384 func: Function accepting 1D array chunk. 

385 data: Array to process. 

386 chunk_size: Number of samples per chunk. 

387 max_workers: Maximum concurrent workers. 

388 use_threads: Use threads (True) or processes (False). 

389 timeout: Timeout per chunk in seconds. 

390 

391 Returns: 

392 Processed array (concatenated chunks). 

393 

394 Raises: 

395 AnalysisError: If processing fails. 

396 

397 Example: 

398 >>> def process_chunk(chunk): 

399 ... return np.fft.fft(chunk) 

400 >>> result = chunked_parallel_map(process_chunk, data, chunk_size=1000) 

401 

402 References: 

403 OPT-001: Parallel Execution Framework 

404 """ 

405 if len(data) == 0: 

406 return np.array([]) 

407 

408 if len(data) <= chunk_size: 

409 return func(data) 

410 

411 # Create chunks 

412 chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] 

413 

414 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor 

415 max_workers = get_optimal_workers(max_workers) 

416 

417 results: list[NDArray[np.float64]] = [] 

418 

419 with executor_class(max_workers=max_workers) as executor: 

420 futures = {executor.submit(func, chunk): i for i, chunk in enumerate(chunks)} 

421 

422 for future in as_completed(futures, timeout=timeout): 

423 try: 

424 results.append(future.result()) 

425 except Exception as e: 

426 raise AnalysisError(f"Chunk processing failed: {e!s}") from e 

427 

428 return np.concatenate(results) 

429 

430 

431__all__ = [ 

432 "ParallelResult", 

433 "WorkerPool", 

434 "batch_parallel_map", 

435 "chunked_parallel_map", 

436 "get_optimal_workers", 

437 "parallel_filter", 

438 "parallel_map", 

439 "parallel_reduce", 

440]