Coverage for src / tracekit / batch / analyze.py: 94%
47 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Multi-file batch analysis with parallel execution support.
4This module provides parallel batch processing of signal files using
5concurrent.futures for efficient multi-core utilization.
6"""
8from collections.abc import Callable
9from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
10from pathlib import Path
11from typing import Any
13import pandas as pd
16def batch_analyze(
17 files: list[str | Path],
18 analysis_fn: Callable[[str | Path], dict[str, Any]],
19 *,
20 parallel: bool = False,
21 workers: int | None = None,
22 progress_callback: Callable[[int, int, str], None] | None = None,
23 use_threads: bool = False,
24 **config: Any,
25) -> pd.DataFrame:
26 """Analyze multiple files with the same analysis configuration.
28 : Multi-file analysis with parallel execution support
29 via concurrent.futures. Returns aggregated results as a DataFrame for
30 easy statistical analysis and export.
32 Args:
33 files: List of file paths to analyze
34 analysis_fn: Analysis function to apply to each file.
35 Must accept a file path and return a dict of results.
36 parallel: Enable parallel processing (default: False)
37 workers: Number of parallel workers (default: CPU count)
38 progress_callback: Optional callback for progress updates.
39 Called with (current, total, filename) after each file.
40 use_threads: Use ThreadPoolExecutor instead of ProcessPoolExecutor
41 (useful for I/O-bound tasks, default: False)
42 **config: Additional keyword arguments passed to analysis_fn
44 Returns:
45 DataFrame with one row per file, columns from analysis results.
46 Always includes a 'file' column with the input filename.
48 Examples:
49 >>> import tracekit as tk
50 >>> import glob
51 >>> files = glob.glob('captures/*.wfm')
52 >>> results = tk.batch_analyze(
53 ... files,
54 ... analysis_fn=tk.characterize_buffer,
55 ... parallel=True,
56 ... workers=4
57 ... )
58 >>> print(results[['file', 'rise_time', 'fall_time', 'status']])
59 >>> results.to_csv('batch_results.csv')
61 Notes:
62 - Use parallel=True for CPU-bound analysis functions
63 - Use use_threads=True for I/O-bound operations (file loading)
64 - Progress callback is called from worker threads/processes
65 - All exceptions during analysis are caught and stored in 'error' column
67 References:
68 BATCH-001: Multi-File Analysis
69 """
70 if not files:
71 return pd.DataFrame()
73 # Wrapper to include config in analysis calls
74 def _wrapped_analysis(filepath: str | Path) -> dict[str, Any]:
75 try:
76 result = analysis_fn(filepath, **config)
77 # Ensure result is a dict
78 if not isinstance(result, dict):
79 result = {"result": result} # type: ignore[unreachable]
80 result["file"] = str(filepath)
81 result["error"] = None
82 return result
83 except Exception as e:
84 # Return error info on failure
85 return {
86 "file": str(filepath),
87 "error": str(e),
88 }
90 results: list[dict[str, Any]] = []
91 total = len(files)
93 if parallel:
94 # Use concurrent.futures for parallel execution
95 executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
96 with executor_class(max_workers=workers) as executor:
97 # Submit all tasks
98 future_to_file = {executor.submit(_wrapped_analysis, f): f for f in files}
100 # Process results as they complete
101 for i, future in enumerate(as_completed(future_to_file), 1):
102 filepath = future_to_file[future]
103 try:
104 result = future.result()
105 results.append(result)
107 if progress_callback:
108 progress_callback(i, total, str(filepath))
109 except Exception as e:
110 # Catch execution errors
111 results.append(
112 {
113 "file": str(filepath),
114 "error": f"Execution error: {e}",
115 }
116 )
118 else:
119 # Sequential processing
120 for i, filepath in enumerate(files, 1):
121 result = _wrapped_analysis(filepath)
122 results.append(result)
124 if progress_callback:
125 progress_callback(i, total, str(filepath))
127 # Convert to DataFrame
128 df = pd.DataFrame(results)
130 # Reorder columns: file first, error last
131 cols = df.columns.tolist()
132 if "file" in cols: 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true
133 cols.remove("file")
134 cols = ["file", *cols]
135 if "error" in cols: 135 ↛ 139line 135 didn't jump to line 139 because the condition on line 135 was always true
136 cols.remove("error")
137 cols = [*cols, "error"]
139 return df[cols]