Coverage for src / tracekit / analyzers / digital / extraction.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Digital signal extraction and edge detection.
3This module provides functions for extracting digital signals from
4analog waveforms and detecting edge transitions.
7Example:
8 >>> from tracekit.analyzers.digital import to_digital, detect_edges
9 >>> digital = to_digital(analog_trace, threshold=1.4)
10 >>> edges = detect_edges(digital, edge_type="rising")
11"""
13from __future__ import annotations
15from typing import TYPE_CHECKING, Any, Literal
17import numpy as np
19from tracekit.core.exceptions import InsufficientDataError
20from tracekit.core.types import DigitalTrace, WaveformTrace
22if TYPE_CHECKING:
23 from numpy.typing import NDArray
25# Standard logic family threshold constants
26# Reference: Various IC manufacturer datasheets
27LOGIC_FAMILIES: dict[str, dict[str, float]] = {
28 "TTL": {
29 "VIL_max": 0.8, # Maximum input low voltage
30 "VIH_min": 2.0, # Minimum input high voltage
31 "VOL_max": 0.4, # Maximum output low voltage
32 "VOH_min": 2.4, # Minimum output high voltage
33 "VCC": 5.0,
34 },
35 "CMOS_5V": {
36 "VIL_max": 1.5,
37 "VIH_min": 3.5,
38 "VOL_max": 0.1,
39 "VOH_min": 4.9,
40 "VCC": 5.0,
41 },
42 "LVTTL": {
43 "VIL_max": 0.8,
44 "VIH_min": 1.5,
45 "VOL_max": 0.4,
46 "VOH_min": 2.4,
47 "VCC": 3.3,
48 },
49 "LVCMOS_3V3": {
50 "VIL_max": 0.3 * 3.3, # 30% of VCC
51 "VIH_min": 0.7 * 3.3, # 70% of VCC
52 "VOL_max": 0.1,
53 "VOH_min": 3.2,
54 "VCC": 3.3,
55 },
56 "LVCMOS_2V5": {
57 "VIL_max": 0.3 * 2.5,
58 "VIH_min": 0.7 * 2.5,
59 "VOL_max": 0.1,
60 "VOH_min": 2.4,
61 "VCC": 2.5,
62 },
63 "LVCMOS_1V8": {
64 "VIL_max": 0.3 * 1.8,
65 "VIH_min": 0.7 * 1.8,
66 "VOL_max": 0.1,
67 "VOH_min": 1.7,
68 "VCC": 1.8,
69 },
70 "LVCMOS_1V2": {
71 "VIL_max": 0.3 * 1.2,
72 "VIH_min": 0.7 * 1.2,
73 "VOL_max": 0.1,
74 "VOH_min": 1.1,
75 "VCC": 1.2,
76 },
77}
80def to_digital(
81 trace: WaveformTrace,
82 *,
83 threshold: float | Literal["auto"] = "auto",
84 hysteresis: float | tuple[float, float] | None = None,
85) -> DigitalTrace:
86 """Extract digital signal from analog waveform.
88 Converts an analog waveform to a digital (boolean) signal using
89 threshold comparison.
91 Args:
92 trace: Input analog waveform trace.
93 threshold: Voltage threshold for conversion. Can be:
94 - A float value for fixed threshold
95 - "auto" for adaptive threshold (midpoint of 10th-90th percentile)
96 hysteresis: Hysteresis for noise immunity. Can be:
97 - None: No hysteresis
98 - A float: Symmetric hysteresis band around threshold
99 - A tuple (low, high): Explicit low and high thresholds
101 Returns:
102 DigitalTrace with boolean data and detected edges.
104 Raises:
105 InsufficientDataError: If trace has insufficient data.
107 Example:
108 >>> digital = to_digital(analog_trace, threshold=1.4)
109 >>> print(f"High samples: {digital.data.sum()}")
111 >>> # With hysteresis for noisy signals
112 >>> digital = to_digital(analog_trace, threshold=1.4, hysteresis=0.2)
114 References:
115 TTL Logic thresholds: VIL_max=0.8V, VIH_min=2.0V
116 """
117 if len(trace.data) < 2:
118 raise InsufficientDataError(
119 "Trace too short for digital extraction",
120 required=2,
121 available=len(trace.data),
122 analysis_type="digital_extraction",
123 )
125 # Convert memoryview to ndarray if needed
126 data = np.asarray(trace.data)
128 # Determine threshold
129 if threshold == "auto":
130 # Adaptive threshold: midpoint of 10th-90th percentile
131 p10, p90 = np.percentile(data, [10, 90])
132 thresh_value = (p10 + p90) / 2.0
133 else:
134 thresh_value = float(threshold)
136 # Apply threshold with or without hysteresis
137 if hysteresis is not None:
138 if isinstance(hysteresis, tuple):
139 thresh_low, thresh_high = hysteresis
140 else:
141 thresh_low = thresh_value - hysteresis / 2
142 thresh_high = thresh_value + hysteresis / 2
143 digital_data = _apply_hysteresis(data, thresh_low, thresh_high)
144 else:
145 digital_data = data >= thresh_value
147 # Detect edges
148 edges = _detect_edges_internal(data, digital_data, trace.metadata.sample_rate, thresh_value)
150 return DigitalTrace(
151 data=digital_data,
152 metadata=trace.metadata,
153 edges=edges,
154 )
157def _apply_hysteresis(
158 data: NDArray[np.floating[Any]],
159 thresh_low: float,
160 thresh_high: float,
161) -> NDArray[np.bool_]:
162 """Apply Schmitt trigger-style hysteresis thresholding.
164 Args:
165 data: Input analog data.
166 thresh_low: Lower threshold (switch to low when below).
167 thresh_high: Upper threshold (switch to high when above).
169 Returns:
170 Boolean array with hysteresis applied.
171 """
172 result = np.zeros(len(data), dtype=np.bool_)
174 # Initial state based on first sample
175 state = data[0] >= (thresh_low + thresh_high) / 2
177 for i, value in enumerate(data):
178 if state:
179 # Currently high, switch low if below thresh_low
180 if value < thresh_low:
181 state = False
182 # Currently low, switch high if above thresh_high
183 elif value >= thresh_high:
184 state = True
185 result[i] = state
187 return result
190def detect_edges(
191 trace: WaveformTrace | DigitalTrace,
192 *,
193 edge_type: Literal["rising", "falling", "both"] = "both",
194 threshold: float | Literal["auto"] = "auto",
195) -> NDArray[np.float64]:
196 """Detect edge transitions in a signal.
198 Finds rising and/or falling edges with sub-sample timestamp
199 interpolation for improved accuracy.
201 Args:
202 trace: Input waveform (analog or digital).
203 edge_type: Type of edges to detect:
204 - "rising": Low-to-high transitions
205 - "falling": High-to-low transitions
206 - "both": All transitions
207 threshold: Threshold for edge detection (only for analog traces).
209 Returns:
210 Array of edge timestamps in seconds.
212 Raises:
213 InsufficientDataError: If trace has insufficient data.
215 Example:
216 >>> edges = detect_edges(trace, edge_type="rising")
217 >>> print(f"Found {len(edges)} rising edges")
218 """
219 if len(trace.data) < 2:
220 raise InsufficientDataError(
221 "Trace too short for edge detection",
222 required=2,
223 available=len(trace.data),
224 analysis_type="edge_detection",
225 )
227 # Convert to digital if analog
228 digital = to_digital(trace, threshold=threshold) if isinstance(trace, WaveformTrace) else trace
230 # Find transitions - ensure we have a numpy array
231 data = np.asarray(digital.data)
233 transitions = np.diff(data.astype(np.int8))
235 # Get edge indices
236 if edge_type == "rising":
237 edge_indices = np.where(transitions == 1)[0]
238 elif edge_type == "falling":
239 edge_indices = np.where(transitions == -1)[0]
240 else: # both
241 edge_indices = np.where(transitions != 0)[0]
243 # Convert indices to timestamps
244 sample_period = digital.metadata.time_base
245 timestamps = edge_indices.astype(np.float64) * sample_period
247 # Sub-sample interpolation for analog traces
248 if isinstance(trace, WaveformTrace) and threshold != "auto":
249 thresh_value = float(threshold)
250 timestamps = _interpolate_edges(trace.data, edge_indices, sample_period, thresh_value)
252 return timestamps
255def _detect_edges_internal(
256 analog_data: NDArray[np.floating[Any]],
257 digital_data: NDArray[np.bool_],
258 sample_rate: float,
259 threshold: float,
260) -> list[tuple[float, bool]]:
261 """Detect edges and return as (timestamp, is_rising) tuples.
263 Args:
264 analog_data: Original analog data for interpolation.
265 digital_data: Thresholded digital data.
266 sample_rate: Sample rate in Hz.
267 threshold: Threshold used for conversion.
269 Returns:
270 List of (timestamp, is_rising) tuples.
271 """
272 sample_period = 1.0 / sample_rate
273 transitions = np.diff(digital_data.astype(np.int8))
275 edges: list[tuple[float, bool]] = []
277 # Rising edges
278 rising_indices = np.where(transitions == 1)[0]
279 for idx in rising_indices:
280 # Sub-sample interpolation
281 if 0 < idx < len(analog_data) - 1:
282 t = _interpolate_crossing(
283 analog_data[idx], analog_data[idx + 1], threshold, sample_period
284 )
285 timestamp = idx * sample_period + t
286 else:
287 timestamp = idx * sample_period
288 edges.append((timestamp, True))
290 # Falling edges
291 falling_indices = np.where(transitions == -1)[0]
292 for idx in falling_indices:
293 if 0 < idx < len(analog_data) - 1:
294 t = _interpolate_crossing(
295 analog_data[idx], analog_data[idx + 1], threshold, sample_period
296 )
297 timestamp = idx * sample_period + t
298 else:
299 timestamp = idx * sample_period
300 edges.append((timestamp, False))
302 # Sort by timestamp
303 edges.sort(key=lambda x: x[0])
305 return edges
308def _interpolate_edges(
309 data: NDArray[np.floating[Any]],
310 edge_indices: NDArray[np.intp],
311 sample_period: float,
312 threshold: float,
313) -> NDArray[np.float64]:
314 """Interpolate edge timestamps for sub-sample accuracy.
316 Uses linear interpolation between samples to estimate the
317 exact crossing point.
319 Args:
320 data: Analog data array.
321 edge_indices: Indices of detected edges.
322 sample_period: Time between samples.
323 threshold: Threshold level.
325 Returns:
326 Array of interpolated timestamps.
327 """
328 timestamps = np.zeros(len(edge_indices), dtype=np.float64)
330 for i, idx in enumerate(edge_indices):
331 base_time = idx * sample_period
333 if 0 < idx < len(data) - 1:
334 # Linear interpolation between samples
335 t = _interpolate_crossing(data[idx], data[idx + 1], threshold, sample_period)
336 timestamps[i] = base_time + t
337 else:
338 timestamps[i] = base_time
340 return timestamps
343def _interpolate_crossing(
344 v1: float,
345 v2: float,
346 threshold: float,
347 sample_period: float,
348) -> float:
349 """Linearly interpolate threshold crossing time.
351 Args:
352 v1: Voltage at sample before crossing.
353 v2: Voltage at sample after crossing.
354 threshold: Threshold level.
355 sample_period: Time between samples.
357 Returns:
358 Time offset from v1 to crossing point.
359 """
360 dv = v2 - v1
361 if abs(dv) < 1e-12:
362 return sample_period / 2 # Midpoint if no change
364 # Linear interpolation: t = (threshold - v1) / (v2 - v1) * period
365 t = (threshold - v1) / dv * sample_period
366 return max(0.0, min(sample_period, t))
369def get_logic_threshold(
370 family: str,
371 threshold_type: Literal["midpoint", "VIH", "VIL"] = "midpoint",
372) -> float:
373 """Get threshold voltage for a logic family.
375 Args:
376 family: Logic family name (e.g., "TTL", "LVCMOS_3V3").
377 threshold_type: Type of threshold:
378 - "midpoint": Midpoint between VIL_max and VIH_min
379 - "VIH": Minimum input high voltage
380 - "VIL": Maximum input low voltage
382 Returns:
383 Threshold voltage.
385 Raises:
386 ValueError: If family or threshold_type is unknown.
388 Example:
389 >>> get_logic_threshold("TTL", "midpoint")
390 1.4
391 """
392 if family not in LOGIC_FAMILIES:
393 available = ", ".join(LOGIC_FAMILIES.keys())
394 raise ValueError(f"Unknown logic family: {family}. Available: {available}")
396 levels = LOGIC_FAMILIES[family]
398 if threshold_type == "midpoint":
399 return (levels["VIL_max"] + levels["VIH_min"]) / 2
400 elif threshold_type == "VIH":
401 return levels["VIH_min"]
402 elif threshold_type == "VIL":
403 return levels["VIL_max"]
404 else:
405 raise ValueError(f"Unknown threshold_type: {threshold_type}")
408__all__ = [
409 "LOGIC_FAMILIES",
410 "detect_edges",
411 "get_logic_threshold",
412 "to_digital",
413]