Coverage for src / tracekit / analyzers / digital / quality.py: 94%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Signal quality analysis for digital signals. 

2 

3This module provides signal quality metrics including noise margin 

4calculation, setup/hold violation detection, and glitch detection. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.digital.quality import noise_margin, detect_glitches 

9 >>> margins = noise_margin(trace, family="TTL") 

10 >>> glitches = detect_glitches(trace, min_width=10e-9) 

11 

12References: 

13 JEDEC Standard No. 8C: High-Speed CMOS Interface 

14 Various IC manufacturer datasheets for logic family specifications 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass 

20from typing import TYPE_CHECKING, Literal 

21 

22import numpy as np 

23 

24from tracekit.analyzers.digital.extraction import LOGIC_FAMILIES 

25from tracekit.analyzers.digital.timing import hold_time, setup_time 

26from tracekit.core.exceptions import InsufficientDataError 

27from tracekit.core.types import DigitalTrace, WaveformTrace 

28 

29if TYPE_CHECKING: 

30 from typing import Any 

31 

32 from numpy.typing import NDArray 

33 

34 

35@dataclass 

36class NoiseMarginResult: 

37 """Result of noise margin calculation. 

38 

39 Attributes: 

40 nm_high: Noise margin high (VOH_min - VIH_min). 

41 nm_low: Noise margin low (VIL_max - VOL_max). 

42 logic_family: Logic family used for calculation. 

43 voh: Output high voltage (measured or spec). 

44 vol: Output low voltage (measured or spec). 

45 vih: Input high threshold (from spec). 

46 vil: Input low threshold (from spec). 

47 """ 

48 

49 nm_high: float 

50 nm_low: float 

51 logic_family: str 

52 voh: float 

53 vol: float 

54 vih: float 

55 vil: float 

56 

57 

58@dataclass 

59class Violation: 

60 """Represents a timing or signal violation. 

61 

62 Attributes: 

63 timestamp: Time of violation in seconds. 

64 violation_type: Type of violation. 

65 measured: Measured value. 

66 limit: Specification limit. 

67 margin: Margin to specification (negative = violation). 

68 end_timestamp: End time of violation (if applicable). 

69 """ 

70 

71 timestamp: float 

72 violation_type: str 

73 measured: float 

74 limit: float 

75 margin: float 

76 end_timestamp: float | None = None 

77 

78 

79@dataclass 

80class Glitch: 

81 """Represents a detected glitch. 

82 

83 Attributes: 

84 timestamp: Start time of glitch in seconds. 

85 width: Duration of glitch in seconds. 

86 polarity: "positive" (spike high) or "negative" (spike low). 

87 amplitude: Peak amplitude of glitch. 

88 """ 

89 

90 timestamp: float 

91 width: float 

92 polarity: Literal["positive", "negative"] 

93 amplitude: float 

94 

95 

96def noise_margin( 

97 trace: WaveformTrace, 

98 *, 

99 family: str = "LVCMOS_3V3", 

100 use_measured_levels: bool = True, 

101) -> NoiseMarginResult: 

102 """Calculate noise margins for a digital signal. 

103 

104 Computes noise margin high (NMH) and noise margin low (NML) based on 

105 measured signal levels or logic family specifications. 

106 

107 Args: 

108 trace: Input waveform trace. 

109 family: Logic family for threshold levels. 

110 Options: TTL, CMOS_5V, LVTTL, LVCMOS_3V3, LVCMOS_2V5, LVCMOS_1V8, LVCMOS_1V2 

111 use_measured_levels: If True, use measured VOH/VOL from signal. 

112 If False, use spec values from logic family. 

113 

114 Returns: 

115 NoiseMarginResult with calculated margins. 

116 

117 Raises: 

118 ValueError: If logic family is not recognized. 

119 

120 Example: 

121 >>> result = noise_margin(trace, family="TTL") 

122 >>> print(f"NMH: {result.nm_high:.3f} V") 

123 >>> print(f"NML: {result.nm_low:.3f} V") 

124 

125 References: 

126 JEDEC Standard No. 8C 

127 """ 

128 if family not in LOGIC_FAMILIES: 

129 available = ", ".join(LOGIC_FAMILIES.keys()) 

130 raise ValueError(f"Unknown logic family: {family}. Available: {available}") 

131 

132 specs = LOGIC_FAMILIES[family] 

133 vih = specs["VIH_min"] 

134 vil = specs["VIL_max"] 

135 

136 if use_measured_levels and len(trace.data) > 0: 

137 # Measure actual output levels from signal 

138 data = trace.data 

139 low, high = _find_logic_levels(data) 

140 voh = high 

141 vol = low 

142 else: 

143 # Use specification values 

144 voh = specs["VOH_min"] 

145 vol = specs["VOL_max"] 

146 

147 # Calculate noise margins 

148 # NMH = VOH - VIH (margin when output is high) 

149 # NML = VIL - VOL (margin when output is low) 

150 nm_high = voh - vih 

151 nm_low = vil - vol 

152 

153 return NoiseMarginResult( 

154 nm_high=nm_high, 

155 nm_low=nm_low, 

156 logic_family=family, 

157 voh=voh, 

158 vol=vol, 

159 vih=vih, 

160 vil=vil, 

161 ) 

162 

163 

164def detect_violations( 

165 data_trace: WaveformTrace | DigitalTrace, 

166 clock_trace: WaveformTrace | DigitalTrace, 

167 *, 

168 setup_spec: float, 

169 hold_spec: float, 

170 clock_edge: Literal["rising", "falling"] = "rising", 

171) -> list[Violation]: 

172 """Detect setup and hold time violations. 

173 

174 Compares measured setup and hold times to specifications and 

175 reports any violations with timestamps and margins. 

176 

177 Args: 

178 data_trace: Data signal trace. 

179 clock_trace: Clock signal trace. 

180 setup_spec: Required setup time in seconds. 

181 hold_spec: Required hold time in seconds. 

182 clock_edge: Clock edge to reference ("rising" or "falling"). 

183 

184 Returns: 

185 List of Violation objects for each detected violation. 

186 

187 Example: 

188 >>> violations = detect_violations( 

189 ... data_trace, clock_trace, 

190 ... setup_spec=2e-9, hold_spec=1e-9 

191 ... ) 

192 >>> for v in violations: 

193 ... print(f"{v.violation_type}: {v.margin*1e12:.0f} ps margin") 

194 

195 References: 

196 JEDEC Standard No. 65B 

197 """ 

198 violations: list[Violation] = [] 

199 

200 # Get all setup times 

201 setup_times = setup_time(data_trace, clock_trace, clock_edge=clock_edge, return_all=True) 

202 

203 if isinstance(setup_times, np.ndarray) and len(setup_times) > 0: 

204 clock_edges = _get_clock_edges(clock_trace, clock_edge) 

205 

206 for _i, (t_setup, clk_edge) in enumerate( 

207 zip(setup_times, clock_edges[: len(setup_times)], strict=False) 

208 ): 

209 margin = t_setup - setup_spec 

210 if margin < 0: # Violation 

211 violations.append( 

212 Violation( 

213 timestamp=clk_edge, 

214 violation_type="setup", 

215 measured=t_setup, 

216 limit=setup_spec, 

217 margin=margin, 

218 ) 

219 ) 

220 

221 # Get all hold times 

222 hold_times = hold_time(data_trace, clock_trace, clock_edge=clock_edge, return_all=True) 

223 

224 if isinstance(hold_times, np.ndarray) and len(hold_times) > 0: 

225 clock_edges = _get_clock_edges(clock_trace, clock_edge) 

226 

227 for _i, (t_hold, clk_edge) in enumerate( 

228 zip(hold_times, clock_edges[: len(hold_times)], strict=False) 

229 ): 

230 margin = t_hold - hold_spec 

231 if margin < 0: # Violation 

232 violations.append( 

233 Violation( 

234 timestamp=clk_edge, 

235 violation_type="hold", 

236 measured=t_hold, 

237 limit=hold_spec, 

238 margin=margin, 

239 ) 

240 ) 

241 

242 # Sort by timestamp 

243 violations.sort(key=lambda v: v.timestamp) 

244 

245 return violations 

246 

247 

248def detect_glitches( 

249 trace: WaveformTrace | DigitalTrace, 

250 *, 

251 min_width: float, 

252 threshold: float | None = None, 

253) -> list[Glitch]: 

254 """Detect glitches (pulses shorter than minimum width). 

255 

256 Identifies short pulses that violate minimum pulse width specifications, 

257 which may cause logic errors or be artifacts. 

258 

259 Args: 

260 trace: Input trace (analog or digital). 

261 min_width: Minimum valid pulse width in seconds. 

262 threshold: Threshold for digital conversion (analog traces only). 

263 If None, auto-detected from signal. 

264 

265 Returns: 

266 List of Glitch objects for each detected glitch. 

267 

268 Example: 

269 >>> glitches = detect_glitches(trace, min_width=10e-9) 

270 >>> for g in glitches: 

271 ... print(f"Glitch at {g.timestamp*1e6:.2f} us, width={g.width*1e9:.1f} ns") 

272 

273 References: 

274 Application note AN-905: Understanding Glitch Detection 

275 """ 

276 if isinstance(trace, DigitalTrace): 

277 # Already digital - use directly 

278 digital = trace.data 

279 sample_rate = trace.metadata.sample_rate 

280 threshold_used = 0.5 # Not used for amplitude calc on digital 

281 data = trace.data.astype(np.float64) 

282 else: 

283 # Analog trace - need to threshold 

284 data = trace.data 

285 sample_rate = trace.metadata.sample_rate 

286 

287 if len(data) < 3: 

288 return [] 

289 

290 # Find threshold 

291 low, high = _find_logic_levels(data) 

292 threshold_used = (low + high) / 2 if threshold is None else threshold 

293 

294 amplitude = high - low 

295 if amplitude <= 0: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 return [] 

297 

298 # Convert to binary 

299 digital = data >= threshold_used 

300 

301 if len(digital) < 3: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 return [] 

303 

304 sample_period = 1.0 / sample_rate 

305 

306 glitches: list[Glitch] = [] 

307 

308 # Find all pulse edges 

309 transitions = np.diff(digital.astype(np.int8)) 

310 rising_edges = np.where(transitions == 1)[0] 

311 falling_edges = np.where(transitions == -1)[0] 

312 

313 # Check positive pulses (rising to falling) 

314 for rising_idx in rising_edges: 

315 # Find next falling edge 

316 subsequent_falling = falling_edges[falling_edges > rising_idx] 

317 if len(subsequent_falling) > 0: 

318 falling_idx = subsequent_falling[0] 

319 width = (falling_idx - rising_idx) * sample_period 

320 

321 if width < min_width: 

322 # Calculate amplitude within pulse 

323 pulse_data = data[rising_idx : falling_idx + 1] 

324 if isinstance(trace, DigitalTrace): 

325 # For digital trace, amplitude is just 1.0 (logic high) 

326 pulse_amplitude = 1.0 

327 else: 

328 pulse_amplitude = float(np.max(pulse_data) - threshold_used) 

329 

330 glitches.append( 

331 Glitch( 

332 timestamp=rising_idx * sample_period, 

333 width=width, 

334 polarity="positive", 

335 amplitude=pulse_amplitude, 

336 ) 

337 ) 

338 

339 # Check negative pulses (falling to rising) 

340 for falling_idx in falling_edges: 

341 # Find next rising edge 

342 subsequent_rising = rising_edges[rising_edges > falling_idx] 

343 if len(subsequent_rising) > 0: 

344 rising_idx = subsequent_rising[0] 

345 width = (rising_idx - falling_idx) * sample_period 

346 

347 if width < min_width: 

348 # Calculate amplitude within pulse 

349 pulse_data = data[falling_idx : rising_idx + 1] 

350 if isinstance(trace, DigitalTrace): 350 ↛ 352line 350 didn't jump to line 352 because the condition on line 350 was never true

351 # For digital trace, amplitude is just 1.0 (logic low) 

352 pulse_amplitude = 1.0 

353 else: 

354 pulse_amplitude = float(threshold_used - np.min(pulse_data)) 

355 

356 glitches.append( 

357 Glitch( 

358 timestamp=falling_idx * sample_period, 

359 width=width, 

360 polarity="negative", 

361 amplitude=pulse_amplitude, 

362 ) 

363 ) 

364 

365 # Sort by timestamp 

366 glitches.sort(key=lambda g: g.timestamp) 

367 

368 return glitches 

369 

370 

371def signal_quality_summary( 

372 trace: WaveformTrace, 

373 *, 

374 family: str = "LVCMOS_3V3", 

375 min_pulse_width: float = 10e-9, 

376) -> dict: # type: ignore[type-arg] 

377 """Generate comprehensive signal quality summary. 

378 

379 Combines multiple quality metrics into a single report. 

380 

381 Args: 

382 trace: Input waveform trace. 

383 family: Logic family for noise margin calculation. 

384 min_pulse_width: Minimum pulse width for glitch detection. 

385 

386 Returns: 

387 Dictionary with quality metrics: 

388 - noise_margin: NoiseMarginResult 

389 - glitch_count: Number of detected glitches 

390 - glitches: List of Glitch objects 

391 - signal_levels: Measured low/high levels 

392 - transition_count: Number of transitions 

393 

394 Example: 

395 >>> summary = signal_quality_summary(trace) 

396 >>> print(f"NMH: {summary['noise_margin'].nm_high:.3f} V") 

397 >>> print(f"Glitches: {summary['glitch_count']}") 

398 """ 

399 # Noise margin analysis 

400 nm_result = noise_margin(trace, family=family) 

401 

402 # Glitch detection 

403 glitches = detect_glitches(trace, min_width=min_pulse_width) 

404 

405 # Signal levels 

406 low, high = _find_logic_levels(trace.data) 

407 

408 # Transition count 

409 threshold = (low + high) / 2 

410 digital = trace.data >= threshold 

411 transitions = np.sum(np.abs(np.diff(digital.astype(np.int8)))) 

412 

413 return { 

414 "noise_margin": nm_result, 

415 "glitch_count": len(glitches), 

416 "glitches": glitches, 

417 "signal_levels": {"low": low, "high": high}, 

418 "transition_count": int(transitions), 

419 } 

420 

421 

422# ============================================================================= 

423# Helper Functions 

424# ============================================================================= 

425 

426 

427def _find_logic_levels(data: NDArray[np.floating[Any]]) -> tuple[float, float]: 

428 """Find low and high logic levels from signal data. 

429 

430 Uses histogram analysis to identify stable high and low levels. 

431 

432 Args: 

433 data: Waveform data array. 

434 

435 Returns: 

436 Tuple of (low_level, high_level). 

437 """ 

438 if len(data) == 0: 

439 return 0.0, 0.0 

440 

441 # Use percentiles for robust level detection 

442 p10, p90 = np.percentile(data, [10, 90]) 

443 

444 try: 

445 # Refine using histogram peaks 

446 hist, bin_edges = np.histogram(data, bins=50) 

447 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 

448 

449 # Find peaks in lower and upper halves 

450 mid_idx = len(hist) // 2 

451 low_idx = np.argmax(hist[:mid_idx]) 

452 high_idx = mid_idx + np.argmax(hist[mid_idx:]) 

453 

454 low = bin_centers[low_idx] 

455 high = bin_centers[high_idx] 

456 

457 # Sanity check 

458 if high <= low: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 return float(p10), float(p90) 

460 

461 return float(low), float(high) 

462 except (ValueError, IndexError): 

463 return float(p10), float(p90) 

464 

465 

466def _get_clock_edges( 

467 trace: WaveformTrace | DigitalTrace, 

468 edge_type: Literal["rising", "falling"], 

469) -> NDArray[np.float64]: 

470 """Get clock edge timestamps. 

471 

472 Args: 

473 trace: Clock trace. 

474 edge_type: Type of edges to find. 

475 

476 Returns: 

477 Array of edge timestamps in seconds. 

478 """ 

479 data = trace.data.astype(np.float64) if isinstance(trace, DigitalTrace) else trace.data 

480 

481 if len(data) < 2: 

482 return np.array([], dtype=np.float64) 

483 

484 sample_period = trace.metadata.time_base 

485 

486 # Find threshold 

487 low, high = _find_logic_levels(data) 

488 threshold = (low + high) / 2 

489 

490 if edge_type == "rising": 

491 crossings = np.where((data[:-1] < threshold) & (data[1:] >= threshold))[0] 

492 else: 

493 crossings = np.where((data[:-1] >= threshold) & (data[1:] < threshold))[0] 

494 

495 # Convert to timestamps with interpolation 

496 timestamps = np.zeros(len(crossings), dtype=np.float64) 

497 

498 for i, idx in enumerate(crossings): 

499 base_time = idx * sample_period 

500 if idx < len(data) - 1: 500 ↛ 509line 500 didn't jump to line 509 because the condition on line 500 was always true

501 v1, v2 = data[idx], data[idx + 1] 

502 if abs(v2 - v1) > 1e-12: 502 ↛ 507line 502 didn't jump to line 507 because the condition on line 502 was always true

503 t_offset = (threshold - v1) / (v2 - v1) * sample_period 

504 t_offset = max(0, min(sample_period, t_offset)) 

505 timestamps[i] = base_time + t_offset 

506 else: 

507 timestamps[i] = base_time + sample_period / 2 

508 else: 

509 timestamps[i] = base_time 

510 

511 return timestamps 

512 

513 

514@dataclass 

515class MaskTestResult: 

516 """Result of mask testing. 

517 

518 Attributes: 

519 pass_fail: Overall pass/fail result. 

520 hit_count: Number of samples violating the mask. 

521 total_samples: Total number of samples tested. 

522 margin_top: Margin to top mask boundary in volts (minimum). 

523 margin_bottom: Margin to bottom mask boundary in volts (minimum). 

524 violations: List of violation timestamps and amplitudes. 

525 """ 

526 

527 pass_fail: bool 

528 hit_count: int 

529 total_samples: int 

530 margin_top: float 

531 margin_bottom: float 

532 violations: list[tuple[float, float]] # (time, voltage) pairs 

533 

534 

535@dataclass 

536class PLLRecoveryResult: 

537 """Result of PLL clock recovery. 

538 

539 Attributes: 

540 recovered_frequency: Recovered clock frequency in Hz. 

541 recovered_phase: Recovered phase trajectory (radians). 

542 vco_control: VCO control voltage trajectory. 

543 lock_status: True if PLL is locked. 

544 lock_time: Time to achieve lock in seconds (if locked). 

545 frequency_error: Final frequency error in Hz. 

546 """ 

547 

548 recovered_frequency: float 

549 recovered_phase: NDArray[np.float64] 

550 vco_control: NDArray[np.float64] 

551 lock_status: bool 

552 lock_time: float | None 

553 frequency_error: float 

554 

555 

556def mask_test( 

557 trace: WaveformTrace, 

558 mask: dict[str, NDArray[np.float64]] | str = "usb2", 

559 *, 

560 bit_period: float | None = None, 

561) -> MaskTestResult: 

562 """Test signal against compliance mask template. 

563 

564 Performs mask testing for signal integrity verification against 

565 predefined templates (USB, PCIe, etc.) or custom masks. 

566 

567 Args: 

568 trace: Input waveform trace. 

569 mask: Either mask name ("usb2", "pcie_gen3") or custom mask dict with: 

570 - "time_ui": Time coordinates in UI (0.0 to 2.0 for 2-UI mask) 

571 - "voltage_top": Upper voltage boundary 

572 - "voltage_bottom": Lower voltage boundary 

573 bit_period: Bit period in seconds (required if mask uses UI coordinates). 

574 

575 Returns: 

576 MaskTestResult with pass/fail and violation statistics. 

577 

578 Raises: 

579 ValueError: If bit_period is not provided or mask name is not recognized. 

580 

581 Example: 

582 >>> result = mask_test(signal_trace, mask="usb2", bit_period=3.33e-9) 

583 >>> print(f"Pass: {result.pass_fail}, Violations: {result.hit_count}") 

584 

585 References: 

586 USB 2.0 Specification, PCIe Base Specification 

587 """ 

588 # Load predefined mask if string 

589 mask_data = _get_predefined_mask(mask) if isinstance(mask, str) else mask 

590 

591 # Extract mask boundaries 

592 time_ui = mask_data["time_ui"] 

593 v_top = mask_data["voltage_top"] 

594 v_bottom = mask_data["voltage_bottom"] 

595 

596 # Get signal data 

597 data = trace.data 

598 n_samples = len(data) 

599 sample_rate = trace.metadata.sample_rate 

600 

601 if bit_period is None: 

602 raise ValueError("bit_period is required for mask testing with UI coordinates") 

603 

604 # Convert UI to sample indices 

605 samples_per_ui = bit_period * sample_rate 

606 time_samples = time_ui * samples_per_ui 

607 

608 # For simplicity, test over one or two bit periods 

609 # Align signal to start of bit period 

610 n_ui = int(np.max(time_ui)) # 1 or 2 UI mask 

611 

612 violations: list[tuple[float, float]] = [] 

613 hit_count = 0 

614 

615 # Test all complete bit periods in the signal 

616 n_periods = n_samples // int(samples_per_ui * n_ui) 

617 

618 for period_idx in range(n_periods): 

619 period_start_sample = int(period_idx * samples_per_ui * n_ui) 

620 

621 # Extract samples for this period 

622 for i, _t_ui in enumerate(time_ui): 

623 sample_idx = period_start_sample + int(time_samples[i]) 

624 

625 if sample_idx >= n_samples: 

626 break 

627 

628 voltage = data[sample_idx] 

629 

630 # Check if voltage violates mask 

631 if voltage > v_top[i] or voltage < v_bottom[i]: 

632 timestamp = sample_idx / sample_rate 

633 violations.append((timestamp, voltage)) 

634 hit_count += 1 

635 

636 # Calculate margins (minimum distance to mask boundaries) 

637 margin_top = float(np.inf) 

638 margin_bottom = float(np.inf) 

639 

640 for period_idx in range(n_periods): 

641 period_start_sample = int(period_idx * samples_per_ui * n_ui) 

642 

643 for i, _t_ui in enumerate(time_ui): 

644 sample_idx = period_start_sample + int(time_samples[i]) 

645 

646 if sample_idx >= n_samples: 

647 break 

648 

649 voltage = data[sample_idx] 

650 

651 # Margin to top 

652 margin_top = min(margin_top, v_top[i] - voltage) 

653 

654 # Margin to bottom 

655 margin_bottom = min(margin_bottom, voltage - v_bottom[i]) 

656 

657 # Pass if no hits 

658 pass_fail = hit_count == 0 

659 

660 return MaskTestResult( 

661 pass_fail=pass_fail, 

662 hit_count=hit_count, 

663 total_samples=n_periods * len(time_ui), 

664 margin_top=margin_top if margin_top != np.inf else 0.0, 

665 margin_bottom=margin_bottom if margin_bottom != np.inf else 0.0, 

666 violations=violations, 

667 ) 

668 

669 

670def _get_predefined_mask(mask_name: str) -> dict[str, NDArray[np.float64]]: 

671 """Get predefined mask template. 

672 

673 Args: 

674 mask_name: Name of standard mask ("usb2", "pcie_gen3", etc.). 

675 

676 Returns: 

677 Dictionary with time_ui, voltage_top, voltage_bottom arrays. 

678 

679 Raises: 

680 ValueError: If mask name is not recognized. 

681 """ 

682 if mask_name == "usb2": 

683 # USB 2.0 high-speed eye mask (simplified) 

684 # 2-UI mask, normalized to ±1V amplitude 

685 time_ui = np.array([0.0, 0.2, 0.4, 0.5, 0.6, 0.8, 1.0, 1.2, 1.4, 1.5, 1.6, 1.8, 2.0]) 

686 v_top = np.array([0.6, 0.6, 0.8, 0.9, 0.8, 0.6, 0.4, 0.6, 0.8, 0.9, 0.8, 0.6, 0.6]) 

687 v_bottom = np.array( 

688 [ 

689 -0.6, 

690 -0.6, 

691 -0.8, 

692 -0.9, 

693 -0.8, 

694 -0.6, 

695 -0.4, 

696 -0.6, 

697 -0.8, 

698 -0.9, 

699 -0.8, 

700 -0.6, 

701 -0.6, 

702 ] 

703 ) 

704 

705 elif mask_name == "pcie_gen3": 

706 # PCIe Gen 3 eye mask (simplified) 

707 time_ui = np.array([0.0, 0.15, 0.35, 0.5, 0.65, 0.85, 1.0]) 

708 v_top = np.array([0.5, 0.5, 0.7, 0.8, 0.7, 0.5, 0.5]) 

709 v_bottom = np.array([-0.5, -0.5, -0.7, -0.8, -0.7, -0.5, -0.5]) 

710 

711 else: 

712 raise ValueError( 

713 f"Unknown mask: {mask_name}. Available: usb2, pcie_gen3. Or provide custom mask dict." 

714 ) 

715 

716 return {"time_ui": time_ui, "voltage_top": v_top, "voltage_bottom": v_bottom} 

717 

718 

719def pll_clock_recovery( 

720 trace: WaveformTrace | DigitalTrace, 

721 *, 

722 nominal_frequency: float, 

723 loop_bandwidth: float = 1e6, 

724 damping: float = 0.707, 

725 vco_gain: float = 1e6, 

726) -> PLLRecoveryResult: 

727 """Recover clock using PLL emulation. 

728 

729 Emulates a second-order PLL to recover embedded clock from NRZ, 

730 NRZI, or Manchester-encoded data streams. 

731 

732 Args: 

733 trace: Input data trace. 

734 nominal_frequency: Nominal clock frequency in Hz. 

735 loop_bandwidth: PLL loop bandwidth in Hz (default 1 MHz). 

736 damping: Damping factor (default 0.707 for critical damping). 

737 vco_gain: VCO gain in Hz/V (default 1 MHz/V). 

738 

739 Returns: 

740 PLLRecoveryResult with recovered clock parameters. 

741 

742 Raises: 

743 InsufficientDataError: If trace has fewer than 100 samples. 

744 

745 Example: 

746 >>> result = pll_clock_recovery(data_trace, nominal_frequency=1e9) 

747 >>> print(f"Recovered: {result.recovered_frequency / 1e9:.3f} GHz") 

748 >>> print(f"Locked: {result.lock_status}") 

749 

750 References: 

751 Gardner, F. M. (2005). Phaselock Techniques, 3rd ed. 

752 """ 

753 data = trace.data.astype(np.float64) if isinstance(trace, DigitalTrace) else trace.data 

754 

755 sample_rate = trace.metadata.sample_rate 

756 n_samples = len(data) 

757 

758 if n_samples < 100: 

759 raise InsufficientDataError( 

760 "PLL recovery requires at least 100 samples", 

761 required=100, 

762 available=n_samples, 

763 analysis_type="pll_clock_recovery", 

764 ) 

765 

766 dt = 1.0 / sample_rate 

767 

768 # PLL parameters 

769 omega_n = 2 * np.pi * loop_bandwidth # Natural frequency 

770 K_vco = 2 * np.pi * vco_gain # VCO gain in rad/s/V 

771 

772 # Loop filter coefficients (2nd order) 

773 # Transfer function: F(s) = K1 + K2/s 

774 K1 = (2 * damping * omega_n) / K_vco 

775 K2 = (omega_n**2) / K_vco 

776 

777 # Initialize PLL state 

778 phase = np.zeros(n_samples) 

779 vco_control = np.zeros(n_samples) 

780 integrator = 0.0 

781 theta = 0.0 

782 

783 # Nominal phase increment per sample 

784 nominal_phase_inc = 2 * np.pi * nominal_frequency * dt 

785 

786 # Find edges for phase detection (simplified) 

787 threshold = (np.max(data) + np.min(data)) / 2 

788 edges = np.where(np.abs(np.diff(np.sign(data - threshold))) > 0)[0] 

789 

790 edge_idx = 0 

791 

792 # Run PLL loop 

793 for i in range(n_samples): 

794 # Phase detector: compare VCO phase to input transitions 

795 if edge_idx < len(edges) and i == edges[edge_idx]: 

796 # Edge detected - compute phase error 

797 # Phase error = expected phase - actual VCO phase 

798 expected_phase = (edges[edge_idx] * nominal_phase_inc) % (2 * np.pi) 

799 phase_error = expected_phase - (theta % (2 * np.pi)) 

800 

801 # Wrap to [-pi, pi] 

802 phase_error = (phase_error + np.pi) % (2 * np.pi) - np.pi 

803 

804 edge_idx += 1 

805 else: 

806 phase_error = 0.0 

807 

808 # Loop filter (proportional + integral) 

809 integrator += K2 * phase_error * dt 

810 vco_input = K1 * phase_error + integrator 

811 

812 # VCO: frequency = nominal + K_vco * control voltage 

813 vco_freq = nominal_frequency + K_vco * vco_input / (2 * np.pi) 

814 phase_increment = 2 * np.pi * vco_freq * dt 

815 

816 # Update phase 

817 theta += phase_increment 

818 

819 # Store results 

820 phase[i] = theta 

821 vco_control[i] = vco_input 

822 

823 # Analyze lock status 

824 # Consider locked if VCO control voltage is stable in last 20% 

825 lock_threshold = 0.1 # 10% variation 

826 last_20_percent = vco_control[int(0.8 * n_samples) :] 

827 

828 if len(last_20_percent) > 0: 828 ↛ 847line 828 didn't jump to line 847 because the condition on line 828 was always true

829 vco_std = np.std(last_20_percent) 

830 vco_mean = np.abs(np.mean(last_20_percent)) 

831 lock_status = vco_std < lock_threshold * max(vco_mean, 1.0) 

832 

833 # Find lock time (when variation drops below threshold) 

834 if lock_status: 834 ↛ 845line 834 didn't jump to line 845 because the condition on line 834 was always true

835 # Search for first point where subsequent variance is low 

836 window = int(0.1 * n_samples) # 10% window 

837 for i in range(window, n_samples - window): 837 ↛ 843line 837 didn't jump to line 843 because the loop on line 837 didn't complete

838 window_std = np.std(vco_control[i : i + window]) 

839 if window_std < lock_threshold: 839 ↛ 837line 839 didn't jump to line 837 because the condition on line 839 was always true

840 lock_time = i * dt 

841 break 

842 else: 

843 lock_time = None 

844 else: 

845 lock_time = None 

846 else: 

847 lock_status = False 

848 lock_time = None 

849 

850 # Recovered frequency from final VCO state 

851 final_vco = np.mean(vco_control[-int(0.1 * n_samples) :]) 

852 recovered_frequency = nominal_frequency + K_vco * final_vco / (2 * np.pi) 

853 

854 frequency_error = recovered_frequency - nominal_frequency 

855 

856 return PLLRecoveryResult( 

857 recovered_frequency=float(recovered_frequency), 

858 recovered_phase=phase, 

859 vco_control=vco_control, 

860 lock_status=lock_status, 

861 lock_time=lock_time, 

862 frequency_error=float(frequency_error), 

863 ) 

864 

865 

866__all__ = [ 

867 "Glitch", 

868 "MaskTestResult", 

869 "NoiseMarginResult", 

870 "PLLRecoveryResult", 

871 "Violation", 

872 "detect_glitches", 

873 "detect_violations", 

874 "mask_test", 

875 "noise_margin", 

876 "pll_clock_recovery", 

877 "signal_quality_summary", 

878]