Coverage for src / tracekit / discovery / quality_validator.py: 87%
189 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Data quality assessment for signal analysis.
3This module assesses whether captured data is sufficient and of adequate
4quality for meaningful analysis.
7Example:
8 >>> from tracekit.discovery import assess_data_quality
9 >>> quality = assess_data_quality(trace)
10 >>> print(f"Status: {quality.status}")
11 >>> for metric in quality.metrics:
12 ... print(f"{metric.name}: {metric.status}")
14References:
15 IEEE 1241-2010: ADC Terminology and Test Methods
16"""
18from __future__ import annotations
20from dataclasses import dataclass, field
21from typing import TYPE_CHECKING, Any, Literal
23import numpy as np
25from tracekit.analyzers.statistics.basic import basic_stats
26from tracekit.core.types import DigitalTrace, WaveformTrace
28if TYPE_CHECKING:
29 from numpy.typing import NDArray
31QualityStatus = Literal["PASS", "WARNING", "FAIL"]
32AnalysisScenario = Literal["protocol_decode", "timing_analysis", "fft", "eye_diagram", "general"]
35@dataclass
36class QualityMetric:
37 """Individual quality metric result.
39 Attributes:
40 name: Metric name (e.g., "Sample Rate", "Resolution").
41 status: Quality status (PASS, WARNING, FAIL).
42 passed: Whether metric passes minimum requirements.
43 current_value: Measured value.
44 required_value: Required value for this scenario.
45 unit: Unit of measurement.
46 margin_percent: Margin relative to requirement (positive = good).
47 explanation: Plain-language explanation if failed.
48 recommendation: Actionable recommendation to fix issue.
50 Example:
51 >>> metric = QualityMetric(
52 ... name="Sample Rate",
53 ... status="WARNING",
54 ... passed=False,
55 ... current_value=50.0,
56 ... required_value=100.0,
57 ... unit="MS/s"
58 ... )
59 """
61 name: str
62 status: QualityStatus
63 passed: bool
64 current_value: float
65 required_value: float
66 unit: str
67 margin_percent: float = 0.0
68 explanation: str = ""
69 recommendation: str = ""
72@dataclass
73class DataQuality:
74 """Overall data quality assessment result.
76 Attributes:
77 status: Overall quality status (PASS, WARNING, FAIL).
78 confidence: Assessment confidence (0.0-1.0).
79 metrics: List of individual quality metrics.
80 improvement_suggestions: Suggested improvements if quality is poor.
82 Example:
83 >>> quality = assess_data_quality(trace)
84 >>> if quality.status != "PASS":
85 ... print("Quality issues detected:")
86 ... for metric in quality.metrics:
87 ... if not metric.passed:
88 ... print(f" - {metric.name}: {metric.explanation}")
89 """
91 status: QualityStatus
92 confidence: float
93 metrics: list[QualityMetric] = field(default_factory=list)
94 improvement_suggestions: list[dict[str, str]] = field(default_factory=list)
97def assess_data_quality(
98 trace: WaveformTrace | DigitalTrace,
99 *,
100 scenario: AnalysisScenario = "general",
101 protocol_params: dict[str, Any] | None = None,
102 strict_mode: bool = False,
103) -> DataQuality:
104 """Assess whether captured data is adequate for analysis.
106 Evaluates sample rate, resolution, duration, and noise level against
107 scenario-specific requirements.
109 Args:
110 trace: Input waveform or digital trace.
111 scenario: Analysis scenario for scenario-specific thresholds.
112 protocol_params: Protocol-specific parameters (e.g., clock frequency).
113 strict_mode: If True, fail on any warnings.
115 Returns:
116 DataQuality assessment with overall status and individual metrics.
118 Raises:
119 ValueError: If trace is empty or invalid.
121 Example:
122 >>> quality = assess_data_quality(trace, scenario='protocol_decode')
123 >>> print(f"Overall: {quality.status} (confidence: {quality.confidence:.2f})")
124 >>> for metric in quality.metrics:
125 ... if metric.status != 'PASS':
126 ... print(f"Issue: {metric.name} - {metric.explanation}")
127 ... print(f"Fix: {metric.recommendation}")
129 References:
130 DISC-009: Data Quality Assessment
131 """
132 # Validate input
133 if len(trace) == 0:
134 raise ValueError("Cannot assess quality of empty trace")
136 # Get signal data
137 if isinstance(trace, WaveformTrace):
138 data = trace.data
139 sample_rate = trace.metadata.sample_rate
140 is_analog = True
141 else:
142 data = trace.data.astype(np.float64)
143 sample_rate = trace.metadata.sample_rate
144 is_analog = False
146 # Compute basic statistics
147 stats = basic_stats(data)
148 voltage_swing = stats["max"] - stats["min"]
150 # Protocol parameters
151 if protocol_params is None:
152 protocol_params = {}
154 # Assess individual metrics
155 metrics: list[QualityMetric] = []
157 # 1. Sample Rate Assessment
158 sample_rate_metric = _assess_sample_rate(sample_rate, data, stats, scenario, protocol_params)
159 metrics.append(sample_rate_metric)
161 # 2. Resolution Assessment
162 resolution_metric = _assess_resolution(data, voltage_swing, stats, is_analog, scenario)
163 metrics.append(resolution_metric)
165 # 3. Duration Assessment
166 duration_metric = _assess_duration(len(data), sample_rate, data, scenario, protocol_params)
167 metrics.append(duration_metric)
169 # 4. Noise Level Assessment
170 noise_metric = _assess_noise(data, voltage_swing, stats, scenario)
171 metrics.append(noise_metric)
173 # Determine overall status
174 failed_metrics = [m for m in metrics if m.status == "FAIL"]
175 warning_metrics = [m for m in metrics if m.status == "WARNING"]
177 if failed_metrics or (strict_mode and warning_metrics): 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true
178 overall_status: QualityStatus = "FAIL"
179 elif warning_metrics:
180 overall_status = "WARNING"
181 else:
182 overall_status = "PASS"
184 # Calculate confidence (higher when more metrics pass)
185 passed_count = sum(1 for m in metrics if m.passed)
186 confidence = round(0.5 + (passed_count / len(metrics)) * 0.5, 2)
188 # Generate improvement suggestions
189 suggestions = []
190 for metric in metrics:
191 if not metric.passed and metric.recommendation:
192 suggestions.append(
193 {
194 "action": metric.recommendation,
195 "expected_benefit": f"Improves {metric.name.lower()} to required level",
196 "difficulty_level": "Easy"
197 if "setting" in metric.recommendation.lower()
198 else "Medium",
199 }
200 )
202 return DataQuality(
203 status=overall_status,
204 confidence=confidence,
205 metrics=metrics,
206 improvement_suggestions=suggestions,
207 )
210def _assess_sample_rate(
211 sample_rate: float,
212 data: NDArray[np.floating[Any]],
213 stats: dict[str, float],
214 scenario: AnalysisScenario,
215 protocol_params: dict[str, Any],
216) -> QualityMetric:
217 """Assess sample rate adequacy.
219 Args:
220 sample_rate: Sample rate in Hz.
221 data: Signal data array.
222 stats: Basic statistics.
223 scenario: Analysis scenario.
224 protocol_params: Protocol-specific parameters.
226 Returns:
227 QualityMetric for sample rate.
228 """
229 # Estimate signal frequency
230 mean_val = stats["mean"]
231 crossings = np.where(np.diff(np.sign(data - mean_val)) != 0)[0]
233 if len(crossings) >= 2:
234 avg_half_period = np.mean(np.diff(crossings))
235 signal_freq = sample_rate / (avg_half_period * 2) if avg_half_period > 0 else 0
236 else:
237 signal_freq = 0
239 # Determine required sample rate based on scenario
240 if scenario == "protocol_decode":
241 # Need 10x the bit rate
242 if "clock_freq_mhz" in protocol_params:
243 clock_freq = protocol_params["clock_freq_mhz"] * 1e6
244 required_rate = clock_freq * 10
245 elif signal_freq > 0:
246 required_rate = signal_freq * 10
247 else:
248 required_rate = 10e6 # Default 10 MS/s minimum
249 elif scenario == "timing_analysis":
250 # Need 100x the edge rate
251 required_rate = signal_freq * 100 if signal_freq > 0 else 100e6
252 elif scenario == "fft":
253 # Nyquist + 20%
254 required_rate = signal_freq * 2.4 if signal_freq > 0 else 10e6
255 elif scenario == "eye_diagram":
256 # Need high oversampling
257 required_rate = signal_freq * 50 if signal_freq > 0 else 100e6
258 else: # general
259 # At least 10x signal frequency
260 required_rate = signal_freq * 10 if signal_freq > 0 else 10e6
262 # Calculate margin
263 margin_percent = ((sample_rate - required_rate) / required_rate) * 100
265 # Determine status
266 if margin_percent >= 0:
267 status: QualityStatus = "PASS"
268 passed = True
269 explanation = ""
270 recommendation = ""
271 elif margin_percent >= -20:
272 status = "WARNING"
273 passed = False
274 explanation = f"Sample rate is {abs(margin_percent):.0f}% below recommended"
275 recommendation = f"Increase sample rate to {required_rate / 1e6:.0f} MS/s (currently {sample_rate / 1e6:.0f} MS/s)"
276 else:
277 status = "FAIL"
278 passed = False
279 explanation = f"Sample rate is critically low ({abs(margin_percent):.0f}% below required)"
280 recommendation = f"Increase sample rate to at least {required_rate / 1e6:.0f} MS/s"
282 return QualityMetric(
283 name="Sample Rate",
284 status=status,
285 passed=passed,
286 current_value=sample_rate / 1e6,
287 required_value=required_rate / 1e6,
288 unit="MS/s",
289 margin_percent=margin_percent,
290 explanation=explanation,
291 recommendation=recommendation,
292 )
295def _assess_resolution(
296 data: NDArray[np.floating[Any]],
297 voltage_swing: float,
298 stats: dict[str, float],
299 is_analog: bool,
300 scenario: AnalysisScenario,
301) -> QualityMetric:
302 """Assess vertical resolution adequacy.
304 Args:
305 data: Signal data array.
306 voltage_swing: Peak-to-peak voltage.
307 stats: Basic statistics.
308 is_analog: Whether signal is analog.
309 scenario: Analysis scenario.
311 Returns:
312 QualityMetric for resolution.
313 """
314 # Estimate effective number of bits (ENOB)
315 if voltage_swing > 0:
316 # Approximate ENOB from noise level
317 noise_rms = stats["std"]
318 snr_linear = (voltage_swing / 2) / (noise_rms + 1e-12)
319 snr_db = 20 * np.log10(snr_linear) if snr_linear > 0 else 0
320 (snr_db - 1.76) / 6.02 # Theoretical ENOB from SNR
321 else:
322 pass # Default assumption
324 # Determine required resolution
325 if scenario in ("protocol_decode", "timing_analysis"):
326 required_snr = 20.0 # dB
327 elif scenario in ("fft", "eye_diagram"):
328 required_snr = 40.0 # dB
329 else:
330 required_snr = 20.0 # dB
332 # Use SNR for assessment
333 current_snr = snr_db
334 margin_percent = ((current_snr - required_snr) / required_snr) * 100
336 # Determine status
337 if current_snr >= required_snr: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 status: QualityStatus = "PASS"
339 passed = True
340 explanation = ""
341 recommendation = ""
342 elif current_snr >= required_snr * 0.8: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true
343 status = "WARNING"
344 passed = False
345 explanation = f"SNR is {abs(margin_percent):.0f}% below recommended ({current_snr:.1f} dB)"
346 recommendation = "Reduce noise sources or increase signal amplitude"
347 else:
348 status = "FAIL"
349 passed = False
350 explanation = f"SNR is critically low ({current_snr:.1f} dB, need {required_snr:.0f} dB)"
351 recommendation = "Significantly improve signal quality or use higher resolution capture"
353 return QualityMetric(
354 name="Resolution",
355 status=status,
356 passed=passed,
357 current_value=current_snr,
358 required_value=required_snr,
359 unit="dB SNR",
360 margin_percent=margin_percent,
361 explanation=explanation,
362 recommendation=recommendation,
363 )
366def _assess_duration(
367 n_samples: int,
368 sample_rate: float,
369 data: NDArray[np.floating[Any]],
370 scenario: AnalysisScenario,
371 protocol_params: dict[str, Any],
372) -> QualityMetric:
373 """Assess capture duration adequacy.
375 Args:
376 n_samples: Number of samples.
377 sample_rate: Sample rate in Hz.
378 data: Signal data array.
379 scenario: Analysis scenario.
380 protocol_params: Protocol-specific parameters.
382 Returns:
383 QualityMetric for duration.
384 """
385 duration_sec = n_samples / sample_rate
387 # Estimate signal period
388 mean_val = np.mean(data)
389 crossings = np.where(np.diff(np.sign(data - mean_val)) != 0)[0]
391 if len(crossings) >= 2:
392 avg_half_period = np.mean(np.diff(crossings))
393 signal_period = (avg_half_period * 2) / sample_rate
394 num_periods = duration_sec / signal_period if signal_period > 0 else 0
395 else:
396 num_periods = 0
397 signal_period = duration_sec / 10 # Assume at least 10 periods
399 # Determine required duration
400 if scenario in {"protocol_decode", "timing_analysis"}:
401 required_periods = 100
402 elif scenario == "fft":
403 required_periods = 10 # Need enough for frequency resolution
404 elif scenario == "eye_diagram":
405 required_periods = 1000 # Need many UIs
406 else:
407 required_periods = 100
409 required_duration = required_periods * signal_period
410 margin_percent = (
411 ((duration_sec - required_duration) / required_duration) * 100
412 if required_duration > 0
413 else 100
414 )
416 # Determine status
417 if num_periods >= required_periods or margin_percent >= 0:
418 status: QualityStatus = "PASS"
419 passed = True
420 explanation = ""
421 recommendation = ""
422 elif num_periods >= required_periods * 0.5: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 status = "WARNING"
424 passed = False
425 explanation = f"Captured only {num_periods:.0f} signal periods, recommended minimum is {required_periods}"
426 recommendation = f"Increase capture duration to at least {required_duration * 1e3:.1f} ms (currently {duration_sec * 1e3:.1f} ms)"
427 else:
428 status = "FAIL"
429 passed = False
430 explanation = f"Capture duration is critically short ({num_periods:.0f} periods)"
431 recommendation = f"Increase capture duration to at least {required_duration * 1e3:.1f} ms"
433 return QualityMetric(
434 name="Duration",
435 status=status,
436 passed=passed,
437 current_value=duration_sec * 1e3,
438 required_value=required_duration * 1e3,
439 unit="ms",
440 margin_percent=margin_percent,
441 explanation=explanation,
442 recommendation=recommendation,
443 )
446def _assess_noise(
447 data: NDArray[np.floating[Any]],
448 voltage_swing: float,
449 stats: dict[str, float],
450 scenario: AnalysisScenario,
451) -> QualityMetric:
452 """Assess noise level.
454 Args:
455 data: Signal data array.
456 voltage_swing: Peak-to-peak voltage.
457 stats: Basic statistics.
458 scenario: Analysis scenario.
460 Returns:
461 QualityMetric for noise level.
462 """
463 if voltage_swing == 0: 463 ↛ 465line 463 didn't jump to line 465 because the condition on line 463 was never true
464 # No signal swing, can't assess noise
465 return QualityMetric(
466 name="Noise Level",
467 status="PASS",
468 passed=True,
469 current_value=0.0,
470 required_value=0.0,
471 unit="% of swing",
472 margin_percent=100.0,
473 )
475 # Noise RMS as percentage of swing
476 noise_rms = stats["std"]
477 noise_percent = (noise_rms / voltage_swing) * 100
479 # Determine acceptable noise level
480 if scenario in ("protocol_decode", "timing_analysis"):
481 max_noise_percent = 10.0
482 elif scenario in ("fft", "eye_diagram"):
483 max_noise_percent = 5.0
484 else:
485 max_noise_percent = 10.0
487 margin_percent = ((max_noise_percent - noise_percent) / max_noise_percent) * 100
489 # Determine status
490 if noise_percent <= max_noise_percent: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true
491 status: QualityStatus = "PASS"
492 passed = True
493 explanation = ""
494 recommendation = ""
495 elif noise_percent <= max_noise_percent * 1.5: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true
496 status = "WARNING"
497 passed = False
498 explanation = f"Noise level is {noise_percent:.1f}% of signal swing (max recommended: {max_noise_percent:.0f}%)"
499 recommendation = "Reduce noise sources, check grounding, or use averaging"
500 else:
501 status = "FAIL"
502 passed = False
503 explanation = f"Noise level is critically high ({noise_percent:.1f}% of swing)"
504 recommendation = (
505 "Significantly reduce noise through better probing, shielding, or bandwidth limiting"
506 )
508 return QualityMetric(
509 name="Noise Level",
510 status=status,
511 passed=passed,
512 current_value=noise_percent,
513 required_value=max_noise_percent,
514 unit="% of swing",
515 margin_percent=margin_percent,
516 explanation=explanation,
517 recommendation=recommendation,
518 )
521__all__ = [
522 "AnalysisScenario",
523 "DataQuality",
524 "QualityMetric",
525 "QualityStatus",
526 "assess_data_quality",
527]