Coverage for src / tracekit / triggering / edge.py: 100%
90 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Edge triggering for TraceKit.
3Provides edge detection with configurable thresholds, hysteresis,
4and edge polarity (rising, falling, or both).
6Example:
7 >>> from tracekit.triggering.edge import EdgeTrigger, find_rising_edges
8 >>> # Object-oriented approach
9 >>> trigger = EdgeTrigger(level=1.5, edge="rising", hysteresis=0.1)
10 >>> events = trigger.find_events(trace)
11 >>> # Functional approach
12 >>> timestamps = find_rising_edges(trace, level=1.5)
13"""
15from __future__ import annotations
17from typing import TYPE_CHECKING, Any, Literal
19import numpy as np
21from tracekit.core.types import DigitalTrace, WaveformTrace
22from tracekit.triggering.base import (
23 Trigger,
24 TriggerEvent,
25 TriggerType,
26 interpolate_crossing,
27)
29if TYPE_CHECKING:
30 from numpy.typing import NDArray
33class EdgeTrigger(Trigger):
34 """Edge trigger with threshold and optional hysteresis.
36 Detects signal crossings of a threshold level with configurable
37 hysteresis for noise immunity.
39 Attributes:
40 level: Trigger threshold level.
41 edge: Edge type - "rising", "falling", or "either".
42 hysteresis: Hysteresis band (Schmitt trigger style).
43 """
45 def __init__(
46 self,
47 level: float,
48 edge: Literal["rising", "falling", "either"] = "rising",
49 hysteresis: float = 0.0,
50 ) -> None:
51 """Initialize edge trigger.
53 Args:
54 level: Trigger threshold level in signal units (e.g., volts).
55 edge: Edge polarity to trigger on.
56 hysteresis: Hysteresis band width. Trigger requires signal to
57 cross level +/- hysteresis/2 before retriggering.
58 """
59 self.level = level
60 self.edge = edge
61 self.hysteresis = hysteresis
63 def find_events(
64 self,
65 trace: WaveformTrace | DigitalTrace,
66 ) -> list[TriggerEvent]:
67 """Find all edge events in the trace.
69 Args:
70 trace: Input waveform or digital trace.
72 Returns:
73 List of trigger events for each detected edge.
74 """
75 if isinstance(trace, DigitalTrace):
76 # For digital traces, use edge list if available
77 data = trace.data.astype(np.float64)
78 else:
79 data = trace.data
81 sample_period = trace.metadata.time_base
82 events: list[TriggerEvent] = []
84 if self.hysteresis > 0:
85 # Schmitt trigger mode
86 events = self._find_edges_with_hysteresis(data, sample_period)
87 else:
88 # Simple threshold crossing
89 events = self._find_edges_simple(data, sample_period)
91 return events
93 def _find_edges_simple(
94 self,
95 data: NDArray[np.floating[Any]],
96 sample_period: float,
97 ) -> list[TriggerEvent]:
98 """Find edges using simple threshold crossing."""
99 events: list[TriggerEvent] = []
101 below = data < self.level
102 above = data >= self.level
104 if self.edge in ("rising", "either"):
105 # Rising: below -> above
106 rising_idx = np.where(below[:-1] & above[1:])[0]
107 for idx in rising_idx:
108 timestamp = interpolate_crossing(data, idx, self.level, sample_period, rising=True)
109 events.append(
110 TriggerEvent(
111 timestamp=timestamp,
112 sample_index=int(idx),
113 event_type=TriggerType.RISING_EDGE,
114 level=float(data[idx + 1]),
115 )
116 )
118 if self.edge in ("falling", "either"):
119 # Falling: above -> below
120 falling_idx = np.where(above[:-1] & below[1:])[0]
121 for idx in falling_idx:
122 timestamp = interpolate_crossing(data, idx, self.level, sample_period, rising=False)
123 events.append(
124 TriggerEvent(
125 timestamp=timestamp,
126 sample_index=int(idx),
127 event_type=TriggerType.FALLING_EDGE,
128 level=float(data[idx + 1]),
129 )
130 )
132 # Sort by timestamp if we detected both edge types
133 if self.edge == "either":
134 events.sort(key=lambda e: e.timestamp)
136 return events
138 def _find_edges_with_hysteresis(
139 self,
140 data: NDArray[np.floating[Any]],
141 sample_period: float,
142 ) -> list[TriggerEvent]:
143 """Find edges using Schmitt trigger with hysteresis."""
144 events: list[TriggerEvent] = []
146 high_thresh = self.level + self.hysteresis / 2
147 low_thresh = self.level - self.hysteresis / 2
149 # State machine: track if we're currently "high" or "low"
150 state = "low" if data[0] < self.level else "high"
152 for i in range(1, len(data)):
153 if state == "low" and data[i] >= high_thresh:
154 # Rising edge detected
155 state = "high"
156 if self.edge in ("rising", "either"):
157 timestamp = interpolate_crossing(
158 data, i - 1, high_thresh, sample_period, rising=True
159 )
160 events.append(
161 TriggerEvent(
162 timestamp=timestamp,
163 sample_index=i,
164 event_type=TriggerType.RISING_EDGE,
165 level=float(data[i]),
166 )
167 )
169 elif state == "high" and data[i] <= low_thresh:
170 # Falling edge detected
171 state = "low"
172 if self.edge in ("falling", "either"):
173 timestamp = interpolate_crossing(
174 data, i - 1, low_thresh, sample_period, rising=False
175 )
176 events.append(
177 TriggerEvent(
178 timestamp=timestamp,
179 sample_index=i,
180 event_type=TriggerType.FALLING_EDGE,
181 level=float(data[i]),
182 )
183 )
185 return events
188def find_rising_edges(
189 trace: WaveformTrace,
190 level: float | None = None,
191 *,
192 hysteresis: float = 0.0,
193 return_indices: bool = False,
194) -> NDArray[np.float64] | NDArray[np.int64]:
195 """Find all rising edge timestamps or indices.
197 Args:
198 trace: Input waveform trace.
199 level: Trigger threshold. If None, uses signal midpoint.
200 hysteresis: Hysteresis band for noise immunity.
201 return_indices: If True, return sample indices instead of timestamps.
203 Returns:
204 Array of timestamps (seconds) or sample indices.
206 Example:
207 >>> edges = find_rising_edges(trace, level=1.5)
208 >>> print(f"Found {len(edges)} rising edges")
209 """
210 if level is None:
211 level = (np.min(trace.data) + np.max(trace.data)) / 2
213 trigger = EdgeTrigger(level=level, edge="rising", hysteresis=hysteresis)
214 events = trigger.find_events(trace)
216 if return_indices:
217 return np.array([e.sample_index for e in events], dtype=np.int64)
218 return np.array([e.timestamp for e in events], dtype=np.float64)
221def find_falling_edges(
222 trace: WaveformTrace,
223 level: float | None = None,
224 *,
225 hysteresis: float = 0.0,
226 return_indices: bool = False,
227) -> NDArray[np.float64] | NDArray[np.int64]:
228 """Find all falling edge timestamps or indices.
230 Args:
231 trace: Input waveform trace.
232 level: Trigger threshold. If None, uses signal midpoint.
233 hysteresis: Hysteresis band for noise immunity.
234 return_indices: If True, return sample indices instead of timestamps.
236 Returns:
237 Array of timestamps (seconds) or sample indices.
239 Example:
240 >>> edges = find_falling_edges(trace, level=1.5)
241 """
242 if level is None:
243 level = (np.min(trace.data) + np.max(trace.data)) / 2
245 trigger = EdgeTrigger(level=level, edge="falling", hysteresis=hysteresis)
246 events = trigger.find_events(trace)
248 if return_indices:
249 return np.array([e.sample_index for e in events], dtype=np.int64)
250 return np.array([e.timestamp for e in events], dtype=np.float64)
253def find_all_edges(
254 trace: WaveformTrace,
255 level: float | None = None,
256 *,
257 hysteresis: float = 0.0,
258) -> tuple[NDArray[np.float64], NDArray[np.bool_]]:
259 """Find all edges (rising and falling) with polarity.
261 Args:
262 trace: Input waveform trace.
263 level: Trigger threshold. If None, uses signal midpoint.
264 hysteresis: Hysteresis band for noise immunity.
266 Returns:
267 Tuple of (timestamps, is_rising) where is_rising is True for
268 rising edges and False for falling edges.
270 Example:
271 >>> timestamps, is_rising = find_all_edges(trace, level=1.5)
272 >>> rising = timestamps[is_rising]
273 >>> falling = timestamps[~is_rising]
274 """
275 if level is None:
276 level = (np.min(trace.data) + np.max(trace.data)) / 2
278 trigger = EdgeTrigger(level=level, edge="either", hysteresis=hysteresis)
279 events = trigger.find_events(trace)
281 timestamps = np.array([e.timestamp for e in events], dtype=np.float64)
282 is_rising = np.array([e.event_type == TriggerType.RISING_EDGE for e in events], dtype=np.bool_)
284 return timestamps, is_rising
287def edge_count(
288 trace: WaveformTrace,
289 level: float | None = None,
290 edge: Literal["rising", "falling", "either"] = "either",
291 *,
292 hysteresis: float = 0.0,
293) -> int:
294 """Count edges in a trace.
296 Args:
297 trace: Input waveform trace.
298 level: Trigger threshold. If None, uses signal midpoint.
299 edge: Edge type to count.
300 hysteresis: Hysteresis band for noise immunity.
302 Returns:
303 Number of edges found.
305 Example:
306 >>> n_rising = edge_count(trace, level=1.5, edge="rising")
307 """
308 if level is None:
309 level = (np.min(trace.data) + np.max(trace.data)) / 2
311 trigger = EdgeTrigger(level=level, edge=edge, hysteresis=hysteresis)
312 return trigger.count_events(trace)
315def edge_rate(
316 trace: WaveformTrace,
317 level: float | None = None,
318 edge: Literal["rising", "falling", "either"] = "either",
319 *,
320 hysteresis: float = 0.0,
321) -> float:
322 """Calculate edge rate (edges per second).
324 Args:
325 trace: Input waveform trace.
326 level: Trigger threshold.
327 edge: Edge type to count.
328 hysteresis: Hysteresis band for noise immunity.
330 Returns:
331 Edge rate in Hz.
333 Example:
334 >>> rate = edge_rate(trace, level=1.5, edge="rising")
335 >>> print(f"Toggle rate: {rate} Hz")
336 """
337 count = edge_count(trace, level, edge, hysteresis=hysteresis)
338 duration = trace.duration
340 if duration <= 0:
341 return 0.0
343 return count / duration
346__all__ = [
347 "EdgeTrigger",
348 "edge_count",
349 "edge_rate",
350 "find_all_edges",
351 "find_falling_edges",
352 "find_rising_edges",
353]