Coverage for src / tracekit / reporting / analyze.py: 73%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Comprehensive analysis report system main entry point. 

2 

3This module provides the primary `analyze()` function for running 

4comprehensive analysis on any supported input data type. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import time 

11from collections.abc import Callable 

12from datetime import datetime 

13from pathlib import Path 

14from typing import TYPE_CHECKING, Any 

15 

16from tracekit.reporting.config import ( 

17 AnalysisConfig, 

18 AnalysisDomain, 

19 AnalysisError, 

20 AnalysisResult, 

21 InputType, 

22 ProgressInfo, 

23 get_available_analyses, 

24) 

25from tracekit.reporting.output import OutputManager 

26 

27if TYPE_CHECKING: 

28 from tracekit.core.types import Trace 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33class UnsupportedFormatError(Exception): 

34 """Raised when input file format is not recognized.""" 

35 

36 pass 

37 

38 

39def analyze( 

40 input_path: str | Path | None = None, 

41 data: Trace | bytes | list[Any] | None = None, 

42 *, 

43 output_dir: str | Path | None = None, 

44 config: AnalysisConfig | None = None, 

45 progress_callback: Callable[[ProgressInfo], None] | None = None, 

46) -> AnalysisResult: 

47 """Run comprehensive analysis on data. 

48 

49 Provide EITHER input_path (file) OR data (in-memory), not both. 

50 

51 Args: 

52 input_path: Path to input data file (any supported format). 

53 data: In-memory data (Trace, bytes, list of packets). 

54 output_dir: Base directory for output. Default: input file's directory 

55 or current directory for in-memory data. 

56 config: Analysis configuration. Default: analyze all applicable domains. 

57 progress_callback: Called with progress updates during analysis. 

58 

59 Returns: 

60 AnalysisResult with paths to all outputs and summary statistics. 

61 

62 Raises: 

63 FileNotFoundError: If the input file does not exist. 

64 ValueError: If neither or both input_path and data are provided. 

65 

66 Examples: 

67 # From file 

68 result = analyze("capture.wfm") 

69 print(result.output_dir) # 20260101_120000_capture_analysis/ 

70 

71 # From in-memory data 

72 result = analyze(data=my_waveform_trace, output_dir="/reports") 

73 

74 # With configuration 

75 config = AnalysisConfig(domains=[AnalysisDomain.SPECTRAL]) 

76 result = analyze("capture.wfm", config=config) 

77 

78 # With progress callback 

79 def on_progress(info): 

80 print(f"{info.domain}: {info.percent}%") 

81 result = analyze("capture.wfm", progress_callback=on_progress) 

82 """ 

83 # Validate inputs 

84 if input_path is None and data is None: 

85 raise ValueError("Either input_path or data must be provided") 

86 if input_path is not None and data is not None: 

87 raise ValueError("Provide input_path OR data, not both") 

88 

89 # Use default config if not provided 

90 if config is None: 

91 config = AnalysisConfig() 

92 

93 # Track timing 

94 start_time = time.time() 

95 

96 # Determine input name and type 

97 if input_path is not None: 

98 input_path = Path(input_path) 

99 if not input_path.exists(): 99 ↛ 101line 99 didn't jump to line 101 because the condition on line 99 was always true

100 raise FileNotFoundError(f"Input file not found: {input_path}") 

101 input_name = input_path.stem 

102 input_type = _detect_input_type_from_file(input_path) 

103 loaded_data = _load_input_file(input_path, input_type) 

104 else: 

105 input_name = "memory_data" 

106 input_type = _detect_input_type_from_data(data) 

107 loaded_data = data 

108 

109 # Determine output directory 

110 if output_dir is None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 if input_path is not None: 

112 base_dir = input_path.parent 

113 else: 

114 base_dir = Path.cwd() 

115 else: 

116 base_dir = Path(output_dir) 

117 

118 # Create output manager with timestamp 

119 timestamp = datetime.now() 

120 output_manager = OutputManager(base_dir, input_name, timestamp) 

121 output_manager.create() 

122 

123 # Report progress: starting 

124 _report_progress( 

125 progress_callback, 

126 phase="initializing", 

127 domain=None, 

128 function=None, 

129 percent=0.0, 

130 message="Initializing analysis", 

131 elapsed=time.time() - start_time, 

132 ) 

133 

134 # Determine applicable domains 

135 applicable_domains = get_available_analyses(input_type) 

136 enabled_domains = [d for d in applicable_domains if config.is_domain_enabled(d)] 

137 

138 logger.info(f"Running analysis on {input_name} ({input_type.value})") 

139 logger.info(f"Enabled domains: {[d.value for d in enabled_domains]}") 

140 

141 # Execute analysis engine 

142 from tracekit.reporting.engine import AnalysisEngine 

143 

144 engine = AnalysisEngine(config) 

145 engine_result = engine.run( 

146 input_path=input_path, 

147 data=loaded_data, 

148 progress_callback=progress_callback, 

149 ) 

150 

151 # Generate plots 

152 plot_paths: list[Path] = [] 

153 if config.generate_plots: 

154 _report_progress( 

155 progress_callback, 

156 phase="plotting", 

157 domain=None, 

158 function=None, 

159 percent=70.0, 

160 message="Generating visualizations", 

161 elapsed=time.time() - start_time, 

162 ) 

163 

164 from tracekit.reporting.plots import PlotGenerator 

165 

166 plot_gen = PlotGenerator(config) 

167 for domain, results in engine_result["results"].items(): 167 ↛ 168line 167 didn't jump to line 168 because the loop on line 167 never started

168 domain_plots = plot_gen.generate_plots(domain, results, output_manager) 

169 plot_paths.extend(domain_plots) 

170 

171 # Save data outputs 

172 _report_progress( 

173 progress_callback, 

174 phase="saving", 

175 domain=None, 

176 function=None, 

177 percent=85.0, 

178 message="Saving analysis results", 

179 elapsed=time.time() - start_time, 

180 ) 

181 

182 # Save summary data 

183 summary_data = { 

184 "input": { 

185 "name": input_name, 

186 "type": input_type.value, 

187 "path": str(input_path) if input_path else None, 

188 }, 

189 "timestamp": timestamp.isoformat(), 

190 "duration_seconds": time.time() - start_time, 

191 "stats": engine_result["stats"], 

192 "domains": {d.value: r for d, r in engine_result["results"].items()}, 

193 } 

194 

195 summary_json = output_manager.save_json("summary", summary_data) 

196 summary_yaml = None 

197 if "yaml" in config.output_formats: 197 ↛ 201line 197 didn't jump to line 201 because the condition on line 197 was always true

198 summary_yaml = output_manager.save_yaml("summary", summary_data) 

199 

200 # Save metadata 

201 metadata = { 

202 "tracekit_version": _get_version(), 

203 "analysis_version": "2.0", 

204 "timestamp": timestamp.isoformat(), 

205 "input_file": str(input_path) if input_path else None, 

206 "input_type": input_type.value, 

207 "duration_seconds": time.time() - start_time, 

208 "total_analyses": engine_result["stats"]["total_analyses"], 

209 "successful": engine_result["stats"]["successful_analyses"], 

210 "failed": engine_result["stats"]["failed_analyses"], 

211 "skipped": engine_result["stats"].get("skipped_analyses", 0), 

212 } 

213 metadata_json = output_manager.save_json("metadata", metadata) 

214 

215 # Save configuration 

216 config_data = { 

217 "domains": [d.value for d in enabled_domains], 

218 "generate_plots": config.generate_plots, 

219 "plot_format": config.plot_format, 

220 "plot_dpi": config.plot_dpi, 

221 "output_formats": config.output_formats, 

222 "index_formats": config.index_formats, 

223 } 

224 config_yaml = output_manager.save_yaml("config", config_data) 

225 

226 # Save domain results 

227 domain_dirs: dict[AnalysisDomain, Path] = {} 

228 for domain, results in engine_result["results"].items(): 

229 domain_dir = output_manager.create_domain_dir(domain) 

230 domain_dirs[domain] = domain_dir 

231 output_manager.save_json("results", results, subdir=domain.value) 

232 

233 # Save errors if any 

234 error_log: Path | None = None 

235 errors: list[AnalysisError] = engine_result["errors"] 

236 if errors: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 error_list = [ 

238 { 

239 "domain": e.domain.value, 

240 "function": e.function, 

241 "error_type": e.error_type, 

242 "error_message": e.error_message, 

243 "duration_ms": e.duration_ms, 

244 } 

245 for e in errors 

246 ] 

247 error_data = {"errors": error_list, "count": len(error_list)} 

248 error_log = output_manager.save_json("failed_analyses", error_data, subdir="errors") 

249 

250 # Build AnalysisResult for index generation 

251 partial_result = AnalysisResult( 

252 output_dir=output_manager.root, 

253 index_html=None, 

254 index_md=None, 

255 index_pdf=None, 

256 summary_json=summary_json, 

257 summary_yaml=summary_yaml, 

258 metadata_json=metadata_json, 

259 config_yaml=config_yaml, 

260 domain_dirs=domain_dirs, 

261 plot_paths=plot_paths, 

262 error_log=error_log, 

263 input_file=str(input_path) if input_path else None, 

264 input_type=input_type, 

265 total_analyses=engine_result["stats"]["total_analyses"], 

266 successful_analyses=engine_result["stats"]["successful_analyses"], 

267 failed_analyses=engine_result["stats"]["failed_analyses"], 

268 skipped_analyses=engine_result["stats"].get("skipped_analyses", 0), 

269 duration_seconds=time.time() - start_time, 

270 domain_summaries=engine_result["results"], 

271 errors=errors, 

272 ) 

273 

274 # Generate index files 

275 _report_progress( 

276 progress_callback, 

277 phase="indexing", 

278 domain=None, 

279 function=None, 

280 percent=95.0, 

281 message="Generating index files", 

282 elapsed=time.time() - start_time, 

283 ) 

284 

285 from tracekit.reporting.index import IndexGenerator 

286 

287 index_gen = IndexGenerator(output_manager) 

288 index_paths = index_gen.generate(partial_result, config.index_formats) 

289 

290 # Complete result 

291 result = AnalysisResult( 

292 output_dir=output_manager.root, 

293 index_html=index_paths.get("html"), 

294 index_md=index_paths.get("md"), 

295 index_pdf=index_paths.get("pdf"), 

296 summary_json=summary_json, 

297 summary_yaml=summary_yaml, 

298 metadata_json=metadata_json, 

299 config_yaml=config_yaml, 

300 domain_dirs=domain_dirs, 

301 plot_paths=plot_paths, 

302 error_log=error_log, 

303 input_file=str(input_path) if input_path else None, 

304 input_type=input_type, 

305 total_analyses=engine_result["stats"]["total_analyses"], 

306 successful_analyses=engine_result["stats"]["successful_analyses"], 

307 failed_analyses=engine_result["stats"]["failed_analyses"], 

308 skipped_analyses=engine_result["stats"].get("skipped_analyses", 0), 

309 duration_seconds=time.time() - start_time, 

310 domain_summaries=engine_result["results"], 

311 errors=errors, 

312 ) 

313 

314 # Report completion 

315 _report_progress( 

316 progress_callback, 

317 phase="complete", 

318 domain=None, 

319 function=None, 

320 percent=100.0, 

321 message=f"Analysis complete: {result.successful_analyses}/{result.total_analyses} successful", 

322 elapsed=time.time() - start_time, 

323 ) 

324 

325 logger.info(f"Analysis complete. Output: {result.output_dir}") 

326 return result 

327 

328 

329def _detect_input_type_from_file(path: Path) -> InputType: 

330 """Detect input type from file extension.""" 

331 suffix = path.suffix.lower() 

332 

333 waveform_extensions = {".wfm", ".csv", ".npz", ".hdf5", ".h5", ".wav", ".tdms"} 

334 digital_extensions = {".vcd", ".sr"} 

335 binary_extensions = {".bin", ".raw"} 

336 pcap_extensions = {".pcap", ".pcapng"} 

337 sparams_extensions = {".s1p", ".s2p", ".s3p", ".s4p", ".s5p", ".s6p", ".s7p", ".s8p"} 

338 

339 if suffix in waveform_extensions: 

340 return InputType.WAVEFORM 

341 elif suffix in digital_extensions: 

342 return InputType.DIGITAL 

343 elif suffix in binary_extensions: 

344 return InputType.BINARY 

345 elif suffix in pcap_extensions: 

346 return InputType.PCAP 

347 elif suffix in sparams_extensions: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 return InputType.SPARAMS 

349 else: 

350 raise UnsupportedFormatError(f"Unsupported file format: {suffix}") 

351 

352 

353def _detect_input_type_from_data(data: Any) -> InputType: 

354 """Detect input type from in-memory data.""" 

355 # Check for Trace object (time + voltage = waveform) 

356 # Check this BEFORE SParameterData to avoid MagicMock false positives 

357 if hasattr(data, "time") and hasattr(data, "voltage"): 

358 # Verify these are not just mock/placeholder attributes 

359 try: 

360 _ = data.time 

361 _ = data.voltage 

362 return InputType.WAVEFORM 

363 except (AttributeError, TypeError): 

364 pass 

365 

366 # Check for SParameterData 

367 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 return InputType.SPARAMS 

369 

370 # Check for bytes 

371 if isinstance(data, bytes | bytearray): 

372 return InputType.BINARY 

373 

374 # Check for list of packets 

375 if isinstance(data, list) and len(data) > 0: 375 ↛ 381line 375 didn't jump to line 381 because the condition on line 375 was always true

376 first = data[0] 

377 if hasattr(first, "timestamp") or isinstance(first, dict): 377 ↛ 381line 377 didn't jump to line 381 because the condition on line 377 was always true

378 return InputType.PACKETS 

379 

380 # Default to waveform 

381 return InputType.WAVEFORM 

382 

383 

384def _load_input_file(path: Path, input_type: InputType) -> Any: 

385 """Load input file based on type.""" 

386 try: 

387 from tracekit.loaders import load 

388 

389 if input_type == InputType.WAVEFORM: 

390 return load(path) 

391 elif input_type == InputType.DIGITAL: 

392 # Use VCD/SR loader 

393 from tracekit.loaders.vcd import load_vcd 

394 

395 return load_vcd(path) 

396 elif input_type == InputType.BINARY: 

397 return path.read_bytes() 

398 elif input_type == InputType.PCAP: 

399 from tracekit.loaders.pcap import load_pcap 

400 

401 return load_pcap(path) 

402 elif input_type == InputType.SPARAMS: 

403 from tracekit.analyzers.signal_integrity.sparams import load_touchstone 

404 

405 return load_touchstone(path) 

406 else: 

407 return load(path) 

408 except ImportError as e: 

409 logger.warning(f"Loader not available: {e}") 

410 # Fall back to raw bytes 

411 return path.read_bytes() 

412 

413 

414def _report_progress( 

415 callback: Callable[[ProgressInfo], None] | None, 

416 phase: str, 

417 domain: AnalysisDomain | None, 

418 function: str | None, 

419 percent: float, 

420 message: str, 

421 elapsed: float, 

422) -> None: 

423 """Report progress to callback if provided.""" 

424 if callback is not None: 

425 info = ProgressInfo( 

426 phase=phase, 

427 domain=domain, 

428 function=function, 

429 percent=percent, 

430 message=message, 

431 elapsed_seconds=elapsed, 

432 estimated_remaining_seconds=None, 

433 ) 

434 callback(info) 

435 

436 

437def _get_version() -> str: 

438 """Get TraceKit version.""" 

439 try: 

440 from tracekit import __version__ 

441 

442 return __version__ 

443 except ImportError: 

444 return "unknown" 

445 

446 

447__all__ = [ 

448 "UnsupportedFormatError", 

449 "analyze", 

450]