Coverage for src / tracekit / batch / advanced.py: 79%
184 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced batch processing with checkpointing and error handling.
3This module provides enhanced batch processing capabilities including
4checkpointing for long-running jobs, resume functionality, and sophisticated
5error handling.
6"""
8from __future__ import annotations
10import concurrent.futures
11import json
12import threading
13import traceback
14from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
15from dataclasses import asdict, dataclass, field
16from pathlib import Path
17from typing import TYPE_CHECKING, Any, Literal
19import pandas as pd
21if TYPE_CHECKING:
22 from collections.abc import Callable
24try:
25 from tqdm import tqdm # type: ignore[import-untyped]
27 HAS_TQDM = True
28except ImportError:
29 HAS_TQDM = False
32class TimeoutError(Exception):
33 """Raised when a function execution exceeds timeout."""
35 pass
38@dataclass
39class BatchConfig:
40 """Configuration for advanced batch processing.
42 Attributes:
43 on_error: Error handling strategy:
44 - 'skip': Skip failed files, continue processing
45 - 'stop': Stop processing on first error
46 - 'warn': Log warning but continue (default)
47 checkpoint_dir: Directory to save checkpoints. If None, no checkpointing.
48 checkpoint_interval: Save checkpoint every N files (default: 10).
49 max_workers: Maximum number of parallel workers. None uses CPU count.
50 memory_limit: Maximum memory per worker in MB (not enforced, for documentation).
51 timeout_per_file: Timeout in seconds per file. None for no timeout.
52 When specified, uses threading.Timer for true timeout enforcement
53 that interrupts long-running operations.
54 use_threads: Use threads instead of processes for parallelization.
55 progress_bar: Show progress bar (requires tqdm).
57 Example:
58 >>> config = BatchConfig(
59 ... on_error='skip',
60 ... checkpoint_dir='./checkpoints',
61 ... checkpoint_interval=5,
62 ... max_workers=4
63 ... )
65 References:
66 API-012: Advanced Batch Control
67 """
69 on_error: Literal["skip", "stop", "warn"] = "warn"
70 checkpoint_dir: Path | str | None = None
71 checkpoint_interval: int = 10
72 max_workers: int | None = None
73 memory_limit: float | None = None # MB, not enforced
74 timeout_per_file: float | None = None # seconds, enforced via threading.Timer
75 use_threads: bool = False
76 progress_bar: bool = True
79@dataclass
80class FileResult:
81 """Result from processing a single file.
83 Attributes:
84 file: Path to the file.
85 success: Whether processing succeeded.
86 result: Analysis result dictionary if successful.
87 error: Error message if failed.
88 traceback: Full traceback if failed.
89 duration: Processing time in seconds.
90 timed_out: Whether processing was terminated due to timeout.
92 Example:
93 >>> result = FileResult(
94 ... file='trace001.wfm',
95 ... success=True,
96 ... result={'rise_time': 1.2e-9},
97 ... duration=0.5
98 ... )
100 References:
101 API-012: Advanced Batch Control
102 """
104 file: str
105 success: bool = True
106 result: dict[str, Any] = field(default_factory=dict)
107 error: str | None = None
108 traceback: str | None = None
109 duration: float = 0.0
110 timed_out: bool = False
113@dataclass
114class BatchCheckpoint:
115 """Checkpoint state for batch processing.
117 Attributes:
118 completed_files: List of successfully completed files.
119 failed_files: List of failed file paths.
120 results: List of FileResult objects.
121 total_files: Total number of files in batch.
122 config: Batch configuration used.
124 Example:
125 >>> checkpoint = BatchCheckpoint(
126 ... completed_files=['file1.wfm', 'file2.wfm'],
127 ... total_files=10,
128 ... config=config
129 ... )
131 References:
132 API-012: Advanced Batch Control
133 """
135 completed_files: list[str] = field(default_factory=list)
136 failed_files: list[str] = field(default_factory=list)
137 results: list[FileResult] = field(default_factory=list)
138 total_files: int = 0
139 config: BatchConfig | None = None
141 def save(self, checkpoint_path: Path) -> None:
142 """Save checkpoint to JSON file.
144 Args:
145 checkpoint_path: Path to save checkpoint file.
147 Example:
148 >>> checkpoint.save(Path('checkpoints/batch_001.json'))
149 """
150 checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
152 # Convert to serializable format
153 config_dict = None
154 if self.config:
155 config_dict = asdict(self.config)
156 # Convert Path to str for JSON serialization
157 if config_dict.get("checkpoint_dir") is not None:
158 config_dict["checkpoint_dir"] = str(config_dict["checkpoint_dir"])
160 data = {
161 "completed_files": self.completed_files,
162 "failed_files": self.failed_files,
163 "results": [asdict(r) for r in self.results],
164 "total_files": self.total_files,
165 "config": config_dict,
166 }
168 with open(checkpoint_path, "w") as f:
169 json.dump(data, f, indent=2)
171 @classmethod
172 def load(cls, checkpoint_path: Path) -> BatchCheckpoint:
173 """Load checkpoint from JSON file.
175 Args:
176 checkpoint_path: Path to checkpoint file.
178 Returns:
179 Loaded BatchCheckpoint object.
181 Example:
182 >>> checkpoint = BatchCheckpoint.load(Path('checkpoints/batch_001.json'))
183 """
184 with open(checkpoint_path) as f:
185 data = json.load(f)
187 # Reconstruct FileResult objects
188 results = [FileResult(**r) for r in data.get("results", [])]
190 # Reconstruct BatchConfig if present
191 config = None
192 if data.get("config"):
193 config_data = data["config"]
194 # Convert checkpoint_dir back to Path if it exists
195 if config_data.get("checkpoint_dir"):
196 config_data["checkpoint_dir"] = Path(config_data["checkpoint_dir"])
197 config = BatchConfig(**config_data)
199 return cls(
200 completed_files=data.get("completed_files", []),
201 failed_files=data.get("failed_files", []),
202 results=results,
203 total_files=data.get("total_files", 0),
204 config=config,
205 )
208def _run_with_timeout(
209 func: Callable[..., Any],
210 args: tuple[Any, ...],
211 kwargs: dict[str, Any],
212 timeout: float,
213) -> tuple[Any, bool]:
214 """Run a function with true timeout enforcement using threading.
216 This function wraps the target function in a separate thread and uses
217 threading.Timer to interrupt it if it exceeds the timeout. This provides
218 actual timeout enforcement rather than just post-hoc checking.
220 Args:
221 func: Function to execute.
222 args: Positional arguments for the function.
223 kwargs: Keyword arguments for the function.
224 timeout: Timeout in seconds.
226 Returns:
227 Tuple of (result, timed_out) where result is the function return value
228 (or None if timed out) and timed_out indicates whether timeout occurred.
230 Note:
231 This uses concurrent.futures.ThreadPoolExecutor internally which can
232 only interrupt I/O-bound operations. CPU-bound functions in Python
233 cannot be truly interrupted due to the GIL. For CPU-bound timeouts,
234 consider using ProcessPoolExecutor with timeout on future.result().
235 """
236 result_container: dict[str, Any] = {"result": None, "error": None}
238 def target() -> None:
239 try:
240 result_container["result"] = func(*args, **kwargs)
241 except Exception as e:
242 result_container["error"] = e
244 # Use a thread with explicit timeout
245 thread = threading.Thread(target=target, daemon=True)
246 thread.start()
247 thread.join(timeout=timeout)
249 if thread.is_alive():
250 # Thread is still running - timeout occurred
251 # Note: We can't truly kill the thread, but we can mark it as timed out
252 # and move on. The daemon=True ensures it won't block process exit.
253 return None, True
255 if result_container["error"] is not None:
256 raise result_container["error"]
258 return result_container["result"], False
261class AdvancedBatchProcessor:
262 """Advanced batch processor with checkpointing and error handling.
264 Provides robust batch processing with checkpoint/resume capability,
265 per-file error isolation, progress tracking, and resource limits.
267 Timeout enforcement:
268 When `timeout_per_file` is configured, this processor uses actual
269 timeout enforcement (via threading.Timer or concurrent.futures timeout)
270 rather than just post-hoc duration checking. This means:
271 - Long-running operations will be interrupted
272 - Results will be marked with `timed_out=True`
273 - Processing continues to the next file
275 Example:
276 >>> from tracekit.batch.advanced import AdvancedBatchProcessor, BatchConfig
277 >>> config = BatchConfig(
278 ... on_error='skip',
279 ... checkpoint_dir='./checkpoints',
280 ... max_workers=4,
281 ... timeout_per_file=60.0 # Enforced timeout
282 ... )
283 >>> processor = AdvancedBatchProcessor(config)
284 >>> results = processor.process(files, analysis_fn)
286 References:
287 API-012: Advanced Batch Control
288 """
290 def __init__(self, config: BatchConfig | None = None) -> None:
291 """Initialize batch processor.
293 Args:
294 config: Batch configuration. Uses defaults if None.
295 """
296 self.config = config or BatchConfig()
297 self.checkpoint: BatchCheckpoint | None = None
299 def process(
300 self,
301 files: list[str | Path],
302 analysis_fn: Callable[[str | Path], dict[str, Any]],
303 *,
304 checkpoint_name: str = "batch_checkpoint",
305 **kwargs: Any,
306 ) -> pd.DataFrame:
307 """Process files with checkpointing and error handling.
309 Args:
310 files: List of file paths to process.
311 analysis_fn: Analysis function to apply to each file.
312 checkpoint_name: Name for checkpoint file (default: 'batch_checkpoint').
313 **kwargs: Additional arguments passed to analysis_fn.
315 Returns:
316 DataFrame with results and error information.
318 Example:
319 >>> results = processor.process(
320 ... files=['trace1.wfm', 'trace2.wfm'],
321 ... analysis_fn=analyze_trace
322 ... )
324 References:
325 API-012: Advanced Batch Control
326 """
327 # Try to resume from checkpoint
328 remaining_files = self._resume_or_start(files, checkpoint_name)
330 # Initialize checkpoint if not resumed
331 if self.checkpoint is None: 331 ↛ 338line 331 didn't jump to line 338 because the condition on line 331 was always true
332 self.checkpoint = BatchCheckpoint(
333 total_files=len(files),
334 config=self.config,
335 )
337 # Process remaining files
338 self._process_files(remaining_files, analysis_fn, checkpoint_name, **kwargs)
340 # Convert results to DataFrame
341 return self._results_to_dataframe()
343 def _resume_or_start(self, files: list[str | Path], checkpoint_name: str) -> list[str | Path]:
344 """Try to resume from checkpoint or start fresh.
346 Args:
347 files: Full list of files to process.
348 checkpoint_name: Name of checkpoint file.
350 Returns:
351 List of files remaining to be processed.
352 """
353 if self.config.checkpoint_dir is None:
354 return files
356 checkpoint_dir = Path(self.config.checkpoint_dir)
357 checkpoint_path = checkpoint_dir / f"{checkpoint_name}.json"
359 if checkpoint_path.exists(): 359 ↛ 361line 359 didn't jump to line 361 because the condition on line 359 was never true
360 # Load checkpoint
361 self.checkpoint = BatchCheckpoint.load(checkpoint_path)
363 # Determine remaining files
364 completed_set = set(self.checkpoint.completed_files)
365 failed_set = set(self.checkpoint.failed_files)
366 processed_set = completed_set | failed_set
368 remaining = [str(f) for f in files if str(f) not in processed_set]
370 print(
371 f"Resuming from checkpoint: "
372 f"{len(self.checkpoint.completed_files)} completed, "
373 f"{len(self.checkpoint.failed_files)} failed, "
374 f"{len(remaining)} remaining"
375 )
377 return [Path(f) for f in remaining]
379 return files
381 def _process_files(
382 self,
383 files: list[str | Path],
384 analysis_fn: Callable[[str | Path], dict[str, Any]],
385 checkpoint_name: str,
386 **kwargs: Any,
387 ) -> None:
388 """Process files with parallel execution and checkpointing.
390 Args:
391 files: Files to process.
392 analysis_fn: Analysis function.
393 checkpoint_name: Checkpoint file name.
394 **kwargs: Additional arguments for analysis_fn.
396 Raises:
397 RuntimeError: If processing is stopped due to error and on_error is "stop".
398 """
399 if not files:
400 return
402 # Create progress bar if requested
403 pbar = None
404 if self.config.progress_bar and HAS_TQDM: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 total = self.checkpoint.total_files if self.checkpoint else len(files)
406 initial = (
407 len(self.checkpoint.completed_files) + len(self.checkpoint.failed_files)
408 if self.checkpoint
409 else 0
410 )
411 pbar = tqdm(total=total, initial=initial, desc="Processing files")
413 # Wrapper for file processing with error handling and timeout
414 def _process_one(file_path: str | Path) -> FileResult:
415 import time
417 start_time = time.time()
418 timed_out = False
420 try:
421 # Apply timeout if configured
422 if self.config.timeout_per_file is not None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 result, timed_out = _run_with_timeout(
424 analysis_fn,
425 (file_path,),
426 kwargs,
427 self.config.timeout_per_file,
428 )
429 if timed_out:
430 duration = time.time() - start_time
431 return FileResult(
432 file=str(file_path),
433 success=False,
434 error=f"Processing timed out after {self.config.timeout_per_file}s",
435 duration=duration,
436 timed_out=True,
437 )
438 else:
439 result = analysis_fn(file_path, **kwargs)
441 duration = time.time() - start_time
442 return FileResult(
443 file=str(file_path),
444 success=True,
445 result=result if isinstance(result, dict) else {"result": result},
446 duration=duration,
447 )
448 except Exception as e:
449 duration = time.time() - start_time
450 return FileResult(
451 file=str(file_path),
452 success=False,
453 error=str(e),
454 traceback=traceback.format_exc(),
455 duration=duration,
456 )
458 # Process files
459 processed_count = 0
460 executor_class = ThreadPoolExecutor if self.config.use_threads else ProcessPoolExecutor
462 with executor_class(max_workers=self.config.max_workers) as executor:
463 # Submit all tasks
464 futures = {executor.submit(_process_one, f): f for f in files}
466 # Process results as they complete
467 # Use future.result() with timeout for additional enforcement layer
468 for future in as_completed(futures):
469 file_path = futures[future]
470 try:
471 # Apply timeout on result retrieval as backup enforcement
472 # This catches cases where the thread-based timeout didn't work
473 # (e.g., CPU-bound operations that don't release the GIL)
474 retrieval_timeout = (
475 self.config.timeout_per_file * 1.1 if self.config.timeout_per_file else None
476 )
477 file_result = future.result(timeout=retrieval_timeout)
478 except concurrent.futures.TimeoutError:
479 # Backup timeout triggered - future.result() timed out
480 file_result = FileResult(
481 file=str(file_path),
482 success=False,
483 error=f"Processing timed out (backup enforcement) "
484 f"after {self.config.timeout_per_file}s",
485 timed_out=True,
486 )
487 except Exception as e:
488 # Unexpected error during result retrieval
489 file_result = FileResult(
490 file=str(file_path),
491 success=False,
492 error=f"Error retrieving result: {e}",
493 traceback=traceback.format_exc(),
494 )
496 # Update checkpoint
497 if self.checkpoint: 497 ↛ 505line 497 didn't jump to line 505 because the condition on line 497 was always true
498 self.checkpoint.results.append(file_result)
499 if file_result.success:
500 self.checkpoint.completed_files.append(file_result.file)
501 else:
502 self.checkpoint.failed_files.append(file_result.file)
504 # Handle errors
505 if not file_result.success:
506 if self.config.on_error == "stop":
507 if pbar: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true
508 pbar.close()
509 raise RuntimeError(
510 f"Processing stopped due to error in {file_result.file}: "
511 f"{file_result.error}"
512 )
513 elif self.config.on_error == "warn":
514 timeout_note = " (TIMEOUT)" if file_result.timed_out else ""
515 print(
516 f"Warning: Error processing {file_result.file}{timeout_note}: "
517 f"{file_result.error}"
518 )
520 # Update progress
521 processed_count += 1
522 if pbar: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 pbar.update(1)
525 # Save checkpoint periodically
526 if (
527 self.config.checkpoint_dir
528 and processed_count % self.config.checkpoint_interval == 0
529 ):
530 self._save_checkpoint(checkpoint_name)
532 if pbar: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 pbar.close()
535 # Final checkpoint save
536 if self.config.checkpoint_dir:
537 self._save_checkpoint(checkpoint_name)
539 def _save_checkpoint(self, checkpoint_name: str) -> None:
540 """Save current checkpoint.
542 Args:
543 checkpoint_name: Name for checkpoint file.
544 """
545 if self.checkpoint and self.config.checkpoint_dir: 545 ↛ exitline 545 didn't return from function '_save_checkpoint' because the condition on line 545 was always true
546 checkpoint_dir = Path(self.config.checkpoint_dir)
547 checkpoint_path = checkpoint_dir / f"{checkpoint_name}.json"
548 self.checkpoint.save(checkpoint_path)
550 def _results_to_dataframe(self) -> pd.DataFrame:
551 """Convert checkpoint results to DataFrame.
553 Returns:
554 DataFrame with results and metadata.
555 """
556 if not self.checkpoint or not self.checkpoint.results:
557 return pd.DataFrame()
559 # Build rows
560 rows = []
561 for file_result in self.checkpoint.results:
562 row = {
563 "file": file_result.file,
564 "success": file_result.success,
565 "timed_out": file_result.timed_out,
566 }
568 if file_result.success:
569 row.update(file_result.result)
570 row["error"] = None
571 else:
572 row["error"] = file_result.error
573 row["traceback"] = file_result.traceback
575 row["duration"] = file_result.duration
576 rows.append(row)
578 return pd.DataFrame(rows)
581def resume_batch(
582 checkpoint_dir: str | Path, checkpoint_name: str = "batch_checkpoint"
583) -> BatchCheckpoint:
584 """Resume a batch job from checkpoint directory.
586 Convenience function to load checkpoint and inspect state.
588 Args:
589 checkpoint_dir: Directory containing checkpoint.
590 checkpoint_name: Name of checkpoint file.
592 Returns:
593 Loaded checkpoint.
595 Example:
596 >>> checkpoint = resume_batch('./checkpoints')
597 >>> print(f"Completed: {len(checkpoint.completed_files)}")
599 References:
600 API-012: Advanced Batch Control
601 """
602 checkpoint_path = Path(checkpoint_dir) / f"{checkpoint_name}.json"
603 return BatchCheckpoint.load(checkpoint_path)
606__all__ = [
607 "AdvancedBatchProcessor",
608 "BatchCheckpoint",
609 "BatchConfig",
610 "FileResult",
611 "TimeoutError",
612 "resume_batch",
613]