Coverage for src / tracekit / analyzers / digital / thresholds.py: 98%

226 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Time-varying and multi-level threshold support for digital signal analysis. 

2 

3 - RE-THR-001: Time-Varying Threshold Support 

4 - RE-THR-002: Multi-Level Logic Support 

5 

6This module provides adaptive thresholding for signals with varying DC offset 

7or amplitude, and support for multi-level logic standards beyond simple 

8high/low states. 

9""" 

10 

11from __future__ import annotations 

12 

13from dataclasses import dataclass, field 

14from typing import TYPE_CHECKING, Literal 

15 

16import numpy as np 

17 

18if TYPE_CHECKING: 

19 from numpy.typing import NDArray 

20 

21 

22@dataclass 

23class ThresholdConfig: 

24 """Configuration for threshold detection. 

25 

26 Implements RE-THR-001, RE-THR-002: Threshold configuration. 

27 

28 Attributes: 

29 threshold_type: Type of thresholding ('fixed', 'adaptive', 'multi_level'). 

30 fixed_threshold: Fixed threshold value (for 'fixed' type). 

31 window_size: Window size for adaptive thresholding. 

32 percentile: Percentile for adaptive threshold calculation. 

33 levels: Voltage levels for multi-level logic. 

34 hysteresis: Hysteresis margin to prevent oscillation. 

35 """ 

36 

37 threshold_type: Literal["fixed", "adaptive", "multi_level"] = "fixed" 

38 fixed_threshold: float = 0.5 

39 window_size: int = 1024 

40 percentile: float = 50.0 

41 levels: list[float] = field(default_factory=lambda: [0.0, 1.0]) 

42 hysteresis: float = 0.05 

43 

44 

45@dataclass 

46class AdaptiveThresholdResult: 

47 """Result of adaptive threshold calculation. 

48 

49 Implements RE-THR-001: Time-varying threshold result. 

50 

51 Attributes: 

52 thresholds: Array of threshold values at each sample. 

53 binary_output: Digitized signal. 

54 crossings: Indices of threshold crossings. 

55 dc_offset: Estimated DC offset over time. 

56 amplitude: Estimated signal amplitude over time. 

57 """ 

58 

59 thresholds: NDArray[np.float64] 

60 binary_output: NDArray[np.uint8] 

61 crossings: list[int] 

62 dc_offset: NDArray[np.float64] 

63 amplitude: NDArray[np.float64] 

64 

65 

66@dataclass 

67class MultiLevelResult: 

68 """Result of multi-level logic detection. 

69 

70 Implements RE-THR-002: Multi-level detection result. 

71 

72 Attributes: 

73 levels: Detected logic levels at each sample. 

74 level_values: Voltage levels used. 

75 transitions: List of (index, from_level, to_level) transitions. 

76 level_histogram: Count of samples at each level. 

77 eye_heights: Eye height for each transition. 

78 """ 

79 

80 levels: NDArray[np.int32] 

81 level_values: list[float] 

82 transitions: list[tuple[int, int, int]] 

83 level_histogram: dict[int, int] 

84 eye_heights: list[float] 

85 

86 

87class AdaptiveThresholder: 

88 """Apply time-varying thresholds to signals. 

89 

90 Implements RE-THR-001: Time-Varying Threshold Support. 

91 

92 Tracks DC offset and amplitude changes to maintain accurate 

93 thresholding despite signal drift. 

94 

95 Example: 

96 >>> thresholder = AdaptiveThresholder(window_size=1000) 

97 >>> result = thresholder.apply(analog_signal) 

98 >>> digital = result.binary_output 

99 """ 

100 

101 def __init__( 

102 self, 

103 window_size: int = 1024, 

104 percentile: float = 50.0, 

105 method: Literal["median", "mean", "envelope", "otsu"] = "median", 

106 hysteresis: float = 0.05, 

107 ) -> None: 

108 """Initialize adaptive thresholder. 

109 

110 Args: 

111 window_size: Size of sliding window for adaptation. 

112 percentile: Percentile for threshold calculation. 

113 method: Thresholding method. 

114 hysteresis: Hysteresis margin. This is used as an absolute value 

115 when amplitude-relative calculation would be too small. For 

116 signals with small amplitude variations (e.g., oscillating 

117 around a threshold), this value is applied directly. 

118 """ 

119 self.window_size = window_size 

120 self.percentile = percentile 

121 self.method = method 

122 self.hysteresis = hysteresis 

123 

124 def apply(self, signal: NDArray[np.float64]) -> AdaptiveThresholdResult: 

125 """Apply adaptive thresholding to signal. 

126 

127 Implements RE-THR-001: Adaptive threshold application. 

128 

129 Args: 

130 signal: Input analog signal. 

131 

132 Returns: 

133 AdaptiveThresholdResult with thresholds and digitized output. 

134 

135 Example: 

136 >>> result = thresholder.apply(analog_waveform) 

137 >>> plt.plot(result.binary_output) 

138 """ 

139 n_samples = len(signal) 

140 

141 # Estimate DC offset and amplitude over time 

142 dc_offset = np.zeros(n_samples) 

143 amplitude = np.zeros(n_samples) 

144 thresholds = np.zeros(n_samples) 

145 

146 half_window = self.window_size // 2 

147 

148 for i in range(n_samples): 

149 # Window bounds 

150 start = max(0, i - half_window) 

151 end = min(n_samples, i + half_window) 

152 window = signal[start:end] 

153 

154 if self.method == "median": 

155 dc_offset[i] = np.median(window) 

156 amplitude[i] = np.percentile(window, 95) - np.percentile(window, 5) 

157 thresholds[i] = dc_offset[i] 

158 

159 elif self.method == "mean": 

160 dc_offset[i] = np.mean(window) 

161 amplitude[i] = np.std(window) * 4 # Approximate peak-to-peak 

162 thresholds[i] = dc_offset[i] 

163 

164 elif self.method == "envelope": 

165 # Use min/max envelope 

166 high = np.max(window) 

167 low = np.min(window) 

168 dc_offset[i] = (high + low) / 2 

169 amplitude[i] = high - low 

170 thresholds[i] = dc_offset[i] 

171 

172 elif self.method == "otsu": 172 ↛ 148line 172 didn't jump to line 148 because the condition on line 172 was always true

173 # Simplified Otsu's method 

174 threshold = self._otsu_threshold(window) 

175 thresholds[i] = threshold 

176 dc_offset[i] = threshold 

177 amplitude[i] = np.max(window) - np.min(window) 

178 

179 # Apply hysteresis 

180 binary_output, crossings = self._apply_with_hysteresis(signal, thresholds, amplitude) 

181 

182 return AdaptiveThresholdResult( 

183 thresholds=thresholds, 

184 binary_output=binary_output, 

185 crossings=crossings, 

186 dc_offset=dc_offset, 

187 amplitude=amplitude, 

188 ) 

189 

190 def calculate_threshold_profile(self, signal: NDArray[np.float64]) -> NDArray[np.float64]: 

191 """Calculate threshold values without applying. 

192 

193 Implements RE-THR-001: Threshold profile calculation. 

194 

195 Args: 

196 signal: Input signal. 

197 

198 Returns: 

199 Array of threshold values. 

200 """ 

201 result = self.apply(signal) 

202 return result.thresholds 

203 

204 def _apply_with_hysteresis( 

205 self, 

206 signal: NDArray[np.float64], 

207 thresholds: NDArray[np.float64], 

208 amplitude: NDArray[np.float64], 

209 ) -> tuple[NDArray[np.uint8], list[int]]: 

210 """Apply thresholding with hysteresis. 

211 

212 The hysteresis prevents rapid oscillation when the signal hovers near 

213 the threshold. The margin is calculated as: 

214 - If amplitude is significant: hyst_margin = amplitude * hysteresis 

215 - If amplitude is small: hyst_margin = hysteresis (used as absolute value) 

216 

217 This ensures hysteresis remains effective even for signals with very 

218 small amplitude variations. 

219 

220 Args: 

221 signal: Input signal. 

222 thresholds: Threshold values. 

223 amplitude: Signal amplitude at each point. 

224 

225 Returns: 

226 Tuple of (binary_output, crossings). 

227 """ 

228 n_samples = len(signal) 

229 binary = np.zeros(n_samples, dtype=np.uint8) 

230 crossings = [] 

231 

232 # Initial state 

233 current_state = 1 if signal[0] > thresholds[0] else 0 

234 binary[0] = current_state 

235 

236 for i in range(1, n_samples): 

237 threshold = thresholds[i] 

238 amp = amplitude[i] 

239 

240 # Calculate hysteresis margin: 

241 # - Use amplitude-relative margin for signals with significant amplitude 

242 # - Use absolute hysteresis value when amplitude is small 

243 # This prevents oscillation for signals hovering around the threshold 

244 amplitude_relative_margin = amp * self.hysteresis 

245 absolute_margin = self.hysteresis 

246 

247 # Use the larger of the two to ensure effective hysteresis 

248 # When amplitude is large (e.g., > 1.0), amplitude-relative dominates 

249 # When amplitude is small (e.g., 0.02), absolute hysteresis dominates 

250 hyst_margin = max(amplitude_relative_margin, absolute_margin) 

251 

252 if current_state == 0: 

253 # Currently low, need signal above threshold + hysteresis to go high 

254 if signal[i] > threshold + hyst_margin: 

255 current_state = 1 

256 crossings.append(i) 

257 else: 

258 # Currently high, need signal below threshold - hysteresis to go low 

259 if signal[i] < threshold - hyst_margin: 

260 current_state = 0 

261 crossings.append(i) 

262 

263 binary[i] = current_state 

264 

265 return binary, crossings 

266 

267 def _otsu_threshold(self, data: NDArray[np.float64]) -> float: 

268 """Calculate Otsu's threshold. 

269 

270 Args: 

271 data: Data window. 

272 

273 Returns: 

274 Optimal threshold value. 

275 """ 

276 # Simplified Otsu's method 

277 hist, bin_edges = np.histogram(data, bins=50) 

278 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 

279 

280 total = hist.sum() 

281 if total == 0: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 return float(np.mean(data)) 

283 

284 current_max = 0 

285 threshold = bin_centers[0] 

286 

287 sum_total = np.sum(bin_centers * hist) 

288 sum_background = 0 

289 weight_background = 0 

290 

291 for i in range(len(hist)): 291 ↛ 313line 291 didn't jump to line 313 because the loop on line 291 didn't complete

292 weight_background += hist[i] 

293 if weight_background == 0: 

294 continue 

295 

296 weight_foreground = total - weight_background 

297 if weight_foreground == 0: 

298 break 

299 

300 sum_background += bin_centers[i] * hist[i] 

301 

302 mean_background = sum_background / weight_background 

303 mean_foreground = (sum_total - sum_background) / weight_foreground 

304 

305 variance_between = ( 

306 weight_background * weight_foreground * (mean_background - mean_foreground) ** 2 

307 ) 

308 

309 if variance_between > current_max: 

310 current_max = variance_between 

311 threshold = bin_centers[i] 

312 

313 return float(threshold) 

314 

315 

316class MultiLevelDetector: 

317 """Detect multi-level logic signals. 

318 

319 Implements RE-THR-002: Multi-Level Logic Support. 

320 

321 Supports PAM-2, PAM-4, PAM-8, and custom multi-level signaling 

322 where signals encode multiple bits per symbol. 

323 

324 Example: 

325 >>> detector = MultiLevelDetector(levels=4) # PAM-4 

326 >>> result = detector.detect(signal) 

327 >>> symbols = result.levels 

328 """ 

329 

330 def __init__( 

331 self, 

332 levels: int | list[float] = 2, 

333 auto_detect_levels: bool = True, 

334 hysteresis: float = 0.1, 

335 ) -> None: 

336 """Initialize multi-level detector. 

337 

338 Args: 

339 levels: Number of levels or explicit voltage levels. 

340 auto_detect_levels: Automatically detect level voltages. 

341 hysteresis: Hysteresis fraction between levels. 

342 """ 

343 if isinstance(levels, int): 

344 self.n_levels = levels 

345 self.level_values = None 

346 else: 

347 self.n_levels = len(levels) 

348 self.level_values = sorted(levels) 

349 

350 self.auto_detect_levels = auto_detect_levels 

351 self.hysteresis = hysteresis 

352 

353 def detect(self, signal: NDArray[np.float64]) -> MultiLevelResult: 

354 """Detect multi-level logic in signal. 

355 

356 Implements RE-THR-002: Multi-level detection workflow. 

357 

358 Args: 

359 signal: Input analog signal. 

360 

361 Returns: 

362 MultiLevelResult with detected levels. 

363 

364 Example: 

365 >>> result = detector.detect(pam4_signal) 

366 >>> print(f"Detected {len(result.level_values)} levels") 

367 """ 

368 # Auto-detect level values if needed 

369 if self.level_values is None or self.auto_detect_levels: 

370 level_values = self._detect_levels(signal) 

371 else: 

372 level_values = self.level_values 

373 

374 # Calculate decision thresholds between levels 

375 thresholds = [ 

376 (level_values[i] + level_values[i + 1]) / 2 for i in range(len(level_values) - 1) 

377 ] 

378 

379 # Apply hysteresis-aware level detection 

380 levels, transitions = self._detect_with_hysteresis(signal, level_values, thresholds) 

381 

382 # Calculate level histogram 

383 level_histogram = {} 

384 for level in range(len(level_values)): 

385 level_histogram[level] = int(np.sum(levels == level)) 

386 

387 # Calculate eye heights 

388 eye_heights = self._calculate_eye_heights(signal, level_values) 

389 

390 return MultiLevelResult( 

391 levels=levels, 

392 level_values=level_values, 

393 transitions=transitions, 

394 level_histogram=level_histogram, 

395 eye_heights=eye_heights, 

396 ) 

397 

398 def detect_levels_from_histogram( 

399 self, signal: NDArray[np.float64], n_levels: int | None = None 

400 ) -> list[float]: 

401 """Detect logic levels from signal histogram. 

402 

403 Implements RE-THR-002: Level detection. 

404 

405 Args: 

406 signal: Input signal. 

407 n_levels: Expected number of levels (auto-detect if None). 

408 

409 Returns: 

410 List of detected voltage levels. 

411 """ 

412 if n_levels is None: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true

413 n_levels = self.n_levels 

414 

415 return self._detect_levels(signal, n_levels) 

416 

417 def calculate_eye_diagram( 

418 self, 

419 signal: NDArray[np.float64], 

420 samples_per_symbol: int, 

421 n_symbols: int = 100, 

422 ) -> NDArray[np.float64]: 

423 """Calculate eye diagram data for multi-level signal. 

424 

425 Implements RE-THR-002: Eye diagram support. 

426 

427 Args: 

428 signal: Input signal. 

429 samples_per_symbol: Samples per symbol period. 

430 n_symbols: Number of symbols to overlay. 

431 

432 Returns: 

433 2D array of overlaid symbol waveforms. 

434 """ 

435 n_available = len(signal) // samples_per_symbol 

436 n_symbols = min(n_symbols, n_available) 

437 

438 # Create 2D array with overlaid symbols 

439 eye_data = np.zeros((n_symbols, samples_per_symbol * 2)) 

440 

441 for i in range(n_symbols): 

442 start = i * samples_per_symbol 

443 end = start + samples_per_symbol * 2 

444 

445 if end <= len(signal): 

446 eye_data[i] = signal[start:end] 

447 

448 return eye_data 

449 

450 def _detect_levels( 

451 self, signal: NDArray[np.float64], n_levels: int | None = None 

452 ) -> list[float]: 

453 """Detect voltage levels using clustering. 

454 

455 Args: 

456 signal: Input signal. 

457 n_levels: Expected number of levels. 

458 

459 Returns: 

460 List of level voltage values. 

461 """ 

462 if n_levels is None: 

463 n_levels = self.n_levels 

464 

465 # Use histogram-based clustering 

466 hist, bin_edges = np.histogram(signal, bins=100) 

467 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 

468 

469 # Find peaks in histogram 

470 peaks = [] 

471 for i in range(1, len(hist) - 1): 

472 if hist[i] > hist[i - 1] and hist[i] > hist[i + 1]: 

473 peaks.append((hist[i], bin_centers[i])) 

474 

475 # Sort by frequency and take top n_levels 

476 peaks.sort(reverse=True) 

477 level_values = sorted([p[1] for p in peaks[:n_levels]]) 

478 

479 # If not enough peaks found, use evenly spaced levels 

480 if len(level_values) < n_levels: 

481 min_val = np.min(signal) 

482 max_val = np.max(signal) 

483 level_values = list(np.linspace(min_val, max_val, n_levels)) 

484 

485 return level_values 

486 

487 def _detect_with_hysteresis( 

488 self, 

489 signal: NDArray[np.float64], 

490 level_values: list[float], 

491 thresholds: list[float], 

492 ) -> tuple[NDArray[np.int32], list[tuple[int, int, int]]]: 

493 """Detect levels with hysteresis. 

494 

495 Args: 

496 signal: Input signal. 

497 level_values: Voltage levels. 

498 thresholds: Decision thresholds. 

499 

500 Returns: 

501 Tuple of (level_array, transitions). 

502 """ 

503 n_samples = len(signal) 

504 levels = np.zeros(n_samples, dtype=np.int32) 

505 transitions = [] 

506 

507 # Calculate hysteresis margins 

508 margins = [] 

509 for i in range(len(level_values) - 1): 

510 margin = (level_values[i + 1] - level_values[i]) * self.hysteresis 

511 margins.append(margin) 

512 

513 # Initial level 

514 current_level = self._find_closest_level(signal[0], level_values) 

515 levels[0] = current_level 

516 

517 for i in range(1, n_samples): 

518 new_level = current_level 

519 

520 # Check for transitions 

521 if current_level < len(level_values) - 1: 

522 # Can go up 

523 upper_threshold = thresholds[current_level] + margins[current_level] 

524 if signal[i] > upper_threshold: 

525 new_level = current_level + 1 

526 

527 if current_level > 0: 

528 # Can go down 

529 lower_threshold = thresholds[current_level - 1] - margins[current_level - 1] 

530 if signal[i] < lower_threshold: 

531 new_level = current_level - 1 

532 

533 if new_level != current_level: 

534 transitions.append((i, current_level, new_level)) 

535 current_level = new_level 

536 

537 levels[i] = current_level 

538 

539 return levels, transitions 

540 

541 def _find_closest_level(self, value: float, level_values: list[float]) -> int: 

542 """Find closest level to value. 

543 

544 Args: 

545 value: Sample value. 

546 level_values: Level voltages. 

547 

548 Returns: 

549 Level index. 

550 """ 

551 distances = [abs(value - lv) for lv in level_values] 

552 return int(np.argmin(distances)) 

553 

554 def _calculate_eye_heights( 

555 self, signal: NDArray[np.float64], level_values: list[float] 

556 ) -> list[float]: 

557 """Calculate eye heights between levels. 

558 

559 Args: 

560 signal: Input signal. 

561 level_values: Level voltages. 

562 

563 Returns: 

564 List of eye heights for each level transition. 

565 """ 

566 eye_heights = [] 

567 

568 for i in range(len(level_values) - 1): 

569 lower = level_values[i] 

570 upper = level_values[i + 1] 

571 

572 # Find samples near each level 

573 lower_samples = signal[np.abs(signal - lower) < (upper - lower) * 0.2] 

574 upper_samples = signal[np.abs(signal - upper) < (upper - lower) * 0.2] 

575 

576 if len(lower_samples) > 0 and len(upper_samples) > 0: 

577 # Eye height is gap between worst cases 

578 worst_low = np.max(lower_samples) 

579 worst_high = np.min(upper_samples) 

580 eye_height = worst_high - worst_low 

581 else: 

582 eye_height = upper - lower 

583 

584 eye_heights.append(max(0, eye_height)) 

585 

586 return eye_heights 

587 

588 

589# ============================================================================= 

590# Convenience functions 

591# ============================================================================= 

592 

593 

594def apply_adaptive_threshold( 

595 signal: NDArray[np.float64], 

596 window_size: int = 1024, 

597 method: Literal["median", "mean", "envelope", "otsu"] = "median", 

598 hysteresis: float = 0.05, 

599) -> AdaptiveThresholdResult: 

600 """Apply adaptive thresholding to a signal. 

601 

602 Implements RE-THR-001: Time-Varying Threshold Support. 

603 

604 Args: 

605 signal: Input analog signal. 

606 window_size: Adaptive window size. 

607 method: Thresholding method. 

608 hysteresis: Hysteresis margin. 

609 

610 Returns: 

611 AdaptiveThresholdResult with thresholds and binary output. 

612 

613 Example: 

614 >>> result = apply_adaptive_threshold(noisy_signal) 

615 >>> digital = result.binary_output 

616 """ 

617 thresholder = AdaptiveThresholder( 

618 window_size=window_size, 

619 method=method, 

620 hysteresis=hysteresis, 

621 ) 

622 return thresholder.apply(signal) 

623 

624 

625def detect_multi_level( 

626 signal: NDArray[np.float64], 

627 n_levels: int = 4, 

628 auto_detect: bool = True, 

629 hysteresis: float = 0.1, 

630) -> MultiLevelResult: 

631 """Detect multi-level logic in signal. 

632 

633 Implements RE-THR-002: Multi-Level Logic Support. 

634 

635 Args: 

636 signal: Input analog signal. 

637 n_levels: Expected number of levels. 

638 auto_detect: Automatically detect level voltages. 

639 hysteresis: Hysteresis between levels. 

640 

641 Returns: 

642 MultiLevelResult with detected levels. 

643 

644 Example: 

645 >>> result = detect_multi_level(pam4_signal, n_levels=4) 

646 >>> symbols = result.levels 

647 """ 

648 detector = MultiLevelDetector( 

649 levels=n_levels, 

650 auto_detect_levels=auto_detect, 

651 hysteresis=hysteresis, 

652 ) 

653 return detector.detect(signal) 

654 

655 

656def calculate_threshold_snr( 

657 signal: NDArray[np.float64], 

658 threshold: float | NDArray[np.float64], 

659) -> float: 

660 """Calculate signal-to-noise ratio at threshold. 

661 

662 Implements RE-THR-001: Threshold quality metric. 

663 

664 Args: 

665 signal: Input signal. 

666 threshold: Threshold value(s). 

667 

668 Returns: 

669 Estimated SNR in dB. 

670 """ 

671 if isinstance(threshold, np.ndarray): 

672 threshold = float(np.mean(threshold)) 

673 

674 # Separate high and low samples 

675 high_samples = signal[signal > threshold] 

676 low_samples = signal[signal <= threshold] 

677 

678 if len(high_samples) == 0 or len(low_samples) == 0: 

679 return 0.0 

680 

681 # Calculate signal power (difference between means) 

682 signal_power = (np.mean(high_samples) - np.mean(low_samples)) ** 2 

683 

684 # Calculate noise power (variance around means) 

685 noise_power = (np.var(high_samples) + np.var(low_samples)) / 2 

686 

687 if noise_power == 0: 

688 return 100.0 # Very high SNR 

689 

690 snr_linear = signal_power / noise_power 

691 snr_db = 10 * np.log10(snr_linear) 

692 

693 return float(snr_db) 

694 

695 

696__all__ = [ 

697 "AdaptiveThresholdResult", 

698 # Classes 

699 "AdaptiveThresholder", 

700 "MultiLevelDetector", 

701 "MultiLevelResult", 

702 # Data classes 

703 "ThresholdConfig", 

704 # Functions 

705 "apply_adaptive_threshold", 

706 "calculate_threshold_snr", 

707 "detect_multi_level", 

708]