Coverage for src / tracekit / cli / batch.py: 99%
116 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""TraceKit Batch Command implementing CLI-004.
3Provides CLI for batch processing multiple files with parallel execution support.
6Example:
7 $ tracekit batch '*.wfm' --analysis characterize
8 $ tracekit batch 'test_*.wfm' --analysis decode --parallel 4
9 $ tracekit batch 'captures/*.wfm' --analysis spectrum --save-summary results.csv
10"""
12from __future__ import annotations
14import csv
15import glob
16import logging
17from pathlib import Path
18from typing import Any
20import click
22from tracekit.cli.main import format_output
24logger = logging.getLogger("tracekit.cli.batch")
27@click.command() # type: ignore[misc]
28@click.argument("pattern") # type: ignore[misc]
29@click.option( # type: ignore[misc]
30 "--analysis",
31 type=click.Choice(["characterize", "decode", "spectrum"], case_sensitive=False),
32 required=True,
33 help="Type of analysis to perform on each file.",
34)
35@click.option( # type: ignore[misc]
36 "--parallel",
37 type=int,
38 default=1,
39 help="Number of files to process concurrently (default: 1).",
40)
41@click.option( # type: ignore[misc]
42 "--output",
43 type=click.Choice(["json", "csv", "html", "table"], case_sensitive=False),
44 default="table",
45 help="Output format (default: table).",
46)
47@click.option( # type: ignore[misc]
48 "--save-summary",
49 type=click.Path(),
50 default=None,
51 help="Save aggregated results to file (CSV format).",
52)
53@click.option( # type: ignore[misc]
54 "--continue-on-error",
55 is_flag=True,
56 help="Continue processing even if individual files fail.",
57)
58@click.pass_context # type: ignore[misc]
59def batch(
60 ctx: click.Context,
61 pattern: str,
62 analysis: str,
63 parallel: int,
64 output: str,
65 save_summary: str | None,
66 continue_on_error: bool,
67) -> None:
68 """Batch process multiple files.
70 Processes all files matching the given pattern with the specified analysis.
71 Supports parallel processing for faster execution on multi-core systems.
73 Args:
74 ctx: Click context object.
75 pattern: Glob pattern to match files.
76 analysis: Type of analysis (characterize, decode, spectrum).
77 parallel: Number of parallel workers.
78 output: Output format (json, csv, html, table).
79 save_summary: Path to save CSV summary file.
80 continue_on_error: Continue processing if individual files fail.
82 Raises:
83 Exception: If batch processing fails or no files found.
85 Examples:
87 \b
88 # Process all WFM files with characterization
89 $ tracekit batch '*.wfm' --analysis characterize
91 \b
92 # Parallel processing with 4 workers
93 $ tracekit batch 'test_run_*/*.wfm' \\
94 --analysis characterize \\
95 --parallel 4 \\
96 --save-summary results.csv
98 \b
99 # Decode all captures, continue on errors
100 $ tracekit batch 'captures/*.wfm' \\
101 --analysis decode \\
102 --continue-on-error
103 """
104 verbose = ctx.obj.get("verbose", 0)
106 if verbose:
107 logger.info(f"Batch processing pattern: {pattern}")
108 logger.info(f"Analysis type: {analysis}")
109 logger.info(f"Parallel workers: {parallel}")
111 try:
112 # Expand glob pattern
113 files = glob.glob(pattern, recursive=True) # noqa: PTH207
115 if not files:
116 click.echo(f"No files matched pattern: {pattern}", err=True)
117 ctx.exit(1)
119 logger.info(f"Found {len(files)} files to process")
121 # Perform batch analysis
122 results = _perform_batch_analysis(
123 files=files,
124 analysis_type=analysis,
125 parallel=parallel,
126 continue_on_error=continue_on_error,
127 verbose=verbose,
128 )
130 # Save summary if requested
131 if save_summary:
132 _save_summary(results, save_summary)
133 logger.info(f"Summary saved to {save_summary}")
135 # Output aggregated results
136 summary = _generate_summary(results)
137 formatted = format_output(summary, output)
138 click.echo(formatted)
140 except Exception as e:
141 logger.error(f"Batch processing failed: {e}")
142 if verbose > 1:
143 raise
144 click.echo(f"Error: {e}", err=True)
145 ctx.exit(1)
148def _perform_batch_analysis(
149 files: list[str],
150 analysis_type: str,
151 parallel: int,
152 continue_on_error: bool,
153 verbose: int,
154) -> list[dict[str, Any]]:
155 """Perform batch analysis on multiple files.
157 Uses concurrent.futures for parallel processing when parallel > 1.
159 Args:
160 files: List of file paths to process.
161 analysis_type: Type of analysis to perform.
162 parallel: Number of parallel workers.
163 continue_on_error: Whether to continue on errors.
164 verbose: Verbosity level.
166 Returns:
167 List of result dictionaries, one per file.
169 Raises:
170 Exception: If analysis fails and continue_on_error is False.
171 """
172 from concurrent.futures import ThreadPoolExecutor, as_completed
174 def analyze_single_file(file_path: str) -> dict[str, Any]:
175 """Analyze a single file and return results.
177 Args:
178 file_path: Path to waveform file to analyze.
180 Returns:
181 Dictionary containing analysis results.
182 """
183 import numpy as np
185 from tracekit.analyzers.waveform.measurements import fall_time, rise_time
186 from tracekit.analyzers.waveform.spectral import fft, thd
187 from tracekit.inference import detect_protocol
188 from tracekit.loaders import load
190 # Load trace
191 trace = load(file_path)
192 sample_rate = trace.metadata.sample_rate
194 # Base result
195 result: dict[str, Any] = {
196 "file": str(Path(file_path).name),
197 "status": "success",
198 "analysis_type": analysis_type,
199 "samples": len(trace.data), # type: ignore[union-attr]
200 "sample_rate": f"{sample_rate / 1e6:.1f} MHz",
201 }
203 # Add analysis-specific results
204 if analysis_type == "characterize":
205 # Pass WaveformTrace directly to functions (they expect WaveformTrace)
206 rt = rise_time(trace) # type: ignore[arg-type]
207 ft = fall_time(trace) # type: ignore[arg-type]
208 result.update(
209 {
210 "rise_time": f"{rt * 1e9:.2f} ns" if not np.isnan(rt) else "N/A",
211 "fall_time": f"{ft * 1e9:.2f} ns" if not np.isnan(ft) else "N/A",
212 }
213 )
214 elif analysis_type == "decode":
215 detected = detect_protocol(trace) # type: ignore[arg-type]
216 result.update(
217 {
218 "protocol": detected.get("protocol", "unknown"),
219 "confidence": f"{detected.get('confidence', 0) * 100:.0f}%",
220 }
221 )
222 elif analysis_type == "spectrum": 222 ↛ 238line 222 didn't jump to line 238 because the condition on line 222 was always true
223 # Pass WaveformTrace directly to FFT functions
224 freqs, mags = fft(trace) # type: ignore[misc, arg-type]
225 if len(mags) > 0:
226 peak_idx = int(np.argmax(mags))
227 peak_freq = freqs[peak_idx]
228 else:
229 peak_freq = 0.0
230 thd_val = thd(trace) # type: ignore[arg-type]
231 result.update(
232 {
233 "peak_frequency": f"{peak_freq / 1e6:.3f} MHz",
234 "thd": f"{thd_val:.1f} dB" if not np.isnan(thd_val) else "N/A",
235 }
236 )
238 return result
240 results: list[dict[str, Any]] = []
242 if parallel > 1:
243 # Parallel processing using ThreadPoolExecutor
244 with ThreadPoolExecutor(max_workers=parallel) as executor:
245 future_to_file = {executor.submit(analyze_single_file, f): f for f in files}
247 for i, future in enumerate(as_completed(future_to_file), 1):
248 file_path = future_to_file[future]
249 if verbose:
250 logger.info(f"[{i}/{len(files)}] Completed {Path(file_path).name}")
252 try:
253 result = future.result()
254 results.append(result)
255 except Exception as e:
256 logger.error(f"Failed to process {file_path}: {e}")
257 if continue_on_error:
258 results.append(
259 {
260 "file": str(Path(file_path).name),
261 "status": "error",
262 "error": str(e),
263 }
264 )
265 else:
266 raise
267 else:
268 # Sequential processing
269 for i, file_path in enumerate(files, 1):
270 if verbose:
271 logger.info(f"[{i}/{len(files)}] Processing {file_path}")
273 try:
274 result = analyze_single_file(file_path)
275 results.append(result)
276 except Exception as e:
277 logger.error(f"Failed to process {file_path}: {e}")
278 if continue_on_error:
279 results.append(
280 {
281 "file": str(Path(file_path).name),
282 "status": "error",
283 "error": str(e),
284 }
285 )
286 else:
287 raise
289 return results
292def _generate_summary(results: list[dict[str, Any]]) -> dict[str, Any]:
293 """Generate summary statistics from batch results.
295 Args:
296 results: List of individual file results.
298 Returns:
299 Summary dictionary with aggregated statistics.
300 """
301 total = len(results)
302 successful = sum(1 for r in results if r.get("status") == "success")
303 failed = total - successful
305 summary: dict[str, Any] = {
306 "total_files": total,
307 "successful": successful,
308 "failed": failed,
309 "success_rate": f"{successful / total * 100:.1f}%" if total > 0 else "N/A",
310 }
312 # Add analysis-specific aggregations
313 if results and successful > 0:
314 summary["note"] = "Detailed per-file results available in JSON/CSV output"
316 return summary
319def _save_summary(results: list[dict[str, Any]], output_path: str) -> None:
320 """Save batch results to CSV file.
322 Args:
323 results: List of result dictionaries.
324 output_path: Path to save CSV file.
325 """
326 if not results:
327 return
329 # Get all unique keys across all results
330 all_keys: set[str] = set()
331 for result in results:
332 all_keys.update(result.keys())
334 fieldnames = sorted(all_keys)
336 with open(output_path, "w", newline="") as f:
337 writer = csv.DictWriter(f, fieldnames=fieldnames)
338 writer.writeheader()
339 writer.writerows(results)