Coverage for src / tracekit / batch / advanced.py: 79%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced batch processing with checkpointing and error handling. 

2 

3This module provides enhanced batch processing capabilities including 

4checkpointing for long-running jobs, resume functionality, and sophisticated 

5error handling. 

6""" 

7 

8from __future__ import annotations 

9 

10import concurrent.futures 

11import json 

12import threading 

13import traceback 

14from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed 

15from dataclasses import asdict, dataclass, field 

16from pathlib import Path 

17from typing import TYPE_CHECKING, Any, Literal 

18 

19import pandas as pd 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Callable 

23 

24try: 

25 from tqdm import tqdm # type: ignore[import-untyped] 

26 

27 HAS_TQDM = True 

28except ImportError: 

29 HAS_TQDM = False 

30 

31 

32class TimeoutError(Exception): 

33 """Raised when a function execution exceeds timeout.""" 

34 

35 pass 

36 

37 

38@dataclass 

39class BatchConfig: 

40 """Configuration for advanced batch processing. 

41 

42 Attributes: 

43 on_error: Error handling strategy: 

44 - 'skip': Skip failed files, continue processing 

45 - 'stop': Stop processing on first error 

46 - 'warn': Log warning but continue (default) 

47 checkpoint_dir: Directory to save checkpoints. If None, no checkpointing. 

48 checkpoint_interval: Save checkpoint every N files (default: 10). 

49 max_workers: Maximum number of parallel workers. None uses CPU count. 

50 memory_limit: Maximum memory per worker in MB (not enforced, for documentation). 

51 timeout_per_file: Timeout in seconds per file. None for no timeout. 

52 When specified, uses threading.Timer for true timeout enforcement 

53 that interrupts long-running operations. 

54 use_threads: Use threads instead of processes for parallelization. 

55 progress_bar: Show progress bar (requires tqdm). 

56 

57 Example: 

58 >>> config = BatchConfig( 

59 ... on_error='skip', 

60 ... checkpoint_dir='./checkpoints', 

61 ... checkpoint_interval=5, 

62 ... max_workers=4 

63 ... ) 

64 

65 References: 

66 API-012: Advanced Batch Control 

67 """ 

68 

69 on_error: Literal["skip", "stop", "warn"] = "warn" 

70 checkpoint_dir: Path | str | None = None 

71 checkpoint_interval: int = 10 

72 max_workers: int | None = None 

73 memory_limit: float | None = None # MB, not enforced 

74 timeout_per_file: float | None = None # seconds, enforced via threading.Timer 

75 use_threads: bool = False 

76 progress_bar: bool = True 

77 

78 

79@dataclass 

80class FileResult: 

81 """Result from processing a single file. 

82 

83 Attributes: 

84 file: Path to the file. 

85 success: Whether processing succeeded. 

86 result: Analysis result dictionary if successful. 

87 error: Error message if failed. 

88 traceback: Full traceback if failed. 

89 duration: Processing time in seconds. 

90 timed_out: Whether processing was terminated due to timeout. 

91 

92 Example: 

93 >>> result = FileResult( 

94 ... file='trace001.wfm', 

95 ... success=True, 

96 ... result={'rise_time': 1.2e-9}, 

97 ... duration=0.5 

98 ... ) 

99 

100 References: 

101 API-012: Advanced Batch Control 

102 """ 

103 

104 file: str 

105 success: bool = True 

106 result: dict[str, Any] = field(default_factory=dict) 

107 error: str | None = None 

108 traceback: str | None = None 

109 duration: float = 0.0 

110 timed_out: bool = False 

111 

112 

113@dataclass 

114class BatchCheckpoint: 

115 """Checkpoint state for batch processing. 

116 

117 Attributes: 

118 completed_files: List of successfully completed files. 

119 failed_files: List of failed file paths. 

120 results: List of FileResult objects. 

121 total_files: Total number of files in batch. 

122 config: Batch configuration used. 

123 

124 Example: 

125 >>> checkpoint = BatchCheckpoint( 

126 ... completed_files=['file1.wfm', 'file2.wfm'], 

127 ... total_files=10, 

128 ... config=config 

129 ... ) 

130 

131 References: 

132 API-012: Advanced Batch Control 

133 """ 

134 

135 completed_files: list[str] = field(default_factory=list) 

136 failed_files: list[str] = field(default_factory=list) 

137 results: list[FileResult] = field(default_factory=list) 

138 total_files: int = 0 

139 config: BatchConfig | None = None 

140 

141 def save(self, checkpoint_path: Path) -> None: 

142 """Save checkpoint to JSON file. 

143 

144 Args: 

145 checkpoint_path: Path to save checkpoint file. 

146 

147 Example: 

148 >>> checkpoint.save(Path('checkpoints/batch_001.json')) 

149 """ 

150 checkpoint_path.parent.mkdir(parents=True, exist_ok=True) 

151 

152 # Convert to serializable format 

153 config_dict = None 

154 if self.config: 

155 config_dict = asdict(self.config) 

156 # Convert Path to str for JSON serialization 

157 if config_dict.get("checkpoint_dir") is not None: 

158 config_dict["checkpoint_dir"] = str(config_dict["checkpoint_dir"]) 

159 

160 data = { 

161 "completed_files": self.completed_files, 

162 "failed_files": self.failed_files, 

163 "results": [asdict(r) for r in self.results], 

164 "total_files": self.total_files, 

165 "config": config_dict, 

166 } 

167 

168 with open(checkpoint_path, "w") as f: 

169 json.dump(data, f, indent=2) 

170 

171 @classmethod 

172 def load(cls, checkpoint_path: Path) -> BatchCheckpoint: 

173 """Load checkpoint from JSON file. 

174 

175 Args: 

176 checkpoint_path: Path to checkpoint file. 

177 

178 Returns: 

179 Loaded BatchCheckpoint object. 

180 

181 Example: 

182 >>> checkpoint = BatchCheckpoint.load(Path('checkpoints/batch_001.json')) 

183 """ 

184 with open(checkpoint_path) as f: 

185 data = json.load(f) 

186 

187 # Reconstruct FileResult objects 

188 results = [FileResult(**r) for r in data.get("results", [])] 

189 

190 # Reconstruct BatchConfig if present 

191 config = None 

192 if data.get("config"): 

193 config_data = data["config"] 

194 # Convert checkpoint_dir back to Path if it exists 

195 if config_data.get("checkpoint_dir"): 

196 config_data["checkpoint_dir"] = Path(config_data["checkpoint_dir"]) 

197 config = BatchConfig(**config_data) 

198 

199 return cls( 

200 completed_files=data.get("completed_files", []), 

201 failed_files=data.get("failed_files", []), 

202 results=results, 

203 total_files=data.get("total_files", 0), 

204 config=config, 

205 ) 

206 

207 

208def _run_with_timeout( 

209 func: Callable[..., Any], 

210 args: tuple[Any, ...], 

211 kwargs: dict[str, Any], 

212 timeout: float, 

213) -> tuple[Any, bool]: 

214 """Run a function with true timeout enforcement using threading. 

215 

216 This function wraps the target function in a separate thread and uses 

217 threading.Timer to interrupt it if it exceeds the timeout. This provides 

218 actual timeout enforcement rather than just post-hoc checking. 

219 

220 Args: 

221 func: Function to execute. 

222 args: Positional arguments for the function. 

223 kwargs: Keyword arguments for the function. 

224 timeout: Timeout in seconds. 

225 

226 Returns: 

227 Tuple of (result, timed_out) where result is the function return value 

228 (or None if timed out) and timed_out indicates whether timeout occurred. 

229 

230 Note: 

231 This uses concurrent.futures.ThreadPoolExecutor internally which can 

232 only interrupt I/O-bound operations. CPU-bound functions in Python 

233 cannot be truly interrupted due to the GIL. For CPU-bound timeouts, 

234 consider using ProcessPoolExecutor with timeout on future.result(). 

235 """ 

236 result_container: dict[str, Any] = {"result": None, "error": None} 

237 

238 def target() -> None: 

239 try: 

240 result_container["result"] = func(*args, **kwargs) 

241 except Exception as e: 

242 result_container["error"] = e 

243 

244 # Use a thread with explicit timeout 

245 thread = threading.Thread(target=target, daemon=True) 

246 thread.start() 

247 thread.join(timeout=timeout) 

248 

249 if thread.is_alive(): 

250 # Thread is still running - timeout occurred 

251 # Note: We can't truly kill the thread, but we can mark it as timed out 

252 # and move on. The daemon=True ensures it won't block process exit. 

253 return None, True 

254 

255 if result_container["error"] is not None: 

256 raise result_container["error"] 

257 

258 return result_container["result"], False 

259 

260 

261class AdvancedBatchProcessor: 

262 """Advanced batch processor with checkpointing and error handling. 

263 

264 Provides robust batch processing with checkpoint/resume capability, 

265 per-file error isolation, progress tracking, and resource limits. 

266 

267 Timeout enforcement: 

268 When `timeout_per_file` is configured, this processor uses actual 

269 timeout enforcement (via threading.Timer or concurrent.futures timeout) 

270 rather than just post-hoc duration checking. This means: 

271 - Long-running operations will be interrupted 

272 - Results will be marked with `timed_out=True` 

273 - Processing continues to the next file 

274 

275 Example: 

276 >>> from tracekit.batch.advanced import AdvancedBatchProcessor, BatchConfig 

277 >>> config = BatchConfig( 

278 ... on_error='skip', 

279 ... checkpoint_dir='./checkpoints', 

280 ... max_workers=4, 

281 ... timeout_per_file=60.0 # Enforced timeout 

282 ... ) 

283 >>> processor = AdvancedBatchProcessor(config) 

284 >>> results = processor.process(files, analysis_fn) 

285 

286 References: 

287 API-012: Advanced Batch Control 

288 """ 

289 

290 def __init__(self, config: BatchConfig | None = None) -> None: 

291 """Initialize batch processor. 

292 

293 Args: 

294 config: Batch configuration. Uses defaults if None. 

295 """ 

296 self.config = config or BatchConfig() 

297 self.checkpoint: BatchCheckpoint | None = None 

298 

299 def process( 

300 self, 

301 files: list[str | Path], 

302 analysis_fn: Callable[[str | Path], dict[str, Any]], 

303 *, 

304 checkpoint_name: str = "batch_checkpoint", 

305 **kwargs: Any, 

306 ) -> pd.DataFrame: 

307 """Process files with checkpointing and error handling. 

308 

309 Args: 

310 files: List of file paths to process. 

311 analysis_fn: Analysis function to apply to each file. 

312 checkpoint_name: Name for checkpoint file (default: 'batch_checkpoint'). 

313 **kwargs: Additional arguments passed to analysis_fn. 

314 

315 Returns: 

316 DataFrame with results and error information. 

317 

318 Example: 

319 >>> results = processor.process( 

320 ... files=['trace1.wfm', 'trace2.wfm'], 

321 ... analysis_fn=analyze_trace 

322 ... ) 

323 

324 References: 

325 API-012: Advanced Batch Control 

326 """ 

327 # Try to resume from checkpoint 

328 remaining_files = self._resume_or_start(files, checkpoint_name) 

329 

330 # Initialize checkpoint if not resumed 

331 if self.checkpoint is None: 331 ↛ 338line 331 didn't jump to line 338 because the condition on line 331 was always true

332 self.checkpoint = BatchCheckpoint( 

333 total_files=len(files), 

334 config=self.config, 

335 ) 

336 

337 # Process remaining files 

338 self._process_files(remaining_files, analysis_fn, checkpoint_name, **kwargs) 

339 

340 # Convert results to DataFrame 

341 return self._results_to_dataframe() 

342 

343 def _resume_or_start(self, files: list[str | Path], checkpoint_name: str) -> list[str | Path]: 

344 """Try to resume from checkpoint or start fresh. 

345 

346 Args: 

347 files: Full list of files to process. 

348 checkpoint_name: Name of checkpoint file. 

349 

350 Returns: 

351 List of files remaining to be processed. 

352 """ 

353 if self.config.checkpoint_dir is None: 

354 return files 

355 

356 checkpoint_dir = Path(self.config.checkpoint_dir) 

357 checkpoint_path = checkpoint_dir / f"{checkpoint_name}.json" 

358 

359 if checkpoint_path.exists(): 359 ↛ 361line 359 didn't jump to line 361 because the condition on line 359 was never true

360 # Load checkpoint 

361 self.checkpoint = BatchCheckpoint.load(checkpoint_path) 

362 

363 # Determine remaining files 

364 completed_set = set(self.checkpoint.completed_files) 

365 failed_set = set(self.checkpoint.failed_files) 

366 processed_set = completed_set | failed_set 

367 

368 remaining = [str(f) for f in files if str(f) not in processed_set] 

369 

370 print( 

371 f"Resuming from checkpoint: " 

372 f"{len(self.checkpoint.completed_files)} completed, " 

373 f"{len(self.checkpoint.failed_files)} failed, " 

374 f"{len(remaining)} remaining" 

375 ) 

376 

377 return [Path(f) for f in remaining] 

378 

379 return files 

380 

381 def _process_files( 

382 self, 

383 files: list[str | Path], 

384 analysis_fn: Callable[[str | Path], dict[str, Any]], 

385 checkpoint_name: str, 

386 **kwargs: Any, 

387 ) -> None: 

388 """Process files with parallel execution and checkpointing. 

389 

390 Args: 

391 files: Files to process. 

392 analysis_fn: Analysis function. 

393 checkpoint_name: Checkpoint file name. 

394 **kwargs: Additional arguments for analysis_fn. 

395 

396 Raises: 

397 RuntimeError: If processing is stopped due to error and on_error is "stop". 

398 """ 

399 if not files: 

400 return 

401 

402 # Create progress bar if requested 

403 pbar = None 

404 if self.config.progress_bar and HAS_TQDM: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 total = self.checkpoint.total_files if self.checkpoint else len(files) 

406 initial = ( 

407 len(self.checkpoint.completed_files) + len(self.checkpoint.failed_files) 

408 if self.checkpoint 

409 else 0 

410 ) 

411 pbar = tqdm(total=total, initial=initial, desc="Processing files") 

412 

413 # Wrapper for file processing with error handling and timeout 

414 def _process_one(file_path: str | Path) -> FileResult: 

415 import time 

416 

417 start_time = time.time() 

418 timed_out = False 

419 

420 try: 

421 # Apply timeout if configured 

422 if self.config.timeout_per_file is not None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 result, timed_out = _run_with_timeout( 

424 analysis_fn, 

425 (file_path,), 

426 kwargs, 

427 self.config.timeout_per_file, 

428 ) 

429 if timed_out: 

430 duration = time.time() - start_time 

431 return FileResult( 

432 file=str(file_path), 

433 success=False, 

434 error=f"Processing timed out after {self.config.timeout_per_file}s", 

435 duration=duration, 

436 timed_out=True, 

437 ) 

438 else: 

439 result = analysis_fn(file_path, **kwargs) 

440 

441 duration = time.time() - start_time 

442 return FileResult( 

443 file=str(file_path), 

444 success=True, 

445 result=result if isinstance(result, dict) else {"result": result}, 

446 duration=duration, 

447 ) 

448 except Exception as e: 

449 duration = time.time() - start_time 

450 return FileResult( 

451 file=str(file_path), 

452 success=False, 

453 error=str(e), 

454 traceback=traceback.format_exc(), 

455 duration=duration, 

456 ) 

457 

458 # Process files 

459 processed_count = 0 

460 executor_class = ThreadPoolExecutor if self.config.use_threads else ProcessPoolExecutor 

461 

462 with executor_class(max_workers=self.config.max_workers) as executor: 

463 # Submit all tasks 

464 futures = {executor.submit(_process_one, f): f for f in files} 

465 

466 # Process results as they complete 

467 # Use future.result() with timeout for additional enforcement layer 

468 for future in as_completed(futures): 

469 file_path = futures[future] 

470 try: 

471 # Apply timeout on result retrieval as backup enforcement 

472 # This catches cases where the thread-based timeout didn't work 

473 # (e.g., CPU-bound operations that don't release the GIL) 

474 retrieval_timeout = ( 

475 self.config.timeout_per_file * 1.1 if self.config.timeout_per_file else None 

476 ) 

477 file_result = future.result(timeout=retrieval_timeout) 

478 except concurrent.futures.TimeoutError: 

479 # Backup timeout triggered - future.result() timed out 

480 file_result = FileResult( 

481 file=str(file_path), 

482 success=False, 

483 error=f"Processing timed out (backup enforcement) " 

484 f"after {self.config.timeout_per_file}s", 

485 timed_out=True, 

486 ) 

487 except Exception as e: 

488 # Unexpected error during result retrieval 

489 file_result = FileResult( 

490 file=str(file_path), 

491 success=False, 

492 error=f"Error retrieving result: {e}", 

493 traceback=traceback.format_exc(), 

494 ) 

495 

496 # Update checkpoint 

497 if self.checkpoint: 497 ↛ 505line 497 didn't jump to line 505 because the condition on line 497 was always true

498 self.checkpoint.results.append(file_result) 

499 if file_result.success: 

500 self.checkpoint.completed_files.append(file_result.file) 

501 else: 

502 self.checkpoint.failed_files.append(file_result.file) 

503 

504 # Handle errors 

505 if not file_result.success: 

506 if self.config.on_error == "stop": 

507 if pbar: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true

508 pbar.close() 

509 raise RuntimeError( 

510 f"Processing stopped due to error in {file_result.file}: " 

511 f"{file_result.error}" 

512 ) 

513 elif self.config.on_error == "warn": 

514 timeout_note = " (TIMEOUT)" if file_result.timed_out else "" 

515 print( 

516 f"Warning: Error processing {file_result.file}{timeout_note}: " 

517 f"{file_result.error}" 

518 ) 

519 

520 # Update progress 

521 processed_count += 1 

522 if pbar: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 pbar.update(1) 

524 

525 # Save checkpoint periodically 

526 if ( 

527 self.config.checkpoint_dir 

528 and processed_count % self.config.checkpoint_interval == 0 

529 ): 

530 self._save_checkpoint(checkpoint_name) 

531 

532 if pbar: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 pbar.close() 

534 

535 # Final checkpoint save 

536 if self.config.checkpoint_dir: 

537 self._save_checkpoint(checkpoint_name) 

538 

539 def _save_checkpoint(self, checkpoint_name: str) -> None: 

540 """Save current checkpoint. 

541 

542 Args: 

543 checkpoint_name: Name for checkpoint file. 

544 """ 

545 if self.checkpoint and self.config.checkpoint_dir: 545 ↛ exitline 545 didn't return from function '_save_checkpoint' because the condition on line 545 was always true

546 checkpoint_dir = Path(self.config.checkpoint_dir) 

547 checkpoint_path = checkpoint_dir / f"{checkpoint_name}.json" 

548 self.checkpoint.save(checkpoint_path) 

549 

550 def _results_to_dataframe(self) -> pd.DataFrame: 

551 """Convert checkpoint results to DataFrame. 

552 

553 Returns: 

554 DataFrame with results and metadata. 

555 """ 

556 if not self.checkpoint or not self.checkpoint.results: 

557 return pd.DataFrame() 

558 

559 # Build rows 

560 rows = [] 

561 for file_result in self.checkpoint.results: 

562 row = { 

563 "file": file_result.file, 

564 "success": file_result.success, 

565 "timed_out": file_result.timed_out, 

566 } 

567 

568 if file_result.success: 

569 row.update(file_result.result) 

570 row["error"] = None 

571 else: 

572 row["error"] = file_result.error 

573 row["traceback"] = file_result.traceback 

574 

575 row["duration"] = file_result.duration 

576 rows.append(row) 

577 

578 return pd.DataFrame(rows) 

579 

580 

581def resume_batch( 

582 checkpoint_dir: str | Path, checkpoint_name: str = "batch_checkpoint" 

583) -> BatchCheckpoint: 

584 """Resume a batch job from checkpoint directory. 

585 

586 Convenience function to load checkpoint and inspect state. 

587 

588 Args: 

589 checkpoint_dir: Directory containing checkpoint. 

590 checkpoint_name: Name of checkpoint file. 

591 

592 Returns: 

593 Loaded checkpoint. 

594 

595 Example: 

596 >>> checkpoint = resume_batch('./checkpoints') 

597 >>> print(f"Completed: {len(checkpoint.completed_files)}") 

598 

599 References: 

600 API-012: Advanced Batch Control 

601 """ 

602 checkpoint_path = Path(checkpoint_dir) / f"{checkpoint_name}.json" 

603 return BatchCheckpoint.load(checkpoint_path) 

604 

605 

606__all__ = [ 

607 "AdvancedBatchProcessor", 

608 "BatchCheckpoint", 

609 "BatchConfig", 

610 "FileResult", 

611 "TimeoutError", 

612 "resume_batch", 

613]