Coverage for src / tracekit / analyzers / digital / quality.py: 94%
282 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Signal quality analysis for digital signals.
3This module provides signal quality metrics including noise margin
4calculation, setup/hold violation detection, and glitch detection.
7Example:
8 >>> from tracekit.analyzers.digital.quality import noise_margin, detect_glitches
9 >>> margins = noise_margin(trace, family="TTL")
10 >>> glitches = detect_glitches(trace, min_width=10e-9)
12References:
13 JEDEC Standard No. 8C: High-Speed CMOS Interface
14 Various IC manufacturer datasheets for logic family specifications
15"""
17from __future__ import annotations
19from dataclasses import dataclass
20from typing import TYPE_CHECKING, Literal
22import numpy as np
24from tracekit.analyzers.digital.extraction import LOGIC_FAMILIES
25from tracekit.analyzers.digital.timing import hold_time, setup_time
26from tracekit.core.exceptions import InsufficientDataError
27from tracekit.core.types import DigitalTrace, WaveformTrace
29if TYPE_CHECKING:
30 from typing import Any
32 from numpy.typing import NDArray
35@dataclass
36class NoiseMarginResult:
37 """Result of noise margin calculation.
39 Attributes:
40 nm_high: Noise margin high (VOH_min - VIH_min).
41 nm_low: Noise margin low (VIL_max - VOL_max).
42 logic_family: Logic family used for calculation.
43 voh: Output high voltage (measured or spec).
44 vol: Output low voltage (measured or spec).
45 vih: Input high threshold (from spec).
46 vil: Input low threshold (from spec).
47 """
49 nm_high: float
50 nm_low: float
51 logic_family: str
52 voh: float
53 vol: float
54 vih: float
55 vil: float
58@dataclass
59class Violation:
60 """Represents a timing or signal violation.
62 Attributes:
63 timestamp: Time of violation in seconds.
64 violation_type: Type of violation.
65 measured: Measured value.
66 limit: Specification limit.
67 margin: Margin to specification (negative = violation).
68 end_timestamp: End time of violation (if applicable).
69 """
71 timestamp: float
72 violation_type: str
73 measured: float
74 limit: float
75 margin: float
76 end_timestamp: float | None = None
79@dataclass
80class Glitch:
81 """Represents a detected glitch.
83 Attributes:
84 timestamp: Start time of glitch in seconds.
85 width: Duration of glitch in seconds.
86 polarity: "positive" (spike high) or "negative" (spike low).
87 amplitude: Peak amplitude of glitch.
88 """
90 timestamp: float
91 width: float
92 polarity: Literal["positive", "negative"]
93 amplitude: float
96def noise_margin(
97 trace: WaveformTrace,
98 *,
99 family: str = "LVCMOS_3V3",
100 use_measured_levels: bool = True,
101) -> NoiseMarginResult:
102 """Calculate noise margins for a digital signal.
104 Computes noise margin high (NMH) and noise margin low (NML) based on
105 measured signal levels or logic family specifications.
107 Args:
108 trace: Input waveform trace.
109 family: Logic family for threshold levels.
110 Options: TTL, CMOS_5V, LVTTL, LVCMOS_3V3, LVCMOS_2V5, LVCMOS_1V8, LVCMOS_1V2
111 use_measured_levels: If True, use measured VOH/VOL from signal.
112 If False, use spec values from logic family.
114 Returns:
115 NoiseMarginResult with calculated margins.
117 Raises:
118 ValueError: If logic family is not recognized.
120 Example:
121 >>> result = noise_margin(trace, family="TTL")
122 >>> print(f"NMH: {result.nm_high:.3f} V")
123 >>> print(f"NML: {result.nm_low:.3f} V")
125 References:
126 JEDEC Standard No. 8C
127 """
128 if family not in LOGIC_FAMILIES:
129 available = ", ".join(LOGIC_FAMILIES.keys())
130 raise ValueError(f"Unknown logic family: {family}. Available: {available}")
132 specs = LOGIC_FAMILIES[family]
133 vih = specs["VIH_min"]
134 vil = specs["VIL_max"]
136 if use_measured_levels and len(trace.data) > 0:
137 # Measure actual output levels from signal
138 data = trace.data
139 low, high = _find_logic_levels(data)
140 voh = high
141 vol = low
142 else:
143 # Use specification values
144 voh = specs["VOH_min"]
145 vol = specs["VOL_max"]
147 # Calculate noise margins
148 # NMH = VOH - VIH (margin when output is high)
149 # NML = VIL - VOL (margin when output is low)
150 nm_high = voh - vih
151 nm_low = vil - vol
153 return NoiseMarginResult(
154 nm_high=nm_high,
155 nm_low=nm_low,
156 logic_family=family,
157 voh=voh,
158 vol=vol,
159 vih=vih,
160 vil=vil,
161 )
164def detect_violations(
165 data_trace: WaveformTrace | DigitalTrace,
166 clock_trace: WaveformTrace | DigitalTrace,
167 *,
168 setup_spec: float,
169 hold_spec: float,
170 clock_edge: Literal["rising", "falling"] = "rising",
171) -> list[Violation]:
172 """Detect setup and hold time violations.
174 Compares measured setup and hold times to specifications and
175 reports any violations with timestamps and margins.
177 Args:
178 data_trace: Data signal trace.
179 clock_trace: Clock signal trace.
180 setup_spec: Required setup time in seconds.
181 hold_spec: Required hold time in seconds.
182 clock_edge: Clock edge to reference ("rising" or "falling").
184 Returns:
185 List of Violation objects for each detected violation.
187 Example:
188 >>> violations = detect_violations(
189 ... data_trace, clock_trace,
190 ... setup_spec=2e-9, hold_spec=1e-9
191 ... )
192 >>> for v in violations:
193 ... print(f"{v.violation_type}: {v.margin*1e12:.0f} ps margin")
195 References:
196 JEDEC Standard No. 65B
197 """
198 violations: list[Violation] = []
200 # Get all setup times
201 setup_times = setup_time(data_trace, clock_trace, clock_edge=clock_edge, return_all=True)
203 if isinstance(setup_times, np.ndarray) and len(setup_times) > 0:
204 clock_edges = _get_clock_edges(clock_trace, clock_edge)
206 for _i, (t_setup, clk_edge) in enumerate(
207 zip(setup_times, clock_edges[: len(setup_times)], strict=False)
208 ):
209 margin = t_setup - setup_spec
210 if margin < 0: # Violation
211 violations.append(
212 Violation(
213 timestamp=clk_edge,
214 violation_type="setup",
215 measured=t_setup,
216 limit=setup_spec,
217 margin=margin,
218 )
219 )
221 # Get all hold times
222 hold_times = hold_time(data_trace, clock_trace, clock_edge=clock_edge, return_all=True)
224 if isinstance(hold_times, np.ndarray) and len(hold_times) > 0:
225 clock_edges = _get_clock_edges(clock_trace, clock_edge)
227 for _i, (t_hold, clk_edge) in enumerate(
228 zip(hold_times, clock_edges[: len(hold_times)], strict=False)
229 ):
230 margin = t_hold - hold_spec
231 if margin < 0: # Violation
232 violations.append(
233 Violation(
234 timestamp=clk_edge,
235 violation_type="hold",
236 measured=t_hold,
237 limit=hold_spec,
238 margin=margin,
239 )
240 )
242 # Sort by timestamp
243 violations.sort(key=lambda v: v.timestamp)
245 return violations
248def detect_glitches(
249 trace: WaveformTrace | DigitalTrace,
250 *,
251 min_width: float,
252 threshold: float | None = None,
253) -> list[Glitch]:
254 """Detect glitches (pulses shorter than minimum width).
256 Identifies short pulses that violate minimum pulse width specifications,
257 which may cause logic errors or be artifacts.
259 Args:
260 trace: Input trace (analog or digital).
261 min_width: Minimum valid pulse width in seconds.
262 threshold: Threshold for digital conversion (analog traces only).
263 If None, auto-detected from signal.
265 Returns:
266 List of Glitch objects for each detected glitch.
268 Example:
269 >>> glitches = detect_glitches(trace, min_width=10e-9)
270 >>> for g in glitches:
271 ... print(f"Glitch at {g.timestamp*1e6:.2f} us, width={g.width*1e9:.1f} ns")
273 References:
274 Application note AN-905: Understanding Glitch Detection
275 """
276 if isinstance(trace, DigitalTrace):
277 # Already digital - use directly
278 digital = trace.data
279 sample_rate = trace.metadata.sample_rate
280 threshold_used = 0.5 # Not used for amplitude calc on digital
281 data = trace.data.astype(np.float64)
282 else:
283 # Analog trace - need to threshold
284 data = trace.data
285 sample_rate = trace.metadata.sample_rate
287 if len(data) < 3:
288 return []
290 # Find threshold
291 low, high = _find_logic_levels(data)
292 threshold_used = (low + high) / 2 if threshold is None else threshold
294 amplitude = high - low
295 if amplitude <= 0: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 return []
298 # Convert to binary
299 digital = data >= threshold_used
301 if len(digital) < 3: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 return []
304 sample_period = 1.0 / sample_rate
306 glitches: list[Glitch] = []
308 # Find all pulse edges
309 transitions = np.diff(digital.astype(np.int8))
310 rising_edges = np.where(transitions == 1)[0]
311 falling_edges = np.where(transitions == -1)[0]
313 # Check positive pulses (rising to falling)
314 for rising_idx in rising_edges:
315 # Find next falling edge
316 subsequent_falling = falling_edges[falling_edges > rising_idx]
317 if len(subsequent_falling) > 0:
318 falling_idx = subsequent_falling[0]
319 width = (falling_idx - rising_idx) * sample_period
321 if width < min_width:
322 # Calculate amplitude within pulse
323 pulse_data = data[rising_idx : falling_idx + 1]
324 if isinstance(trace, DigitalTrace):
325 # For digital trace, amplitude is just 1.0 (logic high)
326 pulse_amplitude = 1.0
327 else:
328 pulse_amplitude = float(np.max(pulse_data) - threshold_used)
330 glitches.append(
331 Glitch(
332 timestamp=rising_idx * sample_period,
333 width=width,
334 polarity="positive",
335 amplitude=pulse_amplitude,
336 )
337 )
339 # Check negative pulses (falling to rising)
340 for falling_idx in falling_edges:
341 # Find next rising edge
342 subsequent_rising = rising_edges[rising_edges > falling_idx]
343 if len(subsequent_rising) > 0:
344 rising_idx = subsequent_rising[0]
345 width = (rising_idx - falling_idx) * sample_period
347 if width < min_width:
348 # Calculate amplitude within pulse
349 pulse_data = data[falling_idx : rising_idx + 1]
350 if isinstance(trace, DigitalTrace): 350 ↛ 352line 350 didn't jump to line 352 because the condition on line 350 was never true
351 # For digital trace, amplitude is just 1.0 (logic low)
352 pulse_amplitude = 1.0
353 else:
354 pulse_amplitude = float(threshold_used - np.min(pulse_data))
356 glitches.append(
357 Glitch(
358 timestamp=falling_idx * sample_period,
359 width=width,
360 polarity="negative",
361 amplitude=pulse_amplitude,
362 )
363 )
365 # Sort by timestamp
366 glitches.sort(key=lambda g: g.timestamp)
368 return glitches
371def signal_quality_summary(
372 trace: WaveformTrace,
373 *,
374 family: str = "LVCMOS_3V3",
375 min_pulse_width: float = 10e-9,
376) -> dict: # type: ignore[type-arg]
377 """Generate comprehensive signal quality summary.
379 Combines multiple quality metrics into a single report.
381 Args:
382 trace: Input waveform trace.
383 family: Logic family for noise margin calculation.
384 min_pulse_width: Minimum pulse width for glitch detection.
386 Returns:
387 Dictionary with quality metrics:
388 - noise_margin: NoiseMarginResult
389 - glitch_count: Number of detected glitches
390 - glitches: List of Glitch objects
391 - signal_levels: Measured low/high levels
392 - transition_count: Number of transitions
394 Example:
395 >>> summary = signal_quality_summary(trace)
396 >>> print(f"NMH: {summary['noise_margin'].nm_high:.3f} V")
397 >>> print(f"Glitches: {summary['glitch_count']}")
398 """
399 # Noise margin analysis
400 nm_result = noise_margin(trace, family=family)
402 # Glitch detection
403 glitches = detect_glitches(trace, min_width=min_pulse_width)
405 # Signal levels
406 low, high = _find_logic_levels(trace.data)
408 # Transition count
409 threshold = (low + high) / 2
410 digital = trace.data >= threshold
411 transitions = np.sum(np.abs(np.diff(digital.astype(np.int8))))
413 return {
414 "noise_margin": nm_result,
415 "glitch_count": len(glitches),
416 "glitches": glitches,
417 "signal_levels": {"low": low, "high": high},
418 "transition_count": int(transitions),
419 }
422# =============================================================================
423# Helper Functions
424# =============================================================================
427def _find_logic_levels(data: NDArray[np.floating[Any]]) -> tuple[float, float]:
428 """Find low and high logic levels from signal data.
430 Uses histogram analysis to identify stable high and low levels.
432 Args:
433 data: Waveform data array.
435 Returns:
436 Tuple of (low_level, high_level).
437 """
438 if len(data) == 0:
439 return 0.0, 0.0
441 # Use percentiles for robust level detection
442 p10, p90 = np.percentile(data, [10, 90])
444 try:
445 # Refine using histogram peaks
446 hist, bin_edges = np.histogram(data, bins=50)
447 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
449 # Find peaks in lower and upper halves
450 mid_idx = len(hist) // 2
451 low_idx = np.argmax(hist[:mid_idx])
452 high_idx = mid_idx + np.argmax(hist[mid_idx:])
454 low = bin_centers[low_idx]
455 high = bin_centers[high_idx]
457 # Sanity check
458 if high <= low: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 return float(p10), float(p90)
461 return float(low), float(high)
462 except (ValueError, IndexError):
463 return float(p10), float(p90)
466def _get_clock_edges(
467 trace: WaveformTrace | DigitalTrace,
468 edge_type: Literal["rising", "falling"],
469) -> NDArray[np.float64]:
470 """Get clock edge timestamps.
472 Args:
473 trace: Clock trace.
474 edge_type: Type of edges to find.
476 Returns:
477 Array of edge timestamps in seconds.
478 """
479 data = trace.data.astype(np.float64) if isinstance(trace, DigitalTrace) else trace.data
481 if len(data) < 2:
482 return np.array([], dtype=np.float64)
484 sample_period = trace.metadata.time_base
486 # Find threshold
487 low, high = _find_logic_levels(data)
488 threshold = (low + high) / 2
490 if edge_type == "rising":
491 crossings = np.where((data[:-1] < threshold) & (data[1:] >= threshold))[0]
492 else:
493 crossings = np.where((data[:-1] >= threshold) & (data[1:] < threshold))[0]
495 # Convert to timestamps with interpolation
496 timestamps = np.zeros(len(crossings), dtype=np.float64)
498 for i, idx in enumerate(crossings):
499 base_time = idx * sample_period
500 if idx < len(data) - 1: 500 ↛ 509line 500 didn't jump to line 509 because the condition on line 500 was always true
501 v1, v2 = data[idx], data[idx + 1]
502 if abs(v2 - v1) > 1e-12: 502 ↛ 507line 502 didn't jump to line 507 because the condition on line 502 was always true
503 t_offset = (threshold - v1) / (v2 - v1) * sample_period
504 t_offset = max(0, min(sample_period, t_offset))
505 timestamps[i] = base_time + t_offset
506 else:
507 timestamps[i] = base_time + sample_period / 2
508 else:
509 timestamps[i] = base_time
511 return timestamps
514@dataclass
515class MaskTestResult:
516 """Result of mask testing.
518 Attributes:
519 pass_fail: Overall pass/fail result.
520 hit_count: Number of samples violating the mask.
521 total_samples: Total number of samples tested.
522 margin_top: Margin to top mask boundary in volts (minimum).
523 margin_bottom: Margin to bottom mask boundary in volts (minimum).
524 violations: List of violation timestamps and amplitudes.
525 """
527 pass_fail: bool
528 hit_count: int
529 total_samples: int
530 margin_top: float
531 margin_bottom: float
532 violations: list[tuple[float, float]] # (time, voltage) pairs
535@dataclass
536class PLLRecoveryResult:
537 """Result of PLL clock recovery.
539 Attributes:
540 recovered_frequency: Recovered clock frequency in Hz.
541 recovered_phase: Recovered phase trajectory (radians).
542 vco_control: VCO control voltage trajectory.
543 lock_status: True if PLL is locked.
544 lock_time: Time to achieve lock in seconds (if locked).
545 frequency_error: Final frequency error in Hz.
546 """
548 recovered_frequency: float
549 recovered_phase: NDArray[np.float64]
550 vco_control: NDArray[np.float64]
551 lock_status: bool
552 lock_time: float | None
553 frequency_error: float
556def mask_test(
557 trace: WaveformTrace,
558 mask: dict[str, NDArray[np.float64]] | str = "usb2",
559 *,
560 bit_period: float | None = None,
561) -> MaskTestResult:
562 """Test signal against compliance mask template.
564 Performs mask testing for signal integrity verification against
565 predefined templates (USB, PCIe, etc.) or custom masks.
567 Args:
568 trace: Input waveform trace.
569 mask: Either mask name ("usb2", "pcie_gen3") or custom mask dict with:
570 - "time_ui": Time coordinates in UI (0.0 to 2.0 for 2-UI mask)
571 - "voltage_top": Upper voltage boundary
572 - "voltage_bottom": Lower voltage boundary
573 bit_period: Bit period in seconds (required if mask uses UI coordinates).
575 Returns:
576 MaskTestResult with pass/fail and violation statistics.
578 Raises:
579 ValueError: If bit_period is not provided or mask name is not recognized.
581 Example:
582 >>> result = mask_test(signal_trace, mask="usb2", bit_period=3.33e-9)
583 >>> print(f"Pass: {result.pass_fail}, Violations: {result.hit_count}")
585 References:
586 USB 2.0 Specification, PCIe Base Specification
587 """
588 # Load predefined mask if string
589 mask_data = _get_predefined_mask(mask) if isinstance(mask, str) else mask
591 # Extract mask boundaries
592 time_ui = mask_data["time_ui"]
593 v_top = mask_data["voltage_top"]
594 v_bottom = mask_data["voltage_bottom"]
596 # Get signal data
597 data = trace.data
598 n_samples = len(data)
599 sample_rate = trace.metadata.sample_rate
601 if bit_period is None:
602 raise ValueError("bit_period is required for mask testing with UI coordinates")
604 # Convert UI to sample indices
605 samples_per_ui = bit_period * sample_rate
606 time_samples = time_ui * samples_per_ui
608 # For simplicity, test over one or two bit periods
609 # Align signal to start of bit period
610 n_ui = int(np.max(time_ui)) # 1 or 2 UI mask
612 violations: list[tuple[float, float]] = []
613 hit_count = 0
615 # Test all complete bit periods in the signal
616 n_periods = n_samples // int(samples_per_ui * n_ui)
618 for period_idx in range(n_periods):
619 period_start_sample = int(period_idx * samples_per_ui * n_ui)
621 # Extract samples for this period
622 for i, _t_ui in enumerate(time_ui):
623 sample_idx = period_start_sample + int(time_samples[i])
625 if sample_idx >= n_samples:
626 break
628 voltage = data[sample_idx]
630 # Check if voltage violates mask
631 if voltage > v_top[i] or voltage < v_bottom[i]:
632 timestamp = sample_idx / sample_rate
633 violations.append((timestamp, voltage))
634 hit_count += 1
636 # Calculate margins (minimum distance to mask boundaries)
637 margin_top = float(np.inf)
638 margin_bottom = float(np.inf)
640 for period_idx in range(n_periods):
641 period_start_sample = int(period_idx * samples_per_ui * n_ui)
643 for i, _t_ui in enumerate(time_ui):
644 sample_idx = period_start_sample + int(time_samples[i])
646 if sample_idx >= n_samples:
647 break
649 voltage = data[sample_idx]
651 # Margin to top
652 margin_top = min(margin_top, v_top[i] - voltage)
654 # Margin to bottom
655 margin_bottom = min(margin_bottom, voltage - v_bottom[i])
657 # Pass if no hits
658 pass_fail = hit_count == 0
660 return MaskTestResult(
661 pass_fail=pass_fail,
662 hit_count=hit_count,
663 total_samples=n_periods * len(time_ui),
664 margin_top=margin_top if margin_top != np.inf else 0.0,
665 margin_bottom=margin_bottom if margin_bottom != np.inf else 0.0,
666 violations=violations,
667 )
670def _get_predefined_mask(mask_name: str) -> dict[str, NDArray[np.float64]]:
671 """Get predefined mask template.
673 Args:
674 mask_name: Name of standard mask ("usb2", "pcie_gen3", etc.).
676 Returns:
677 Dictionary with time_ui, voltage_top, voltage_bottom arrays.
679 Raises:
680 ValueError: If mask name is not recognized.
681 """
682 if mask_name == "usb2":
683 # USB 2.0 high-speed eye mask (simplified)
684 # 2-UI mask, normalized to ±1V amplitude
685 time_ui = np.array([0.0, 0.2, 0.4, 0.5, 0.6, 0.8, 1.0, 1.2, 1.4, 1.5, 1.6, 1.8, 2.0])
686 v_top = np.array([0.6, 0.6, 0.8, 0.9, 0.8, 0.6, 0.4, 0.6, 0.8, 0.9, 0.8, 0.6, 0.6])
687 v_bottom = np.array(
688 [
689 -0.6,
690 -0.6,
691 -0.8,
692 -0.9,
693 -0.8,
694 -0.6,
695 -0.4,
696 -0.6,
697 -0.8,
698 -0.9,
699 -0.8,
700 -0.6,
701 -0.6,
702 ]
703 )
705 elif mask_name == "pcie_gen3":
706 # PCIe Gen 3 eye mask (simplified)
707 time_ui = np.array([0.0, 0.15, 0.35, 0.5, 0.65, 0.85, 1.0])
708 v_top = np.array([0.5, 0.5, 0.7, 0.8, 0.7, 0.5, 0.5])
709 v_bottom = np.array([-0.5, -0.5, -0.7, -0.8, -0.7, -0.5, -0.5])
711 else:
712 raise ValueError(
713 f"Unknown mask: {mask_name}. Available: usb2, pcie_gen3. Or provide custom mask dict."
714 )
716 return {"time_ui": time_ui, "voltage_top": v_top, "voltage_bottom": v_bottom}
719def pll_clock_recovery(
720 trace: WaveformTrace | DigitalTrace,
721 *,
722 nominal_frequency: float,
723 loop_bandwidth: float = 1e6,
724 damping: float = 0.707,
725 vco_gain: float = 1e6,
726) -> PLLRecoveryResult:
727 """Recover clock using PLL emulation.
729 Emulates a second-order PLL to recover embedded clock from NRZ,
730 NRZI, or Manchester-encoded data streams.
732 Args:
733 trace: Input data trace.
734 nominal_frequency: Nominal clock frequency in Hz.
735 loop_bandwidth: PLL loop bandwidth in Hz (default 1 MHz).
736 damping: Damping factor (default 0.707 for critical damping).
737 vco_gain: VCO gain in Hz/V (default 1 MHz/V).
739 Returns:
740 PLLRecoveryResult with recovered clock parameters.
742 Raises:
743 InsufficientDataError: If trace has fewer than 100 samples.
745 Example:
746 >>> result = pll_clock_recovery(data_trace, nominal_frequency=1e9)
747 >>> print(f"Recovered: {result.recovered_frequency / 1e9:.3f} GHz")
748 >>> print(f"Locked: {result.lock_status}")
750 References:
751 Gardner, F. M. (2005). Phaselock Techniques, 3rd ed.
752 """
753 data = trace.data.astype(np.float64) if isinstance(trace, DigitalTrace) else trace.data
755 sample_rate = trace.metadata.sample_rate
756 n_samples = len(data)
758 if n_samples < 100:
759 raise InsufficientDataError(
760 "PLL recovery requires at least 100 samples",
761 required=100,
762 available=n_samples,
763 analysis_type="pll_clock_recovery",
764 )
766 dt = 1.0 / sample_rate
768 # PLL parameters
769 omega_n = 2 * np.pi * loop_bandwidth # Natural frequency
770 K_vco = 2 * np.pi * vco_gain # VCO gain in rad/s/V
772 # Loop filter coefficients (2nd order)
773 # Transfer function: F(s) = K1 + K2/s
774 K1 = (2 * damping * omega_n) / K_vco
775 K2 = (omega_n**2) / K_vco
777 # Initialize PLL state
778 phase = np.zeros(n_samples)
779 vco_control = np.zeros(n_samples)
780 integrator = 0.0
781 theta = 0.0
783 # Nominal phase increment per sample
784 nominal_phase_inc = 2 * np.pi * nominal_frequency * dt
786 # Find edges for phase detection (simplified)
787 threshold = (np.max(data) + np.min(data)) / 2
788 edges = np.where(np.abs(np.diff(np.sign(data - threshold))) > 0)[0]
790 edge_idx = 0
792 # Run PLL loop
793 for i in range(n_samples):
794 # Phase detector: compare VCO phase to input transitions
795 if edge_idx < len(edges) and i == edges[edge_idx]:
796 # Edge detected - compute phase error
797 # Phase error = expected phase - actual VCO phase
798 expected_phase = (edges[edge_idx] * nominal_phase_inc) % (2 * np.pi)
799 phase_error = expected_phase - (theta % (2 * np.pi))
801 # Wrap to [-pi, pi]
802 phase_error = (phase_error + np.pi) % (2 * np.pi) - np.pi
804 edge_idx += 1
805 else:
806 phase_error = 0.0
808 # Loop filter (proportional + integral)
809 integrator += K2 * phase_error * dt
810 vco_input = K1 * phase_error + integrator
812 # VCO: frequency = nominal + K_vco * control voltage
813 vco_freq = nominal_frequency + K_vco * vco_input / (2 * np.pi)
814 phase_increment = 2 * np.pi * vco_freq * dt
816 # Update phase
817 theta += phase_increment
819 # Store results
820 phase[i] = theta
821 vco_control[i] = vco_input
823 # Analyze lock status
824 # Consider locked if VCO control voltage is stable in last 20%
825 lock_threshold = 0.1 # 10% variation
826 last_20_percent = vco_control[int(0.8 * n_samples) :]
828 if len(last_20_percent) > 0: 828 ↛ 847line 828 didn't jump to line 847 because the condition on line 828 was always true
829 vco_std = np.std(last_20_percent)
830 vco_mean = np.abs(np.mean(last_20_percent))
831 lock_status = vco_std < lock_threshold * max(vco_mean, 1.0)
833 # Find lock time (when variation drops below threshold)
834 if lock_status: 834 ↛ 845line 834 didn't jump to line 845 because the condition on line 834 was always true
835 # Search for first point where subsequent variance is low
836 window = int(0.1 * n_samples) # 10% window
837 for i in range(window, n_samples - window): 837 ↛ 843line 837 didn't jump to line 843 because the loop on line 837 didn't complete
838 window_std = np.std(vco_control[i : i + window])
839 if window_std < lock_threshold: 839 ↛ 837line 839 didn't jump to line 837 because the condition on line 839 was always true
840 lock_time = i * dt
841 break
842 else:
843 lock_time = None
844 else:
845 lock_time = None
846 else:
847 lock_status = False
848 lock_time = None
850 # Recovered frequency from final VCO state
851 final_vco = np.mean(vco_control[-int(0.1 * n_samples) :])
852 recovered_frequency = nominal_frequency + K_vco * final_vco / (2 * np.pi)
854 frequency_error = recovered_frequency - nominal_frequency
856 return PLLRecoveryResult(
857 recovered_frequency=float(recovered_frequency),
858 recovered_phase=phase,
859 vco_control=vco_control,
860 lock_status=lock_status,
861 lock_time=lock_time,
862 frequency_error=float(frequency_error),
863 )
866__all__ = [
867 "Glitch",
868 "MaskTestResult",
869 "NoiseMarginResult",
870 "PLLRecoveryResult",
871 "Violation",
872 "detect_glitches",
873 "detect_violations",
874 "mask_test",
875 "noise_margin",
876 "pll_clock_recovery",
877 "signal_quality_summary",
878]