Coverage for src / tracekit / triggering / base.py: 84%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Base classes and utilities for TraceKit triggering module. 

2 

3Provides abstract base class for triggers and common trigger event 

4data structure. 

5""" 

6 

7from __future__ import annotations 

8 

9from abc import ABC, abstractmethod 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from typing import TYPE_CHECKING, Any, Literal 

13 

14from tracekit.core.exceptions import AnalysisError 

15 

16if TYPE_CHECKING: 

17 import numpy as np 

18 from numpy.typing import NDArray 

19 

20 from tracekit.core.types import DigitalTrace, WaveformTrace 

21 

22 

23class TriggerType(Enum): 

24 """Types of trigger events.""" 

25 

26 RISING_EDGE = "rising_edge" 

27 FALLING_EDGE = "falling_edge" 

28 PATTERN_MATCH = "pattern_match" 

29 PULSE_WIDTH = "pulse_width" 

30 GLITCH = "glitch" 

31 RUNT = "runt" 

32 WINDOW_ENTRY = "window_entry" 

33 WINDOW_EXIT = "window_exit" 

34 ZONE_VIOLATION = "zone_violation" 

35 

36 

37@dataclass 

38class TriggerEvent: 

39 """Represents a detected trigger event. 

40 

41 Attributes: 

42 timestamp: Time of the trigger event in seconds. 

43 sample_index: Sample index where trigger occurred. 

44 event_type: Type of trigger event. 

45 level: Voltage/signal level at trigger point. 

46 data: Additional event-specific data. 

47 """ 

48 

49 timestamp: float 

50 sample_index: int 

51 event_type: TriggerType 

52 level: float | None = None 

53 duration: float | None = None 

54 data: dict[str, Any] = field(default_factory=dict) 

55 

56 def __repr__(self) -> str: 

57 return ( 

58 f"TriggerEvent({self.event_type.value} at t={self.timestamp:.6e}s, " 

59 f"sample={self.sample_index})" 

60 ) 

61 

62 

63class Trigger(ABC): 

64 """Abstract base class for all trigger types. 

65 

66 Defines the common interface for finding trigger events in traces. 

67 """ 

68 

69 @abstractmethod 

70 def find_events( 

71 self, 

72 trace: WaveformTrace | DigitalTrace, 

73 ) -> list[TriggerEvent]: 

74 """Find all trigger events in a trace. 

75 

76 Args: 

77 trace: Input trace to search. 

78 

79 Returns: 

80 List of trigger events found. 

81 """ 

82 ... 

83 

84 def find_first( 

85 self, 

86 trace: WaveformTrace | DigitalTrace, 

87 ) -> TriggerEvent | None: 

88 """Find the first trigger event. 

89 

90 Args: 

91 trace: Input trace to search. 

92 

93 Returns: 

94 First trigger event, or None if no triggers found. 

95 """ 

96 events = self.find_events(trace) 

97 return events[0] if events else None 

98 

99 def count_events( 

100 self, 

101 trace: WaveformTrace | DigitalTrace, 

102 ) -> int: 

103 """Count trigger events. 

104 

105 Args: 

106 trace: Input trace to search. 

107 

108 Returns: 

109 Number of trigger events found. 

110 """ 

111 return len(self.find_events(trace)) 

112 

113 

114def find_triggers( 

115 trace: WaveformTrace | DigitalTrace, 

116 trigger_type: Literal["edge", "pattern", "pulse_width", "glitch", "runt", "window"], 

117 **kwargs: Any, 

118) -> list[TriggerEvent]: 

119 """Unified function to find trigger events. 

120 

121 Args: 

122 trace: Input trace to search. 

123 trigger_type: Type of trigger to use. 

124 **kwargs: Trigger-specific parameters. 

125 

126 Returns: 

127 List of trigger events. 

128 

129 Raises: 

130 AnalysisError: If unknown trigger type. 

131 

132 Example: 

133 >>> events = find_triggers(trace, "edge", level=1.5, edge="rising") 

134 >>> events = find_triggers(trace, "pulse_width", min_width=1e-6, max_width=2e-6) 

135 >>> events = find_triggers(trace, "glitch", max_width=50e-9) 

136 """ 

137 from tracekit.triggering.edge import EdgeTrigger 

138 from tracekit.triggering.pulse import PulseWidthTrigger 

139 from tracekit.triggering.window import WindowTrigger 

140 

141 if trigger_type == "edge": 

142 trigger = EdgeTrigger( 

143 level=kwargs.get("level", 0.0), 

144 edge=kwargs.get("edge", "rising"), 

145 hysteresis=kwargs.get("hysteresis", 0.0), 

146 ) 

147 elif trigger_type == "pattern": 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 from tracekit.triggering.pattern import PatternTrigger 

149 

150 trigger = PatternTrigger( # type: ignore[assignment] 

151 pattern=kwargs.get("pattern", []), 

152 levels=kwargs.get("levels"), 

153 ) 

154 elif trigger_type == "pulse_width": 

155 trigger = PulseWidthTrigger( # type: ignore[assignment] 

156 level=kwargs.get("level", 0.0), 

157 polarity=kwargs.get("polarity", "positive"), 

158 min_width=kwargs.get("min_width"), 

159 max_width=kwargs.get("max_width"), 

160 ) 

161 elif trigger_type == "glitch": 

162 from tracekit.triggering.pulse import GlitchTrigger 

163 

164 trigger = GlitchTrigger( # type: ignore[assignment] 

165 level=kwargs.get("level", 0.0), 

166 max_width=kwargs.get("max_width", 100e-9), 

167 polarity=kwargs.get("polarity", "either"), 

168 ) 

169 elif trigger_type == "runt": 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 from tracekit.triggering.pulse import RuntTrigger 

171 

172 trigger = RuntTrigger( # type: ignore[assignment] 

173 low_threshold=kwargs.get("low_threshold", 0.0), 

174 high_threshold=kwargs.get("high_threshold", 1.0), 

175 polarity=kwargs.get("polarity", "either"), 

176 ) 

177 elif trigger_type == "window": 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true

178 trigger = WindowTrigger( # type: ignore[assignment] 

179 low_threshold=kwargs.get("low_threshold", 0.0), 

180 high_threshold=kwargs.get("high_threshold", 1.0), 

181 trigger_on=kwargs.get("trigger_on", "exit"), 

182 ) 

183 else: 

184 raise AnalysisError(f"Unknown trigger type: {trigger_type}") 

185 

186 return trigger.find_events(trace) 

187 

188 

189def interpolate_crossing( 

190 data: NDArray[np.floating[Any]], 

191 idx: int, 

192 threshold: float, 

193 sample_period: float, 

194 rising: bool = True, 

195) -> float: 

196 """Interpolate exact threshold crossing time. 

197 

198 Args: 

199 data: Waveform data array. 

200 idx: Sample index near crossing. 

201 threshold: Threshold level. 

202 sample_period: Time between samples. 

203 rising: True for rising edge, False for falling. 

204 

205 Returns: 

206 Interpolated crossing time in seconds. 

207 """ 

208 if idx < 0 or idx >= len(data) - 1: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 return idx * sample_period 

210 

211 v1, v2 = data[idx], data[idx + 1] 

212 

213 if abs(v2 - v1) < 1e-12: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 return (idx + 0.5) * sample_period 

215 

216 # Linear interpolation 

217 t_offset = (threshold - v1) / (v2 - v1) * sample_period 

218 t_offset = max(0, min(sample_period, t_offset)) 

219 

220 return idx * sample_period + t_offset # type: ignore[no-any-return] 

221 

222 

223__all__ = [ 

224 "Trigger", 

225 "TriggerEvent", 

226 "TriggerType", 

227 "find_triggers", 

228 "interpolate_crossing", 

229]