Coverage for src / tracekit / analyzers / digital / thresholds.py: 98%
226 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Time-varying and multi-level threshold support for digital signal analysis.
3 - RE-THR-001: Time-Varying Threshold Support
4 - RE-THR-002: Multi-Level Logic Support
6This module provides adaptive thresholding for signals with varying DC offset
7or amplitude, and support for multi-level logic standards beyond simple
8high/low states.
9"""
11from __future__ import annotations
13from dataclasses import dataclass, field
14from typing import TYPE_CHECKING, Literal
16import numpy as np
18if TYPE_CHECKING:
19 from numpy.typing import NDArray
22@dataclass
23class ThresholdConfig:
24 """Configuration for threshold detection.
26 Implements RE-THR-001, RE-THR-002: Threshold configuration.
28 Attributes:
29 threshold_type: Type of thresholding ('fixed', 'adaptive', 'multi_level').
30 fixed_threshold: Fixed threshold value (for 'fixed' type).
31 window_size: Window size for adaptive thresholding.
32 percentile: Percentile for adaptive threshold calculation.
33 levels: Voltage levels for multi-level logic.
34 hysteresis: Hysteresis margin to prevent oscillation.
35 """
37 threshold_type: Literal["fixed", "adaptive", "multi_level"] = "fixed"
38 fixed_threshold: float = 0.5
39 window_size: int = 1024
40 percentile: float = 50.0
41 levels: list[float] = field(default_factory=lambda: [0.0, 1.0])
42 hysteresis: float = 0.05
45@dataclass
46class AdaptiveThresholdResult:
47 """Result of adaptive threshold calculation.
49 Implements RE-THR-001: Time-varying threshold result.
51 Attributes:
52 thresholds: Array of threshold values at each sample.
53 binary_output: Digitized signal.
54 crossings: Indices of threshold crossings.
55 dc_offset: Estimated DC offset over time.
56 amplitude: Estimated signal amplitude over time.
57 """
59 thresholds: NDArray[np.float64]
60 binary_output: NDArray[np.uint8]
61 crossings: list[int]
62 dc_offset: NDArray[np.float64]
63 amplitude: NDArray[np.float64]
66@dataclass
67class MultiLevelResult:
68 """Result of multi-level logic detection.
70 Implements RE-THR-002: Multi-level detection result.
72 Attributes:
73 levels: Detected logic levels at each sample.
74 level_values: Voltage levels used.
75 transitions: List of (index, from_level, to_level) transitions.
76 level_histogram: Count of samples at each level.
77 eye_heights: Eye height for each transition.
78 """
80 levels: NDArray[np.int32]
81 level_values: list[float]
82 transitions: list[tuple[int, int, int]]
83 level_histogram: dict[int, int]
84 eye_heights: list[float]
87class AdaptiveThresholder:
88 """Apply time-varying thresholds to signals.
90 Implements RE-THR-001: Time-Varying Threshold Support.
92 Tracks DC offset and amplitude changes to maintain accurate
93 thresholding despite signal drift.
95 Example:
96 >>> thresholder = AdaptiveThresholder(window_size=1000)
97 >>> result = thresholder.apply(analog_signal)
98 >>> digital = result.binary_output
99 """
101 def __init__(
102 self,
103 window_size: int = 1024,
104 percentile: float = 50.0,
105 method: Literal["median", "mean", "envelope", "otsu"] = "median",
106 hysteresis: float = 0.05,
107 ) -> None:
108 """Initialize adaptive thresholder.
110 Args:
111 window_size: Size of sliding window for adaptation.
112 percentile: Percentile for threshold calculation.
113 method: Thresholding method.
114 hysteresis: Hysteresis margin. This is used as an absolute value
115 when amplitude-relative calculation would be too small. For
116 signals with small amplitude variations (e.g., oscillating
117 around a threshold), this value is applied directly.
118 """
119 self.window_size = window_size
120 self.percentile = percentile
121 self.method = method
122 self.hysteresis = hysteresis
124 def apply(self, signal: NDArray[np.float64]) -> AdaptiveThresholdResult:
125 """Apply adaptive thresholding to signal.
127 Implements RE-THR-001: Adaptive threshold application.
129 Args:
130 signal: Input analog signal.
132 Returns:
133 AdaptiveThresholdResult with thresholds and digitized output.
135 Example:
136 >>> result = thresholder.apply(analog_waveform)
137 >>> plt.plot(result.binary_output)
138 """
139 n_samples = len(signal)
141 # Estimate DC offset and amplitude over time
142 dc_offset = np.zeros(n_samples)
143 amplitude = np.zeros(n_samples)
144 thresholds = np.zeros(n_samples)
146 half_window = self.window_size // 2
148 for i in range(n_samples):
149 # Window bounds
150 start = max(0, i - half_window)
151 end = min(n_samples, i + half_window)
152 window = signal[start:end]
154 if self.method == "median":
155 dc_offset[i] = np.median(window)
156 amplitude[i] = np.percentile(window, 95) - np.percentile(window, 5)
157 thresholds[i] = dc_offset[i]
159 elif self.method == "mean":
160 dc_offset[i] = np.mean(window)
161 amplitude[i] = np.std(window) * 4 # Approximate peak-to-peak
162 thresholds[i] = dc_offset[i]
164 elif self.method == "envelope":
165 # Use min/max envelope
166 high = np.max(window)
167 low = np.min(window)
168 dc_offset[i] = (high + low) / 2
169 amplitude[i] = high - low
170 thresholds[i] = dc_offset[i]
172 elif self.method == "otsu": 172 ↛ 148line 172 didn't jump to line 148 because the condition on line 172 was always true
173 # Simplified Otsu's method
174 threshold = self._otsu_threshold(window)
175 thresholds[i] = threshold
176 dc_offset[i] = threshold
177 amplitude[i] = np.max(window) - np.min(window)
179 # Apply hysteresis
180 binary_output, crossings = self._apply_with_hysteresis(signal, thresholds, amplitude)
182 return AdaptiveThresholdResult(
183 thresholds=thresholds,
184 binary_output=binary_output,
185 crossings=crossings,
186 dc_offset=dc_offset,
187 amplitude=amplitude,
188 )
190 def calculate_threshold_profile(self, signal: NDArray[np.float64]) -> NDArray[np.float64]:
191 """Calculate threshold values without applying.
193 Implements RE-THR-001: Threshold profile calculation.
195 Args:
196 signal: Input signal.
198 Returns:
199 Array of threshold values.
200 """
201 result = self.apply(signal)
202 return result.thresholds
204 def _apply_with_hysteresis(
205 self,
206 signal: NDArray[np.float64],
207 thresholds: NDArray[np.float64],
208 amplitude: NDArray[np.float64],
209 ) -> tuple[NDArray[np.uint8], list[int]]:
210 """Apply thresholding with hysteresis.
212 The hysteresis prevents rapid oscillation when the signal hovers near
213 the threshold. The margin is calculated as:
214 - If amplitude is significant: hyst_margin = amplitude * hysteresis
215 - If amplitude is small: hyst_margin = hysteresis (used as absolute value)
217 This ensures hysteresis remains effective even for signals with very
218 small amplitude variations.
220 Args:
221 signal: Input signal.
222 thresholds: Threshold values.
223 amplitude: Signal amplitude at each point.
225 Returns:
226 Tuple of (binary_output, crossings).
227 """
228 n_samples = len(signal)
229 binary = np.zeros(n_samples, dtype=np.uint8)
230 crossings = []
232 # Initial state
233 current_state = 1 if signal[0] > thresholds[0] else 0
234 binary[0] = current_state
236 for i in range(1, n_samples):
237 threshold = thresholds[i]
238 amp = amplitude[i]
240 # Calculate hysteresis margin:
241 # - Use amplitude-relative margin for signals with significant amplitude
242 # - Use absolute hysteresis value when amplitude is small
243 # This prevents oscillation for signals hovering around the threshold
244 amplitude_relative_margin = amp * self.hysteresis
245 absolute_margin = self.hysteresis
247 # Use the larger of the two to ensure effective hysteresis
248 # When amplitude is large (e.g., > 1.0), amplitude-relative dominates
249 # When amplitude is small (e.g., 0.02), absolute hysteresis dominates
250 hyst_margin = max(amplitude_relative_margin, absolute_margin)
252 if current_state == 0:
253 # Currently low, need signal above threshold + hysteresis to go high
254 if signal[i] > threshold + hyst_margin:
255 current_state = 1
256 crossings.append(i)
257 else:
258 # Currently high, need signal below threshold - hysteresis to go low
259 if signal[i] < threshold - hyst_margin:
260 current_state = 0
261 crossings.append(i)
263 binary[i] = current_state
265 return binary, crossings
267 def _otsu_threshold(self, data: NDArray[np.float64]) -> float:
268 """Calculate Otsu's threshold.
270 Args:
271 data: Data window.
273 Returns:
274 Optimal threshold value.
275 """
276 # Simplified Otsu's method
277 hist, bin_edges = np.histogram(data, bins=50)
278 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
280 total = hist.sum()
281 if total == 0: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 return float(np.mean(data))
284 current_max = 0
285 threshold = bin_centers[0]
287 sum_total = np.sum(bin_centers * hist)
288 sum_background = 0
289 weight_background = 0
291 for i in range(len(hist)): 291 ↛ 313line 291 didn't jump to line 313 because the loop on line 291 didn't complete
292 weight_background += hist[i]
293 if weight_background == 0:
294 continue
296 weight_foreground = total - weight_background
297 if weight_foreground == 0:
298 break
300 sum_background += bin_centers[i] * hist[i]
302 mean_background = sum_background / weight_background
303 mean_foreground = (sum_total - sum_background) / weight_foreground
305 variance_between = (
306 weight_background * weight_foreground * (mean_background - mean_foreground) ** 2
307 )
309 if variance_between > current_max:
310 current_max = variance_between
311 threshold = bin_centers[i]
313 return float(threshold)
316class MultiLevelDetector:
317 """Detect multi-level logic signals.
319 Implements RE-THR-002: Multi-Level Logic Support.
321 Supports PAM-2, PAM-4, PAM-8, and custom multi-level signaling
322 where signals encode multiple bits per symbol.
324 Example:
325 >>> detector = MultiLevelDetector(levels=4) # PAM-4
326 >>> result = detector.detect(signal)
327 >>> symbols = result.levels
328 """
330 def __init__(
331 self,
332 levels: int | list[float] = 2,
333 auto_detect_levels: bool = True,
334 hysteresis: float = 0.1,
335 ) -> None:
336 """Initialize multi-level detector.
338 Args:
339 levels: Number of levels or explicit voltage levels.
340 auto_detect_levels: Automatically detect level voltages.
341 hysteresis: Hysteresis fraction between levels.
342 """
343 if isinstance(levels, int):
344 self.n_levels = levels
345 self.level_values = None
346 else:
347 self.n_levels = len(levels)
348 self.level_values = sorted(levels)
350 self.auto_detect_levels = auto_detect_levels
351 self.hysteresis = hysteresis
353 def detect(self, signal: NDArray[np.float64]) -> MultiLevelResult:
354 """Detect multi-level logic in signal.
356 Implements RE-THR-002: Multi-level detection workflow.
358 Args:
359 signal: Input analog signal.
361 Returns:
362 MultiLevelResult with detected levels.
364 Example:
365 >>> result = detector.detect(pam4_signal)
366 >>> print(f"Detected {len(result.level_values)} levels")
367 """
368 # Auto-detect level values if needed
369 if self.level_values is None or self.auto_detect_levels:
370 level_values = self._detect_levels(signal)
371 else:
372 level_values = self.level_values
374 # Calculate decision thresholds between levels
375 thresholds = [
376 (level_values[i] + level_values[i + 1]) / 2 for i in range(len(level_values) - 1)
377 ]
379 # Apply hysteresis-aware level detection
380 levels, transitions = self._detect_with_hysteresis(signal, level_values, thresholds)
382 # Calculate level histogram
383 level_histogram = {}
384 for level in range(len(level_values)):
385 level_histogram[level] = int(np.sum(levels == level))
387 # Calculate eye heights
388 eye_heights = self._calculate_eye_heights(signal, level_values)
390 return MultiLevelResult(
391 levels=levels,
392 level_values=level_values,
393 transitions=transitions,
394 level_histogram=level_histogram,
395 eye_heights=eye_heights,
396 )
398 def detect_levels_from_histogram(
399 self, signal: NDArray[np.float64], n_levels: int | None = None
400 ) -> list[float]:
401 """Detect logic levels from signal histogram.
403 Implements RE-THR-002: Level detection.
405 Args:
406 signal: Input signal.
407 n_levels: Expected number of levels (auto-detect if None).
409 Returns:
410 List of detected voltage levels.
411 """
412 if n_levels is None: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 n_levels = self.n_levels
415 return self._detect_levels(signal, n_levels)
417 def calculate_eye_diagram(
418 self,
419 signal: NDArray[np.float64],
420 samples_per_symbol: int,
421 n_symbols: int = 100,
422 ) -> NDArray[np.float64]:
423 """Calculate eye diagram data for multi-level signal.
425 Implements RE-THR-002: Eye diagram support.
427 Args:
428 signal: Input signal.
429 samples_per_symbol: Samples per symbol period.
430 n_symbols: Number of symbols to overlay.
432 Returns:
433 2D array of overlaid symbol waveforms.
434 """
435 n_available = len(signal) // samples_per_symbol
436 n_symbols = min(n_symbols, n_available)
438 # Create 2D array with overlaid symbols
439 eye_data = np.zeros((n_symbols, samples_per_symbol * 2))
441 for i in range(n_symbols):
442 start = i * samples_per_symbol
443 end = start + samples_per_symbol * 2
445 if end <= len(signal):
446 eye_data[i] = signal[start:end]
448 return eye_data
450 def _detect_levels(
451 self, signal: NDArray[np.float64], n_levels: int | None = None
452 ) -> list[float]:
453 """Detect voltage levels using clustering.
455 Args:
456 signal: Input signal.
457 n_levels: Expected number of levels.
459 Returns:
460 List of level voltage values.
461 """
462 if n_levels is None:
463 n_levels = self.n_levels
465 # Use histogram-based clustering
466 hist, bin_edges = np.histogram(signal, bins=100)
467 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
469 # Find peaks in histogram
470 peaks = []
471 for i in range(1, len(hist) - 1):
472 if hist[i] > hist[i - 1] and hist[i] > hist[i + 1]:
473 peaks.append((hist[i], bin_centers[i]))
475 # Sort by frequency and take top n_levels
476 peaks.sort(reverse=True)
477 level_values = sorted([p[1] for p in peaks[:n_levels]])
479 # If not enough peaks found, use evenly spaced levels
480 if len(level_values) < n_levels:
481 min_val = np.min(signal)
482 max_val = np.max(signal)
483 level_values = list(np.linspace(min_val, max_val, n_levels))
485 return level_values
487 def _detect_with_hysteresis(
488 self,
489 signal: NDArray[np.float64],
490 level_values: list[float],
491 thresholds: list[float],
492 ) -> tuple[NDArray[np.int32], list[tuple[int, int, int]]]:
493 """Detect levels with hysteresis.
495 Args:
496 signal: Input signal.
497 level_values: Voltage levels.
498 thresholds: Decision thresholds.
500 Returns:
501 Tuple of (level_array, transitions).
502 """
503 n_samples = len(signal)
504 levels = np.zeros(n_samples, dtype=np.int32)
505 transitions = []
507 # Calculate hysteresis margins
508 margins = []
509 for i in range(len(level_values) - 1):
510 margin = (level_values[i + 1] - level_values[i]) * self.hysteresis
511 margins.append(margin)
513 # Initial level
514 current_level = self._find_closest_level(signal[0], level_values)
515 levels[0] = current_level
517 for i in range(1, n_samples):
518 new_level = current_level
520 # Check for transitions
521 if current_level < len(level_values) - 1:
522 # Can go up
523 upper_threshold = thresholds[current_level] + margins[current_level]
524 if signal[i] > upper_threshold:
525 new_level = current_level + 1
527 if current_level > 0:
528 # Can go down
529 lower_threshold = thresholds[current_level - 1] - margins[current_level - 1]
530 if signal[i] < lower_threshold:
531 new_level = current_level - 1
533 if new_level != current_level:
534 transitions.append((i, current_level, new_level))
535 current_level = new_level
537 levels[i] = current_level
539 return levels, transitions
541 def _find_closest_level(self, value: float, level_values: list[float]) -> int:
542 """Find closest level to value.
544 Args:
545 value: Sample value.
546 level_values: Level voltages.
548 Returns:
549 Level index.
550 """
551 distances = [abs(value - lv) for lv in level_values]
552 return int(np.argmin(distances))
554 def _calculate_eye_heights(
555 self, signal: NDArray[np.float64], level_values: list[float]
556 ) -> list[float]:
557 """Calculate eye heights between levels.
559 Args:
560 signal: Input signal.
561 level_values: Level voltages.
563 Returns:
564 List of eye heights for each level transition.
565 """
566 eye_heights = []
568 for i in range(len(level_values) - 1):
569 lower = level_values[i]
570 upper = level_values[i + 1]
572 # Find samples near each level
573 lower_samples = signal[np.abs(signal - lower) < (upper - lower) * 0.2]
574 upper_samples = signal[np.abs(signal - upper) < (upper - lower) * 0.2]
576 if len(lower_samples) > 0 and len(upper_samples) > 0:
577 # Eye height is gap between worst cases
578 worst_low = np.max(lower_samples)
579 worst_high = np.min(upper_samples)
580 eye_height = worst_high - worst_low
581 else:
582 eye_height = upper - lower
584 eye_heights.append(max(0, eye_height))
586 return eye_heights
589# =============================================================================
590# Convenience functions
591# =============================================================================
594def apply_adaptive_threshold(
595 signal: NDArray[np.float64],
596 window_size: int = 1024,
597 method: Literal["median", "mean", "envelope", "otsu"] = "median",
598 hysteresis: float = 0.05,
599) -> AdaptiveThresholdResult:
600 """Apply adaptive thresholding to a signal.
602 Implements RE-THR-001: Time-Varying Threshold Support.
604 Args:
605 signal: Input analog signal.
606 window_size: Adaptive window size.
607 method: Thresholding method.
608 hysteresis: Hysteresis margin.
610 Returns:
611 AdaptiveThresholdResult with thresholds and binary output.
613 Example:
614 >>> result = apply_adaptive_threshold(noisy_signal)
615 >>> digital = result.binary_output
616 """
617 thresholder = AdaptiveThresholder(
618 window_size=window_size,
619 method=method,
620 hysteresis=hysteresis,
621 )
622 return thresholder.apply(signal)
625def detect_multi_level(
626 signal: NDArray[np.float64],
627 n_levels: int = 4,
628 auto_detect: bool = True,
629 hysteresis: float = 0.1,
630) -> MultiLevelResult:
631 """Detect multi-level logic in signal.
633 Implements RE-THR-002: Multi-Level Logic Support.
635 Args:
636 signal: Input analog signal.
637 n_levels: Expected number of levels.
638 auto_detect: Automatically detect level voltages.
639 hysteresis: Hysteresis between levels.
641 Returns:
642 MultiLevelResult with detected levels.
644 Example:
645 >>> result = detect_multi_level(pam4_signal, n_levels=4)
646 >>> symbols = result.levels
647 """
648 detector = MultiLevelDetector(
649 levels=n_levels,
650 auto_detect_levels=auto_detect,
651 hysteresis=hysteresis,
652 )
653 return detector.detect(signal)
656def calculate_threshold_snr(
657 signal: NDArray[np.float64],
658 threshold: float | NDArray[np.float64],
659) -> float:
660 """Calculate signal-to-noise ratio at threshold.
662 Implements RE-THR-001: Threshold quality metric.
664 Args:
665 signal: Input signal.
666 threshold: Threshold value(s).
668 Returns:
669 Estimated SNR in dB.
670 """
671 if isinstance(threshold, np.ndarray):
672 threshold = float(np.mean(threshold))
674 # Separate high and low samples
675 high_samples = signal[signal > threshold]
676 low_samples = signal[signal <= threshold]
678 if len(high_samples) == 0 or len(low_samples) == 0:
679 return 0.0
681 # Calculate signal power (difference between means)
682 signal_power = (np.mean(high_samples) - np.mean(low_samples)) ** 2
684 # Calculate noise power (variance around means)
685 noise_power = (np.var(high_samples) + np.var(low_samples)) / 2
687 if noise_power == 0:
688 return 100.0 # Very high SNR
690 snr_linear = signal_power / noise_power
691 snr_db = 10 * np.log10(snr_linear)
693 return float(snr_db)
696__all__ = [
697 "AdaptiveThresholdResult",
698 # Classes
699 "AdaptiveThresholder",
700 "MultiLevelDetector",
701 "MultiLevelResult",
702 # Data classes
703 "ThresholdConfig",
704 # Functions
705 "apply_adaptive_threshold",
706 "calculate_threshold_snr",
707 "detect_multi_level",
708]