Coverage for src / tracekit / cli / batch.py: 99%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""TraceKit Batch Command implementing CLI-004. 

2 

3Provides CLI for batch processing multiple files with parallel execution support. 

4 

5 

6Example: 

7 $ tracekit batch '*.wfm' --analysis characterize 

8 $ tracekit batch 'test_*.wfm' --analysis decode --parallel 4 

9 $ tracekit batch 'captures/*.wfm' --analysis spectrum --save-summary results.csv 

10""" 

11 

12from __future__ import annotations 

13 

14import csv 

15import glob 

16import logging 

17from pathlib import Path 

18from typing import Any 

19 

20import click 

21 

22from tracekit.cli.main import format_output 

23 

24logger = logging.getLogger("tracekit.cli.batch") 

25 

26 

27@click.command() # type: ignore[misc] 

28@click.argument("pattern") # type: ignore[misc] 

29@click.option( # type: ignore[misc] 

30 "--analysis", 

31 type=click.Choice(["characterize", "decode", "spectrum"], case_sensitive=False), 

32 required=True, 

33 help="Type of analysis to perform on each file.", 

34) 

35@click.option( # type: ignore[misc] 

36 "--parallel", 

37 type=int, 

38 default=1, 

39 help="Number of files to process concurrently (default: 1).", 

40) 

41@click.option( # type: ignore[misc] 

42 "--output", 

43 type=click.Choice(["json", "csv", "html", "table"], case_sensitive=False), 

44 default="table", 

45 help="Output format (default: table).", 

46) 

47@click.option( # type: ignore[misc] 

48 "--save-summary", 

49 type=click.Path(), 

50 default=None, 

51 help="Save aggregated results to file (CSV format).", 

52) 

53@click.option( # type: ignore[misc] 

54 "--continue-on-error", 

55 is_flag=True, 

56 help="Continue processing even if individual files fail.", 

57) 

58@click.pass_context # type: ignore[misc] 

59def batch( 

60 ctx: click.Context, 

61 pattern: str, 

62 analysis: str, 

63 parallel: int, 

64 output: str, 

65 save_summary: str | None, 

66 continue_on_error: bool, 

67) -> None: 

68 """Batch process multiple files. 

69 

70 Processes all files matching the given pattern with the specified analysis. 

71 Supports parallel processing for faster execution on multi-core systems. 

72 

73 Args: 

74 ctx: Click context object. 

75 pattern: Glob pattern to match files. 

76 analysis: Type of analysis (characterize, decode, spectrum). 

77 parallel: Number of parallel workers. 

78 output: Output format (json, csv, html, table). 

79 save_summary: Path to save CSV summary file. 

80 continue_on_error: Continue processing if individual files fail. 

81 

82 Raises: 

83 Exception: If batch processing fails or no files found. 

84 

85 Examples: 

86 

87 \b 

88 # Process all WFM files with characterization 

89 $ tracekit batch '*.wfm' --analysis characterize 

90 

91 \b 

92 # Parallel processing with 4 workers 

93 $ tracekit batch 'test_run_*/*.wfm' \\ 

94 --analysis characterize \\ 

95 --parallel 4 \\ 

96 --save-summary results.csv 

97 

98 \b 

99 # Decode all captures, continue on errors 

100 $ tracekit batch 'captures/*.wfm' \\ 

101 --analysis decode \\ 

102 --continue-on-error 

103 """ 

104 verbose = ctx.obj.get("verbose", 0) 

105 

106 if verbose: 

107 logger.info(f"Batch processing pattern: {pattern}") 

108 logger.info(f"Analysis type: {analysis}") 

109 logger.info(f"Parallel workers: {parallel}") 

110 

111 try: 

112 # Expand glob pattern 

113 files = glob.glob(pattern, recursive=True) # noqa: PTH207 

114 

115 if not files: 

116 click.echo(f"No files matched pattern: {pattern}", err=True) 

117 ctx.exit(1) 

118 

119 logger.info(f"Found {len(files)} files to process") 

120 

121 # Perform batch analysis 

122 results = _perform_batch_analysis( 

123 files=files, 

124 analysis_type=analysis, 

125 parallel=parallel, 

126 continue_on_error=continue_on_error, 

127 verbose=verbose, 

128 ) 

129 

130 # Save summary if requested 

131 if save_summary: 

132 _save_summary(results, save_summary) 

133 logger.info(f"Summary saved to {save_summary}") 

134 

135 # Output aggregated results 

136 summary = _generate_summary(results) 

137 formatted = format_output(summary, output) 

138 click.echo(formatted) 

139 

140 except Exception as e: 

141 logger.error(f"Batch processing failed: {e}") 

142 if verbose > 1: 

143 raise 

144 click.echo(f"Error: {e}", err=True) 

145 ctx.exit(1) 

146 

147 

148def _perform_batch_analysis( 

149 files: list[str], 

150 analysis_type: str, 

151 parallel: int, 

152 continue_on_error: bool, 

153 verbose: int, 

154) -> list[dict[str, Any]]: 

155 """Perform batch analysis on multiple files. 

156 

157 Uses concurrent.futures for parallel processing when parallel > 1. 

158 

159 Args: 

160 files: List of file paths to process. 

161 analysis_type: Type of analysis to perform. 

162 parallel: Number of parallel workers. 

163 continue_on_error: Whether to continue on errors. 

164 verbose: Verbosity level. 

165 

166 Returns: 

167 List of result dictionaries, one per file. 

168 

169 Raises: 

170 Exception: If analysis fails and continue_on_error is False. 

171 """ 

172 from concurrent.futures import ThreadPoolExecutor, as_completed 

173 

174 def analyze_single_file(file_path: str) -> dict[str, Any]: 

175 """Analyze a single file and return results. 

176 

177 Args: 

178 file_path: Path to waveform file to analyze. 

179 

180 Returns: 

181 Dictionary containing analysis results. 

182 """ 

183 import numpy as np 

184 

185 from tracekit.analyzers.waveform.measurements import fall_time, rise_time 

186 from tracekit.analyzers.waveform.spectral import fft, thd 

187 from tracekit.inference import detect_protocol 

188 from tracekit.loaders import load 

189 

190 # Load trace 

191 trace = load(file_path) 

192 sample_rate = trace.metadata.sample_rate 

193 

194 # Base result 

195 result: dict[str, Any] = { 

196 "file": str(Path(file_path).name), 

197 "status": "success", 

198 "analysis_type": analysis_type, 

199 "samples": len(trace.data), # type: ignore[union-attr] 

200 "sample_rate": f"{sample_rate / 1e6:.1f} MHz", 

201 } 

202 

203 # Add analysis-specific results 

204 if analysis_type == "characterize": 

205 # Pass WaveformTrace directly to functions (they expect WaveformTrace) 

206 rt = rise_time(trace) # type: ignore[arg-type] 

207 ft = fall_time(trace) # type: ignore[arg-type] 

208 result.update( 

209 { 

210 "rise_time": f"{rt * 1e9:.2f} ns" if not np.isnan(rt) else "N/A", 

211 "fall_time": f"{ft * 1e9:.2f} ns" if not np.isnan(ft) else "N/A", 

212 } 

213 ) 

214 elif analysis_type == "decode": 

215 detected = detect_protocol(trace) # type: ignore[arg-type] 

216 result.update( 

217 { 

218 "protocol": detected.get("protocol", "unknown"), 

219 "confidence": f"{detected.get('confidence', 0) * 100:.0f}%", 

220 } 

221 ) 

222 elif analysis_type == "spectrum": 222 ↛ 238line 222 didn't jump to line 238 because the condition on line 222 was always true

223 # Pass WaveformTrace directly to FFT functions 

224 freqs, mags = fft(trace) # type: ignore[misc, arg-type] 

225 if len(mags) > 0: 

226 peak_idx = int(np.argmax(mags)) 

227 peak_freq = freqs[peak_idx] 

228 else: 

229 peak_freq = 0.0 

230 thd_val = thd(trace) # type: ignore[arg-type] 

231 result.update( 

232 { 

233 "peak_frequency": f"{peak_freq / 1e6:.3f} MHz", 

234 "thd": f"{thd_val:.1f} dB" if not np.isnan(thd_val) else "N/A", 

235 } 

236 ) 

237 

238 return result 

239 

240 results: list[dict[str, Any]] = [] 

241 

242 if parallel > 1: 

243 # Parallel processing using ThreadPoolExecutor 

244 with ThreadPoolExecutor(max_workers=parallel) as executor: 

245 future_to_file = {executor.submit(analyze_single_file, f): f for f in files} 

246 

247 for i, future in enumerate(as_completed(future_to_file), 1): 

248 file_path = future_to_file[future] 

249 if verbose: 

250 logger.info(f"[{i}/{len(files)}] Completed {Path(file_path).name}") 

251 

252 try: 

253 result = future.result() 

254 results.append(result) 

255 except Exception as e: 

256 logger.error(f"Failed to process {file_path}: {e}") 

257 if continue_on_error: 

258 results.append( 

259 { 

260 "file": str(Path(file_path).name), 

261 "status": "error", 

262 "error": str(e), 

263 } 

264 ) 

265 else: 

266 raise 

267 else: 

268 # Sequential processing 

269 for i, file_path in enumerate(files, 1): 

270 if verbose: 

271 logger.info(f"[{i}/{len(files)}] Processing {file_path}") 

272 

273 try: 

274 result = analyze_single_file(file_path) 

275 results.append(result) 

276 except Exception as e: 

277 logger.error(f"Failed to process {file_path}: {e}") 

278 if continue_on_error: 

279 results.append( 

280 { 

281 "file": str(Path(file_path).name), 

282 "status": "error", 

283 "error": str(e), 

284 } 

285 ) 

286 else: 

287 raise 

288 

289 return results 

290 

291 

292def _generate_summary(results: list[dict[str, Any]]) -> dict[str, Any]: 

293 """Generate summary statistics from batch results. 

294 

295 Args: 

296 results: List of individual file results. 

297 

298 Returns: 

299 Summary dictionary with aggregated statistics. 

300 """ 

301 total = len(results) 

302 successful = sum(1 for r in results if r.get("status") == "success") 

303 failed = total - successful 

304 

305 summary: dict[str, Any] = { 

306 "total_files": total, 

307 "successful": successful, 

308 "failed": failed, 

309 "success_rate": f"{successful / total * 100:.1f}%" if total > 0 else "N/A", 

310 } 

311 

312 # Add analysis-specific aggregations 

313 if results and successful > 0: 

314 summary["note"] = "Detailed per-file results available in JSON/CSV output" 

315 

316 return summary 

317 

318 

319def _save_summary(results: list[dict[str, Any]], output_path: str) -> None: 

320 """Save batch results to CSV file. 

321 

322 Args: 

323 results: List of result dictionaries. 

324 output_path: Path to save CSV file. 

325 """ 

326 if not results: 

327 return 

328 

329 # Get all unique keys across all results 

330 all_keys: set[str] = set() 

331 for result in results: 

332 all_keys.update(result.keys()) 

333 

334 fieldnames = sorted(all_keys) 

335 

336 with open(output_path, "w", newline="") as f: 

337 writer = csv.DictWriter(f, fieldnames=fieldnames) 

338 writer.writeheader() 

339 writer.writerows(results)