Coverage for src / tracekit / reporting / analyze.py: 73%
148 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Comprehensive analysis report system main entry point.
3This module provides the primary `analyze()` function for running
4comprehensive analysis on any supported input data type.
5"""
7from __future__ import annotations
9import logging
10import time
11from collections.abc import Callable
12from datetime import datetime
13from pathlib import Path
14from typing import TYPE_CHECKING, Any
16from tracekit.reporting.config import (
17 AnalysisConfig,
18 AnalysisDomain,
19 AnalysisError,
20 AnalysisResult,
21 InputType,
22 ProgressInfo,
23 get_available_analyses,
24)
25from tracekit.reporting.output import OutputManager
27if TYPE_CHECKING:
28 from tracekit.core.types import Trace
30logger = logging.getLogger(__name__)
33class UnsupportedFormatError(Exception):
34 """Raised when input file format is not recognized."""
36 pass
39def analyze(
40 input_path: str | Path | None = None,
41 data: Trace | bytes | list[Any] | None = None,
42 *,
43 output_dir: str | Path | None = None,
44 config: AnalysisConfig | None = None,
45 progress_callback: Callable[[ProgressInfo], None] | None = None,
46) -> AnalysisResult:
47 """Run comprehensive analysis on data.
49 Provide EITHER input_path (file) OR data (in-memory), not both.
51 Args:
52 input_path: Path to input data file (any supported format).
53 data: In-memory data (Trace, bytes, list of packets).
54 output_dir: Base directory for output. Default: input file's directory
55 or current directory for in-memory data.
56 config: Analysis configuration. Default: analyze all applicable domains.
57 progress_callback: Called with progress updates during analysis.
59 Returns:
60 AnalysisResult with paths to all outputs and summary statistics.
62 Raises:
63 FileNotFoundError: If the input file does not exist.
64 ValueError: If neither or both input_path and data are provided.
66 Examples:
67 # From file
68 result = analyze("capture.wfm")
69 print(result.output_dir) # 20260101_120000_capture_analysis/
71 # From in-memory data
72 result = analyze(data=my_waveform_trace, output_dir="/reports")
74 # With configuration
75 config = AnalysisConfig(domains=[AnalysisDomain.SPECTRAL])
76 result = analyze("capture.wfm", config=config)
78 # With progress callback
79 def on_progress(info):
80 print(f"{info.domain}: {info.percent}%")
81 result = analyze("capture.wfm", progress_callback=on_progress)
82 """
83 # Validate inputs
84 if input_path is None and data is None:
85 raise ValueError("Either input_path or data must be provided")
86 if input_path is not None and data is not None:
87 raise ValueError("Provide input_path OR data, not both")
89 # Use default config if not provided
90 if config is None:
91 config = AnalysisConfig()
93 # Track timing
94 start_time = time.time()
96 # Determine input name and type
97 if input_path is not None:
98 input_path = Path(input_path)
99 if not input_path.exists(): 99 ↛ 101line 99 didn't jump to line 101 because the condition on line 99 was always true
100 raise FileNotFoundError(f"Input file not found: {input_path}")
101 input_name = input_path.stem
102 input_type = _detect_input_type_from_file(input_path)
103 loaded_data = _load_input_file(input_path, input_type)
104 else:
105 input_name = "memory_data"
106 input_type = _detect_input_type_from_data(data)
107 loaded_data = data
109 # Determine output directory
110 if output_dir is None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 if input_path is not None:
112 base_dir = input_path.parent
113 else:
114 base_dir = Path.cwd()
115 else:
116 base_dir = Path(output_dir)
118 # Create output manager with timestamp
119 timestamp = datetime.now()
120 output_manager = OutputManager(base_dir, input_name, timestamp)
121 output_manager.create()
123 # Report progress: starting
124 _report_progress(
125 progress_callback,
126 phase="initializing",
127 domain=None,
128 function=None,
129 percent=0.0,
130 message="Initializing analysis",
131 elapsed=time.time() - start_time,
132 )
134 # Determine applicable domains
135 applicable_domains = get_available_analyses(input_type)
136 enabled_domains = [d for d in applicable_domains if config.is_domain_enabled(d)]
138 logger.info(f"Running analysis on {input_name} ({input_type.value})")
139 logger.info(f"Enabled domains: {[d.value for d in enabled_domains]}")
141 # Execute analysis engine
142 from tracekit.reporting.engine import AnalysisEngine
144 engine = AnalysisEngine(config)
145 engine_result = engine.run(
146 input_path=input_path,
147 data=loaded_data,
148 progress_callback=progress_callback,
149 )
151 # Generate plots
152 plot_paths: list[Path] = []
153 if config.generate_plots:
154 _report_progress(
155 progress_callback,
156 phase="plotting",
157 domain=None,
158 function=None,
159 percent=70.0,
160 message="Generating visualizations",
161 elapsed=time.time() - start_time,
162 )
164 from tracekit.reporting.plots import PlotGenerator
166 plot_gen = PlotGenerator(config)
167 for domain, results in engine_result["results"].items(): 167 ↛ 168line 167 didn't jump to line 168 because the loop on line 167 never started
168 domain_plots = plot_gen.generate_plots(domain, results, output_manager)
169 plot_paths.extend(domain_plots)
171 # Save data outputs
172 _report_progress(
173 progress_callback,
174 phase="saving",
175 domain=None,
176 function=None,
177 percent=85.0,
178 message="Saving analysis results",
179 elapsed=time.time() - start_time,
180 )
182 # Save summary data
183 summary_data = {
184 "input": {
185 "name": input_name,
186 "type": input_type.value,
187 "path": str(input_path) if input_path else None,
188 },
189 "timestamp": timestamp.isoformat(),
190 "duration_seconds": time.time() - start_time,
191 "stats": engine_result["stats"],
192 "domains": {d.value: r for d, r in engine_result["results"].items()},
193 }
195 summary_json = output_manager.save_json("summary", summary_data)
196 summary_yaml = None
197 if "yaml" in config.output_formats: 197 ↛ 201line 197 didn't jump to line 201 because the condition on line 197 was always true
198 summary_yaml = output_manager.save_yaml("summary", summary_data)
200 # Save metadata
201 metadata = {
202 "tracekit_version": _get_version(),
203 "analysis_version": "2.0",
204 "timestamp": timestamp.isoformat(),
205 "input_file": str(input_path) if input_path else None,
206 "input_type": input_type.value,
207 "duration_seconds": time.time() - start_time,
208 "total_analyses": engine_result["stats"]["total_analyses"],
209 "successful": engine_result["stats"]["successful_analyses"],
210 "failed": engine_result["stats"]["failed_analyses"],
211 "skipped": engine_result["stats"].get("skipped_analyses", 0),
212 }
213 metadata_json = output_manager.save_json("metadata", metadata)
215 # Save configuration
216 config_data = {
217 "domains": [d.value for d in enabled_domains],
218 "generate_plots": config.generate_plots,
219 "plot_format": config.plot_format,
220 "plot_dpi": config.plot_dpi,
221 "output_formats": config.output_formats,
222 "index_formats": config.index_formats,
223 }
224 config_yaml = output_manager.save_yaml("config", config_data)
226 # Save domain results
227 domain_dirs: dict[AnalysisDomain, Path] = {}
228 for domain, results in engine_result["results"].items():
229 domain_dir = output_manager.create_domain_dir(domain)
230 domain_dirs[domain] = domain_dir
231 output_manager.save_json("results", results, subdir=domain.value)
233 # Save errors if any
234 error_log: Path | None = None
235 errors: list[AnalysisError] = engine_result["errors"]
236 if errors: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 error_list = [
238 {
239 "domain": e.domain.value,
240 "function": e.function,
241 "error_type": e.error_type,
242 "error_message": e.error_message,
243 "duration_ms": e.duration_ms,
244 }
245 for e in errors
246 ]
247 error_data = {"errors": error_list, "count": len(error_list)}
248 error_log = output_manager.save_json("failed_analyses", error_data, subdir="errors")
250 # Build AnalysisResult for index generation
251 partial_result = AnalysisResult(
252 output_dir=output_manager.root,
253 index_html=None,
254 index_md=None,
255 index_pdf=None,
256 summary_json=summary_json,
257 summary_yaml=summary_yaml,
258 metadata_json=metadata_json,
259 config_yaml=config_yaml,
260 domain_dirs=domain_dirs,
261 plot_paths=plot_paths,
262 error_log=error_log,
263 input_file=str(input_path) if input_path else None,
264 input_type=input_type,
265 total_analyses=engine_result["stats"]["total_analyses"],
266 successful_analyses=engine_result["stats"]["successful_analyses"],
267 failed_analyses=engine_result["stats"]["failed_analyses"],
268 skipped_analyses=engine_result["stats"].get("skipped_analyses", 0),
269 duration_seconds=time.time() - start_time,
270 domain_summaries=engine_result["results"],
271 errors=errors,
272 )
274 # Generate index files
275 _report_progress(
276 progress_callback,
277 phase="indexing",
278 domain=None,
279 function=None,
280 percent=95.0,
281 message="Generating index files",
282 elapsed=time.time() - start_time,
283 )
285 from tracekit.reporting.index import IndexGenerator
287 index_gen = IndexGenerator(output_manager)
288 index_paths = index_gen.generate(partial_result, config.index_formats)
290 # Complete result
291 result = AnalysisResult(
292 output_dir=output_manager.root,
293 index_html=index_paths.get("html"),
294 index_md=index_paths.get("md"),
295 index_pdf=index_paths.get("pdf"),
296 summary_json=summary_json,
297 summary_yaml=summary_yaml,
298 metadata_json=metadata_json,
299 config_yaml=config_yaml,
300 domain_dirs=domain_dirs,
301 plot_paths=plot_paths,
302 error_log=error_log,
303 input_file=str(input_path) if input_path else None,
304 input_type=input_type,
305 total_analyses=engine_result["stats"]["total_analyses"],
306 successful_analyses=engine_result["stats"]["successful_analyses"],
307 failed_analyses=engine_result["stats"]["failed_analyses"],
308 skipped_analyses=engine_result["stats"].get("skipped_analyses", 0),
309 duration_seconds=time.time() - start_time,
310 domain_summaries=engine_result["results"],
311 errors=errors,
312 )
314 # Report completion
315 _report_progress(
316 progress_callback,
317 phase="complete",
318 domain=None,
319 function=None,
320 percent=100.0,
321 message=f"Analysis complete: {result.successful_analyses}/{result.total_analyses} successful",
322 elapsed=time.time() - start_time,
323 )
325 logger.info(f"Analysis complete. Output: {result.output_dir}")
326 return result
329def _detect_input_type_from_file(path: Path) -> InputType:
330 """Detect input type from file extension."""
331 suffix = path.suffix.lower()
333 waveform_extensions = {".wfm", ".csv", ".npz", ".hdf5", ".h5", ".wav", ".tdms"}
334 digital_extensions = {".vcd", ".sr"}
335 binary_extensions = {".bin", ".raw"}
336 pcap_extensions = {".pcap", ".pcapng"}
337 sparams_extensions = {".s1p", ".s2p", ".s3p", ".s4p", ".s5p", ".s6p", ".s7p", ".s8p"}
339 if suffix in waveform_extensions:
340 return InputType.WAVEFORM
341 elif suffix in digital_extensions:
342 return InputType.DIGITAL
343 elif suffix in binary_extensions:
344 return InputType.BINARY
345 elif suffix in pcap_extensions:
346 return InputType.PCAP
347 elif suffix in sparams_extensions: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 return InputType.SPARAMS
349 else:
350 raise UnsupportedFormatError(f"Unsupported file format: {suffix}")
353def _detect_input_type_from_data(data: Any) -> InputType:
354 """Detect input type from in-memory data."""
355 # Check for Trace object (time + voltage = waveform)
356 # Check this BEFORE SParameterData to avoid MagicMock false positives
357 if hasattr(data, "time") and hasattr(data, "voltage"):
358 # Verify these are not just mock/placeholder attributes
359 try:
360 _ = data.time
361 _ = data.voltage
362 return InputType.WAVEFORM
363 except (AttributeError, TypeError):
364 pass
366 # Check for SParameterData
367 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true
368 return InputType.SPARAMS
370 # Check for bytes
371 if isinstance(data, bytes | bytearray):
372 return InputType.BINARY
374 # Check for list of packets
375 if isinstance(data, list) and len(data) > 0: 375 ↛ 381line 375 didn't jump to line 381 because the condition on line 375 was always true
376 first = data[0]
377 if hasattr(first, "timestamp") or isinstance(first, dict): 377 ↛ 381line 377 didn't jump to line 381 because the condition on line 377 was always true
378 return InputType.PACKETS
380 # Default to waveform
381 return InputType.WAVEFORM
384def _load_input_file(path: Path, input_type: InputType) -> Any:
385 """Load input file based on type."""
386 try:
387 from tracekit.loaders import load
389 if input_type == InputType.WAVEFORM:
390 return load(path)
391 elif input_type == InputType.DIGITAL:
392 # Use VCD/SR loader
393 from tracekit.loaders.vcd import load_vcd
395 return load_vcd(path)
396 elif input_type == InputType.BINARY:
397 return path.read_bytes()
398 elif input_type == InputType.PCAP:
399 from tracekit.loaders.pcap import load_pcap
401 return load_pcap(path)
402 elif input_type == InputType.SPARAMS:
403 from tracekit.analyzers.signal_integrity.sparams import load_touchstone
405 return load_touchstone(path)
406 else:
407 return load(path)
408 except ImportError as e:
409 logger.warning(f"Loader not available: {e}")
410 # Fall back to raw bytes
411 return path.read_bytes()
414def _report_progress(
415 callback: Callable[[ProgressInfo], None] | None,
416 phase: str,
417 domain: AnalysisDomain | None,
418 function: str | None,
419 percent: float,
420 message: str,
421 elapsed: float,
422) -> None:
423 """Report progress to callback if provided."""
424 if callback is not None:
425 info = ProgressInfo(
426 phase=phase,
427 domain=domain,
428 function=function,
429 percent=percent,
430 message=message,
431 elapsed_seconds=elapsed,
432 estimated_remaining_seconds=None,
433 )
434 callback(info)
437def _get_version() -> str:
438 """Get TraceKit version."""
439 try:
440 from tracekit import __version__
442 return __version__
443 except ImportError:
444 return "unknown"
447__all__ = [
448 "UnsupportedFormatError",
449 "analyze",
450]