Coverage for src / tracekit / analyzers / validation.py: 89%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Signal validation and suitability checking for measurements. 

2 

3This module provides helper functions to determine whether a signal is suitable 

4for specific measurements before attempting them. This helps avoid NaN results 

5and provides better user feedback. 

6 

7Example: 

8 >>> from tracekit.analyzers.validation import is_suitable_for_frequency, get_valid_measurements 

9 >>> suitable, reason = is_suitable_for_frequency(trace) 

10 >>> if suitable: 

11 ... freq = frequency(trace) 

12 >>> valid_measurements = get_valid_measurements(trace) 

13 >>> print(f"Applicable measurements: {', '.join(valid_measurements)}") 

14""" 

15 

16from __future__ import annotations 

17 

18from typing import TYPE_CHECKING 

19 

20import numpy as np 

21 

22if TYPE_CHECKING: 

23 from tracekit.core.types import WaveformTrace 

24 

25 

26def is_suitable_for_frequency_measurement(trace: WaveformTrace) -> tuple[bool, str]: 

27 """Check if trace is suitable for frequency measurement. 

28 

29 Args: 

30 trace: Input waveform trace. 

31 

32 Returns: 

33 Tuple of (is_suitable, reason). If not suitable, reason explains why. 

34 

35 Example: 

36 >>> suitable, reason = is_suitable_for_frequency_measurement(trace) 

37 >>> if suitable: 

38 ... freq = frequency(trace) 

39 ... else: 

40 ... print(f"Cannot measure frequency: {reason}") 

41 """ 

42 from tracekit.analyzers.waveform.measurements import _find_edges 

43 

44 data = trace.data 

45 n = len(data) 

46 

47 # Check minimum samples 

48 if n < 3: 

49 return False, f"Insufficient samples ({n} < 3)" 

50 

51 # Check for variation (DC signal) 

52 if np.std(data) < 1e-12: 

53 return False, "Signal has no variation (DC or constant)" 

54 

55 # Check for edges 

56 rising_edges = _find_edges(trace, "rising") 

57 if len(rising_edges) < 2: 

58 return ( 

59 False, 

60 f"Insufficient edges for periodic measurement (found {len(rising_edges)} rising edges, need at least 2)", 

61 ) 

62 

63 # Check period consistency (is it periodic?) 

64 if len(rising_edges) >= 3: 64 ↛ 75line 64 didn't jump to line 75 because the condition on line 64 was always true

65 edge_times = rising_edges * trace.metadata.time_base 

66 periods = np.diff(edge_times) 

67 period_cv = np.std(periods) / np.mean(periods) if np.mean(periods) > 0 else float("inf") 

68 

69 if period_cv > 0.2: 

70 return ( 

71 False, 

72 f"Signal is not periodic (period variation: {period_cv * 100:.1f}% > 20%)", 

73 ) 

74 

75 return True, "Signal is suitable for frequency measurement" 

76 

77 

78def is_suitable_for_duty_cycle_measurement(trace: WaveformTrace) -> tuple[bool, str]: 

79 """Check if trace is suitable for duty cycle measurement. 

80 

81 Args: 

82 trace: Input waveform trace. 

83 

84 Returns: 

85 Tuple of (is_suitable, reason). 

86 

87 Example: 

88 >>> suitable, reason = is_suitable_for_duty_cycle_measurement(trace) 

89 >>> if suitable: 

90 ... dc = duty_cycle(trace) 

91 """ 

92 from tracekit.analyzers.waveform.measurements import _find_edges 

93 

94 # Check if suitable for frequency first (duty cycle needs periodic signal) 

95 freq_suitable, freq_reason = is_suitable_for_frequency_measurement(trace) 

96 if not freq_suitable: 

97 return False, freq_reason 

98 

99 # Need both rising and falling edges 

100 rising = _find_edges(trace, "rising") 

101 falling = _find_edges(trace, "falling") 

102 

103 if len(rising) == 0: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 return False, "No rising edges detected" 

105 

106 if len(falling) == 0: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 return False, "No falling edges detected" 

108 

109 return True, "Signal is suitable for duty cycle measurement" 

110 

111 

112def is_suitable_for_rise_time_measurement(trace: WaveformTrace) -> tuple[bool, str]: 

113 """Check if trace is suitable for rise time measurement. 

114 

115 Args: 

116 trace: Input waveform trace. 

117 

118 Returns: 

119 Tuple of (is_suitable, reason). 

120 

121 Example: 

122 >>> suitable, reason = is_suitable_for_rise_time_measurement(trace) 

123 >>> if suitable: 

124 ... rt = rise_time(trace) 

125 """ 

126 from tracekit.analyzers.waveform.measurements import _find_edges, _find_levels 

127 

128 data = trace.data 

129 n = len(data) 

130 

131 if n < 3: 

132 return False, f"Insufficient samples ({n} < 3)" 

133 

134 # Check for amplitude 

135 low, high = _find_levels(data) 

136 amplitude = high - low 

137 

138 if amplitude <= 0: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 return False, "Signal has no amplitude (flat or inverted)" 

140 

141 # Check for rising edges 

142 rising_edges = _find_edges(trace, "rising") 

143 

144 if len(rising_edges) == 0: 

145 return False, "No rising edges detected" 

146 

147 # Check sample rate vs transition time 

148 # Find a rising transition and count samples across it 

149 sample_rate = trace.metadata.sample_rate 

150 low_ref = low + 0.1 * amplitude 

151 high_ref = low + 0.9 * amplitude 

152 

153 # Find first rising transition 

154 crossings = np.where((data[:-1] < low_ref) & (data[1:] >= low_ref))[0] 

155 

156 if len(crossings) > 0: 156 ↛ 175line 156 didn't jump to line 175 because the condition on line 156 was always true

157 idx = crossings[0] 

158 # Count samples from 10% to 90% 

159 remaining = data[idx:] 

160 above_high = remaining >= high_ref 

161 

162 if np.any(above_high): 162 ↛ 175line 162 didn't jump to line 175 because the condition on line 162 was always true

163 end_offset = np.argmax(above_high) 

164 samples_in_transition = end_offset 

165 

166 if samples_in_transition < 2: 

167 est_rise_time = samples_in_transition / sample_rate 

168 recommended_rate = 10 / est_rise_time 

169 return ( 

170 False, 

171 f"Insufficient sample rate for transition (< 2 samples). " 

172 f"Recommend sample rate > {recommended_rate:.3e} Hz", 

173 ) 

174 

175 return True, "Signal is suitable for rise time measurement" 

176 

177 

178def is_suitable_for_fall_time_measurement(trace: WaveformTrace) -> tuple[bool, str]: 

179 """Check if trace is suitable for fall time measurement. 

180 

181 Args: 

182 trace: Input waveform trace. 

183 

184 Returns: 

185 Tuple of (is_suitable, reason). 

186 """ 

187 from tracekit.analyzers.waveform.measurements import _find_edges, _find_levels 

188 

189 data = trace.data 

190 n = len(data) 

191 

192 if n < 3: 

193 return False, f"Insufficient samples ({n} < 3)" 

194 

195 low, high = _find_levels(data) 

196 amplitude = high - low 

197 

198 if amplitude <= 0: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 return False, "Signal has no amplitude (flat or inverted)" 

200 

201 # Check for falling edges 

202 falling_edges = _find_edges(trace, "falling") 

203 

204 if len(falling_edges) == 0: 

205 return False, "No falling edges detected" 

206 

207 return True, "Signal is suitable for fall time measurement" 

208 

209 

210def is_suitable_for_jitter_measurement(trace: WaveformTrace) -> tuple[bool, str]: 

211 """Check if trace is suitable for jitter measurement. 

212 

213 Args: 

214 trace: Input waveform trace. 

215 

216 Returns: 

217 Tuple of (is_suitable, reason). 

218 

219 Example: 

220 >>> suitable, reason = is_suitable_for_jitter_measurement(trace) 

221 >>> if suitable: 

222 ... jitter = rms_jitter(trace) 

223 """ 

224 from tracekit.analyzers.waveform.measurements import _find_edges 

225 

226 # Jitter needs periodic signal 

227 freq_suitable, freq_reason = is_suitable_for_frequency_measurement(trace) 

228 if not freq_suitable: 

229 return False, freq_reason 

230 

231 # Need at least 3 edges (2 periods minimum) 

232 edges = _find_edges(trace, "rising") 

233 

234 if len(edges) < 3: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 return ( 

236 False, 

237 f"Insufficient edges for jitter measurement (found {len(edges)}, need at least 3)", 

238 ) 

239 

240 return True, "Signal is suitable for jitter measurement" 

241 

242 

243def get_valid_measurements(trace: WaveformTrace) -> list[str]: 

244 """Get list of measurements that are suitable for this trace. 

245 

246 Analyzes the signal characteristics and returns the names of all 

247 measurement functions that should return valid (non-NaN) results. 

248 

249 Args: 

250 trace: Input waveform trace. 

251 

252 Returns: 

253 List of measurement function names (without parentheses). 

254 

255 Example: 

256 >>> valid = get_valid_measurements(trace) 

257 >>> print(f"Applicable measurements: {', '.join(valid)}") 

258 >>> # Then apply only valid measurements 

259 >>> for meas_name in valid: 

260 ... func = getattr(tk, meas_name) 

261 ... result = func(trace) 

262 """ 

263 valid = [] 

264 

265 # These almost always work (just need data) 

266 if len(trace.data) > 0: 

267 valid.extend(["mean", "rms"]) 

268 

269 if len(trace.data) >= 2: 

270 valid.append("amplitude") 

271 

272 # Check edge-based measurements 

273 suitable, _ = is_suitable_for_rise_time_measurement(trace) 

274 if suitable: 

275 valid.append("rise_time") 

276 

277 suitable, _ = is_suitable_for_fall_time_measurement(trace) 

278 if suitable: 

279 valid.append("fall_time") 

280 

281 # Check frequency/period 

282 suitable, _ = is_suitable_for_frequency_measurement(trace) 

283 if suitable: 

284 valid.extend(["frequency", "period"]) 

285 

286 # Check duty cycle 

287 suitable, _ = is_suitable_for_duty_cycle_measurement(trace) 

288 if suitable: 

289 valid.append("duty_cycle") 

290 

291 # Check jitter 

292 suitable, _ = is_suitable_for_jitter_measurement(trace) 

293 if suitable: 

294 valid.extend(["rms_jitter", "peak_to_peak_jitter"]) 

295 

296 # Pulse width - needs edges but not necessarily periodic 

297 from tracekit.analyzers.waveform.measurements import _find_edges 

298 

299 rising = _find_edges(trace, "rising") 

300 falling = _find_edges(trace, "falling") 

301 

302 if len(rising) > 0 and len(falling) > 0: 

303 valid.append("pulse_width") 

304 

305 # Overshoot/undershoot - check amplitude 

306 from tracekit.analyzers.waveform.measurements import _find_levels 

307 

308 if len(trace.data) >= 3: 

309 low, high = _find_levels(trace.data) 

310 if high - low > 0: 310 ↛ 314line 310 didn't jump to line 314 because the condition on line 310 was always true

311 valid.extend(["overshoot", "undershoot", "preshoot"]) 

312 

313 # Slew rate - similar to rise/fall time 

314 if "rise_time" in valid or "fall_time" in valid: 

315 valid.append("slew_rate") 

316 

317 return valid 

318 

319 

320def analyze_signal_characteristics(trace: WaveformTrace) -> dict[str, bool | int | str | list[str]]: 

321 """Perform comprehensive signal characteristic analysis. 

322 

323 Determines signal type, edge counts, periodicity, and recommends 

324 applicable measurements. 

325 

326 Args: 

327 trace: Input waveform trace. 

328 

329 Returns: 

330 Dictionary containing: 

331 - sufficient_samples: bool - at least 16 samples 

332 - has_amplitude: bool - signal has variation 

333 - has_variation: bool - standard deviation > 0 

334 - has_edges: bool - rising or falling edges detected 

335 - is_periodic: bool - signal appears periodic 

336 - edge_count: int - total edges (rising + falling) 

337 - rising_edge_count: int - number of rising edges 

338 - falling_edge_count: int - number of falling edges 

339 - signal_type: str - classified type (dc, periodic_digital, etc.) 

340 - recommended_measurements: list[str] - suggested measurements 

341 

342 Example: 

343 >>> chars = analyze_signal_characteristics(trace) 

344 >>> if chars['is_periodic']: 

345 ... print("Signal is periodic") 

346 ... print(f"Frequency measurement recommended: {'frequency' in chars['recommended_measurements']}") 

347 """ 

348 from tracekit.analyzers.waveform.measurements import _find_edges 

349 

350 data = trace.data 

351 n = len(data) 

352 

353 characteristics: dict[str, bool | int | str | list[str]] = { 

354 "sufficient_samples": n >= 16, 

355 "has_amplitude": False, 

356 "has_variation": False, 

357 "has_edges": False, 

358 "is_periodic": False, 

359 "edge_count": 0, 

360 "rising_edge_count": 0, 

361 "falling_edge_count": 0, 

362 "signal_type": "unknown", 

363 "recommended_measurements": [], 

364 } 

365 

366 # Check variation 

367 std = np.std(data) 

368 characteristics["has_variation"] = std > 1e-12 

369 

370 # Check amplitude 

371 amplitude = np.max(data) - np.min(data) 

372 characteristics["has_amplitude"] = amplitude > 1e-12 

373 

374 if not characteristics["has_variation"]: 

375 characteristics["signal_type"] = "dc" 

376 characteristics["recommended_measurements"] = ["mean", "rms"] 

377 return characteristics 

378 

379 # Count edges 

380 rising_edges = _find_edges(trace, "rising") 

381 falling_edges = _find_edges(trace, "falling") 

382 

383 rising_edge_count = len(rising_edges) 

384 falling_edge_count = len(falling_edges) 

385 edge_count = rising_edge_count + falling_edge_count 

386 

387 characteristics["rising_edge_count"] = rising_edge_count 

388 characteristics["falling_edge_count"] = falling_edge_count 

389 characteristics["edge_count"] = edge_count 

390 characteristics["has_edges"] = edge_count > 0 

391 

392 # Check periodicity 

393 if len(rising_edges) >= 3: 

394 periods = np.diff(rising_edges) 

395 period_cv = np.std(periods) / np.mean(periods) if np.mean(periods) > 0 else float("inf") 

396 

397 if period_cv < 0.2: # Less than 20% variation 

398 characteristics["is_periodic"] = True 

399 

400 # Classify signal type 

401 if not characteristics["has_edges"]: 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true

402 # No edges - check if analog periodic 

403 if n >= 16: 

404 fft_result = np.abs(np.fft.rfft(data - np.mean(data))) 

405 peak_power = np.max(fft_result[1:]) if len(fft_result) > 1 else 0 

406 avg_power = np.mean(fft_result[1:]) if len(fft_result) > 1 else 0 

407 

408 if peak_power > 10 * avg_power: 

409 characteristics["signal_type"] = "periodic_analog" 

410 else: 

411 characteristics["signal_type"] = "noise" 

412 else: 

413 characteristics["signal_type"] = "unknown" 

414 elif characteristics["is_periodic"]: 

415 characteristics["signal_type"] = "periodic_digital" 

416 else: 

417 characteristics["signal_type"] = "aperiodic_digital" 

418 

419 # Recommend measurements 

420 recommended = get_valid_measurements(trace) 

421 characteristics["recommended_measurements"] = recommended 

422 

423 return characteristics 

424 

425 

426def get_measurement_requirements(measurement_name: str) -> dict[str, str | int | list[str]]: 

427 """Get requirements for a specific measurement. 

428 

429 Args: 

430 measurement_name: Name of the measurement function. 

431 

432 Returns: 

433 Dictionary containing: 

434 - description: str - what the measurement computes 

435 - min_samples: int - minimum data points needed 

436 - required_signal_types: list[str] - suitable signal types 

437 - required_features: list[str] - required signal features 

438 - common_nan_causes: list[str] - common reasons for NaN 

439 

440 Example: 

441 >>> reqs = get_measurement_requirements('frequency') 

442 >>> print(f"Minimum samples: {reqs['min_samples']}") 

443 >>> print(f"Required features: {', '.join(reqs['required_features'])}") 

444 """ 

445 requirements = { 

446 "frequency": { 

447 "description": "Measures the repetition rate of a periodic signal", 

448 "min_samples": 3, 

449 "required_signal_types": ["periodic_digital", "periodic_analog"], 

450 "required_features": ["edges", "periodic"], 

451 "common_nan_causes": [ 

452 "DC signal (no transitions)", 

453 "Aperiodic signal (< 2 edges)", 

454 "Highly variable period (> 20% variation)", 

455 ], 

456 }, 

457 "period": { 

458 "description": "Measures time between consecutive edges", 

459 "min_samples": 3, 

460 "required_signal_types": ["periodic_digital", "periodic_analog"], 

461 "required_features": ["edges", "periodic"], 

462 "common_nan_causes": [ 

463 "DC signal", 

464 "Fewer than 2 edges detected", 

465 "Aperiodic signal", 

466 ], 

467 }, 

468 "duty_cycle": { 

469 "description": "Measures ratio of high time to period", 

470 "min_samples": 3, 

471 "required_signal_types": ["periodic_digital"], 

472 "required_features": ["rising_edges", "falling_edges", "periodic"], 

473 "common_nan_causes": [ 

474 "Non-periodic signal", 

475 "Missing rising or falling edges", 

476 "DC signal", 

477 ], 

478 }, 

479 "rise_time": { 

480 "description": "Measures time for rising edge transition", 

481 "min_samples": 3, 

482 "required_signal_types": ["periodic_digital", "aperiodic_digital", "periodic_analog"], 

483 "required_features": ["rising_edges", "amplitude"], 

484 "common_nan_causes": [ 

485 "No rising edges", 

486 "Insufficient sample rate", 

487 "DC signal", 

488 ], 

489 }, 

490 "fall_time": { 

491 "description": "Measures time for falling edge transition", 

492 "min_samples": 3, 

493 "required_signal_types": ["periodic_digital", "aperiodic_digital", "periodic_analog"], 

494 "required_features": ["falling_edges", "amplitude"], 

495 "common_nan_causes": [ 

496 "No falling edges", 

497 "Insufficient sample rate", 

498 "DC signal", 

499 ], 

500 }, 

501 "pulse_width": { 

502 "description": "Measures duration of high or low pulse", 

503 "min_samples": 3, 

504 "required_signal_types": ["periodic_digital", "aperiodic_digital"], 

505 "required_features": ["rising_edges", "falling_edges"], 

506 "common_nan_causes": [ 

507 "Missing edge pairs", 

508 "DC signal", 

509 "Incomplete pulses", 

510 ], 

511 }, 

512 "amplitude": { 

513 "description": "Measures peak-to-peak voltage", 

514 "min_samples": 2, 

515 "required_signal_types": ["all"], 

516 "required_features": [], 

517 "common_nan_causes": ["Fewer than 2 samples"], 

518 }, 

519 "mean": { 

520 "description": "Calculates DC level (average voltage)", 

521 "min_samples": 1, 

522 "required_signal_types": ["all"], 

523 "required_features": [], 

524 "common_nan_causes": ["No data"], 

525 }, 

526 "rms": { 

527 "description": "Calculates root-mean-square voltage", 

528 "min_samples": 1, 

529 "required_signal_types": ["all"], 

530 "required_features": [], 

531 "common_nan_causes": ["No data"], 

532 }, 

533 "overshoot": { 

534 "description": "Measures overshoot above high level", 

535 "min_samples": 3, 

536 "required_signal_types": ["periodic_digital", "aperiodic_digital"], 

537 "required_features": ["amplitude"], 

538 "common_nan_causes": ["No amplitude", "DC signal"], 

539 }, 

540 "undershoot": { 

541 "description": "Measures undershoot below low level", 

542 "min_samples": 3, 

543 "required_signal_types": ["periodic_digital", "aperiodic_digital"], 

544 "required_features": ["amplitude"], 

545 "common_nan_causes": ["No amplitude", "DC signal"], 

546 }, 

547 "slew_rate": { 

548 "description": "Measures dV/dt during transitions", 

549 "min_samples": 3, 

550 "required_signal_types": ["periodic_digital", "aperiodic_digital"], 

551 "required_features": ["edges", "amplitude"], 

552 "common_nan_causes": ["No edges", "No amplitude", "DC signal"], 

553 }, 

554 "rms_jitter": { 

555 "description": "Measures timing uncertainty (RMS)", 

556 "min_samples": 3, 

557 "required_signal_types": ["periodic_digital"], 

558 "required_features": ["edges", "periodic"], 

559 "common_nan_causes": [ 

560 "Fewer than 3 edges", 

561 "Non-periodic signal", 

562 "DC signal", 

563 ], 

564 }, 

565 "peak_to_peak_jitter": { 

566 "description": "Measures peak-to-peak timing variation", 

567 "min_samples": 3, 

568 "required_signal_types": ["periodic_digital"], 

569 "required_features": ["edges", "periodic"], 

570 "common_nan_causes": [ 

571 "Fewer than 3 edges", 

572 "Non-periodic signal", 

573 "DC signal", 

574 ], 

575 }, 

576 } 

577 

578 default = { 

579 "description": "Measurement not documented", 

580 "min_samples": 1, 

581 "required_signal_types": ["unknown"], 

582 "required_features": [], 

583 "common_nan_causes": ["Check measurement documentation"], 

584 } 

585 

586 return requirements.get(measurement_name, default) # type: ignore[return-value] 

587 

588 

589__all__ = [ 

590 "analyze_signal_characteristics", 

591 "get_measurement_requirements", 

592 "get_valid_measurements", 

593 "is_suitable_for_duty_cycle_measurement", 

594 "is_suitable_for_fall_time_measurement", 

595 "is_suitable_for_frequency_measurement", 

596 "is_suitable_for_jitter_measurement", 

597 "is_suitable_for_rise_time_measurement", 

598]