Coverage for src / tracekit / analyzers / digital / timing.py: 91%

358 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced timing measurements for digital signals. 

2 

3This module provides IEEE 181-2011 and JEDEC compliant timing measurements 

4including propagation delay, setup/hold time, slew rate, phase, and skew. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.digital.timing import propagation_delay, setup_time 

9 >>> delay = propagation_delay(trace1, trace2) 

10 >>> t_setup = setup_time(data_trace, clock_trace, clock_edge="rising") 

11 

12References: 

13 IEEE 181-2011: Standard for Transitional Waveform Definitions 

14 IEEE 2414-2020: Standard for Jitter and Phase Noise 

15 JEDEC Standard No. 65B: High-Speed Interface Timing 

16""" 

17 

18from __future__ import annotations 

19 

20from dataclasses import dataclass 

21from typing import TYPE_CHECKING, Literal 

22 

23import numpy as np 

24 

25from tracekit.core.exceptions import InsufficientDataError 

26from tracekit.core.types import DigitalTrace, WaveformTrace 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31 

32@dataclass 

33class ClockRecoveryResult: 

34 """Result of clock recovery analysis. 

35 

36 Attributes: 

37 frequency: Recovered clock frequency in Hz. 

38 period: Recovered clock period in seconds. 

39 method: Method used for recovery ("fft" or "edge"). 

40 confidence: Confidence score (0.0 to 1.0). 

41 jitter_rms: RMS jitter in seconds (edge method only). 

42 jitter_pp: Peak-to-peak jitter in seconds (edge method only). 

43 """ 

44 

45 frequency: float 

46 period: float 

47 method: str 

48 confidence: float 

49 jitter_rms: float | None = None 

50 jitter_pp: float | None = None 

51 

52 

53@dataclass 

54class TimingViolation: 

55 """Represents a timing violation. 

56 

57 Attributes: 

58 timestamp: Time of violation in seconds. 

59 violation_type: Type of violation ("setup" or "hold"). 

60 measured: Measured time in seconds. 

61 required: Required time (specification) in seconds. 

62 margin: Margin to specification (negative = violation). 

63 """ 

64 

65 timestamp: float 

66 violation_type: str 

67 measured: float 

68 required: float 

69 margin: float 

70 

71 

72@dataclass 

73class RMSJitterResult: 

74 """Result of RMS jitter measurement. 

75 

76 Attributes: 

77 rms: RMS jitter in seconds. 

78 mean: Mean period in seconds. 

79 samples: Number of edges used. 

80 uncertainty: Measurement uncertainty (1-sigma) in seconds. 

81 edge_type: Type of edges used. 

82 

83 References: 

84 IEEE 2414-2020 Section 5.1 

85 TIM-007 

86 """ 

87 

88 rms: float 

89 mean: float 

90 samples: int 

91 uncertainty: float 

92 edge_type: str 

93 

94 

95def propagation_delay( 

96 input_trace: WaveformTrace | DigitalTrace, 

97 output_trace: WaveformTrace | DigitalTrace, 

98 *, 

99 ref_level: float = 0.5, 

100 edge_type: Literal["rising", "falling", "both"] = "rising", 

101 return_all: bool = False, 

102) -> float | NDArray[np.float64]: 

103 """Measure propagation delay between two signals. 

104 

105 Computes the time delay from input edge to corresponding output edge 

106 at the specified reference level per IEEE 181-2011. 

107 

108 Args: 

109 input_trace: Input signal trace. 

110 output_trace: Output signal trace. 

111 ref_level: Reference level as fraction (0.0 to 1.0). Default 0.5 (50%). 

112 edge_type: Type of edges to measure: 

113 - "rising": Low-to-high transitions 

114 - "falling": High-to-low transitions 

115 - "both": All transitions 

116 return_all: If True, return array of all delays. If False, return mean. 

117 

118 Returns: 

119 Propagation delay in seconds (mean if return_all=False), or array of delays. 

120 

121 Raises: 

122 InsufficientDataError: If traces have insufficient edges. 

123 

124 Example: 

125 >>> delay = propagation_delay(input_trace, output_trace) 

126 >>> print(f"Propagation delay: {delay * 1e9:.2f} ns") 

127 

128 References: 

129 IEEE 181-2011 Section 5.6 

130 """ 

131 # Get edge timestamps for both signals 

132 input_edges = _get_edge_timestamps(input_trace, edge_type, ref_level) 

133 output_edges = _get_edge_timestamps(output_trace, edge_type, ref_level) 

134 

135 if len(input_edges) == 0: 

136 raise InsufficientDataError( 

137 "No edges found in input trace", 

138 required=1, 

139 available=0, 

140 analysis_type="propagation_delay", 

141 ) 

142 

143 if len(output_edges) == 0: 

144 raise InsufficientDataError( 

145 "No edges found in output trace", 

146 required=1, 

147 available=0, 

148 analysis_type="propagation_delay", 

149 ) 

150 

151 # Match input edges to nearest subsequent output edges 

152 delays: list[float] = [] 

153 

154 for in_edge in input_edges: 

155 # Find output edges after this input edge 

156 subsequent_outputs = output_edges[output_edges > in_edge] 

157 if len(subsequent_outputs) > 0: 

158 # Use nearest subsequent output edge 

159 delay = subsequent_outputs[0] - in_edge 

160 if delay > 0: 160 ↛ 154line 160 didn't jump to line 154 because the condition on line 160 was always true

161 delays.append(delay) 

162 

163 if len(delays) == 0: 

164 if return_all: 

165 return np.array([], dtype=np.float64) 

166 return np.nan 

167 

168 delays_arr = np.array(delays, dtype=np.float64) 

169 

170 if return_all: 

171 return delays_arr 

172 return float(np.mean(delays_arr)) 

173 

174 

175def setup_time( 

176 data_trace: WaveformTrace | DigitalTrace, 

177 clock_trace: WaveformTrace | DigitalTrace, 

178 *, 

179 clock_edge: Literal["rising", "falling"] = "rising", 

180 data_stable_level: float = 0.5, 

181 return_all: bool = False, 

182) -> float | NDArray[np.float64]: 

183 """Measure setup time between data and clock signals. 

184 

185 Computes the time from when data becomes stable to the clock edge 

186 per JEDEC timing standards. 

187 

188 Args: 

189 data_trace: Data signal trace. 

190 clock_trace: Clock signal trace. 

191 clock_edge: Type of clock edge to reference ("rising" or "falling"). 

192 data_stable_level: Reference level for data stability (0.0 to 1.0). 

193 return_all: If True, return array of all setup times. If False, return mean. 

194 

195 Returns: 

196 Setup time in seconds (positive = data stable before clock). 

197 

198 Example: 

199 >>> t_setup = setup_time(data_trace, clock_trace, clock_edge="rising") 

200 >>> print(f"Setup time: {t_setup * 1e9:.2f} ns") 

201 

202 References: 

203 JEDEC Standard No. 65B 

204 """ 

205 # Get clock edges 

206 clock_edges = _get_edge_timestamps(clock_trace, clock_edge, 0.5) 

207 

208 if len(clock_edges) == 0: 

209 if return_all: 

210 return np.array([], dtype=np.float64) 

211 return np.nan 

212 

213 # Get all data edges (both rising and falling) 

214 data_edges = _get_edge_timestamps(data_trace, "both", data_stable_level) 

215 

216 if len(data_edges) == 0: 

217 if return_all: 

218 return np.array([], dtype=np.float64) 

219 return np.nan 

220 

221 # For each clock edge, find the most recent data edge 

222 setup_times: list[float] = [] 

223 

224 for clk_edge in clock_edges: 

225 # Find data edges before this clock edge 

226 prior_data_edges = data_edges[data_edges < clk_edge] 

227 if len(prior_data_edges) > 0: 

228 # Setup time = clock edge - last data edge 

229 setup = clk_edge - prior_data_edges[-1] 

230 setup_times.append(setup) 

231 

232 if len(setup_times) == 0: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 if return_all: 

234 return np.array([], dtype=np.float64) 

235 return np.nan 

236 

237 result = np.array(setup_times, dtype=np.float64) 

238 

239 if return_all: 

240 return result 

241 return float(np.mean(result)) 

242 

243 

244def hold_time( 

245 data_trace: WaveformTrace | DigitalTrace, 

246 clock_trace: WaveformTrace | DigitalTrace, 

247 *, 

248 clock_edge: Literal["rising", "falling"] = "rising", 

249 data_stable_level: float = 0.5, 

250 return_all: bool = False, 

251) -> float | NDArray[np.float64]: 

252 """Measure hold time between clock and data signals. 

253 

254 Computes the time from clock edge to when data changes 

255 per JEDEC timing standards. 

256 

257 Args: 

258 data_trace: Data signal trace. 

259 clock_trace: Clock signal trace. 

260 clock_edge: Type of clock edge to reference ("rising" or "falling"). 

261 data_stable_level: Reference level for data transition (0.0 to 1.0). 

262 return_all: If True, return array of all hold times. If False, return mean. 

263 

264 Returns: 

265 Hold time in seconds (positive = data stable after clock). 

266 

267 Example: 

268 >>> t_hold = hold_time(data_trace, clock_trace, clock_edge="rising") 

269 >>> print(f"Hold time: {t_hold * 1e9:.2f} ns") 

270 

271 References: 

272 JEDEC Standard No. 65B 

273 """ 

274 # Get clock edges 

275 clock_edges = _get_edge_timestamps(clock_trace, clock_edge, 0.5) 

276 

277 if len(clock_edges) == 0: 

278 if return_all: 

279 return np.array([], dtype=np.float64) 

280 return np.nan 

281 

282 # Get all data edges 

283 data_edges = _get_edge_timestamps(data_trace, "both", data_stable_level) 

284 

285 if len(data_edges) == 0: 

286 if return_all: 

287 return np.array([], dtype=np.float64) 

288 return np.nan 

289 

290 # For each clock edge, find the next data edge 

291 hold_times: list[float] = [] 

292 

293 for clk_edge in clock_edges: 

294 # Find data edges after this clock edge 

295 subsequent_data_edges = data_edges[data_edges > clk_edge] 

296 if len(subsequent_data_edges) > 0: 

297 # Hold time = next data edge - clock edge 

298 hold = subsequent_data_edges[0] - clk_edge 

299 hold_times.append(hold) 

300 

301 if len(hold_times) == 0: 

302 if return_all: 302 ↛ 304line 302 didn't jump to line 304 because the condition on line 302 was always true

303 return np.array([], dtype=np.float64) 

304 return np.nan 

305 

306 result = np.array(hold_times, dtype=np.float64) 

307 

308 if return_all: 

309 return result 

310 return float(np.mean(result)) 

311 

312 

313def slew_rate( 

314 trace: WaveformTrace, 

315 *, 

316 ref_levels: tuple[float, float] = (0.2, 0.8), 

317 edge_type: Literal["rising", "falling", "both"] = "rising", 

318 return_all: bool = False, 

319) -> float | NDArray[np.float64]: 

320 """Measure slew rate (dV/dt) during signal transitions. 

321 

322 Computes the rate of voltage change during edge transitions 

323 per IEEE 181-2011. 

324 

325 Args: 

326 trace: Input waveform trace. 

327 ref_levels: Reference levels as fractions (default 20%-80%). 

328 edge_type: Type of edges to measure. 

329 return_all: If True, return array of all slew rates. If False, return mean. 

330 

331 Returns: 

332 Slew rate in V/s (positive for rising, negative for falling). 

333 

334 Example: 

335 >>> sr = slew_rate(trace) 

336 >>> print(f"Slew rate: {sr / 1e6:.2f} V/us") 

337 

338 References: 

339 IEEE 181-2011 Section 5.2 

340 """ 

341 if len(trace.data) < 3: 

342 if return_all: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true

343 return np.array([], dtype=np.float64) 

344 return np.nan 

345 

346 data = trace.data 

347 sample_period = trace.metadata.time_base 

348 

349 # Find signal levels 

350 low, high = _find_levels(data) 

351 amplitude = high - low 

352 

353 if amplitude <= 0: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 if return_all: 

355 return np.array([], dtype=np.float64) 

356 return np.nan 

357 

358 # Calculate reference voltages 

359 v_low = low + ref_levels[0] * amplitude 

360 v_high = low + ref_levels[1] * amplitude 

361 dv = v_high - v_low 

362 

363 slew_rates: list[float] = [] 

364 

365 if edge_type in ("rising", "both"): 

366 # Find rising transitions 

367 rising_start = np.where((data[:-1] < v_low) & (data[1:] >= v_low))[0] 

368 

369 for start_idx in rising_start: 

370 # Find where signal reaches v_high 

371 remaining = data[start_idx:] 

372 above_high = remaining >= v_high 

373 

374 if np.any(above_high): 374 ↛ 369line 374 didn't jump to line 369 because the condition on line 374 was always true

375 end_offset = np.argmax(above_high) 

376 dt = end_offset * sample_period 

377 if dt > 0: 377 ↛ 369line 377 didn't jump to line 369 because the condition on line 377 was always true

378 slew_rates.append(float(dv / dt)) 

379 

380 if edge_type in ("falling", "both"): 

381 # Find falling transitions 

382 falling_start = np.where((data[:-1] > v_high) & (data[1:] <= v_high))[0] 

383 

384 for start_idx in falling_start: 

385 # Find where signal reaches v_low 

386 remaining = data[start_idx:] 

387 below_low = remaining <= v_low 

388 

389 if np.any(below_low): 389 ↛ 384line 389 didn't jump to line 384 because the condition on line 389 was always true

390 end_offset = np.argmax(below_low) 

391 dt = end_offset * sample_period 

392 if dt > 0: 392 ↛ 384line 392 didn't jump to line 384 because the condition on line 392 was always true

393 slew_rates.append(float(-dv / dt)) # Negative for falling 

394 

395 if len(slew_rates) == 0: 

396 if return_all: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 return np.array([], dtype=np.float64) 

398 return np.nan 

399 

400 result = np.array(slew_rates, dtype=np.float64) 

401 

402 if return_all: 

403 return result 

404 return float(np.mean(result)) 

405 

406 

407def phase( 

408 trace1: WaveformTrace, 

409 trace2: WaveformTrace, 

410 *, 

411 method: Literal["edge", "fft"] = "edge", 

412 unit: Literal["degrees", "radians"] = "degrees", 

413) -> float: 

414 """Measure phase difference between two signals. 

415 

416 Computes the phase relationship between two waveforms using 

417 either edge-based or FFT-based methods. 

418 

419 Args: 

420 trace1: Reference signal trace. 

421 trace2: Signal to measure phase relative to reference. 

422 method: Measurement method: 

423 - "edge": Edge-to-edge timing (default, more accurate for digital) 

424 - "fft": Cross-spectral phase (better for analog/noisy signals) 

425 unit: Output unit ("degrees" or "radians"). 

426 

427 Returns: 

428 Phase difference in specified units. Positive = trace2 leads trace1. 

429 

430 Raises: 

431 ValueError: If method is not recognized. 

432 

433 Example: 

434 >>> phase_deg = phase(ref_trace, sig_trace) 

435 >>> print(f"Phase: {phase_deg:.1f} degrees") 

436 

437 References: 

438 IEEE 181-2011 Section 5.8 

439 """ 

440 if method == "edge": 

441 return _phase_edge(trace1, trace2, unit) 

442 elif method == "fft": 

443 return _phase_fft(trace1, trace2, unit) 

444 else: 

445 raise ValueError(f"Unknown method: {method}") 

446 

447 

448def _phase_edge( 

449 trace1: WaveformTrace, 

450 trace2: WaveformTrace, 

451 unit: Literal["degrees", "radians"], 

452) -> float: 

453 """Compute phase using edge timing.""" 

454 # Get rising edges for both signals 

455 edges1 = _get_edge_timestamps(trace1, "rising", 0.5) 

456 edges2 = _get_edge_timestamps(trace2, "rising", 0.5) 

457 

458 if len(edges1) < 2 or len(edges2) < 2: 

459 return np.nan # type: ignore[no-any-return] 

460 

461 # Calculate period from first signal 

462 period1 = np.mean(np.diff(edges1)) 

463 

464 if period1 <= 0: 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true

465 return np.nan # type: ignore[no-any-return] 

466 

467 # Calculate phase from edge differences 

468 phase_times: list[float] = [] 

469 

470 for e1 in edges1: 

471 # Find nearest edge in trace2 

472 diffs = edges2 - e1 

473 # Find closest edge (could be before or after) 

474 idx = np.argmin(np.abs(diffs)) 

475 phase_times.append(diffs[idx]) 

476 

477 if len(phase_times) == 0: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 return np.nan # type: ignore[no-any-return] 

479 

480 mean_phase_time = np.mean(phase_times) 

481 

482 # Convert to phase angle 

483 phase_rad = 2 * np.pi * mean_phase_time / period1 

484 

485 # Normalize to [-pi, pi] 

486 phase_rad = (phase_rad + np.pi) % (2 * np.pi) - np.pi 

487 

488 if unit == "degrees": 

489 return float(np.degrees(phase_rad)) 

490 return float(phase_rad) 

491 

492 

493def _phase_fft( 

494 trace1: WaveformTrace, 

495 trace2: WaveformTrace, 

496 unit: Literal["degrees", "radians"], 

497) -> float: 

498 """Compute phase using FFT cross-spectral analysis.""" 

499 data1 = trace1.data - np.mean(trace1.data) 

500 data2 = trace2.data - np.mean(trace2.data) 

501 

502 # Ensure same length 

503 n = min(len(data1), len(data2)) 

504 data1 = data1[:n] 

505 data2 = data2[:n] 

506 

507 if n < 16: 

508 return np.nan # type: ignore[no-any-return] 

509 

510 # Compute FFTs 

511 fft1 = np.fft.rfft(data1) 

512 fft2 = np.fft.rfft(data2) 

513 

514 # Cross-spectrum 

515 cross = fft2 * np.conj(fft1) 

516 

517 # Find fundamental frequency (strongest component after DC) 

518 magnitudes = np.abs(cross) 

519 fund_idx = np.argmax(magnitudes[1:]) + 1 

520 

521 # Phase at fundamental 

522 phase_rad = np.angle(cross[fund_idx]) 

523 

524 if unit == "degrees": 524 ↛ 526line 524 didn't jump to line 526 because the condition on line 524 was always true

525 return float(np.degrees(phase_rad)) 

526 return float(phase_rad) 

527 

528 

529def skew( 

530 traces: list[WaveformTrace | DigitalTrace], 

531 *, 

532 reference_idx: int = 0, 

533 edge_type: Literal["rising", "falling"] = "rising", 

534) -> dict[str, float | NDArray[np.float64]]: 

535 """Measure timing skew between multiple signals. 

536 

537 Computes the timing offset of each signal relative to a reference 

538 per IEEE 181-2011. 

539 

540 Args: 

541 traces: List of signal traces to compare. 

542 reference_idx: Index of reference signal (default 0). 

543 edge_type: Type of edges to use for comparison. 

544 

545 Returns: 

546 Dictionary with skew statistics: 

547 - skew_values: Array of skew for each non-reference trace 

548 - min: Minimum skew 

549 - max: Maximum skew 

550 - mean: Mean skew 

551 - range: Max - min (total skew spread) 

552 

553 Raises: 

554 ValueError: If fewer than 2 traces or reference_idx out of range. 

555 

556 Example: 

557 >>> result = skew([clk1, clk2, clk3]) 

558 >>> print(f"Max skew: {result['max'] * 1e12:.0f} ps") 

559 

560 References: 

561 IEEE 181-2011 Section 5.7 

562 """ 

563 if len(traces) < 2: 

564 raise ValueError("Need at least 2 traces for skew measurement") 

565 

566 if reference_idx >= len(traces): 

567 raise ValueError(f"reference_idx {reference_idx} out of range") 

568 

569 # Get reference edges 

570 ref_trace = traces[reference_idx] 

571 ref_edges = _get_edge_timestamps(ref_trace, edge_type, 0.5) 

572 

573 if len(ref_edges) == 0: 

574 return { 

575 "skew_values": np.array([], dtype=np.float64), 

576 "min": float(np.nan), 

577 "max": float(np.nan), 

578 "mean": float(np.nan), 

579 "range": float(np.nan), 

580 } 

581 

582 # Compute skew for all traces (including reference which has 0 skew) 

583 all_skews: list[float] = [] 

584 skew_values: list[float] = [] 

585 

586 for i, trace in enumerate(traces): 

587 if i == reference_idx: 

588 # Reference has zero skew by definition 

589 all_skews.append(0.0) 

590 continue 

591 

592 trace_edges = _get_edge_timestamps(trace, edge_type, 0.5) 

593 

594 if len(trace_edges) == 0: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 skew_val = np.nan 

596 else: 

597 # Match edges and compute skew 

598 edge_skews = [] 

599 for ref_edge in ref_edges: 

600 # Find nearest edge in this trace 

601 diffs = np.abs(trace_edges - ref_edge) 

602 nearest_idx = np.argmin(diffs) 

603 skew_val_edge = trace_edges[nearest_idx] - ref_edge 

604 edge_skews.append(skew_val_edge) 

605 

606 skew_val = float(np.mean(edge_skews)) if len(edge_skews) > 0 else np.nan 

607 

608 skew_values.append(skew_val) 

609 all_skews.append(skew_val) 

610 

611 skew_arr = np.array(skew_values, dtype=np.float64) 

612 all_skews_arr = np.array(all_skews, dtype=np.float64) 

613 valid_all_skews = all_skews_arr[~np.isnan(all_skews_arr)] 

614 

615 if len(valid_all_skews) == 0: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true

616 return { 

617 "skew_values": skew_arr, 

618 "min": np.nan, 

619 "max": np.nan, 

620 "mean": np.nan, 

621 "range": np.nan, 

622 } 

623 

624 # Compute statistics across ALL traces (including reference) 

625 return { 

626 "skew_values": skew_arr, 

627 "min": float(np.min(valid_all_skews)), 

628 "max": float(np.max(valid_all_skews)), 

629 "mean": float(np.mean(valid_all_skews)), 

630 "range": float(np.max(valid_all_skews) - np.min(valid_all_skews)), 

631 } 

632 

633 

634def recover_clock_fft( 

635 trace: WaveformTrace | DigitalTrace, 

636 *, 

637 min_freq: float | None = None, 

638 max_freq: float | None = None, 

639) -> ClockRecoveryResult: 

640 """Recover clock frequency using FFT peak detection. 

641 

642 Detects the dominant frequency component in the signal using 

643 FFT analysis, suitable for periodic digital signals. 

644 

645 Args: 

646 trace: Input trace (analog or digital). 

647 min_freq: Minimum frequency to consider (Hz). Default: sample_rate/1000. 

648 max_freq: Maximum frequency to consider (Hz). Default: sample_rate/2. 

649 

650 Returns: 

651 ClockRecoveryResult with recovered frequency and confidence. 

652 

653 Raises: 

654 InsufficientDataError: If trace has fewer than 16 samples. 

655 

656 Example: 

657 >>> result = recover_clock_fft(trace) 

658 >>> print(f"Clock: {result.frequency / 1e6:.3f} MHz") 

659 

660 References: 

661 IEEE 1241-2010 Section 4.1 

662 """ 

663 data = trace.data.astype(np.float64) if isinstance(trace, DigitalTrace) else trace.data 

664 

665 n = len(data) 

666 sample_rate = trace.metadata.sample_rate 

667 

668 if n < 16: 

669 raise InsufficientDataError( 

670 "FFT clock recovery requires at least 16 samples", 

671 required=16, 

672 available=n, 

673 analysis_type="clock_recovery_fft", 

674 ) 

675 

676 # Set frequency range defaults 

677 if min_freq is None: 

678 min_freq = sample_rate / 1000 

679 if max_freq is None: 

680 max_freq = sample_rate / 2 

681 

682 # Remove DC and compute FFT 

683 data_centered = data - np.mean(data) 

684 nfft = int(2 ** np.ceil(np.log2(n))) 

685 spectrum = np.fft.rfft(data_centered, n=nfft) 

686 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate) 

687 magnitude = np.abs(spectrum) 

688 

689 # Apply frequency range mask 

690 mask = (freq >= min_freq) & (freq <= max_freq) 

691 valid_indices = np.where(mask)[0] 

692 

693 if len(valid_indices) == 0: 

694 return ClockRecoveryResult( 

695 frequency=np.nan, 

696 period=np.nan, 

697 method="fft", 

698 confidence=0.0, 

699 ) 

700 

701 # Find peak in valid range 

702 local_peak_idx = np.argmax(magnitude[valid_indices]) 

703 peak_idx = valid_indices[local_peak_idx] 

704 peak_freq = freq[peak_idx] 

705 peak_mag = magnitude[peak_idx] 

706 

707 # Calculate confidence (ratio of peak to RMS of spectrum) 

708 rms_mag = np.sqrt(np.mean(magnitude[valid_indices] ** 2)) 

709 confidence = min(1.0, (peak_mag / rms_mag - 1) / 10) if rms_mag > 0 else 0.0 

710 

711 # Parabolic interpolation for more accurate frequency 

712 if 0 < peak_idx < len(magnitude) - 1: 

713 alpha = magnitude[peak_idx - 1] 

714 beta = magnitude[peak_idx] 

715 gamma = magnitude[peak_idx + 1] 

716 

717 if beta > alpha and beta > gamma: 

718 freq_resolution = sample_rate / nfft 

719 delta = 0.5 * (alpha - gamma) / (alpha - 2 * beta + gamma + 1e-12) 

720 peak_freq = peak_freq + delta * freq_resolution 

721 

722 period = 1.0 / peak_freq if peak_freq > 0 else np.nan 

723 

724 return ClockRecoveryResult( 

725 frequency=float(peak_freq), 

726 period=float(period), 

727 method="fft", 

728 confidence=float(confidence), 

729 ) 

730 

731 

732def recover_clock_edge( 

733 trace: WaveformTrace | DigitalTrace, 

734 *, 

735 edge_type: Literal["rising", "falling"] = "rising", 

736 threshold: float | None = None, 

737) -> ClockRecoveryResult: 

738 """Recover clock frequency from edge timestamps. 

739 

740 Computes clock frequency from edge-to-edge timing, also 

741 providing jitter statistics. 

742 

743 Args: 

744 trace: Input trace (analog or digital). 

745 edge_type: Type of edges to use ("rising" or "falling"). 

746 threshold: Threshold for edge detection (analog traces only). 

747 

748 Returns: 

749 ClockRecoveryResult with frequency and jitter statistics. 

750 

751 Example: 

752 >>> result = recover_clock_edge(trace) 

753 >>> print(f"Clock: {result.frequency / 1e6:.3f} MHz") 

754 >>> print(f"Jitter RMS: {result.jitter_rms * 1e12:.1f} ps") 

755 

756 References: 

757 IEEE 2414-2020 Section 4 

758 """ 

759 # Get edge timestamps 

760 ref_level = 0.5 if threshold is None else threshold 

761 edges = _get_edge_timestamps(trace, edge_type, ref_level) 

762 

763 if len(edges) < 3: 

764 return ClockRecoveryResult( 

765 frequency=np.nan, 

766 period=np.nan, 

767 method="edge", 

768 confidence=0.0, 

769 ) 

770 

771 # Compute periods 

772 periods = np.diff(edges) 

773 

774 if len(periods) == 0: 774 ↛ 775line 774 didn't jump to line 775 because the condition on line 774 was never true

775 return ClockRecoveryResult( 

776 frequency=np.nan, 

777 period=np.nan, 

778 method="edge", 

779 confidence=0.0, 

780 ) 

781 

782 # Calculate statistics 

783 mean_period = float(np.mean(periods)) 

784 std_period = float(np.std(periods)) 

785 frequency = 1.0 / mean_period if mean_period > 0 else np.nan 

786 

787 # Jitter statistics 

788 jitter_rms = std_period 

789 jitter_pp = float(np.max(periods) - np.min(periods)) 

790 

791 # Confidence based on period consistency (low jitter = high confidence) 

792 if mean_period > 0: 792 ↛ 796line 792 didn't jump to line 796 because the condition on line 792 was always true

793 cv = std_period / mean_period # Coefficient of variation 

794 confidence = max(0.0, min(1.0, 1.0 - cv * 10)) 

795 else: 

796 confidence = 0.0 

797 

798 return ClockRecoveryResult( 

799 frequency=float(frequency), 

800 period=mean_period, 

801 method="edge", 

802 confidence=float(confidence), 

803 jitter_rms=jitter_rms, 

804 jitter_pp=jitter_pp, 

805 ) 

806 

807 

808# ============================================================================= 

809# Helper Functions 

810# ============================================================================= 

811 

812 

813def _get_edge_timestamps( 

814 trace: WaveformTrace | DigitalTrace, 

815 edge_type: Literal["rising", "falling", "both"], 

816 ref_level: float = 0.5, 

817) -> NDArray[np.float64]: 

818 """Get edge timestamps from a trace. 

819 

820 Args: 

821 trace: Input trace. 

822 edge_type: Type of edges to find. 

823 ref_level: Reference level for analog traces (0.0 to 1.0). 

824 

825 Returns: 

826 Array of edge timestamps in seconds. 

827 """ 

828 if isinstance(trace, DigitalTrace): 

829 data = trace.data.astype(np.float64) 

830 sample_rate = trace.metadata.sample_rate 

831 else: 

832 data = trace.data 

833 sample_rate = trace.metadata.sample_rate 

834 

835 if len(data) < 2: 

836 return np.array([], dtype=np.float64) 

837 

838 sample_period = 1.0 / sample_rate 

839 

840 # Find threshold level 

841 low, high = _find_levels(data) 

842 threshold = low + ref_level * (high - low) 

843 

844 timestamps: list[float] = [] 

845 

846 if edge_type in ("rising", "both"): 

847 crossings = np.where((data[:-1] < threshold) & (data[1:] >= threshold))[0] 

848 for idx in crossings: 

849 # Linear interpolation 

850 if idx < len(data) - 1: 850 ↛ 848line 850 didn't jump to line 848 because the condition on line 850 was always true

851 v1, v2 = data[idx], data[idx + 1] 

852 if abs(v2 - v1) > 1e-12: 852 ↛ 856line 852 didn't jump to line 856 because the condition on line 852 was always true

853 t_offset = (threshold - v1) / (v2 - v1) * sample_period 

854 t_offset = max(0, min(sample_period, t_offset)) 

855 else: 

856 t_offset = sample_period / 2 

857 timestamps.append(idx * sample_period + t_offset) 

858 

859 if edge_type in ("falling", "both"): 

860 crossings = np.where((data[:-1] >= threshold) & (data[1:] < threshold))[0] 

861 for idx in crossings: 

862 if idx < len(data) - 1: 862 ↛ 861line 862 didn't jump to line 861 because the condition on line 862 was always true

863 v1, v2 = data[idx], data[idx + 1] 

864 if abs(v2 - v1) > 1e-12: 864 ↛ 868line 864 didn't jump to line 868 because the condition on line 864 was always true

865 t_offset = (threshold - v1) / (v2 - v1) * sample_period 

866 t_offset = max(0, min(sample_period, t_offset)) 

867 else: 

868 t_offset = sample_period / 2 

869 timestamps.append(idx * sample_period + t_offset) 

870 

871 timestamps.sort() 

872 return np.array(timestamps, dtype=np.float64) 

873 

874 

875def _find_levels(data: NDArray[np.float64]) -> tuple[float, float]: 

876 """Find low and high levels using histogram method. 

877 

878 Args: 

879 data: Waveform data array. 

880 

881 Returns: 

882 Tuple of (low_level, high_level). 

883 """ 

884 # Use percentiles for robust level detection 

885 p10, p90 = np.percentile(data, [10, 90]) 

886 

887 # Refine using histogram peaks 

888 try: 

889 hist, bin_edges = np.histogram(data, bins=50) 

890 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 

891 

892 # Find peaks in lower and upper halves 

893 mid_idx = len(hist) // 2 

894 low_idx = np.argmax(hist[:mid_idx]) 

895 high_idx = mid_idx + np.argmax(hist[mid_idx:]) 

896 

897 low = bin_centers[low_idx] 

898 high = bin_centers[high_idx] 

899 

900 # Sanity check 

901 if high <= low: 901 ↛ 902line 901 didn't jump to line 902 because the condition on line 901 was never true

902 return float(p10), float(p90) 

903 

904 return float(low), float(high) 

905 except (ValueError, IndexError): 

906 return float(p10), float(p90) 

907 

908 

909def rms_jitter( 

910 trace: WaveformTrace | DigitalTrace, 

911 *, 

912 edge_type: Literal["rising", "falling", "both"] = "rising", 

913 threshold: float = 0.5, 

914) -> RMSJitterResult: 

915 """Measure RMS jitter from edge timing variations. 

916 

917 Computes root-mean-square jitter as the standard deviation of edge 

918 timing variations per IEEE 2414-2020. RMS jitter characterizes the 

919 random component of timing uncertainty. 

920 

921 Args: 

922 trace: Input trace (analog or digital). 

923 edge_type: Type of edges to measure ("rising", "falling", or "both"). 

924 threshold: Threshold for edge detection (0.0 to 1.0). 

925 

926 Returns: 

927 RMSJitterResult containing RMS jitter and statistics. 

928 

929 Example: 

930 >>> result = rms_jitter(clock_trace) 

931 >>> print(f"RMS jitter: {result.rms * 1e12:.2f} ps") 

932 >>> print(f"Uncertainty: +/- {result.uncertainty * 1e12:.2f} ps") 

933 

934 References: 

935 IEEE 2414-2020 Section 5.1 

936 TIM-007 

937 """ 

938 # Get edge timestamps 

939 edges = _get_edge_timestamps(trace, edge_type, threshold) 

940 

941 if len(edges) < 3: 

942 return RMSJitterResult( 

943 rms=np.nan, 

944 mean=np.nan, 

945 samples=0, 

946 uncertainty=np.nan, 

947 edge_type=edge_type, 

948 ) 

949 

950 # Calculate periods 

951 periods = np.diff(edges) 

952 

953 if len(periods) < 2: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 return RMSJitterResult( 

955 rms=np.nan, 

956 mean=np.nan, 

957 samples=len(edges), 

958 uncertainty=np.nan, 

959 edge_type=edge_type, 

960 ) 

961 

962 # RMS jitter is the standard deviation of periods 

963 mean_period = float(np.mean(periods)) 

964 jitter_rms = float(np.std(periods, ddof=1)) 

965 

966 # Measurement uncertainty (1-sigma) 

967 # For N samples, uncertainty of std estimate is std / sqrt(2*(N-1)) 

968 n = len(periods) 

969 uncertainty = jitter_rms / np.sqrt(2 * (n - 1)) if n > 1 else np.nan 

970 

971 return RMSJitterResult( 

972 rms=jitter_rms, 

973 mean=mean_period, 

974 samples=n, 

975 uncertainty=uncertainty, 

976 edge_type=edge_type, 

977 ) 

978 

979 

980def peak_to_peak_jitter( 

981 trace: WaveformTrace | DigitalTrace, 

982 *, 

983 edge_type: Literal["rising", "falling", "both"] = "rising", 

984 threshold: float = 0.5, 

985) -> float: 

986 """Measure peak-to-peak jitter from edge timing variations. 

987 

988 Pk-Pk jitter is the maximum range of edge timing deviations from 

989 the ideal periodic timing, measured over the observation window. 

990 

991 Args: 

992 trace: Input trace (analog or digital). 

993 edge_type: Type of edges to measure ("rising", "falling", or "both"). 

994 threshold: Threshold for edge detection (0.0 to 1.0). 

995 

996 Returns: 

997 Peak-to-peak jitter in seconds. 

998 

999 Example: 

1000 >>> jitter_pp = peak_to_peak_jitter(clock_trace) 

1001 >>> print(f"Pk-Pk jitter: {jitter_pp * 1e12:.2f} ps") 

1002 

1003 References: 

1004 IEEE 2414-2020 Section 5.2 

1005 TIM-008 

1006 """ 

1007 # Get edge timestamps 

1008 edges = _get_edge_timestamps(trace, edge_type, threshold) 

1009 

1010 if len(edges) < 3: 

1011 return np.nan # type: ignore[no-any-return] 

1012 

1013 # Calculate periods 

1014 periods = np.diff(edges) 

1015 

1016 if len(periods) < 2: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true

1017 return np.nan # type: ignore[no-any-return] 

1018 

1019 # Pk-Pk jitter is the range of period variations 

1020 jitter_pp = float(np.max(periods) - np.min(periods)) 

1021 

1022 return jitter_pp 

1023 

1024 

1025def time_interval_error( 

1026 trace: WaveformTrace | DigitalTrace, 

1027 *, 

1028 edge_type: Literal["rising", "falling"] = "rising", 

1029 nominal_period: float | None = None, 

1030 threshold: float = 0.5, 

1031) -> NDArray[np.float64]: 

1032 """Measure Time Interval Error (TIE) from clock signal. 

1033 

1034 TIE is the deviation of each edge from its ideal position based on 

1035 the recovered clock period. Provides a time series of jitter values 

1036 for trend analysis and decomposition. 

1037 

1038 Args: 

1039 trace: Input trace (analog or digital). 

1040 edge_type: Type of edges to measure ("rising" or "falling"). 

1041 nominal_period: Expected period in seconds. If None, computed from data. 

1042 threshold: Threshold for edge detection (0.0 to 1.0). 

1043 

1044 Returns: 

1045 Array of TIE values in seconds, one per edge. 

1046 

1047 Raises: 

1048 InsufficientDataError: If trace has fewer than 3 edges. 

1049 

1050 Example: 

1051 >>> tie = time_interval_error(clock_trace) 

1052 >>> plt.plot(tie * 1e12) 

1053 >>> plt.ylabel("TIE (ps)") 

1054 >>> plt.xlabel("Edge number") 

1055 

1056 References: 

1057 IEEE 2414-2020 Section 5.1 

1058 TIM-009 

1059 """ 

1060 # Get edge timestamps 

1061 edges = _get_edge_timestamps(trace, edge_type, threshold) 

1062 

1063 if len(edges) < 3: 

1064 raise InsufficientDataError( 

1065 "TIE measurement requires at least 3 edges", 

1066 required=3, 

1067 available=len(edges), 

1068 analysis_type="time_interval_error", 

1069 ) 

1070 

1071 # Calculate actual periods 

1072 periods = np.diff(edges) 

1073 

1074 # Use mean period if nominal not provided 

1075 if nominal_period is None: 

1076 nominal_period = np.mean(periods) 

1077 

1078 # Calculate ideal edge positions 

1079 n_edges = len(edges) 

1080 start_time = edges[0] 

1081 ideal_positions = start_time + np.arange(n_edges) * nominal_period 

1082 

1083 # TIE is actual - ideal 

1084 tie: NDArray[np.float64] = edges - ideal_positions 

1085 

1086 return tie 

1087 

1088 

1089__all__ = [ 

1090 "ClockRecoveryResult", 

1091 "RMSJitterResult", 

1092 "TimingViolation", 

1093 "hold_time", 

1094 "peak_to_peak_jitter", 

1095 "phase", 

1096 "propagation_delay", 

1097 "recover_clock_edge", 

1098 "recover_clock_fft", 

1099 "rms_jitter", 

1100 "setup_time", 

1101 "skew", 

1102 "slew_rate", 

1103 "time_interval_error", 

1104]