Coverage for src / tracekit / analyzers / digital / edges.py: 90%

199 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Edge detection with sub-sample precision and timing analysis. 

2 

3This module provides edge detection with interpolation for sub-sample precision, 

4timing measurements between edges, and timing constraint validation for digital 

5signal analysis. 

6 

7 

8Example: 

9 >>> import numpy as np 

10 >>> from tracekit.analyzers.digital.edges import detect_edges, measure_edge_timing 

11 >>> # Generate test signal 

12 >>> signal = np.array([0, 0, 0.5, 1.0, 1.0, 1.0, 0.5, 0, 0]) 

13 >>> # Detect edges 

14 >>> edges = detect_edges(signal, edge_type='both', sample_rate=100e6) 

15 >>> # Measure timing 

16 >>> timing = measure_edge_timing(edges, sample_rate=100e6) 

17""" 

18 

19from __future__ import annotations 

20 

21from dataclasses import dataclass 

22from typing import TYPE_CHECKING, Literal 

23 

24import numpy as np 

25 

26from tracekit.core.memoize import memoize_analysis 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31 

32@dataclass 

33class Edge: 

34 """A detected edge in the signal. 

35 

36 Attributes: 

37 sample_index: Sample index where edge was detected. 

38 time: Interpolated edge time in seconds. 

39 edge_type: Type of edge ('rising' or 'falling'). 

40 amplitude: Transition amplitude in signal units (volts). 

41 slew_rate: Edge slew rate (signal units per second). 

42 quality: Edge quality classification. 

43 """ 

44 

45 sample_index: int 

46 time: float # Interpolated time 

47 edge_type: Literal["rising", "falling"] 

48 amplitude: float # Transition amplitude 

49 slew_rate: float # V/s or samples/s 

50 quality: Literal["clean", "slow", "noisy", "glitch"] 

51 

52 

53@dataclass 

54class EdgeTiming: 

55 """Timing measurements from edge analysis. 

56 

57 Attributes: 

58 periods: Array of edge-to-edge periods in seconds. 

59 mean_period: Mean period in seconds. 

60 std_period: Standard deviation of period in seconds. 

61 min_period: Minimum period in seconds. 

62 max_period: Maximum period in seconds. 

63 duty_cycles: Array of duty cycle ratios (0-1). 

64 mean_duty_cycle: Mean duty cycle ratio. 

65 jitter_rms: RMS jitter in seconds. 

66 jitter_pp: Peak-to-peak jitter in seconds. 

67 """ 

68 

69 periods: NDArray[np.float64] # Edge-to-edge periods 

70 mean_period: float 

71 std_period: float 

72 min_period: float 

73 max_period: float 

74 duty_cycles: NDArray[np.float64] 

75 mean_duty_cycle: float 

76 jitter_rms: float 

77 jitter_pp: float 

78 

79 

80@dataclass 

81class TimingConstraint: 

82 """Timing constraint for validation. 

83 

84 Attributes: 

85 name: Descriptive name for the constraint. 

86 min_time: Minimum allowed time in seconds (None for no minimum). 

87 max_time: Maximum allowed time in seconds (None for no maximum). 

88 reference: Which edges to check ('rising', 'falling', or 'both'). 

89 """ 

90 

91 name: str 

92 min_time: float | None = None 

93 max_time: float | None = None 

94 reference: str | None = None # 'rising', 'falling', 'both' 

95 

96 

97@dataclass 

98class TimingViolation: 

99 """A timing constraint violation. 

100 

101 Attributes: 

102 constraint: The violated constraint. 

103 measured_time: The measured time that violated the constraint. 

104 edge_index: Index of the edge that violated the constraint. 

105 sample_index: Sample index where violation occurred. 

106 """ 

107 

108 constraint: TimingConstraint 

109 measured_time: float 

110 edge_index: int 

111 sample_index: int 

112 

113 

114@memoize_analysis(maxsize=32) 

115def detect_edges( 

116 trace: NDArray[np.float64], 

117 edge_type: Literal["rising", "falling", "both"] = "both", 

118 threshold: float | Literal["auto"] = "auto", 

119 hysteresis: float = 0.0, 

120 sample_rate: float = 1.0, 

121) -> list[Edge]: 

122 """Detect signal edges with configurable threshold. 

123 

124 Detects rising and/or falling edges in a digital or analog signal with 

125 optional hysteresis for noise immunity. 

126 

127 Args: 

128 trace: Input signal trace (analog or digital). 

129 edge_type: Type of edges to detect ('rising', 'falling', or 'both'). 

130 threshold: Detection threshold. 'auto' computes from signal midpoint. 

131 hysteresis: Hysteresis amount for noise immunity (signal units). 

132 sample_rate: Sample rate in Hz for time calculation. 

133 

134 Returns: 

135 List of Edge objects with detected edges. 

136 

137 Example: 

138 >>> signal = np.array([0, 0, 1, 1, 0, 0]) 

139 >>> edges = detect_edges(signal, edge_type='rising') 

140 >>> len(edges) 

141 1 

142 """ 

143 if len(trace) < 2: 

144 return [] 

145 

146 trace = np.asarray(trace) 

147 

148 # Compute threshold if auto 

149 thresh_val: float 

150 if threshold == "auto": 

151 thresh_val = float((np.max(trace) + np.min(trace)) / 2.0) 

152 else: 

153 thresh_val = threshold 

154 

155 # Apply hysteresis if specified 

156 if hysteresis > 0: 

157 thresh_high = thresh_val + hysteresis / 2.0 

158 thresh_low = thresh_val - hysteresis / 2.0 

159 else: 

160 thresh_high = thresh_val 

161 thresh_low = thresh_val 

162 

163 edges: list[Edge] = [] 

164 time_base = 1.0 / sample_rate 

165 

166 # State machine for hysteresis 

167 state = trace[0] > thresh_val # Initial state 

168 

169 for i in range(1, len(trace)): 

170 prev_val = trace[i - 1] 

171 curr_val = trace[i] 

172 

173 # Detect transitions with hysteresis 

174 if not state and curr_val > thresh_high: 

175 # Rising edge 

176 if edge_type in ["rising", "both"]: 

177 # Interpolate edge time 

178 interp_time = interpolate_edge_time(trace, i - 1, method="linear") 

179 time = (i - 1 + interp_time) * time_base 

180 

181 # Calculate edge properties 

182 amplitude = curr_val - prev_val 

183 slew_rate = amplitude * sample_rate 

184 

185 # Classify quality (simple heuristic) 

186 quality = classify_edge_quality(trace, i, sample_rate) 

187 

188 edges.append( 

189 Edge( 

190 sample_index=i, 

191 time=time, 

192 edge_type="rising", 

193 amplitude=abs(amplitude), 

194 slew_rate=slew_rate, 

195 quality=quality, 

196 ) 

197 ) 

198 state = True 

199 

200 elif state and curr_val < thresh_low: 

201 # Falling edge 

202 if edge_type in ["falling", "both"]: 

203 # Interpolate edge time 

204 interp_time = interpolate_edge_time(trace, i - 1, method="linear") 

205 time = (i - 1 + interp_time) * time_base 

206 

207 # Calculate edge properties 

208 amplitude = prev_val - curr_val 

209 slew_rate = -amplitude * sample_rate 

210 

211 # Classify quality (simple heuristic) 

212 quality = classify_edge_quality(trace, i, sample_rate) 

213 

214 edges.append( 

215 Edge( 

216 sample_index=i, 

217 time=time, 

218 edge_type="falling", 

219 amplitude=abs(amplitude), 

220 slew_rate=slew_rate, 

221 quality=quality, 

222 ) 

223 ) 

224 state = False 

225 

226 return edges 

227 

228 

229def interpolate_edge_time( 

230 trace: NDArray[np.float64], sample_index: int, method: Literal["linear", "quadratic"] = "linear" 

231) -> float: 

232 """Interpolate edge time for sub-sample precision. 

233 

234 Uses linear or quadratic interpolation to estimate the fractional sample 

235 position where an edge crosses the threshold. 

236 

237 Args: 

238 trace: Input signal trace. 

239 sample_index: Sample index just before the edge. 

240 method: Interpolation method ('linear' or 'quadratic'). 

241 

242 Returns: 

243 Fractional sample offset (0.0 to 1.0) from sample_index. 

244 

245 Example: 

246 >>> trace = np.array([0, 0.3, 0.8, 1.0]) 

247 >>> offset = interpolate_edge_time(trace, 1, method='linear') 

248 """ 

249 if sample_index < 0 or sample_index >= len(trace) - 1: 

250 return 0.0 

251 

252 if method == "linear": 

253 # Linear interpolation between two points 

254 v0 = trace[sample_index] 

255 v1 = trace[sample_index + 1] 

256 

257 if abs(v1 - v0) < 1e-10: 

258 return 0.5 # Avoid division by zero 

259 

260 # Find midpoint crossing 

261 threshold = (v0 + v1) / 2.0 

262 fraction = (threshold - v0) / (v1 - v0) 

263 

264 # Clamp to valid range 

265 return float(np.clip(fraction, 0.0, 1.0)) 

266 

267 elif method == "quadratic": 267 ↛ exitline 267 didn't return from function 'interpolate_edge_time' because the condition on line 267 was always true

268 # Quadratic interpolation using 3 points 

269 if sample_index < 1 or sample_index >= len(trace) - 1: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was never true

270 # Fall back to linear 

271 return interpolate_edge_time(trace, sample_index, method="linear") 

272 

273 # Use points before, at, and after edge 

274 _v_prev = trace[sample_index - 1] 

275 v0 = trace[sample_index] 

276 v1 = trace[sample_index + 1] 

277 

278 # Fit parabola and find threshold crossing 

279 # Simplified: use linear for now (full quadratic fit is complex) 

280 return interpolate_edge_time(trace, sample_index, method="linear") 

281 

282 

283def measure_edge_timing(edges: list[Edge], sample_rate: float = 1.0) -> EdgeTiming: 

284 """Measure timing between edges. 

285 

286 Computes period, duty cycle, and jitter statistics from a list of detected edges. 

287 

288 Args: 

289 edges: List of Edge objects from detect_edges(). 

290 sample_rate: Sample rate in Hz (for time base). 

291 

292 Returns: 

293 EdgeTiming object with timing measurements. 

294 

295 Example: 

296 >>> edges = detect_edges(signal, edge_type='both', sample_rate=100e6) 

297 >>> timing = measure_edge_timing(edges, sample_rate=100e6) 

298 """ 

299 if len(edges) < 2: 

300 # Not enough edges for timing analysis 

301 return EdgeTiming( 

302 periods=np.array([]), 

303 mean_period=0.0, 

304 std_period=0.0, 

305 min_period=0.0, 

306 max_period=0.0, 

307 duty_cycles=np.array([]), 

308 mean_duty_cycle=0.0, 

309 jitter_rms=0.0, 

310 jitter_pp=0.0, 

311 ) 

312 

313 # Calculate periods (time between consecutive edges) 

314 edge_times = np.array([e.time for e in edges]) 

315 periods = np.diff(edge_times) 

316 

317 # Calculate duty cycles (ratio of high time to period) 

318 duty_cycles = [] 

319 rising_edges = [e for e in edges if e.edge_type == "rising"] 

320 falling_edges = [e for e in edges if e.edge_type == "falling"] 

321 

322 # Match rising and falling edges to compute duty cycles 

323 for i in range(min(len(rising_edges), len(falling_edges))): 

324 rise_time = rising_edges[i].time 

325 fall_time = falling_edges[i].time 

326 

327 # Find next edge of opposite type 

328 if i + 1 < len(rising_edges): 

329 next_rise = rising_edges[i + 1].time 

330 period = next_rise - rise_time 

331 if period > 0: 331 ↛ 323line 331 didn't jump to line 323 because the condition on line 331 was always true

332 high_time = fall_time - rise_time 

333 duty_cycle = high_time / period 

334 duty_cycles.append(np.clip(duty_cycle, 0.0, 1.0)) 

335 

336 duty_cycles_arr = np.array(duty_cycles) if duty_cycles else np.array([]) 

337 

338 # Calculate jitter 

339 if len(periods) > 1: 

340 mean_period = np.mean(periods) 

341 jitter_rms = np.std(periods) 

342 jitter_pp = np.max(periods) - np.min(periods) 

343 else: 

344 mean_period = periods[0] if len(periods) > 0 else 0.0 

345 jitter_rms = 0.0 

346 jitter_pp = 0.0 

347 

348 return EdgeTiming( 

349 periods=periods, 

350 mean_period=float(mean_period), 

351 std_period=float(np.std(periods)) if len(periods) > 0 else 0.0, 

352 min_period=float(np.min(periods)) if len(periods) > 0 else 0.0, 

353 max_period=float(np.max(periods)) if len(periods) > 0 else 0.0, 

354 duty_cycles=duty_cycles_arr, 

355 mean_duty_cycle=float(np.mean(duty_cycles_arr)) if len(duty_cycles_arr) > 0 else 0.0, 

356 jitter_rms=float(jitter_rms), 

357 jitter_pp=float(jitter_pp), 

358 ) 

359 

360 

361def check_timing_constraints( 

362 edges: list[Edge], constraints: list[TimingConstraint], sample_rate: float = 1.0 

363) -> list[TimingViolation]: 

364 """Check edges against timing constraints. 

365 

366 Validates edge timing against specified constraints and reports violations. 

367 

368 Args: 

369 edges: List of Edge objects to check. 

370 constraints: List of TimingConstraint objects defining limits. 

371 sample_rate: Sample rate in Hz. 

372 

373 Returns: 

374 List of TimingViolation objects for any violations found. 

375 

376 Example: 

377 >>> constraint = TimingConstraint(name="min_period", min_time=10e-9) 

378 >>> violations = check_timing_constraints(edges, [constraint]) 

379 """ 

380 violations: list[TimingViolation] = [] 

381 

382 if len(edges) < 2: 

383 return violations 

384 

385 # Calculate periods between edges 

386 for i in range(len(edges) - 1): 

387 edge_time = edges[i].time 

388 next_time = edges[i + 1].time 

389 period = next_time - edge_time 

390 

391 for constraint in constraints: 

392 # Check if constraint applies to this edge type 

393 if constraint.reference: 

394 if constraint.reference == "rising" and edges[i].edge_type != "rising": 

395 continue 

396 if constraint.reference == "falling" and edges[i].edge_type != "falling": 

397 continue 

398 

399 # Check timing constraints 

400 violated = False 

401 

402 if constraint.min_time is not None and period < constraint.min_time: 

403 violated = True 

404 

405 if constraint.max_time is not None and period > constraint.max_time: 

406 violated = True 

407 

408 if violated: 

409 violations.append( 

410 TimingViolation( 

411 constraint=constraint, 

412 measured_time=period, 

413 edge_index=i, 

414 sample_index=edges[i].sample_index, 

415 ) 

416 ) 

417 

418 return violations 

419 

420 

421def classify_edge_quality( 

422 trace: NDArray[np.float64], edge_index: int, sample_rate: float 

423) -> Literal["clean", "slow", "noisy", "glitch"]: 

424 """Classify edge quality. 

425 

426 Analyzes the edge transition to classify its quality based on slew rate, 

427 noise, and duration. 

428 

429 Args: 

430 trace: Input signal trace. 

431 edge_index: Sample index of the edge. 

432 sample_rate: Sample rate in Hz. 

433 

434 Returns: 

435 Quality classification: 'clean', 'slow', 'noisy', or 'glitch'. 

436 

437 Example: 

438 >>> quality = classify_edge_quality(trace, 10, 100e6) 

439 """ 

440 if edge_index < 1 or edge_index >= len(trace) - 1: 

441 return "clean" 

442 

443 # Get window around edge 

444 window_size = min(10, edge_index, len(trace) - edge_index - 1) 

445 window = trace[edge_index - window_size : edge_index + window_size + 1] 

446 

447 # Calculate transition amplitude 

448 v_before = trace[edge_index - 1] 

449 v_after = trace[edge_index] 

450 amplitude = abs(v_after - v_before) 

451 

452 # Check for glitch (very short duration) 

453 if window_size < 3: 

454 return "glitch" 

455 

456 # Calculate noise (std dev in window) 

457 noise = np.std(window) 

458 

459 # Calculate slew rate 

460 _slew_rate = amplitude * sample_rate 

461 

462 # Simple heuristic classification 

463 signal_range = np.max(trace) - np.min(trace) 

464 

465 if amplitude < signal_range * 0.1: 

466 return "glitch" 

467 

468 if noise > amplitude * 0.2: 468 ↛ 472line 468 didn't jump to line 472 because the condition on line 468 was always true

469 return "noisy" 

470 

471 # Check if transition is slow (takes many samples) 

472 transition_samples = 0 

473 _threshold = (v_before + v_after) / 2.0 

474 

475 for i in range(max(0, edge_index - window_size), min(len(trace), edge_index + window_size)): 

476 val = trace[i] 

477 if v_before < v_after: # Rising 

478 if v_before <= val <= v_after: 

479 transition_samples += 1 

480 else: # Falling 

481 if v_after <= val <= v_before: 

482 transition_samples += 1 

483 

484 if transition_samples > 5: 

485 return "slow" 

486 

487 return "clean" 

488 

489 

490class EdgeDetector: 

491 """Object-oriented wrapper for edge detection functionality. 

492 

493 Provides a class-based interface for edge detection operations, 

494 wrapping the functional API for consistency with test expectations. 

495 

496 

497 

498 Example: 

499 >>> detector = EdgeDetector() 

500 >>> rising, falling = detector.detect_all_edges(signal_data) 

501 """ 

502 

503 def __init__( 

504 self, 

505 threshold: float | Literal["auto"] = "auto", 

506 hysteresis: float = 0.0, 

507 sample_rate: float = 1.0, 

508 min_pulse_width: int | None = None, 

509 ): 

510 """Initialize edge detector. 

511 

512 Args: 

513 threshold: Detection threshold. 'auto' computes from signal midpoint. 

514 hysteresis: Hysteresis amount for noise immunity (signal units). 

515 sample_rate: Sample rate in Hz for time calculation. 

516 min_pulse_width: Minimum pulse width in samples to filter noise. 

517 """ 

518 self.threshold = threshold 

519 self.hysteresis = hysteresis 

520 self.sample_rate = sample_rate 

521 self.min_pulse_width = min_pulse_width 

522 

523 def detect_all_edges( 

524 self, trace: NDArray[np.float64] 

525 ) -> tuple[NDArray[np.intp], NDArray[np.intp]]: 

526 """Detect all rising and falling edges. 

527 

528 Args: 

529 trace: Input signal trace (analog or digital). 

530 

531 Returns: 

532 Tuple of (rising_edge_indices, falling_edge_indices). 

533 

534 Example: 

535 >>> detector = EdgeDetector(sample_rate=100e6) 

536 >>> rising, falling = detector.detect_all_edges(signal) 

537 """ 

538 edges = detect_edges( 

539 trace, 

540 edge_type="both", 

541 threshold=self.threshold, 

542 hysteresis=self.hysteresis, 

543 sample_rate=self.sample_rate, 

544 ) 

545 

546 # Filter by min_pulse_width if specified 

547 if self.min_pulse_width is not None and len(edges) > 1: 

548 filtered_edges = [] 

549 for i, edge in enumerate(edges): 

550 if i == 0: 

551 filtered_edges.append(edge) 

552 continue 

553 # Check distance to previous edge 

554 dist = edge.sample_index - edges[i - 1].sample_index 

555 if dist >= self.min_pulse_width: 555 ↛ 549line 555 didn't jump to line 549 because the condition on line 555 was always true

556 filtered_edges.append(edge) 

557 edges = filtered_edges 

558 

559 rising_indices = np.array( 

560 [e.sample_index for e in edges if e.edge_type == "rising"], dtype=np.int64 

561 ) 

562 falling_indices = np.array( 

563 [e.sample_index for e in edges if e.edge_type == "falling"], dtype=np.int64 

564 ) 

565 

566 return rising_indices, falling_indices 

567 

568 def detect_rising_edges(self, trace: NDArray[np.float64]) -> list[Edge]: 

569 """Detect only rising edges. 

570 

571 Args: 

572 trace: Input signal trace. 

573 

574 Returns: 

575 List of Edge objects for rising edges. 

576 """ 

577 return detect_edges( 

578 trace, 

579 edge_type="rising", 

580 threshold=self.threshold, 

581 hysteresis=self.hysteresis, 

582 sample_rate=self.sample_rate, 

583 ) 

584 

585 def detect_falling_edges(self, trace: NDArray[np.float64]) -> list[Edge]: 

586 """Detect only falling edges. 

587 

588 Args: 

589 trace: Input signal trace. 

590 

591 Returns: 

592 List of Edge objects for falling edges. 

593 """ 

594 return detect_edges( 

595 trace, 

596 edge_type="falling", 

597 threshold=self.threshold, 

598 hysteresis=self.hysteresis, 

599 sample_rate=self.sample_rate, 

600 ) 

601 

602 def measure_timing(self, trace: NDArray[np.float64]) -> EdgeTiming: 

603 """Detect edges and measure timing. 

604 

605 Args: 

606 trace: Input signal trace. 

607 

608 Returns: 

609 EdgeTiming object with timing measurements. 

610 """ 

611 edges = detect_edges( 

612 trace, 

613 edge_type="both", 

614 threshold=self.threshold, 

615 hysteresis=self.hysteresis, 

616 sample_rate=self.sample_rate, 

617 ) 

618 return measure_edge_timing(edges, self.sample_rate) 

619 

620 

621__all__ = [ 

622 "Edge", 

623 "EdgeDetector", 

624 "EdgeTiming", 

625 "TimingConstraint", 

626 "TimingViolation", 

627 "check_timing_constraints", 

628 "classify_edge_quality", 

629 "detect_edges", 

630 "interpolate_edge_time", 

631 "measure_edge_timing", 

632]