Coverage for src / tracekit / analyzers / digital / edges.py: 90%
199 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Edge detection with sub-sample precision and timing analysis.
3This module provides edge detection with interpolation for sub-sample precision,
4timing measurements between edges, and timing constraint validation for digital
5signal analysis.
8Example:
9 >>> import numpy as np
10 >>> from tracekit.analyzers.digital.edges import detect_edges, measure_edge_timing
11 >>> # Generate test signal
12 >>> signal = np.array([0, 0, 0.5, 1.0, 1.0, 1.0, 0.5, 0, 0])
13 >>> # Detect edges
14 >>> edges = detect_edges(signal, edge_type='both', sample_rate=100e6)
15 >>> # Measure timing
16 >>> timing = measure_edge_timing(edges, sample_rate=100e6)
17"""
19from __future__ import annotations
21from dataclasses import dataclass
22from typing import TYPE_CHECKING, Literal
24import numpy as np
26from tracekit.core.memoize import memoize_analysis
28if TYPE_CHECKING:
29 from numpy.typing import NDArray
32@dataclass
33class Edge:
34 """A detected edge in the signal.
36 Attributes:
37 sample_index: Sample index where edge was detected.
38 time: Interpolated edge time in seconds.
39 edge_type: Type of edge ('rising' or 'falling').
40 amplitude: Transition amplitude in signal units (volts).
41 slew_rate: Edge slew rate (signal units per second).
42 quality: Edge quality classification.
43 """
45 sample_index: int
46 time: float # Interpolated time
47 edge_type: Literal["rising", "falling"]
48 amplitude: float # Transition amplitude
49 slew_rate: float # V/s or samples/s
50 quality: Literal["clean", "slow", "noisy", "glitch"]
53@dataclass
54class EdgeTiming:
55 """Timing measurements from edge analysis.
57 Attributes:
58 periods: Array of edge-to-edge periods in seconds.
59 mean_period: Mean period in seconds.
60 std_period: Standard deviation of period in seconds.
61 min_period: Minimum period in seconds.
62 max_period: Maximum period in seconds.
63 duty_cycles: Array of duty cycle ratios (0-1).
64 mean_duty_cycle: Mean duty cycle ratio.
65 jitter_rms: RMS jitter in seconds.
66 jitter_pp: Peak-to-peak jitter in seconds.
67 """
69 periods: NDArray[np.float64] # Edge-to-edge periods
70 mean_period: float
71 std_period: float
72 min_period: float
73 max_period: float
74 duty_cycles: NDArray[np.float64]
75 mean_duty_cycle: float
76 jitter_rms: float
77 jitter_pp: float
80@dataclass
81class TimingConstraint:
82 """Timing constraint for validation.
84 Attributes:
85 name: Descriptive name for the constraint.
86 min_time: Minimum allowed time in seconds (None for no minimum).
87 max_time: Maximum allowed time in seconds (None for no maximum).
88 reference: Which edges to check ('rising', 'falling', or 'both').
89 """
91 name: str
92 min_time: float | None = None
93 max_time: float | None = None
94 reference: str | None = None # 'rising', 'falling', 'both'
97@dataclass
98class TimingViolation:
99 """A timing constraint violation.
101 Attributes:
102 constraint: The violated constraint.
103 measured_time: The measured time that violated the constraint.
104 edge_index: Index of the edge that violated the constraint.
105 sample_index: Sample index where violation occurred.
106 """
108 constraint: TimingConstraint
109 measured_time: float
110 edge_index: int
111 sample_index: int
114@memoize_analysis(maxsize=32)
115def detect_edges(
116 trace: NDArray[np.float64],
117 edge_type: Literal["rising", "falling", "both"] = "both",
118 threshold: float | Literal["auto"] = "auto",
119 hysteresis: float = 0.0,
120 sample_rate: float = 1.0,
121) -> list[Edge]:
122 """Detect signal edges with configurable threshold.
124 Detects rising and/or falling edges in a digital or analog signal with
125 optional hysteresis for noise immunity.
127 Args:
128 trace: Input signal trace (analog or digital).
129 edge_type: Type of edges to detect ('rising', 'falling', or 'both').
130 threshold: Detection threshold. 'auto' computes from signal midpoint.
131 hysteresis: Hysteresis amount for noise immunity (signal units).
132 sample_rate: Sample rate in Hz for time calculation.
134 Returns:
135 List of Edge objects with detected edges.
137 Example:
138 >>> signal = np.array([0, 0, 1, 1, 0, 0])
139 >>> edges = detect_edges(signal, edge_type='rising')
140 >>> len(edges)
141 1
142 """
143 if len(trace) < 2:
144 return []
146 trace = np.asarray(trace)
148 # Compute threshold if auto
149 thresh_val: float
150 if threshold == "auto":
151 thresh_val = float((np.max(trace) + np.min(trace)) / 2.0)
152 else:
153 thresh_val = threshold
155 # Apply hysteresis if specified
156 if hysteresis > 0:
157 thresh_high = thresh_val + hysteresis / 2.0
158 thresh_low = thresh_val - hysteresis / 2.0
159 else:
160 thresh_high = thresh_val
161 thresh_low = thresh_val
163 edges: list[Edge] = []
164 time_base = 1.0 / sample_rate
166 # State machine for hysteresis
167 state = trace[0] > thresh_val # Initial state
169 for i in range(1, len(trace)):
170 prev_val = trace[i - 1]
171 curr_val = trace[i]
173 # Detect transitions with hysteresis
174 if not state and curr_val > thresh_high:
175 # Rising edge
176 if edge_type in ["rising", "both"]:
177 # Interpolate edge time
178 interp_time = interpolate_edge_time(trace, i - 1, method="linear")
179 time = (i - 1 + interp_time) * time_base
181 # Calculate edge properties
182 amplitude = curr_val - prev_val
183 slew_rate = amplitude * sample_rate
185 # Classify quality (simple heuristic)
186 quality = classify_edge_quality(trace, i, sample_rate)
188 edges.append(
189 Edge(
190 sample_index=i,
191 time=time,
192 edge_type="rising",
193 amplitude=abs(amplitude),
194 slew_rate=slew_rate,
195 quality=quality,
196 )
197 )
198 state = True
200 elif state and curr_val < thresh_low:
201 # Falling edge
202 if edge_type in ["falling", "both"]:
203 # Interpolate edge time
204 interp_time = interpolate_edge_time(trace, i - 1, method="linear")
205 time = (i - 1 + interp_time) * time_base
207 # Calculate edge properties
208 amplitude = prev_val - curr_val
209 slew_rate = -amplitude * sample_rate
211 # Classify quality (simple heuristic)
212 quality = classify_edge_quality(trace, i, sample_rate)
214 edges.append(
215 Edge(
216 sample_index=i,
217 time=time,
218 edge_type="falling",
219 amplitude=abs(amplitude),
220 slew_rate=slew_rate,
221 quality=quality,
222 )
223 )
224 state = False
226 return edges
229def interpolate_edge_time(
230 trace: NDArray[np.float64], sample_index: int, method: Literal["linear", "quadratic"] = "linear"
231) -> float:
232 """Interpolate edge time for sub-sample precision.
234 Uses linear or quadratic interpolation to estimate the fractional sample
235 position where an edge crosses the threshold.
237 Args:
238 trace: Input signal trace.
239 sample_index: Sample index just before the edge.
240 method: Interpolation method ('linear' or 'quadratic').
242 Returns:
243 Fractional sample offset (0.0 to 1.0) from sample_index.
245 Example:
246 >>> trace = np.array([0, 0.3, 0.8, 1.0])
247 >>> offset = interpolate_edge_time(trace, 1, method='linear')
248 """
249 if sample_index < 0 or sample_index >= len(trace) - 1:
250 return 0.0
252 if method == "linear":
253 # Linear interpolation between two points
254 v0 = trace[sample_index]
255 v1 = trace[sample_index + 1]
257 if abs(v1 - v0) < 1e-10:
258 return 0.5 # Avoid division by zero
260 # Find midpoint crossing
261 threshold = (v0 + v1) / 2.0
262 fraction = (threshold - v0) / (v1 - v0)
264 # Clamp to valid range
265 return float(np.clip(fraction, 0.0, 1.0))
267 elif method == "quadratic": 267 ↛ exitline 267 didn't return from function 'interpolate_edge_time' because the condition on line 267 was always true
268 # Quadratic interpolation using 3 points
269 if sample_index < 1 or sample_index >= len(trace) - 1: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was never true
270 # Fall back to linear
271 return interpolate_edge_time(trace, sample_index, method="linear")
273 # Use points before, at, and after edge
274 _v_prev = trace[sample_index - 1]
275 v0 = trace[sample_index]
276 v1 = trace[sample_index + 1]
278 # Fit parabola and find threshold crossing
279 # Simplified: use linear for now (full quadratic fit is complex)
280 return interpolate_edge_time(trace, sample_index, method="linear")
283def measure_edge_timing(edges: list[Edge], sample_rate: float = 1.0) -> EdgeTiming:
284 """Measure timing between edges.
286 Computes period, duty cycle, and jitter statistics from a list of detected edges.
288 Args:
289 edges: List of Edge objects from detect_edges().
290 sample_rate: Sample rate in Hz (for time base).
292 Returns:
293 EdgeTiming object with timing measurements.
295 Example:
296 >>> edges = detect_edges(signal, edge_type='both', sample_rate=100e6)
297 >>> timing = measure_edge_timing(edges, sample_rate=100e6)
298 """
299 if len(edges) < 2:
300 # Not enough edges for timing analysis
301 return EdgeTiming(
302 periods=np.array([]),
303 mean_period=0.0,
304 std_period=0.0,
305 min_period=0.0,
306 max_period=0.0,
307 duty_cycles=np.array([]),
308 mean_duty_cycle=0.0,
309 jitter_rms=0.0,
310 jitter_pp=0.0,
311 )
313 # Calculate periods (time between consecutive edges)
314 edge_times = np.array([e.time for e in edges])
315 periods = np.diff(edge_times)
317 # Calculate duty cycles (ratio of high time to period)
318 duty_cycles = []
319 rising_edges = [e for e in edges if e.edge_type == "rising"]
320 falling_edges = [e for e in edges if e.edge_type == "falling"]
322 # Match rising and falling edges to compute duty cycles
323 for i in range(min(len(rising_edges), len(falling_edges))):
324 rise_time = rising_edges[i].time
325 fall_time = falling_edges[i].time
327 # Find next edge of opposite type
328 if i + 1 < len(rising_edges):
329 next_rise = rising_edges[i + 1].time
330 period = next_rise - rise_time
331 if period > 0: 331 ↛ 323line 331 didn't jump to line 323 because the condition on line 331 was always true
332 high_time = fall_time - rise_time
333 duty_cycle = high_time / period
334 duty_cycles.append(np.clip(duty_cycle, 0.0, 1.0))
336 duty_cycles_arr = np.array(duty_cycles) if duty_cycles else np.array([])
338 # Calculate jitter
339 if len(periods) > 1:
340 mean_period = np.mean(periods)
341 jitter_rms = np.std(periods)
342 jitter_pp = np.max(periods) - np.min(periods)
343 else:
344 mean_period = periods[0] if len(periods) > 0 else 0.0
345 jitter_rms = 0.0
346 jitter_pp = 0.0
348 return EdgeTiming(
349 periods=periods,
350 mean_period=float(mean_period),
351 std_period=float(np.std(periods)) if len(periods) > 0 else 0.0,
352 min_period=float(np.min(periods)) if len(periods) > 0 else 0.0,
353 max_period=float(np.max(periods)) if len(periods) > 0 else 0.0,
354 duty_cycles=duty_cycles_arr,
355 mean_duty_cycle=float(np.mean(duty_cycles_arr)) if len(duty_cycles_arr) > 0 else 0.0,
356 jitter_rms=float(jitter_rms),
357 jitter_pp=float(jitter_pp),
358 )
361def check_timing_constraints(
362 edges: list[Edge], constraints: list[TimingConstraint], sample_rate: float = 1.0
363) -> list[TimingViolation]:
364 """Check edges against timing constraints.
366 Validates edge timing against specified constraints and reports violations.
368 Args:
369 edges: List of Edge objects to check.
370 constraints: List of TimingConstraint objects defining limits.
371 sample_rate: Sample rate in Hz.
373 Returns:
374 List of TimingViolation objects for any violations found.
376 Example:
377 >>> constraint = TimingConstraint(name="min_period", min_time=10e-9)
378 >>> violations = check_timing_constraints(edges, [constraint])
379 """
380 violations: list[TimingViolation] = []
382 if len(edges) < 2:
383 return violations
385 # Calculate periods between edges
386 for i in range(len(edges) - 1):
387 edge_time = edges[i].time
388 next_time = edges[i + 1].time
389 period = next_time - edge_time
391 for constraint in constraints:
392 # Check if constraint applies to this edge type
393 if constraint.reference:
394 if constraint.reference == "rising" and edges[i].edge_type != "rising":
395 continue
396 if constraint.reference == "falling" and edges[i].edge_type != "falling":
397 continue
399 # Check timing constraints
400 violated = False
402 if constraint.min_time is not None and period < constraint.min_time:
403 violated = True
405 if constraint.max_time is not None and period > constraint.max_time:
406 violated = True
408 if violated:
409 violations.append(
410 TimingViolation(
411 constraint=constraint,
412 measured_time=period,
413 edge_index=i,
414 sample_index=edges[i].sample_index,
415 )
416 )
418 return violations
421def classify_edge_quality(
422 trace: NDArray[np.float64], edge_index: int, sample_rate: float
423) -> Literal["clean", "slow", "noisy", "glitch"]:
424 """Classify edge quality.
426 Analyzes the edge transition to classify its quality based on slew rate,
427 noise, and duration.
429 Args:
430 trace: Input signal trace.
431 edge_index: Sample index of the edge.
432 sample_rate: Sample rate in Hz.
434 Returns:
435 Quality classification: 'clean', 'slow', 'noisy', or 'glitch'.
437 Example:
438 >>> quality = classify_edge_quality(trace, 10, 100e6)
439 """
440 if edge_index < 1 or edge_index >= len(trace) - 1:
441 return "clean"
443 # Get window around edge
444 window_size = min(10, edge_index, len(trace) - edge_index - 1)
445 window = trace[edge_index - window_size : edge_index + window_size + 1]
447 # Calculate transition amplitude
448 v_before = trace[edge_index - 1]
449 v_after = trace[edge_index]
450 amplitude = abs(v_after - v_before)
452 # Check for glitch (very short duration)
453 if window_size < 3:
454 return "glitch"
456 # Calculate noise (std dev in window)
457 noise = np.std(window)
459 # Calculate slew rate
460 _slew_rate = amplitude * sample_rate
462 # Simple heuristic classification
463 signal_range = np.max(trace) - np.min(trace)
465 if amplitude < signal_range * 0.1:
466 return "glitch"
468 if noise > amplitude * 0.2: 468 ↛ 472line 468 didn't jump to line 472 because the condition on line 468 was always true
469 return "noisy"
471 # Check if transition is slow (takes many samples)
472 transition_samples = 0
473 _threshold = (v_before + v_after) / 2.0
475 for i in range(max(0, edge_index - window_size), min(len(trace), edge_index + window_size)):
476 val = trace[i]
477 if v_before < v_after: # Rising
478 if v_before <= val <= v_after:
479 transition_samples += 1
480 else: # Falling
481 if v_after <= val <= v_before:
482 transition_samples += 1
484 if transition_samples > 5:
485 return "slow"
487 return "clean"
490class EdgeDetector:
491 """Object-oriented wrapper for edge detection functionality.
493 Provides a class-based interface for edge detection operations,
494 wrapping the functional API for consistency with test expectations.
498 Example:
499 >>> detector = EdgeDetector()
500 >>> rising, falling = detector.detect_all_edges(signal_data)
501 """
503 def __init__(
504 self,
505 threshold: float | Literal["auto"] = "auto",
506 hysteresis: float = 0.0,
507 sample_rate: float = 1.0,
508 min_pulse_width: int | None = None,
509 ):
510 """Initialize edge detector.
512 Args:
513 threshold: Detection threshold. 'auto' computes from signal midpoint.
514 hysteresis: Hysteresis amount for noise immunity (signal units).
515 sample_rate: Sample rate in Hz for time calculation.
516 min_pulse_width: Minimum pulse width in samples to filter noise.
517 """
518 self.threshold = threshold
519 self.hysteresis = hysteresis
520 self.sample_rate = sample_rate
521 self.min_pulse_width = min_pulse_width
523 def detect_all_edges(
524 self, trace: NDArray[np.float64]
525 ) -> tuple[NDArray[np.intp], NDArray[np.intp]]:
526 """Detect all rising and falling edges.
528 Args:
529 trace: Input signal trace (analog or digital).
531 Returns:
532 Tuple of (rising_edge_indices, falling_edge_indices).
534 Example:
535 >>> detector = EdgeDetector(sample_rate=100e6)
536 >>> rising, falling = detector.detect_all_edges(signal)
537 """
538 edges = detect_edges(
539 trace,
540 edge_type="both",
541 threshold=self.threshold,
542 hysteresis=self.hysteresis,
543 sample_rate=self.sample_rate,
544 )
546 # Filter by min_pulse_width if specified
547 if self.min_pulse_width is not None and len(edges) > 1:
548 filtered_edges = []
549 for i, edge in enumerate(edges):
550 if i == 0:
551 filtered_edges.append(edge)
552 continue
553 # Check distance to previous edge
554 dist = edge.sample_index - edges[i - 1].sample_index
555 if dist >= self.min_pulse_width: 555 ↛ 549line 555 didn't jump to line 549 because the condition on line 555 was always true
556 filtered_edges.append(edge)
557 edges = filtered_edges
559 rising_indices = np.array(
560 [e.sample_index for e in edges if e.edge_type == "rising"], dtype=np.int64
561 )
562 falling_indices = np.array(
563 [e.sample_index for e in edges if e.edge_type == "falling"], dtype=np.int64
564 )
566 return rising_indices, falling_indices
568 def detect_rising_edges(self, trace: NDArray[np.float64]) -> list[Edge]:
569 """Detect only rising edges.
571 Args:
572 trace: Input signal trace.
574 Returns:
575 List of Edge objects for rising edges.
576 """
577 return detect_edges(
578 trace,
579 edge_type="rising",
580 threshold=self.threshold,
581 hysteresis=self.hysteresis,
582 sample_rate=self.sample_rate,
583 )
585 def detect_falling_edges(self, trace: NDArray[np.float64]) -> list[Edge]:
586 """Detect only falling edges.
588 Args:
589 trace: Input signal trace.
591 Returns:
592 List of Edge objects for falling edges.
593 """
594 return detect_edges(
595 trace,
596 edge_type="falling",
597 threshold=self.threshold,
598 hysteresis=self.hysteresis,
599 sample_rate=self.sample_rate,
600 )
602 def measure_timing(self, trace: NDArray[np.float64]) -> EdgeTiming:
603 """Detect edges and measure timing.
605 Args:
606 trace: Input signal trace.
608 Returns:
609 EdgeTiming object with timing measurements.
610 """
611 edges = detect_edges(
612 trace,
613 edge_type="both",
614 threshold=self.threshold,
615 hysteresis=self.hysteresis,
616 sample_rate=self.sample_rate,
617 )
618 return measure_edge_timing(edges, self.sample_rate)
621__all__ = [
622 "Edge",
623 "EdgeDetector",
624 "EdgeTiming",
625 "TimingConstraint",
626 "TimingViolation",
627 "check_timing_constraints",
628 "classify_edge_quality",
629 "detect_edges",
630 "interpolate_edge_time",
631 "measure_edge_timing",
632]