Coverage for src / tracekit / analyzers / validation.py: 89%
166 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Signal validation and suitability checking for measurements.
3This module provides helper functions to determine whether a signal is suitable
4for specific measurements before attempting them. This helps avoid NaN results
5and provides better user feedback.
7Example:
8 >>> from tracekit.analyzers.validation import is_suitable_for_frequency, get_valid_measurements
9 >>> suitable, reason = is_suitable_for_frequency(trace)
10 >>> if suitable:
11 ... freq = frequency(trace)
12 >>> valid_measurements = get_valid_measurements(trace)
13 >>> print(f"Applicable measurements: {', '.join(valid_measurements)}")
14"""
16from __future__ import annotations
18from typing import TYPE_CHECKING
20import numpy as np
22if TYPE_CHECKING:
23 from tracekit.core.types import WaveformTrace
26def is_suitable_for_frequency_measurement(trace: WaveformTrace) -> tuple[bool, str]:
27 """Check if trace is suitable for frequency measurement.
29 Args:
30 trace: Input waveform trace.
32 Returns:
33 Tuple of (is_suitable, reason). If not suitable, reason explains why.
35 Example:
36 >>> suitable, reason = is_suitable_for_frequency_measurement(trace)
37 >>> if suitable:
38 ... freq = frequency(trace)
39 ... else:
40 ... print(f"Cannot measure frequency: {reason}")
41 """
42 from tracekit.analyzers.waveform.measurements import _find_edges
44 data = trace.data
45 n = len(data)
47 # Check minimum samples
48 if n < 3:
49 return False, f"Insufficient samples ({n} < 3)"
51 # Check for variation (DC signal)
52 if np.std(data) < 1e-12:
53 return False, "Signal has no variation (DC or constant)"
55 # Check for edges
56 rising_edges = _find_edges(trace, "rising")
57 if len(rising_edges) < 2:
58 return (
59 False,
60 f"Insufficient edges for periodic measurement (found {len(rising_edges)} rising edges, need at least 2)",
61 )
63 # Check period consistency (is it periodic?)
64 if len(rising_edges) >= 3: 64 ↛ 75line 64 didn't jump to line 75 because the condition on line 64 was always true
65 edge_times = rising_edges * trace.metadata.time_base
66 periods = np.diff(edge_times)
67 period_cv = np.std(periods) / np.mean(periods) if np.mean(periods) > 0 else float("inf")
69 if period_cv > 0.2:
70 return (
71 False,
72 f"Signal is not periodic (period variation: {period_cv * 100:.1f}% > 20%)",
73 )
75 return True, "Signal is suitable for frequency measurement"
78def is_suitable_for_duty_cycle_measurement(trace: WaveformTrace) -> tuple[bool, str]:
79 """Check if trace is suitable for duty cycle measurement.
81 Args:
82 trace: Input waveform trace.
84 Returns:
85 Tuple of (is_suitable, reason).
87 Example:
88 >>> suitable, reason = is_suitable_for_duty_cycle_measurement(trace)
89 >>> if suitable:
90 ... dc = duty_cycle(trace)
91 """
92 from tracekit.analyzers.waveform.measurements import _find_edges
94 # Check if suitable for frequency first (duty cycle needs periodic signal)
95 freq_suitable, freq_reason = is_suitable_for_frequency_measurement(trace)
96 if not freq_suitable:
97 return False, freq_reason
99 # Need both rising and falling edges
100 rising = _find_edges(trace, "rising")
101 falling = _find_edges(trace, "falling")
103 if len(rising) == 0: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 return False, "No rising edges detected"
106 if len(falling) == 0: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 return False, "No falling edges detected"
109 return True, "Signal is suitable for duty cycle measurement"
112def is_suitable_for_rise_time_measurement(trace: WaveformTrace) -> tuple[bool, str]:
113 """Check if trace is suitable for rise time measurement.
115 Args:
116 trace: Input waveform trace.
118 Returns:
119 Tuple of (is_suitable, reason).
121 Example:
122 >>> suitable, reason = is_suitable_for_rise_time_measurement(trace)
123 >>> if suitable:
124 ... rt = rise_time(trace)
125 """
126 from tracekit.analyzers.waveform.measurements import _find_edges, _find_levels
128 data = trace.data
129 n = len(data)
131 if n < 3:
132 return False, f"Insufficient samples ({n} < 3)"
134 # Check for amplitude
135 low, high = _find_levels(data)
136 amplitude = high - low
138 if amplitude <= 0: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 return False, "Signal has no amplitude (flat or inverted)"
141 # Check for rising edges
142 rising_edges = _find_edges(trace, "rising")
144 if len(rising_edges) == 0:
145 return False, "No rising edges detected"
147 # Check sample rate vs transition time
148 # Find a rising transition and count samples across it
149 sample_rate = trace.metadata.sample_rate
150 low_ref = low + 0.1 * amplitude
151 high_ref = low + 0.9 * amplitude
153 # Find first rising transition
154 crossings = np.where((data[:-1] < low_ref) & (data[1:] >= low_ref))[0]
156 if len(crossings) > 0: 156 ↛ 175line 156 didn't jump to line 175 because the condition on line 156 was always true
157 idx = crossings[0]
158 # Count samples from 10% to 90%
159 remaining = data[idx:]
160 above_high = remaining >= high_ref
162 if np.any(above_high): 162 ↛ 175line 162 didn't jump to line 175 because the condition on line 162 was always true
163 end_offset = np.argmax(above_high)
164 samples_in_transition = end_offset
166 if samples_in_transition < 2:
167 est_rise_time = samples_in_transition / sample_rate
168 recommended_rate = 10 / est_rise_time
169 return (
170 False,
171 f"Insufficient sample rate for transition (< 2 samples). "
172 f"Recommend sample rate > {recommended_rate:.3e} Hz",
173 )
175 return True, "Signal is suitable for rise time measurement"
178def is_suitable_for_fall_time_measurement(trace: WaveformTrace) -> tuple[bool, str]:
179 """Check if trace is suitable for fall time measurement.
181 Args:
182 trace: Input waveform trace.
184 Returns:
185 Tuple of (is_suitable, reason).
186 """
187 from tracekit.analyzers.waveform.measurements import _find_edges, _find_levels
189 data = trace.data
190 n = len(data)
192 if n < 3:
193 return False, f"Insufficient samples ({n} < 3)"
195 low, high = _find_levels(data)
196 amplitude = high - low
198 if amplitude <= 0: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 return False, "Signal has no amplitude (flat or inverted)"
201 # Check for falling edges
202 falling_edges = _find_edges(trace, "falling")
204 if len(falling_edges) == 0:
205 return False, "No falling edges detected"
207 return True, "Signal is suitable for fall time measurement"
210def is_suitable_for_jitter_measurement(trace: WaveformTrace) -> tuple[bool, str]:
211 """Check if trace is suitable for jitter measurement.
213 Args:
214 trace: Input waveform trace.
216 Returns:
217 Tuple of (is_suitable, reason).
219 Example:
220 >>> suitable, reason = is_suitable_for_jitter_measurement(trace)
221 >>> if suitable:
222 ... jitter = rms_jitter(trace)
223 """
224 from tracekit.analyzers.waveform.measurements import _find_edges
226 # Jitter needs periodic signal
227 freq_suitable, freq_reason = is_suitable_for_frequency_measurement(trace)
228 if not freq_suitable:
229 return False, freq_reason
231 # Need at least 3 edges (2 periods minimum)
232 edges = _find_edges(trace, "rising")
234 if len(edges) < 3: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 return (
236 False,
237 f"Insufficient edges for jitter measurement (found {len(edges)}, need at least 3)",
238 )
240 return True, "Signal is suitable for jitter measurement"
243def get_valid_measurements(trace: WaveformTrace) -> list[str]:
244 """Get list of measurements that are suitable for this trace.
246 Analyzes the signal characteristics and returns the names of all
247 measurement functions that should return valid (non-NaN) results.
249 Args:
250 trace: Input waveform trace.
252 Returns:
253 List of measurement function names (without parentheses).
255 Example:
256 >>> valid = get_valid_measurements(trace)
257 >>> print(f"Applicable measurements: {', '.join(valid)}")
258 >>> # Then apply only valid measurements
259 >>> for meas_name in valid:
260 ... func = getattr(tk, meas_name)
261 ... result = func(trace)
262 """
263 valid = []
265 # These almost always work (just need data)
266 if len(trace.data) > 0:
267 valid.extend(["mean", "rms"])
269 if len(trace.data) >= 2:
270 valid.append("amplitude")
272 # Check edge-based measurements
273 suitable, _ = is_suitable_for_rise_time_measurement(trace)
274 if suitable:
275 valid.append("rise_time")
277 suitable, _ = is_suitable_for_fall_time_measurement(trace)
278 if suitable:
279 valid.append("fall_time")
281 # Check frequency/period
282 suitable, _ = is_suitable_for_frequency_measurement(trace)
283 if suitable:
284 valid.extend(["frequency", "period"])
286 # Check duty cycle
287 suitable, _ = is_suitable_for_duty_cycle_measurement(trace)
288 if suitable:
289 valid.append("duty_cycle")
291 # Check jitter
292 suitable, _ = is_suitable_for_jitter_measurement(trace)
293 if suitable:
294 valid.extend(["rms_jitter", "peak_to_peak_jitter"])
296 # Pulse width - needs edges but not necessarily periodic
297 from tracekit.analyzers.waveform.measurements import _find_edges
299 rising = _find_edges(trace, "rising")
300 falling = _find_edges(trace, "falling")
302 if len(rising) > 0 and len(falling) > 0:
303 valid.append("pulse_width")
305 # Overshoot/undershoot - check amplitude
306 from tracekit.analyzers.waveform.measurements import _find_levels
308 if len(trace.data) >= 3:
309 low, high = _find_levels(trace.data)
310 if high - low > 0: 310 ↛ 314line 310 didn't jump to line 314 because the condition on line 310 was always true
311 valid.extend(["overshoot", "undershoot", "preshoot"])
313 # Slew rate - similar to rise/fall time
314 if "rise_time" in valid or "fall_time" in valid:
315 valid.append("slew_rate")
317 return valid
320def analyze_signal_characteristics(trace: WaveformTrace) -> dict[str, bool | int | str | list[str]]:
321 """Perform comprehensive signal characteristic analysis.
323 Determines signal type, edge counts, periodicity, and recommends
324 applicable measurements.
326 Args:
327 trace: Input waveform trace.
329 Returns:
330 Dictionary containing:
331 - sufficient_samples: bool - at least 16 samples
332 - has_amplitude: bool - signal has variation
333 - has_variation: bool - standard deviation > 0
334 - has_edges: bool - rising or falling edges detected
335 - is_periodic: bool - signal appears periodic
336 - edge_count: int - total edges (rising + falling)
337 - rising_edge_count: int - number of rising edges
338 - falling_edge_count: int - number of falling edges
339 - signal_type: str - classified type (dc, periodic_digital, etc.)
340 - recommended_measurements: list[str] - suggested measurements
342 Example:
343 >>> chars = analyze_signal_characteristics(trace)
344 >>> if chars['is_periodic']:
345 ... print("Signal is periodic")
346 ... print(f"Frequency measurement recommended: {'frequency' in chars['recommended_measurements']}")
347 """
348 from tracekit.analyzers.waveform.measurements import _find_edges
350 data = trace.data
351 n = len(data)
353 characteristics: dict[str, bool | int | str | list[str]] = {
354 "sufficient_samples": n >= 16,
355 "has_amplitude": False,
356 "has_variation": False,
357 "has_edges": False,
358 "is_periodic": False,
359 "edge_count": 0,
360 "rising_edge_count": 0,
361 "falling_edge_count": 0,
362 "signal_type": "unknown",
363 "recommended_measurements": [],
364 }
366 # Check variation
367 std = np.std(data)
368 characteristics["has_variation"] = std > 1e-12
370 # Check amplitude
371 amplitude = np.max(data) - np.min(data)
372 characteristics["has_amplitude"] = amplitude > 1e-12
374 if not characteristics["has_variation"]:
375 characteristics["signal_type"] = "dc"
376 characteristics["recommended_measurements"] = ["mean", "rms"]
377 return characteristics
379 # Count edges
380 rising_edges = _find_edges(trace, "rising")
381 falling_edges = _find_edges(trace, "falling")
383 rising_edge_count = len(rising_edges)
384 falling_edge_count = len(falling_edges)
385 edge_count = rising_edge_count + falling_edge_count
387 characteristics["rising_edge_count"] = rising_edge_count
388 characteristics["falling_edge_count"] = falling_edge_count
389 characteristics["edge_count"] = edge_count
390 characteristics["has_edges"] = edge_count > 0
392 # Check periodicity
393 if len(rising_edges) >= 3:
394 periods = np.diff(rising_edges)
395 period_cv = np.std(periods) / np.mean(periods) if np.mean(periods) > 0 else float("inf")
397 if period_cv < 0.2: # Less than 20% variation
398 characteristics["is_periodic"] = True
400 # Classify signal type
401 if not characteristics["has_edges"]: 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true
402 # No edges - check if analog periodic
403 if n >= 16:
404 fft_result = np.abs(np.fft.rfft(data - np.mean(data)))
405 peak_power = np.max(fft_result[1:]) if len(fft_result) > 1 else 0
406 avg_power = np.mean(fft_result[1:]) if len(fft_result) > 1 else 0
408 if peak_power > 10 * avg_power:
409 characteristics["signal_type"] = "periodic_analog"
410 else:
411 characteristics["signal_type"] = "noise"
412 else:
413 characteristics["signal_type"] = "unknown"
414 elif characteristics["is_periodic"]:
415 characteristics["signal_type"] = "periodic_digital"
416 else:
417 characteristics["signal_type"] = "aperiodic_digital"
419 # Recommend measurements
420 recommended = get_valid_measurements(trace)
421 characteristics["recommended_measurements"] = recommended
423 return characteristics
426def get_measurement_requirements(measurement_name: str) -> dict[str, str | int | list[str]]:
427 """Get requirements for a specific measurement.
429 Args:
430 measurement_name: Name of the measurement function.
432 Returns:
433 Dictionary containing:
434 - description: str - what the measurement computes
435 - min_samples: int - minimum data points needed
436 - required_signal_types: list[str] - suitable signal types
437 - required_features: list[str] - required signal features
438 - common_nan_causes: list[str] - common reasons for NaN
440 Example:
441 >>> reqs = get_measurement_requirements('frequency')
442 >>> print(f"Minimum samples: {reqs['min_samples']}")
443 >>> print(f"Required features: {', '.join(reqs['required_features'])}")
444 """
445 requirements = {
446 "frequency": {
447 "description": "Measures the repetition rate of a periodic signal",
448 "min_samples": 3,
449 "required_signal_types": ["periodic_digital", "periodic_analog"],
450 "required_features": ["edges", "periodic"],
451 "common_nan_causes": [
452 "DC signal (no transitions)",
453 "Aperiodic signal (< 2 edges)",
454 "Highly variable period (> 20% variation)",
455 ],
456 },
457 "period": {
458 "description": "Measures time between consecutive edges",
459 "min_samples": 3,
460 "required_signal_types": ["periodic_digital", "periodic_analog"],
461 "required_features": ["edges", "periodic"],
462 "common_nan_causes": [
463 "DC signal",
464 "Fewer than 2 edges detected",
465 "Aperiodic signal",
466 ],
467 },
468 "duty_cycle": {
469 "description": "Measures ratio of high time to period",
470 "min_samples": 3,
471 "required_signal_types": ["periodic_digital"],
472 "required_features": ["rising_edges", "falling_edges", "periodic"],
473 "common_nan_causes": [
474 "Non-periodic signal",
475 "Missing rising or falling edges",
476 "DC signal",
477 ],
478 },
479 "rise_time": {
480 "description": "Measures time for rising edge transition",
481 "min_samples": 3,
482 "required_signal_types": ["periodic_digital", "aperiodic_digital", "periodic_analog"],
483 "required_features": ["rising_edges", "amplitude"],
484 "common_nan_causes": [
485 "No rising edges",
486 "Insufficient sample rate",
487 "DC signal",
488 ],
489 },
490 "fall_time": {
491 "description": "Measures time for falling edge transition",
492 "min_samples": 3,
493 "required_signal_types": ["periodic_digital", "aperiodic_digital", "periodic_analog"],
494 "required_features": ["falling_edges", "amplitude"],
495 "common_nan_causes": [
496 "No falling edges",
497 "Insufficient sample rate",
498 "DC signal",
499 ],
500 },
501 "pulse_width": {
502 "description": "Measures duration of high or low pulse",
503 "min_samples": 3,
504 "required_signal_types": ["periodic_digital", "aperiodic_digital"],
505 "required_features": ["rising_edges", "falling_edges"],
506 "common_nan_causes": [
507 "Missing edge pairs",
508 "DC signal",
509 "Incomplete pulses",
510 ],
511 },
512 "amplitude": {
513 "description": "Measures peak-to-peak voltage",
514 "min_samples": 2,
515 "required_signal_types": ["all"],
516 "required_features": [],
517 "common_nan_causes": ["Fewer than 2 samples"],
518 },
519 "mean": {
520 "description": "Calculates DC level (average voltage)",
521 "min_samples": 1,
522 "required_signal_types": ["all"],
523 "required_features": [],
524 "common_nan_causes": ["No data"],
525 },
526 "rms": {
527 "description": "Calculates root-mean-square voltage",
528 "min_samples": 1,
529 "required_signal_types": ["all"],
530 "required_features": [],
531 "common_nan_causes": ["No data"],
532 },
533 "overshoot": {
534 "description": "Measures overshoot above high level",
535 "min_samples": 3,
536 "required_signal_types": ["periodic_digital", "aperiodic_digital"],
537 "required_features": ["amplitude"],
538 "common_nan_causes": ["No amplitude", "DC signal"],
539 },
540 "undershoot": {
541 "description": "Measures undershoot below low level",
542 "min_samples": 3,
543 "required_signal_types": ["periodic_digital", "aperiodic_digital"],
544 "required_features": ["amplitude"],
545 "common_nan_causes": ["No amplitude", "DC signal"],
546 },
547 "slew_rate": {
548 "description": "Measures dV/dt during transitions",
549 "min_samples": 3,
550 "required_signal_types": ["periodic_digital", "aperiodic_digital"],
551 "required_features": ["edges", "amplitude"],
552 "common_nan_causes": ["No edges", "No amplitude", "DC signal"],
553 },
554 "rms_jitter": {
555 "description": "Measures timing uncertainty (RMS)",
556 "min_samples": 3,
557 "required_signal_types": ["periodic_digital"],
558 "required_features": ["edges", "periodic"],
559 "common_nan_causes": [
560 "Fewer than 3 edges",
561 "Non-periodic signal",
562 "DC signal",
563 ],
564 },
565 "peak_to_peak_jitter": {
566 "description": "Measures peak-to-peak timing variation",
567 "min_samples": 3,
568 "required_signal_types": ["periodic_digital"],
569 "required_features": ["edges", "periodic"],
570 "common_nan_causes": [
571 "Fewer than 3 edges",
572 "Non-periodic signal",
573 "DC signal",
574 ],
575 },
576 }
578 default = {
579 "description": "Measurement not documented",
580 "min_samples": 1,
581 "required_signal_types": ["unknown"],
582 "required_features": [],
583 "common_nan_causes": ["Check measurement documentation"],
584 }
586 return requirements.get(measurement_name, default) # type: ignore[return-value]
589__all__ = [
590 "analyze_signal_characteristics",
591 "get_measurement_requirements",
592 "get_valid_measurements",
593 "is_suitable_for_duty_cycle_measurement",
594 "is_suitable_for_fall_time_measurement",
595 "is_suitable_for_frequency_measurement",
596 "is_suitable_for_jitter_measurement",
597 "is_suitable_for_rise_time_measurement",
598]