Coverage for src / tracekit / triggering / pulse.py: 80%
160 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Pulse width and glitch triggering for TraceKit.
3Provides pulse width triggering, glitch detection, and runt pulse
4detection for signal integrity analysis.
6Example:
7 >>> from tracekit.triggering.pulse import PulseWidthTrigger, find_glitches
8 >>> # Find pulses between 100ns and 200ns
9 >>> trigger = PulseWidthTrigger(level=1.5, min_width=100e-9, max_width=200e-9)
10 >>> events = trigger.find_events(trace)
11 >>> # Find glitches shorter than 50ns
12 >>> glitches = find_glitches(trace, max_width=50e-9)
13"""
15from __future__ import annotations
17from dataclasses import dataclass
18from typing import Literal
20import numpy as np
22from tracekit.core.exceptions import AnalysisError
23from tracekit.core.types import DigitalTrace, WaveformTrace
24from tracekit.triggering.base import (
25 Trigger,
26 TriggerEvent,
27 TriggerType,
28 interpolate_crossing,
29)
32@dataclass
33class PulseInfo:
34 """Information about a detected pulse.
36 Attributes:
37 start_time: Start time of pulse in seconds.
38 end_time: End time of pulse in seconds.
39 width: Pulse width in seconds.
40 polarity: "positive" or "negative".
41 start_index: Sample index at pulse start.
42 end_index: Sample index at pulse end.
43 amplitude: Peak amplitude during pulse.
44 """
46 start_time: float
47 end_time: float
48 width: float
49 polarity: Literal["positive", "negative"]
50 start_index: int
51 end_index: int
52 amplitude: float
55class PulseWidthTrigger(Trigger):
56 """Pulse width trigger for detecting pulses in a width range.
58 Triggers on pulses that fall within the specified width range.
60 Attributes:
61 level: Threshold level for pulse detection.
62 polarity: Pulse polarity - "positive", "negative", or "either".
63 min_width: Minimum pulse width (None for no minimum).
64 max_width: Maximum pulse width (None for no maximum).
65 """
67 def __init__(
68 self,
69 level: float,
70 polarity: Literal["positive", "negative", "either"] = "positive",
71 min_width: float | None = None,
72 max_width: float | None = None,
73 ) -> None:
74 """Initialize pulse width trigger.
76 Args:
77 level: Threshold level for pulse detection.
78 polarity: Pulse polarity to detect.
79 min_width: Minimum pulse width in seconds.
80 max_width: Maximum pulse width in seconds.
82 Raises:
83 AnalysisError: If min_width is greater than max_width.
84 """
85 self.level = level
86 self.polarity = polarity
87 self.min_width = min_width
88 self.max_width = max_width
90 if min_width is not None and max_width is not None and min_width > max_width: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 raise AnalysisError("min_width cannot be greater than max_width")
93 def find_events(
94 self,
95 trace: WaveformTrace | DigitalTrace,
96 ) -> list[TriggerEvent]:
97 """Find pulses matching the width criteria.
99 Args:
100 trace: Input trace.
102 Returns:
103 List of trigger events for matching pulses.
104 """
105 pulses = self._find_all_pulses(trace)
107 # Filter by width
108 events: list[TriggerEvent] = []
109 for pulse in pulses:
110 if self.min_width is not None and pulse.width < self.min_width: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 continue
112 if self.max_width is not None and pulse.width > self.max_width:
113 continue
115 events.append(
116 TriggerEvent(
117 timestamp=pulse.start_time,
118 sample_index=pulse.start_index,
119 event_type=TriggerType.PULSE_WIDTH,
120 level=pulse.amplitude,
121 duration=pulse.width,
122 data={
123 "polarity": pulse.polarity,
124 "end_time": pulse.end_time,
125 "end_index": pulse.end_index,
126 },
127 )
128 )
130 return events
132 def _find_all_pulses(
133 self,
134 trace: WaveformTrace | DigitalTrace,
135 ) -> list[PulseInfo]:
136 """Find all pulses in the trace."""
137 if isinstance(trace, DigitalTrace): 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 data = trace.data.astype(np.float64)
139 level = 0.5
140 else:
141 data = trace.data
142 level = self.level
144 sample_period = trace.metadata.time_base
145 pulses: list[PulseInfo] = []
147 # Find all threshold crossings
148 above = data >= level
149 below = data < level
151 # Rising edges: transition from below to above
152 rising = np.where(below[:-1] & above[1:])[0]
153 # Falling edges: transition from above to below
154 falling = np.where(above[:-1] & below[1:])[0]
156 if self.polarity in ("positive", "either"): 156 ↛ 185line 156 didn't jump to line 185 because the condition on line 156 was always true
157 # Positive pulses: rising -> falling
158 for r_idx in rising:
159 # Find next falling edge
160 next_falling = falling[falling > r_idx]
161 if len(next_falling) == 0: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 continue
163 f_idx = next_falling[0]
165 start_time = interpolate_crossing(data, r_idx, level, sample_period, True)
166 end_time = interpolate_crossing(data, f_idx, level, sample_period, False)
167 width = end_time - start_time
169 # Get peak amplitude
170 pulse_data = data[r_idx : f_idx + 1]
171 amplitude = float(np.max(pulse_data)) if len(pulse_data) > 0 else level
173 pulses.append(
174 PulseInfo(
175 start_time=start_time,
176 end_time=end_time,
177 width=width,
178 polarity="positive",
179 start_index=int(r_idx),
180 end_index=int(f_idx),
181 amplitude=amplitude,
182 )
183 )
185 if self.polarity in ("negative", "either"):
186 # Negative pulses: falling -> rising
187 for f_idx in falling:
188 # Find next rising edge
189 next_rising = rising[rising > f_idx]
190 if len(next_rising) == 0:
191 continue
192 r_idx = next_rising[0]
194 start_time = interpolate_crossing(data, f_idx, level, sample_period, False)
195 end_time = interpolate_crossing(data, r_idx, level, sample_period, True)
196 width = end_time - start_time
198 # Get peak (minimum) amplitude
199 pulse_data = data[f_idx : r_idx + 1]
200 amplitude = float(np.min(pulse_data)) if len(pulse_data) > 0 else level
202 pulses.append(
203 PulseInfo(
204 start_time=start_time,
205 end_time=end_time,
206 width=width,
207 polarity="negative",
208 start_index=int(f_idx),
209 end_index=int(r_idx),
210 amplitude=amplitude,
211 )
212 )
214 # Sort by start time
215 pulses.sort(key=lambda p: p.start_time)
216 return pulses
219class GlitchTrigger(Trigger):
220 """Glitch trigger for detecting narrow pulses.
222 Glitches are pulses shorter than a maximum width threshold.
224 Attributes:
225 level: Threshold level.
226 max_width: Maximum pulse width to be considered a glitch.
227 polarity: Glitch polarity - "positive", "negative", or "either".
228 """
230 def __init__(
231 self,
232 level: float,
233 max_width: float = 100e-9,
234 polarity: Literal["positive", "negative", "either"] = "either",
235 ) -> None:
236 """Initialize glitch trigger.
238 Args:
239 level: Threshold level.
240 max_width: Maximum pulse width to trigger (in seconds).
241 polarity: Glitch polarity to detect.
242 """
243 self.level = level
244 self.max_width = max_width
245 self.polarity = polarity
247 def find_events(
248 self,
249 trace: WaveformTrace | DigitalTrace,
250 ) -> list[TriggerEvent]:
251 """Find all glitches in the trace.
253 Args:
254 trace: Input trace.
256 Returns:
257 List of trigger events for each glitch.
258 """
259 pulse_trigger = PulseWidthTrigger(
260 level=self.level,
261 polarity=self.polarity,
262 min_width=None,
263 max_width=self.max_width,
264 )
266 events = pulse_trigger.find_events(trace)
268 # Reclassify as glitch events
269 for event in events:
270 event.event_type = TriggerType.GLITCH
272 return events
275class RuntTrigger(Trigger):
276 """Runt pulse trigger for detecting incomplete transitions.
278 Runt pulses cross one threshold but not the other, indicating
279 incomplete signal transitions.
281 Attributes:
282 low_threshold: Lower threshold level.
283 high_threshold: Upper threshold level.
284 polarity: Runt polarity - "positive", "negative", or "either".
285 """
287 def __init__(
288 self,
289 low_threshold: float,
290 high_threshold: float,
291 polarity: Literal["positive", "negative", "either"] = "either",
292 ) -> None:
293 """Initialize runt trigger.
295 Args:
296 low_threshold: Lower threshold (e.g., logic low).
297 high_threshold: Upper threshold (e.g., logic high).
298 polarity: "positive" for rising runts, "negative" for falling.
300 Raises:
301 AnalysisError: If low_threshold is not less than high_threshold.
302 """
303 if low_threshold >= high_threshold: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 raise AnalysisError("low_threshold must be less than high_threshold")
306 self.low_threshold = low_threshold
307 self.high_threshold = high_threshold
308 self.polarity = polarity
310 def find_events(
311 self,
312 trace: WaveformTrace | DigitalTrace,
313 ) -> list[TriggerEvent]:
314 """Find all runt pulses in the trace.
316 Args:
317 trace: Input trace.
319 Returns:
320 List of trigger events for each runt pulse.
321 """
322 if isinstance(trace, DigitalTrace): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true
323 # Digital traces don't have runts
324 return []
326 data = trace.data
327 sample_period = trace.metadata.time_base
328 events: list[TriggerEvent] = []
330 # Track signal zones
331 # Zone 0: below low_threshold
332 # Zone 1: between thresholds
333 # Zone 2: above high_threshold
334 def get_zone(value: float) -> int:
335 if value < self.low_threshold:
336 return 0
337 elif value > self.high_threshold:
338 return 2
339 else:
340 return 1
342 zones = np.array([get_zone(v) for v in data])
344 # Find runt pulses: transitions that enter zone 1 but don't reach the other side
345 i = 0
346 while i < len(zones) - 1:
347 curr_zone = zones[i]
349 if curr_zone == 0:
350 # Starting low - look for positive runt
351 if self.polarity in ("positive", "either"): 351 ↛ 406line 351 didn't jump to line 406 because the condition on line 351 was always true
352 # Find transition to zone 1
353 if zones[i + 1] == 1:
354 start_idx = i
355 # Track through zone 1
356 j = i + 1
357 while j < len(zones) and zones[j] == 1:
358 j += 1
359 if j < len(zones) and zones[j] == 0: 359 ↛ 376line 359 didn't jump to line 376 because the condition on line 359 was always true
360 # Returned to low without reaching high - RUNT
361 peak = float(np.max(data[start_idx : j + 1]))
362 events.append(
363 TriggerEvent(
364 timestamp=start_idx * sample_period,
365 sample_index=start_idx,
366 event_type=TriggerType.RUNT,
367 level=peak,
368 duration=(j - start_idx) * sample_period,
369 data={
370 "polarity": "positive",
371 "expected_high": self.high_threshold,
372 "actual_peak": peak,
373 },
374 )
375 )
376 i = j
377 continue
379 elif curr_zone == 2: 379 ↛ 406line 379 didn't jump to line 406 because the condition on line 379 was always true
380 # Starting high - look for negative runt
381 if self.polarity in ("negative", "either") and zones[i + 1] == 1: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 start_idx = i
383 j = i + 1
384 while j < len(zones) and zones[j] == 1:
385 j += 1
386 if j < len(zones) and zones[j] == 2:
387 # Returned to high without reaching low - RUNT
388 trough = float(np.min(data[start_idx : j + 1]))
389 events.append(
390 TriggerEvent(
391 timestamp=start_idx * sample_period,
392 sample_index=start_idx,
393 event_type=TriggerType.RUNT,
394 level=trough,
395 duration=(j - start_idx) * sample_period,
396 data={
397 "polarity": "negative",
398 "expected_low": self.low_threshold,
399 "actual_trough": trough,
400 },
401 )
402 )
403 i = j
404 continue
406 i += 1
408 return events
411def find_pulses(
412 trace: WaveformTrace,
413 *,
414 level: float | None = None,
415 polarity: Literal["positive", "negative", "either"] = "positive",
416 min_width: float | None = None,
417 max_width: float | None = None,
418) -> list[TriggerEvent]:
419 """Find pulses matching width criteria.
421 Args:
422 trace: Input waveform trace.
423 level: Threshold level. If None, uses 50% of amplitude.
424 polarity: Pulse polarity to find.
425 min_width: Minimum pulse width in seconds.
426 max_width: Maximum pulse width in seconds.
428 Returns:
429 List of trigger events for matching pulses.
431 Example:
432 >>> # Find all positive pulses between 1us and 10us
433 >>> pulses = find_pulses(trace, min_width=1e-6, max_width=10e-6)
434 """
435 if level is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 level = (np.min(trace.data) + np.max(trace.data)) / 2
438 trigger = PulseWidthTrigger(
439 level=level,
440 polarity=polarity,
441 min_width=min_width,
442 max_width=max_width,
443 )
444 return trigger.find_events(trace)
447def find_glitches(
448 trace: WaveformTrace,
449 max_width: float = 100e-9,
450 *,
451 level: float | None = None,
452 polarity: Literal["positive", "negative", "either"] = "either",
453) -> list[TriggerEvent]:
454 """Find glitches (narrow pulses) in a trace.
456 Args:
457 trace: Input waveform trace.
458 max_width: Maximum width to be considered a glitch (default 100ns).
459 level: Threshold level. If None, uses 50% of amplitude.
460 polarity: Glitch polarity to find.
462 Returns:
463 List of trigger events for each glitch.
465 Example:
466 >>> # Find all glitches shorter than 50ns
467 >>> glitches = find_glitches(trace, max_width=50e-9)
468 >>> print(f"Found {len(glitches)} glitches")
469 """
470 if level is None: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 level = (np.min(trace.data) + np.max(trace.data)) / 2
473 trigger = GlitchTrigger(
474 level=level,
475 max_width=max_width,
476 polarity=polarity,
477 )
478 return trigger.find_events(trace)
481def find_runt_pulses(
482 trace: WaveformTrace,
483 low_threshold: float | None = None,
484 high_threshold: float | None = None,
485 *,
486 polarity: Literal["positive", "negative", "either"] = "either",
487) -> list[TriggerEvent]:
488 """Find runt pulses (incomplete transitions) in a trace.
490 Args:
491 trace: Input waveform trace.
492 low_threshold: Lower threshold. If None, uses 20% of amplitude.
493 high_threshold: Upper threshold. If None, uses 80% of amplitude.
494 polarity: Runt polarity to find.
496 Returns:
497 List of trigger events for each runt pulse.
499 Example:
500 >>> # Find runts using standard 20%/80% thresholds
501 >>> runts = find_runt_pulses(trace)
502 >>> for runt in runts:
503 ... print(f"Runt at {runt.timestamp*1e6:.2f} us")
504 """
505 if low_threshold is None: 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true
506 amplitude = np.max(trace.data) - np.min(trace.data)
507 low_threshold = np.min(trace.data) + 0.2 * amplitude
509 if high_threshold is None: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 amplitude = np.max(trace.data) - np.min(trace.data)
511 high_threshold = np.min(trace.data) + 0.8 * amplitude
513 trigger = RuntTrigger(
514 low_threshold=low_threshold,
515 high_threshold=high_threshold,
516 polarity=polarity,
517 )
518 return trigger.find_events(trace)
521def pulse_statistics(
522 trace: WaveformTrace,
523 *,
524 level: float | None = None,
525 polarity: Literal["positive", "negative"] = "positive",
526) -> dict[str, float]:
527 """Calculate pulse width statistics.
529 Args:
530 trace: Input waveform trace.
531 level: Threshold level.
532 polarity: Pulse polarity to analyze.
534 Returns:
535 Dictionary with pulse statistics:
536 - count: Number of pulses
537 - min_width: Minimum pulse width
538 - max_width: Maximum pulse width
539 - mean_width: Mean pulse width
540 - std_width: Standard deviation of pulse widths
542 Example:
543 >>> stats = pulse_statistics(trace)
544 >>> print(f"Mean pulse width: {stats['mean_width']*1e6:.2f} us")
545 """
546 if level is None: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 level = (np.min(trace.data) + np.max(trace.data)) / 2
549 trigger = PulseWidthTrigger(level=level, polarity=polarity)
550 events = trigger.find_events(trace)
552 if len(events) == 0: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true
553 return {
554 "count": 0,
555 "min_width": np.nan,
556 "max_width": np.nan,
557 "mean_width": np.nan,
558 "std_width": np.nan,
559 }
561 widths = np.array([e.duration for e in events if e.duration is not None])
563 return {
564 "count": len(widths),
565 "min_width": float(np.min(widths)),
566 "max_width": float(np.max(widths)),
567 "mean_width": float(np.mean(widths)),
568 "std_width": float(np.std(widths)),
569 }
572__all__ = [
573 "GlitchTrigger",
574 "PulseInfo",
575 "PulseWidthTrigger",
576 "RuntTrigger",
577 "find_glitches",
578 "find_pulses",
579 "find_runt_pulses",
580 "pulse_statistics",
581]