Coverage for src / tracekit / analyzers / digital / clock.py: 92%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced clock recovery for digital signals. 

2 

3This module provides comprehensive clock recovery and analysis tools for digital 

4signals, including frequency detection, clock reconstruction, baud rate detection, 

5and jitter measurement. 

6 

7 

8Example: 

9 >>> from tracekit.analyzers.digital.clock import detect_clock_frequency, recover_clock 

10 >>> freq = detect_clock_frequency(data_trace, sample_rate=1e9) 

11 >>> print(f"Detected clock: {freq/1e6:.2f} MHz") 

12 >>> clock = recover_clock(data_trace, sample_rate=1e9, method='edge') 

13 >>> metrics = measure_clock_jitter(clock, sample_rate=1e9) 

14 

15References: 

16 Gardner, F.M.: "Phaselock Techniques" (3rd Ed), Wiley, 2005 

17 Lee, E.A. & Messerschmitt, D.G.: "Digital Communication" (2nd Ed), 1994 

18 IEEE 1241-2010: Standard for Terminology and Test Methods for ADCs 

19""" 

20 

21from __future__ import annotations 

22 

23from dataclasses import dataclass 

24from typing import TYPE_CHECKING, Any, ClassVar, Literal 

25 

26import numpy as np 

27from scipy import signal 

28 

29from tracekit.core.exceptions import InsufficientDataError, ValidationError 

30 

31if TYPE_CHECKING: 

32 from numpy.typing import NDArray 

33 

34 

35@dataclass 

36class ClockMetrics: 

37 """Clock signal quality metrics. 

38 

39 

40 

41 Attributes: 

42 frequency: Detected frequency in Hz. 

43 period_samples: Period in samples. 

44 period_seconds: Period in seconds. 

45 jitter_rms: RMS jitter in seconds. 

46 jitter_pp: Peak-to-peak jitter in seconds. 

47 duty_cycle: Duty cycle (0.0 to 1.0). 

48 stability: Stability score (0.0 to 1.0). 

49 confidence: Detection confidence (0.0 to 1.0). 

50 """ 

51 

52 frequency: float 

53 period_samples: float 

54 period_seconds: float 

55 jitter_rms: float 

56 jitter_pp: float 

57 duty_cycle: float 

58 stability: float 

59 confidence: float 

60 

61 

62@dataclass 

63class BaudRateResult: 

64 """Result of baud rate detection. 

65 

66 

67 

68 Attributes: 

69 baud_rate: Detected baud rate in bits per second. 

70 bit_period_samples: Bit period in samples. 

71 confidence: Detection confidence (0.0 to 1.0). 

72 method: Method used for detection. 

73 """ 

74 

75 baud_rate: int 

76 bit_period_samples: float 

77 confidence: float 

78 method: str 

79 

80 

81class ClockRecovery: 

82 """Recover clock signal from data. 

83 

84 

85 

86 This class provides multiple methods for clock recovery including edge-based, 

87 FFT-based, and autocorrelation-based detection, as well as PLL tracking and 

88 baud rate detection for asynchronous protocols. 

89 

90 Can be initialized with or without sample_rate: 

91 - With sample_rate: ClockRecovery(sample_rate=1e9) 

92 - Without: ClockRecovery() - sample_rate extracted from trace metadata 

93 """ 

94 

95 # Standard baud rates for async protocols 

96 STANDARD_BAUD_RATES: ClassVar[list[int]] = [ 

97 300, 

98 600, 

99 1200, 

100 2400, 

101 4800, 

102 9600, 

103 14400, 

104 19200, 

105 28800, 

106 38400, 

107 57600, 

108 115200, 

109 230400, 

110 460800, 

111 921600, 

112 1000000, 

113 2000000, 

114 ] 

115 

116 def __init__(self, sample_rate: float | None = None): 

117 """Initialize with optional sample rate. 

118 

119 Args: 

120 sample_rate: Sample rate in Hz. If None, will be extracted from trace metadata. 

121 

122 Raises: 

123 ValidationError: If sample rate is provided and invalid. 

124 """ 

125 if sample_rate is not None and sample_rate <= 0: 

126 raise ValidationError(f"Sample rate must be positive, got {sample_rate}") 

127 

128 self.sample_rate: float | None = float(sample_rate) if sample_rate is not None else None 

129 

130 def _get_sample_rate(self, trace: Any) -> float: 

131 """Extract sample rate from trace or use stored value. 

132 

133 Args: 

134 trace: A DigitalTrace/WaveformTrace with metadata, or a numpy array. 

135 

136 Returns: 

137 Sample rate in Hz. 

138 

139 Raises: 

140 ValidationError: If sample rate cannot be determined. 

141 """ 

142 if self.sample_rate is not None: 

143 return self.sample_rate 

144 

145 # Try to extract from trace metadata 

146 if hasattr(trace, "metadata") and hasattr(trace.metadata, "sample_rate"): 

147 return float(trace.metadata.sample_rate) 

148 

149 raise ValidationError( 

150 "Sample rate not set and cannot be extracted from trace. " 

151 "Either provide sample_rate to constructor or use a trace with metadata." 

152 ) 

153 

154 def _get_trace_data(self, trace: Any) -> NDArray[np.float64]: 

155 """Extract numpy array from trace object. 

156 

157 Args: 

158 trace: A DigitalTrace/WaveformTrace or numpy array. 

159 

160 Returns: 

161 Numpy array of signal data. 

162 """ 

163 if hasattr(trace, "data"): 163 ↛ 165line 163 didn't jump to line 165 because the condition on line 163 was always true

164 return np.asarray(trace.data, dtype=np.float64) 

165 return np.asarray(trace, dtype=np.float64) 

166 

167 def detect_frequency( 

168 self, trace: Any, method: Literal["edge", "fft", "autocorr"] = "edge" 

169 ) -> float: 

170 """Detect clock frequency from signal (supports DigitalTrace). 

171 

172 

173 

174 This method supports both raw numpy arrays and DigitalTrace objects. 

175 Sample rate is extracted from trace metadata if not set in constructor. 

176 

177 Args: 

178 trace: Signal trace data (DigitalTrace or numpy array). 

179 method: Detection method to use. 

180 

181 Returns: 

182 Detected frequency in Hz. 

183 

184 Example: 

185 >>> recovery = ClockRecovery() 

186 >>> freq = recovery.detect_frequency(digital_trace) 

187 """ 

188 sample_rate = self._get_sample_rate(trace) 

189 data = self._get_trace_data(trace) 

190 

191 # Temporarily set sample rate for internal methods 

192 old_rate = self.sample_rate 

193 self.sample_rate = sample_rate 

194 

195 try: 

196 return self.detect_clock_frequency(data, method) 

197 finally: 

198 self.sample_rate = old_rate 

199 

200 def detect_clock_frequency( 

201 self, trace: NDArray[np.float64], method: Literal["edge", "fft", "autocorr"] = "edge" 

202 ) -> float: 

203 """Detect clock frequency from signal. 

204 

205 

206 

207 Detects the dominant clock frequency using the specified method. 

208 Each method has different strengths: 

209 - edge: Best for clean digital signals with clear transitions 

210 - fft: Best for noisy signals or periodic analog waveforms 

211 - autocorr: Best for periodic patterns with timing jitter 

212 

213 Args: 

214 trace: Signal trace data. 

215 method: Detection method to use. 

216 

217 Returns: 

218 Detected frequency in Hz. 

219 

220 Raises: 

221 InsufficientDataError: If trace is too short. 

222 ValidationError: If method is invalid or detection fails. 

223 """ 

224 if len(trace) < 10: 

225 raise InsufficientDataError("Trace must have at least 10 samples") 

226 

227 if self.sample_rate is None: 

228 raise ValidationError( 

229 "Sample rate not set. Use detect_frequency() with trace metadata." 

230 ) 

231 

232 if method == "edge": 

233 return self._detect_frequency_edge(trace) 

234 elif method == "fft": 

235 return self._detect_frequency_fft(trace) 

236 elif method == "autocorr": 

237 return self._detect_frequency_autocorr(trace) 

238 else: 

239 raise ValidationError(f"Unknown method: {method}") 

240 

241 def recover_clock( 

242 self, data_trace: NDArray[np.float64], method: Literal["edge", "pll", "fft"] = "edge" 

243 ) -> NDArray[np.float64]: 

244 """Recover clock signal from data. 

245 

246 

247 

248 Reconstructs a clock signal from the data trace. The recovered clock 

249 is a square wave aligned to the detected clock transitions. 

250 

251 Args: 

252 data_trace: Data signal trace. 

253 method: Recovery method to use. 

254 

255 Returns: 

256 Recovered clock trace (same length as input). 

257 

258 Raises: 

259 InsufficientDataError: If trace is too short. 

260 ValidationError: If method is invalid or recovery fails. 

261 """ 

262 if len(data_trace) < 10: 

263 raise InsufficientDataError("Trace must have at least 10 samples") 

264 

265 if self.sample_rate is None: 

266 raise ValidationError("Sample rate not set") 

267 

268 # Detect clock frequency first 

269 freq = self.detect_clock_frequency(data_trace, method=method if method != "pll" else "edge") 

270 

271 if freq <= 0: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 raise ValidationError("Failed to detect valid clock frequency") 

273 

274 if method == "pll": 

275 # Use PLL tracking for robust recovery 

276 return self._pll_track(data_trace, freq) 

277 else: 

278 # Generate ideal square wave at detected frequency 

279 _period_samples = self.sample_rate / freq 

280 n_samples = len(data_trace) 

281 t = np.arange(n_samples) 

282 

283 # Generate square wave (50% duty cycle) 

284 clock_raw = signal.square(2 * np.pi * freq * t / self.sample_rate) 

285 

286 # Normalize to 0-1 range 

287 clock = (clock_raw + 1.0) / 2.0 

288 

289 return np.asarray(clock, dtype=np.float64) 

290 

291 def detect_baud_rate( 

292 self, trace: NDArray[np.float64], candidates: list[int] | None = None 

293 ) -> BaudRateResult: 

294 """Auto-detect baud rate for async protocols. 

295 

296 

297 

298 Detects the baud rate by analyzing bit timing. Works best with traces 

299 containing start bits or transitions between different bit values. 

300 

301 Args: 

302 trace: Signal trace data. 

303 candidates: List of candidate baud rates to test. If None, uses 

304 standard rates. 

305 

306 Returns: 

307 BaudRateResult with detected baud rate and confidence. 

308 

309 Raises: 

310 InsufficientDataError: If trace is too short or not enough edges found. 

311 ValidationError: If sample rate is not set. 

312 """ 

313 if len(trace) < 100: 

314 raise InsufficientDataError("Need at least 100 samples for baud rate detection") 

315 

316 if self.sample_rate is None: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise ValidationError("Sample rate not set") 

318 

319 if candidates is None: 

320 candidates = self.STANDARD_BAUD_RATES 

321 

322 # Detect edges to find bit transitions 

323 edges = self._detect_edges_simple(trace) 

324 

325 if len(edges) < 3: 

326 raise InsufficientDataError("Not enough edges to detect baud rate") 

327 

328 # Calculate inter-edge intervals 

329 intervals = np.diff(edges) 

330 

331 # The minimum interval should be close to one bit period 

332 # (assuming we have at least some single-bit pulses) 

333 # Use histogram to find most common interval 

334 hist, bin_edges = np.histogram(intervals, bins=50) 

335 most_common_interval = bin_edges[np.argmax(hist)] 

336 

337 # Convert to frequency 

338 detected_freq = self.sample_rate / most_common_interval 

339 

340 # Find closest standard baud rate 

341 candidates_array = np.array(candidates) 

342 errors = np.abs(candidates_array - detected_freq) 

343 best_idx = np.argmin(errors) 

344 best_baud = candidates_array[best_idx] 

345 

346 # Calculate confidence based on how close we are to standard rate 

347 relative_error = errors[best_idx] / best_baud 

348 confidence = max(0.0, 1.0 - relative_error * 10) 

349 

350 bit_period_samples = self.sample_rate / best_baud 

351 

352 return BaudRateResult( 

353 baud_rate=int(best_baud), 

354 bit_period_samples=float(bit_period_samples), 

355 confidence=float(confidence), 

356 method="edge_histogram", 

357 ) 

358 

359 def measure_clock_jitter(self, clock_trace: NDArray[np.float64]) -> ClockMetrics: 

360 """Measure clock jitter and quality metrics. 

361 

362 

363 

364 Analyzes a clock signal to measure jitter, duty cycle, and stability. 

365 Works best with recovered or measured clock signals. 

366 

367 Args: 

368 clock_trace: Clock signal trace. 

369 

370 Returns: 

371 ClockMetrics with comprehensive quality measurements. 

372 

373 Raises: 

374 InsufficientDataError: If trace is too short or has too few edges. 

375 ValidationError: If sample rate is not set. 

376 """ 

377 if len(clock_trace) < 10: 

378 raise InsufficientDataError("Trace must have at least 10 samples") 

379 

380 if self.sample_rate is None: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 raise ValidationError("Sample rate not set") 

382 

383 # Detect rising and falling edges 

384 rising_edges = self._detect_edges_by_type(clock_trace, "rising") 

385 falling_edges = self._detect_edges_by_type(clock_trace, "falling") 

386 

387 if len(rising_edges) < 3: 

388 raise InsufficientDataError("Need at least 3 rising edges for jitter measurement") 

389 

390 # Calculate periods from rising edge to rising edge 

391 periods = np.diff(rising_edges) 

392 

393 if len(periods) == 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 raise InsufficientDataError("Cannot calculate period from single edge") 

395 

396 # Mean period 

397 mean_period_samples = np.mean(periods) 

398 mean_period_seconds = mean_period_samples / self.sample_rate 

399 frequency = 1.0 / mean_period_seconds 

400 

401 # RMS jitter (standard deviation of periods) 

402 jitter_rms_samples = np.std(periods) 

403 jitter_rms = jitter_rms_samples / self.sample_rate 

404 

405 # Peak-to-peak jitter 

406 jitter_pp_samples = np.ptp(periods) 

407 jitter_pp = jitter_pp_samples / self.sample_rate 

408 

409 # Duty cycle (high time / period) 

410 if len(falling_edges) >= len(rising_edges): 

411 # Can measure duty cycle 

412 high_times = [] 

413 for _i, rise in enumerate(rising_edges): 

414 # Find next falling edge 

415 fall_idx = np.searchsorted(falling_edges, rise) 

416 if fall_idx < len(falling_edges): 416 ↛ 413line 416 didn't jump to line 413 because the condition on line 416 was always true

417 high_time = falling_edges[fall_idx] - rise 

418 high_times.append(high_time) 

419 

420 if high_times: 420 ↛ 424line 420 didn't jump to line 424 because the condition on line 420 was always true

421 mean_high_time = np.mean(high_times) 

422 duty_cycle = mean_high_time / mean_period_samples 

423 else: 

424 duty_cycle = 0.5 # Assume 50% if cannot measure 

425 else: 

426 duty_cycle = 0.5 

427 

428 # Stability score (inverse of relative jitter) 

429 relative_jitter = ( 

430 jitter_rms_samples / mean_period_samples if mean_period_samples > 0 else 1.0 

431 ) 

432 stability = max(0.0, 1.0 - relative_jitter * 10) 

433 

434 # Confidence based on number of periods and stability 

435 confidence = min(1.0, len(periods) / 100.0) * stability 

436 

437 return ClockMetrics( 

438 frequency=float(frequency), 

439 period_samples=float(mean_period_samples), 

440 period_seconds=float(mean_period_seconds), 

441 jitter_rms=float(jitter_rms), 

442 jitter_pp=float(jitter_pp), 

443 duty_cycle=float(np.clip(duty_cycle, 0.0, 1.0)), 

444 stability=float(stability), 

445 confidence=float(confidence), 

446 ) 

447 

448 def _detect_frequency_edge(self, trace: NDArray[np.float64]) -> float: 

449 """Detect frequency using edge timing histogram. 

450 

451 

452 

453 Args: 

454 trace: Signal trace. 

455 

456 Returns: 

457 Detected frequency in Hz. 

458 

459 Raises: 

460 ValidationError: If not enough edges found to detect frequency. 

461 """ 

462 edges = self._detect_edges_simple(trace) 

463 

464 if len(edges) < 3: 

465 raise ValidationError("Not enough edges to detect frequency") 

466 

467 # Calculate inter-edge intervals 

468 intervals = np.diff(edges) 

469 

470 # Build histogram of intervals 

471 # The peak should correspond to half the period (edge to edge) 

472 hist, bin_edges = np.histogram(intervals, bins=50) 

473 _peak_interval = bin_edges[np.argmax(hist)] 

474 

475 # Frequency is sample_rate / (2 * interval) for edge-to-edge 

476 # But we need to check if these are half-periods or full periods 

477 # Use median interval as robust estimator 

478 median_interval = np.median(intervals) 

479 

480 # Assume median represents half-period (rising to falling or vice versa) 

481 # So full period is 2x median interval 

482 period_samples = 2 * median_interval 

483 frequency = self.sample_rate / period_samples 

484 

485 return float(frequency) 

486 

487 def _detect_frequency_fft(self, trace: NDArray[np.float64]) -> float: 

488 """Detect frequency using FFT spectral analysis. 

489 

490 

491 

492 Args: 

493 trace: Signal trace. 

494 

495 Returns: 

496 Detected frequency in Hz. 

497 

498 Raises: 

499 ValidationError: If sample rate is not set. 

500 """ 

501 # Remove DC component 

502 trace_ac = trace - np.mean(trace) 

503 

504 # Apply window to reduce spectral leakage 

505 window = signal.windows.hann(len(trace_ac)) 

506 trace_windowed = trace_ac * window 

507 

508 # Compute FFT 

509 fft = np.fft.rfft(trace_windowed) 

510 if self.sample_rate is None: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 raise ValidationError("Sample rate not set") 

512 freqs = np.fft.rfftfreq(len(trace_windowed), 1.0 / self.sample_rate) 

513 

514 # Find peak in magnitude spectrum 

515 magnitude = np.abs(fft) 

516 

517 # Ignore DC and very low frequencies (below 10 Hz) 

518 min_freq_hz = 10.0 

519 min_freq_idx = np.searchsorted(freqs, min_freq_hz) 

520 if min_freq_idx >= len(magnitude): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 min_freq_idx = np.intp(1) 

522 

523 peak_idx = min_freq_idx + np.argmax(magnitude[min_freq_idx:]) 

524 frequency = freqs[peak_idx] 

525 

526 return float(frequency) 

527 

528 def _detect_frequency_autocorr(self, trace: NDArray[np.float64]) -> float: 

529 """Detect frequency using autocorrelation. 

530 

531 

532 

533 Args: 

534 trace: Signal trace. 

535 

536 Returns: 

537 Detected frequency in Hz. 

538 

539 Raises: 

540 ValidationError: If no periodic pattern detected or sample rate not set. 

541 """ 

542 # Remove mean 

543 trace_centered = trace - np.mean(trace) 

544 

545 # Compute autocorrelation 

546 autocorr = signal.correlate(trace_centered, trace_centered, mode="full") 

547 autocorr = autocorr[len(autocorr) // 2 :] # Keep only positive lags 

548 

549 # Normalize 

550 autocorr = autocorr / autocorr[0] 

551 

552 # Find first peak after lag 0 

553 # Look for peaks in autocorrelation 

554 peaks, _ = signal.find_peaks(autocorr, height=0.3) 

555 

556 if len(peaks) == 0: 556 ↛ 557line 556 didn't jump to line 557 because the condition on line 556 was never true

557 raise ValidationError("No periodic pattern detected in autocorrelation") 

558 

559 # First peak corresponds to period 

560 period_samples = peaks[0] 

561 if self.sample_rate is None: 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 raise ValidationError("Sample rate not set") 

563 frequency = self.sample_rate / period_samples 

564 

565 return float(frequency) 

566 

567 def _pll_track( 

568 self, trace: NDArray[np.float64], initial_freq: float, bandwidth: float = 0.01 

569 ) -> NDArray[np.float64]: 

570 """Software PLL for phase tracking. 

571 

572 

573 

574 Implements a simple digital PLL for tracking phase and frequency 

575 variations in the input signal. 

576 

577 Args: 

578 trace: Input data trace. 

579 initial_freq: Initial frequency estimate in Hz. 

580 bandwidth: Loop bandwidth (0.0 to 1.0), lower = more filtering. 

581 

582 Returns: 

583 Recovered clock signal. 

584 

585 Raises: 

586 ValidationError: If sample rate is not set. 

587 """ 

588 n_samples = len(trace) 

589 clock = np.zeros(n_samples) 

590 

591 # PLL state 

592 phase = 0.0 

593 freq = initial_freq 

594 if self.sample_rate is None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 raise ValidationError("Sample rate not set") 

596 omega = 2 * np.pi * freq / self.sample_rate 

597 

598 # Loop filter gains (proportional + integral) 

599 kp = 2 * bandwidth # Proportional gain 

600 ki = bandwidth**2 # Integral gain 

601 

602 # Detect edges for phase error calculation 

603 threshold = (np.max(trace) + np.min(trace)) / 2.0 

604 prev_sample = trace[0] 

605 

606 for i in range(n_samples): 

607 # Generate clock output 

608 clock[i] = 1.0 if np.cos(phase) > 0 else 0.0 

609 

610 # Detect phase error at edges 

611 current_sample = trace[i] 

612 phase_error = 0.0 

613 

614 # Simple phase detector: check if edge coincides with clock transition 

615 if (prev_sample < threshold <= current_sample) or ( 

616 prev_sample > threshold >= current_sample 

617 ): 

618 # Edge detected 

619 clock_value = np.cos(phase) 

620 # Phase error is sign of clock at edge 

621 phase_error = np.sign(clock_value) * 0.1 

622 

623 # Update frequency and phase with loop filter 

624 _freq_adjust = kp * phase_error 

625 omega += ki * phase_error 

626 

627 # Update phase 

628 phase += omega 

629 phase = phase % (2 * np.pi) 

630 

631 prev_sample = current_sample 

632 

633 return clock 

634 

635 def _detect_edges_simple(self, trace: NDArray[np.float64]) -> NDArray[np.intp]: 

636 """Detect all edges in trace (both rising and falling). 

637 

638 Args: 

639 trace: Signal trace. 

640 

641 Returns: 

642 Array of edge indices. 

643 """ 

644 threshold = (np.max(trace) + np.min(trace)) / 2.0 

645 rising = np.where((trace[:-1] < threshold) & (trace[1:] >= threshold))[0] 

646 falling = np.where((trace[:-1] > threshold) & (trace[1:] <= threshold))[0] 

647 

648 # Combine and sort 

649 all_edges = np.concatenate([rising, falling]) 

650 all_edges.sort() 

651 

652 return all_edges 

653 

654 def _detect_edges_by_type( 

655 self, trace: NDArray[np.float64], edge_type: Literal["rising", "falling"] 

656 ) -> NDArray[np.intp]: 

657 """Detect edges of specific type. 

658 

659 Args: 

660 trace: Signal trace. 

661 edge_type: Type of edge to detect. 

662 

663 Returns: 

664 Array of edge indices. 

665 """ 

666 threshold = (np.max(trace) + np.min(trace)) / 2.0 

667 

668 if edge_type == "rising": 

669 edges = np.where((trace[:-1] < threshold) & (trace[1:] >= threshold))[0] 

670 else: # falling 

671 edges = np.where((trace[:-1] > threshold) & (trace[1:] <= threshold))[0] 

672 

673 return edges + 1 # Return index after crossing 

674 

675 

676# Convenience functions 

677 

678 

679def detect_clock_frequency( 

680 trace: NDArray[np.float64], 

681 sample_rate: float, 

682 method: Literal["edge", "fft", "autocorr"] = "edge", 

683) -> float: 

684 """Detect clock frequency from signal. 

685 

686 

687 

688 Convenience function for detecting clock frequency without creating 

689 a ClockRecovery instance. 

690 

691 Args: 

692 trace: Signal trace data. 

693 sample_rate: Sample rate in Hz. 

694 method: Detection method ('edge', 'fft', or 'autocorr'). 

695 

696 Returns: 

697 Detected frequency in Hz. 

698 

699 Example: 

700 >>> freq = detect_clock_frequency(data, sample_rate=1e9, method='edge') 

701 >>> print(f"Clock: {freq/1e6:.2f} MHz") 

702 """ 

703 recovery = ClockRecovery(sample_rate) 

704 return recovery.detect_clock_frequency(trace, method) 

705 

706 

707def recover_clock( 

708 data_trace: NDArray[np.float64], 

709 sample_rate: float, 

710 method: Literal["edge", "pll", "fft"] = "edge", 

711) -> NDArray[np.float64]: 

712 """Recover clock signal from data. 

713 

714 

715 

716 Convenience function for recovering clock signal without creating 

717 a ClockRecovery instance. 

718 

719 Args: 

720 data_trace: Data signal trace. 

721 sample_rate: Sample rate in Hz. 

722 method: Recovery method ('edge', 'pll', or 'fft'). 

723 

724 Returns: 

725 Recovered clock trace. 

726 

727 Example: 

728 >>> clock = recover_clock(data, sample_rate=1e9, method='pll') 

729 """ 

730 recovery = ClockRecovery(sample_rate) 

731 return recovery.recover_clock(data_trace, method) 

732 

733 

734def detect_baud_rate( 

735 trace: Any, sample_rate: float | None = None, candidates: list[int] | None = None 

736) -> int | BaudRateResult: 

737 """Auto-detect baud rate. 

738 

739 

740 

741 Convenience function for baud rate detection. Supports both DigitalTrace 

742 objects (with metadata) and raw numpy arrays (requiring sample_rate). 

743 

744 Args: 

745 trace: Signal trace data (DigitalTrace or numpy array). 

746 sample_rate: Sample rate in Hz (optional if trace has metadata). 

747 candidates: List of candidate baud rates. If None, uses standard rates. 

748 

749 Returns: 

750 Detected baud rate as int (for DigitalTrace) or BaudRateResult. 

751 

752 Raises: 

753 ValidationError: If sample_rate is required but not provided. 

754 

755 Example: 

756 >>> baud = detect_baud_rate(digital_trace) # Uses metadata 

757 >>> result = detect_baud_rate(data_array, sample_rate=1e6) # Explicit rate 

758 """ 

759 # Check if trace is a DigitalTrace with metadata 

760 if hasattr(trace, "metadata") and hasattr(trace.metadata, "sample_rate"): 

761 rate = trace.metadata.sample_rate 

762 data = np.asarray(trace.data, dtype=np.float64) 

763 recovery = ClockRecovery(rate) 

764 result = recovery.detect_baud_rate(data, candidates) 

765 return result.baud_rate # Return just the baud rate for DigitalTrace 

766 elif sample_rate is not None: 

767 data = np.asarray(trace, dtype=np.float64) 

768 recovery = ClockRecovery(sample_rate) 

769 return recovery.detect_baud_rate(data, candidates) 

770 else: 

771 raise ValidationError("sample_rate required when trace is not a DigitalTrace with metadata") 

772 

773 

774def measure_clock_jitter(clock_trace: NDArray[np.float64], sample_rate: float) -> ClockMetrics: 

775 """Measure clock jitter. 

776 

777 

778 

779 Convenience function for jitter measurement without creating 

780 a ClockRecovery instance. 

781 

782 Args: 

783 clock_trace: Clock signal trace. 

784 sample_rate: Sample rate in Hz. 

785 

786 Returns: 

787 ClockMetrics with jitter and quality measurements. 

788 

789 Example: 

790 >>> metrics = measure_clock_jitter(clock, sample_rate=1e9) 

791 >>> print(f"RMS jitter: {metrics.jitter_rms*1e12:.2f} ps") 

792 """ 

793 recovery = ClockRecovery(sample_rate) 

794 return recovery.measure_clock_jitter(clock_trace) 

795 

796 

797__all__ = [ 

798 "BaudRateResult", 

799 "ClockMetrics", 

800 "ClockRecovery", 

801 "detect_baud_rate", 

802 "detect_clock_frequency", 

803 "measure_clock_jitter", 

804 "recover_clock", 

805]