Coverage for src / tracekit / discovery / quality_validator.py: 87%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Data quality assessment for signal analysis. 

2 

3This module assesses whether captured data is sufficient and of adequate 

4quality for meaningful analysis. 

5 

6 

7Example: 

8 >>> from tracekit.discovery import assess_data_quality 

9 >>> quality = assess_data_quality(trace) 

10 >>> print(f"Status: {quality.status}") 

11 >>> for metric in quality.metrics: 

12 ... print(f"{metric.name}: {metric.status}") 

13 

14References: 

15 IEEE 1241-2010: ADC Terminology and Test Methods 

16""" 

17 

18from __future__ import annotations 

19 

20from dataclasses import dataclass, field 

21from typing import TYPE_CHECKING, Any, Literal 

22 

23import numpy as np 

24 

25from tracekit.analyzers.statistics.basic import basic_stats 

26from tracekit.core.types import DigitalTrace, WaveformTrace 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31QualityStatus = Literal["PASS", "WARNING", "FAIL"] 

32AnalysisScenario = Literal["protocol_decode", "timing_analysis", "fft", "eye_diagram", "general"] 

33 

34 

35@dataclass 

36class QualityMetric: 

37 """Individual quality metric result. 

38 

39 Attributes: 

40 name: Metric name (e.g., "Sample Rate", "Resolution"). 

41 status: Quality status (PASS, WARNING, FAIL). 

42 passed: Whether metric passes minimum requirements. 

43 current_value: Measured value. 

44 required_value: Required value for this scenario. 

45 unit: Unit of measurement. 

46 margin_percent: Margin relative to requirement (positive = good). 

47 explanation: Plain-language explanation if failed. 

48 recommendation: Actionable recommendation to fix issue. 

49 

50 Example: 

51 >>> metric = QualityMetric( 

52 ... name="Sample Rate", 

53 ... status="WARNING", 

54 ... passed=False, 

55 ... current_value=50.0, 

56 ... required_value=100.0, 

57 ... unit="MS/s" 

58 ... ) 

59 """ 

60 

61 name: str 

62 status: QualityStatus 

63 passed: bool 

64 current_value: float 

65 required_value: float 

66 unit: str 

67 margin_percent: float = 0.0 

68 explanation: str = "" 

69 recommendation: str = "" 

70 

71 

72@dataclass 

73class DataQuality: 

74 """Overall data quality assessment result. 

75 

76 Attributes: 

77 status: Overall quality status (PASS, WARNING, FAIL). 

78 confidence: Assessment confidence (0.0-1.0). 

79 metrics: List of individual quality metrics. 

80 improvement_suggestions: Suggested improvements if quality is poor. 

81 

82 Example: 

83 >>> quality = assess_data_quality(trace) 

84 >>> if quality.status != "PASS": 

85 ... print("Quality issues detected:") 

86 ... for metric in quality.metrics: 

87 ... if not metric.passed: 

88 ... print(f" - {metric.name}: {metric.explanation}") 

89 """ 

90 

91 status: QualityStatus 

92 confidence: float 

93 metrics: list[QualityMetric] = field(default_factory=list) 

94 improvement_suggestions: list[dict[str, str]] = field(default_factory=list) 

95 

96 

97def assess_data_quality( 

98 trace: WaveformTrace | DigitalTrace, 

99 *, 

100 scenario: AnalysisScenario = "general", 

101 protocol_params: dict[str, Any] | None = None, 

102 strict_mode: bool = False, 

103) -> DataQuality: 

104 """Assess whether captured data is adequate for analysis. 

105 

106 Evaluates sample rate, resolution, duration, and noise level against 

107 scenario-specific requirements. 

108 

109 Args: 

110 trace: Input waveform or digital trace. 

111 scenario: Analysis scenario for scenario-specific thresholds. 

112 protocol_params: Protocol-specific parameters (e.g., clock frequency). 

113 strict_mode: If True, fail on any warnings. 

114 

115 Returns: 

116 DataQuality assessment with overall status and individual metrics. 

117 

118 Raises: 

119 ValueError: If trace is empty or invalid. 

120 

121 Example: 

122 >>> quality = assess_data_quality(trace, scenario='protocol_decode') 

123 >>> print(f"Overall: {quality.status} (confidence: {quality.confidence:.2f})") 

124 >>> for metric in quality.metrics: 

125 ... if metric.status != 'PASS': 

126 ... print(f"Issue: {metric.name} - {metric.explanation}") 

127 ... print(f"Fix: {metric.recommendation}") 

128 

129 References: 

130 DISC-009: Data Quality Assessment 

131 """ 

132 # Validate input 

133 if len(trace) == 0: 

134 raise ValueError("Cannot assess quality of empty trace") 

135 

136 # Get signal data 

137 if isinstance(trace, WaveformTrace): 

138 data = trace.data 

139 sample_rate = trace.metadata.sample_rate 

140 is_analog = True 

141 else: 

142 data = trace.data.astype(np.float64) 

143 sample_rate = trace.metadata.sample_rate 

144 is_analog = False 

145 

146 # Compute basic statistics 

147 stats = basic_stats(data) 

148 voltage_swing = stats["max"] - stats["min"] 

149 

150 # Protocol parameters 

151 if protocol_params is None: 

152 protocol_params = {} 

153 

154 # Assess individual metrics 

155 metrics: list[QualityMetric] = [] 

156 

157 # 1. Sample Rate Assessment 

158 sample_rate_metric = _assess_sample_rate(sample_rate, data, stats, scenario, protocol_params) 

159 metrics.append(sample_rate_metric) 

160 

161 # 2. Resolution Assessment 

162 resolution_metric = _assess_resolution(data, voltage_swing, stats, is_analog, scenario) 

163 metrics.append(resolution_metric) 

164 

165 # 3. Duration Assessment 

166 duration_metric = _assess_duration(len(data), sample_rate, data, scenario, protocol_params) 

167 metrics.append(duration_metric) 

168 

169 # 4. Noise Level Assessment 

170 noise_metric = _assess_noise(data, voltage_swing, stats, scenario) 

171 metrics.append(noise_metric) 

172 

173 # Determine overall status 

174 failed_metrics = [m for m in metrics if m.status == "FAIL"] 

175 warning_metrics = [m for m in metrics if m.status == "WARNING"] 

176 

177 if failed_metrics or (strict_mode and warning_metrics): 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true

178 overall_status: QualityStatus = "FAIL" 

179 elif warning_metrics: 

180 overall_status = "WARNING" 

181 else: 

182 overall_status = "PASS" 

183 

184 # Calculate confidence (higher when more metrics pass) 

185 passed_count = sum(1 for m in metrics if m.passed) 

186 confidence = round(0.5 + (passed_count / len(metrics)) * 0.5, 2) 

187 

188 # Generate improvement suggestions 

189 suggestions = [] 

190 for metric in metrics: 

191 if not metric.passed and metric.recommendation: 

192 suggestions.append( 

193 { 

194 "action": metric.recommendation, 

195 "expected_benefit": f"Improves {metric.name.lower()} to required level", 

196 "difficulty_level": "Easy" 

197 if "setting" in metric.recommendation.lower() 

198 else "Medium", 

199 } 

200 ) 

201 

202 return DataQuality( 

203 status=overall_status, 

204 confidence=confidence, 

205 metrics=metrics, 

206 improvement_suggestions=suggestions, 

207 ) 

208 

209 

210def _assess_sample_rate( 

211 sample_rate: float, 

212 data: NDArray[np.floating[Any]], 

213 stats: dict[str, float], 

214 scenario: AnalysisScenario, 

215 protocol_params: dict[str, Any], 

216) -> QualityMetric: 

217 """Assess sample rate adequacy. 

218 

219 Args: 

220 sample_rate: Sample rate in Hz. 

221 data: Signal data array. 

222 stats: Basic statistics. 

223 scenario: Analysis scenario. 

224 protocol_params: Protocol-specific parameters. 

225 

226 Returns: 

227 QualityMetric for sample rate. 

228 """ 

229 # Estimate signal frequency 

230 mean_val = stats["mean"] 

231 crossings = np.where(np.diff(np.sign(data - mean_val)) != 0)[0] 

232 

233 if len(crossings) >= 2: 

234 avg_half_period = np.mean(np.diff(crossings)) 

235 signal_freq = sample_rate / (avg_half_period * 2) if avg_half_period > 0 else 0 

236 else: 

237 signal_freq = 0 

238 

239 # Determine required sample rate based on scenario 

240 if scenario == "protocol_decode": 

241 # Need 10x the bit rate 

242 if "clock_freq_mhz" in protocol_params: 

243 clock_freq = protocol_params["clock_freq_mhz"] * 1e6 

244 required_rate = clock_freq * 10 

245 elif signal_freq > 0: 

246 required_rate = signal_freq * 10 

247 else: 

248 required_rate = 10e6 # Default 10 MS/s minimum 

249 elif scenario == "timing_analysis": 

250 # Need 100x the edge rate 

251 required_rate = signal_freq * 100 if signal_freq > 0 else 100e6 

252 elif scenario == "fft": 

253 # Nyquist + 20% 

254 required_rate = signal_freq * 2.4 if signal_freq > 0 else 10e6 

255 elif scenario == "eye_diagram": 

256 # Need high oversampling 

257 required_rate = signal_freq * 50 if signal_freq > 0 else 100e6 

258 else: # general 

259 # At least 10x signal frequency 

260 required_rate = signal_freq * 10 if signal_freq > 0 else 10e6 

261 

262 # Calculate margin 

263 margin_percent = ((sample_rate - required_rate) / required_rate) * 100 

264 

265 # Determine status 

266 if margin_percent >= 0: 

267 status: QualityStatus = "PASS" 

268 passed = True 

269 explanation = "" 

270 recommendation = "" 

271 elif margin_percent >= -20: 

272 status = "WARNING" 

273 passed = False 

274 explanation = f"Sample rate is {abs(margin_percent):.0f}% below recommended" 

275 recommendation = f"Increase sample rate to {required_rate / 1e6:.0f} MS/s (currently {sample_rate / 1e6:.0f} MS/s)" 

276 else: 

277 status = "FAIL" 

278 passed = False 

279 explanation = f"Sample rate is critically low ({abs(margin_percent):.0f}% below required)" 

280 recommendation = f"Increase sample rate to at least {required_rate / 1e6:.0f} MS/s" 

281 

282 return QualityMetric( 

283 name="Sample Rate", 

284 status=status, 

285 passed=passed, 

286 current_value=sample_rate / 1e6, 

287 required_value=required_rate / 1e6, 

288 unit="MS/s", 

289 margin_percent=margin_percent, 

290 explanation=explanation, 

291 recommendation=recommendation, 

292 ) 

293 

294 

295def _assess_resolution( 

296 data: NDArray[np.floating[Any]], 

297 voltage_swing: float, 

298 stats: dict[str, float], 

299 is_analog: bool, 

300 scenario: AnalysisScenario, 

301) -> QualityMetric: 

302 """Assess vertical resolution adequacy. 

303 

304 Args: 

305 data: Signal data array. 

306 voltage_swing: Peak-to-peak voltage. 

307 stats: Basic statistics. 

308 is_analog: Whether signal is analog. 

309 scenario: Analysis scenario. 

310 

311 Returns: 

312 QualityMetric for resolution. 

313 """ 

314 # Estimate effective number of bits (ENOB) 

315 if voltage_swing > 0: 

316 # Approximate ENOB from noise level 

317 noise_rms = stats["std"] 

318 snr_linear = (voltage_swing / 2) / (noise_rms + 1e-12) 

319 snr_db = 20 * np.log10(snr_linear) if snr_linear > 0 else 0 

320 (snr_db - 1.76) / 6.02 # Theoretical ENOB from SNR 

321 else: 

322 pass # Default assumption 

323 

324 # Determine required resolution 

325 if scenario in ("protocol_decode", "timing_analysis"): 

326 required_snr = 20.0 # dB 

327 elif scenario in ("fft", "eye_diagram"): 

328 required_snr = 40.0 # dB 

329 else: 

330 required_snr = 20.0 # dB 

331 

332 # Use SNR for assessment 

333 current_snr = snr_db 

334 margin_percent = ((current_snr - required_snr) / required_snr) * 100 

335 

336 # Determine status 

337 if current_snr >= required_snr: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 status: QualityStatus = "PASS" 

339 passed = True 

340 explanation = "" 

341 recommendation = "" 

342 elif current_snr >= required_snr * 0.8: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true

343 status = "WARNING" 

344 passed = False 

345 explanation = f"SNR is {abs(margin_percent):.0f}% below recommended ({current_snr:.1f} dB)" 

346 recommendation = "Reduce noise sources or increase signal amplitude" 

347 else: 

348 status = "FAIL" 

349 passed = False 

350 explanation = f"SNR is critically low ({current_snr:.1f} dB, need {required_snr:.0f} dB)" 

351 recommendation = "Significantly improve signal quality or use higher resolution capture" 

352 

353 return QualityMetric( 

354 name="Resolution", 

355 status=status, 

356 passed=passed, 

357 current_value=current_snr, 

358 required_value=required_snr, 

359 unit="dB SNR", 

360 margin_percent=margin_percent, 

361 explanation=explanation, 

362 recommendation=recommendation, 

363 ) 

364 

365 

366def _assess_duration( 

367 n_samples: int, 

368 sample_rate: float, 

369 data: NDArray[np.floating[Any]], 

370 scenario: AnalysisScenario, 

371 protocol_params: dict[str, Any], 

372) -> QualityMetric: 

373 """Assess capture duration adequacy. 

374 

375 Args: 

376 n_samples: Number of samples. 

377 sample_rate: Sample rate in Hz. 

378 data: Signal data array. 

379 scenario: Analysis scenario. 

380 protocol_params: Protocol-specific parameters. 

381 

382 Returns: 

383 QualityMetric for duration. 

384 """ 

385 duration_sec = n_samples / sample_rate 

386 

387 # Estimate signal period 

388 mean_val = np.mean(data) 

389 crossings = np.where(np.diff(np.sign(data - mean_val)) != 0)[0] 

390 

391 if len(crossings) >= 2: 

392 avg_half_period = np.mean(np.diff(crossings)) 

393 signal_period = (avg_half_period * 2) / sample_rate 

394 num_periods = duration_sec / signal_period if signal_period > 0 else 0 

395 else: 

396 num_periods = 0 

397 signal_period = duration_sec / 10 # Assume at least 10 periods 

398 

399 # Determine required duration 

400 if scenario in {"protocol_decode", "timing_analysis"}: 

401 required_periods = 100 

402 elif scenario == "fft": 

403 required_periods = 10 # Need enough for frequency resolution 

404 elif scenario == "eye_diagram": 

405 required_periods = 1000 # Need many UIs 

406 else: 

407 required_periods = 100 

408 

409 required_duration = required_periods * signal_period 

410 margin_percent = ( 

411 ((duration_sec - required_duration) / required_duration) * 100 

412 if required_duration > 0 

413 else 100 

414 ) 

415 

416 # Determine status 

417 if num_periods >= required_periods or margin_percent >= 0: 

418 status: QualityStatus = "PASS" 

419 passed = True 

420 explanation = "" 

421 recommendation = "" 

422 elif num_periods >= required_periods * 0.5: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 status = "WARNING" 

424 passed = False 

425 explanation = f"Captured only {num_periods:.0f} signal periods, recommended minimum is {required_periods}" 

426 recommendation = f"Increase capture duration to at least {required_duration * 1e3:.1f} ms (currently {duration_sec * 1e3:.1f} ms)" 

427 else: 

428 status = "FAIL" 

429 passed = False 

430 explanation = f"Capture duration is critically short ({num_periods:.0f} periods)" 

431 recommendation = f"Increase capture duration to at least {required_duration * 1e3:.1f} ms" 

432 

433 return QualityMetric( 

434 name="Duration", 

435 status=status, 

436 passed=passed, 

437 current_value=duration_sec * 1e3, 

438 required_value=required_duration * 1e3, 

439 unit="ms", 

440 margin_percent=margin_percent, 

441 explanation=explanation, 

442 recommendation=recommendation, 

443 ) 

444 

445 

446def _assess_noise( 

447 data: NDArray[np.floating[Any]], 

448 voltage_swing: float, 

449 stats: dict[str, float], 

450 scenario: AnalysisScenario, 

451) -> QualityMetric: 

452 """Assess noise level. 

453 

454 Args: 

455 data: Signal data array. 

456 voltage_swing: Peak-to-peak voltage. 

457 stats: Basic statistics. 

458 scenario: Analysis scenario. 

459 

460 Returns: 

461 QualityMetric for noise level. 

462 """ 

463 if voltage_swing == 0: 463 ↛ 465line 463 didn't jump to line 465 because the condition on line 463 was never true

464 # No signal swing, can't assess noise 

465 return QualityMetric( 

466 name="Noise Level", 

467 status="PASS", 

468 passed=True, 

469 current_value=0.0, 

470 required_value=0.0, 

471 unit="% of swing", 

472 margin_percent=100.0, 

473 ) 

474 

475 # Noise RMS as percentage of swing 

476 noise_rms = stats["std"] 

477 noise_percent = (noise_rms / voltage_swing) * 100 

478 

479 # Determine acceptable noise level 

480 if scenario in ("protocol_decode", "timing_analysis"): 

481 max_noise_percent = 10.0 

482 elif scenario in ("fft", "eye_diagram"): 

483 max_noise_percent = 5.0 

484 else: 

485 max_noise_percent = 10.0 

486 

487 margin_percent = ((max_noise_percent - noise_percent) / max_noise_percent) * 100 

488 

489 # Determine status 

490 if noise_percent <= max_noise_percent: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 status: QualityStatus = "PASS" 

492 passed = True 

493 explanation = "" 

494 recommendation = "" 

495 elif noise_percent <= max_noise_percent * 1.5: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true

496 status = "WARNING" 

497 passed = False 

498 explanation = f"Noise level is {noise_percent:.1f}% of signal swing (max recommended: {max_noise_percent:.0f}%)" 

499 recommendation = "Reduce noise sources, check grounding, or use averaging" 

500 else: 

501 status = "FAIL" 

502 passed = False 

503 explanation = f"Noise level is critically high ({noise_percent:.1f}% of swing)" 

504 recommendation = ( 

505 "Significantly reduce noise through better probing, shielding, or bandwidth limiting" 

506 ) 

507 

508 return QualityMetric( 

509 name="Noise Level", 

510 status=status, 

511 passed=passed, 

512 current_value=noise_percent, 

513 required_value=max_noise_percent, 

514 unit="% of swing", 

515 margin_percent=margin_percent, 

516 explanation=explanation, 

517 recommendation=recommendation, 

518 ) 

519 

520 

521__all__ = [ 

522 "AnalysisScenario", 

523 "DataQuality", 

524 "QualityMetric", 

525 "QualityStatus", 

526 "assess_data_quality", 

527]