Coverage for src / tracekit / triggering / edge.py: 100%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Edge triggering for TraceKit. 

2 

3Provides edge detection with configurable thresholds, hysteresis, 

4and edge polarity (rising, falling, or both). 

5 

6Example: 

7 >>> from tracekit.triggering.edge import EdgeTrigger, find_rising_edges 

8 >>> # Object-oriented approach 

9 >>> trigger = EdgeTrigger(level=1.5, edge="rising", hysteresis=0.1) 

10 >>> events = trigger.find_events(trace) 

11 >>> # Functional approach 

12 >>> timestamps = find_rising_edges(trace, level=1.5) 

13""" 

14 

15from __future__ import annotations 

16 

17from typing import TYPE_CHECKING, Any, Literal 

18 

19import numpy as np 

20 

21from tracekit.core.types import DigitalTrace, WaveformTrace 

22from tracekit.triggering.base import ( 

23 Trigger, 

24 TriggerEvent, 

25 TriggerType, 

26 interpolate_crossing, 

27) 

28 

29if TYPE_CHECKING: 

30 from numpy.typing import NDArray 

31 

32 

33class EdgeTrigger(Trigger): 

34 """Edge trigger with threshold and optional hysteresis. 

35 

36 Detects signal crossings of a threshold level with configurable 

37 hysteresis for noise immunity. 

38 

39 Attributes: 

40 level: Trigger threshold level. 

41 edge: Edge type - "rising", "falling", or "either". 

42 hysteresis: Hysteresis band (Schmitt trigger style). 

43 """ 

44 

45 def __init__( 

46 self, 

47 level: float, 

48 edge: Literal["rising", "falling", "either"] = "rising", 

49 hysteresis: float = 0.0, 

50 ) -> None: 

51 """Initialize edge trigger. 

52 

53 Args: 

54 level: Trigger threshold level in signal units (e.g., volts). 

55 edge: Edge polarity to trigger on. 

56 hysteresis: Hysteresis band width. Trigger requires signal to 

57 cross level +/- hysteresis/2 before retriggering. 

58 """ 

59 self.level = level 

60 self.edge = edge 

61 self.hysteresis = hysteresis 

62 

63 def find_events( 

64 self, 

65 trace: WaveformTrace | DigitalTrace, 

66 ) -> list[TriggerEvent]: 

67 """Find all edge events in the trace. 

68 

69 Args: 

70 trace: Input waveform or digital trace. 

71 

72 Returns: 

73 List of trigger events for each detected edge. 

74 """ 

75 if isinstance(trace, DigitalTrace): 

76 # For digital traces, use edge list if available 

77 data = trace.data.astype(np.float64) 

78 else: 

79 data = trace.data 

80 

81 sample_period = trace.metadata.time_base 

82 events: list[TriggerEvent] = [] 

83 

84 if self.hysteresis > 0: 

85 # Schmitt trigger mode 

86 events = self._find_edges_with_hysteresis(data, sample_period) 

87 else: 

88 # Simple threshold crossing 

89 events = self._find_edges_simple(data, sample_period) 

90 

91 return events 

92 

93 def _find_edges_simple( 

94 self, 

95 data: NDArray[np.floating[Any]], 

96 sample_period: float, 

97 ) -> list[TriggerEvent]: 

98 """Find edges using simple threshold crossing.""" 

99 events: list[TriggerEvent] = [] 

100 

101 below = data < self.level 

102 above = data >= self.level 

103 

104 if self.edge in ("rising", "either"): 

105 # Rising: below -> above 

106 rising_idx = np.where(below[:-1] & above[1:])[0] 

107 for idx in rising_idx: 

108 timestamp = interpolate_crossing(data, idx, self.level, sample_period, rising=True) 

109 events.append( 

110 TriggerEvent( 

111 timestamp=timestamp, 

112 sample_index=int(idx), 

113 event_type=TriggerType.RISING_EDGE, 

114 level=float(data[idx + 1]), 

115 ) 

116 ) 

117 

118 if self.edge in ("falling", "either"): 

119 # Falling: above -> below 

120 falling_idx = np.where(above[:-1] & below[1:])[0] 

121 for idx in falling_idx: 

122 timestamp = interpolate_crossing(data, idx, self.level, sample_period, rising=False) 

123 events.append( 

124 TriggerEvent( 

125 timestamp=timestamp, 

126 sample_index=int(idx), 

127 event_type=TriggerType.FALLING_EDGE, 

128 level=float(data[idx + 1]), 

129 ) 

130 ) 

131 

132 # Sort by timestamp if we detected both edge types 

133 if self.edge == "either": 

134 events.sort(key=lambda e: e.timestamp) 

135 

136 return events 

137 

138 def _find_edges_with_hysteresis( 

139 self, 

140 data: NDArray[np.floating[Any]], 

141 sample_period: float, 

142 ) -> list[TriggerEvent]: 

143 """Find edges using Schmitt trigger with hysteresis.""" 

144 events: list[TriggerEvent] = [] 

145 

146 high_thresh = self.level + self.hysteresis / 2 

147 low_thresh = self.level - self.hysteresis / 2 

148 

149 # State machine: track if we're currently "high" or "low" 

150 state = "low" if data[0] < self.level else "high" 

151 

152 for i in range(1, len(data)): 

153 if state == "low" and data[i] >= high_thresh: 

154 # Rising edge detected 

155 state = "high" 

156 if self.edge in ("rising", "either"): 

157 timestamp = interpolate_crossing( 

158 data, i - 1, high_thresh, sample_period, rising=True 

159 ) 

160 events.append( 

161 TriggerEvent( 

162 timestamp=timestamp, 

163 sample_index=i, 

164 event_type=TriggerType.RISING_EDGE, 

165 level=float(data[i]), 

166 ) 

167 ) 

168 

169 elif state == "high" and data[i] <= low_thresh: 

170 # Falling edge detected 

171 state = "low" 

172 if self.edge in ("falling", "either"): 

173 timestamp = interpolate_crossing( 

174 data, i - 1, low_thresh, sample_period, rising=False 

175 ) 

176 events.append( 

177 TriggerEvent( 

178 timestamp=timestamp, 

179 sample_index=i, 

180 event_type=TriggerType.FALLING_EDGE, 

181 level=float(data[i]), 

182 ) 

183 ) 

184 

185 return events 

186 

187 

188def find_rising_edges( 

189 trace: WaveformTrace, 

190 level: float | None = None, 

191 *, 

192 hysteresis: float = 0.0, 

193 return_indices: bool = False, 

194) -> NDArray[np.float64] | NDArray[np.int64]: 

195 """Find all rising edge timestamps or indices. 

196 

197 Args: 

198 trace: Input waveform trace. 

199 level: Trigger threshold. If None, uses signal midpoint. 

200 hysteresis: Hysteresis band for noise immunity. 

201 return_indices: If True, return sample indices instead of timestamps. 

202 

203 Returns: 

204 Array of timestamps (seconds) or sample indices. 

205 

206 Example: 

207 >>> edges = find_rising_edges(trace, level=1.5) 

208 >>> print(f"Found {len(edges)} rising edges") 

209 """ 

210 if level is None: 

211 level = (np.min(trace.data) + np.max(trace.data)) / 2 

212 

213 trigger = EdgeTrigger(level=level, edge="rising", hysteresis=hysteresis) 

214 events = trigger.find_events(trace) 

215 

216 if return_indices: 

217 return np.array([e.sample_index for e in events], dtype=np.int64) 

218 return np.array([e.timestamp for e in events], dtype=np.float64) 

219 

220 

221def find_falling_edges( 

222 trace: WaveformTrace, 

223 level: float | None = None, 

224 *, 

225 hysteresis: float = 0.0, 

226 return_indices: bool = False, 

227) -> NDArray[np.float64] | NDArray[np.int64]: 

228 """Find all falling edge timestamps or indices. 

229 

230 Args: 

231 trace: Input waveform trace. 

232 level: Trigger threshold. If None, uses signal midpoint. 

233 hysteresis: Hysteresis band for noise immunity. 

234 return_indices: If True, return sample indices instead of timestamps. 

235 

236 Returns: 

237 Array of timestamps (seconds) or sample indices. 

238 

239 Example: 

240 >>> edges = find_falling_edges(trace, level=1.5) 

241 """ 

242 if level is None: 

243 level = (np.min(trace.data) + np.max(trace.data)) / 2 

244 

245 trigger = EdgeTrigger(level=level, edge="falling", hysteresis=hysteresis) 

246 events = trigger.find_events(trace) 

247 

248 if return_indices: 

249 return np.array([e.sample_index for e in events], dtype=np.int64) 

250 return np.array([e.timestamp for e in events], dtype=np.float64) 

251 

252 

253def find_all_edges( 

254 trace: WaveformTrace, 

255 level: float | None = None, 

256 *, 

257 hysteresis: float = 0.0, 

258) -> tuple[NDArray[np.float64], NDArray[np.bool_]]: 

259 """Find all edges (rising and falling) with polarity. 

260 

261 Args: 

262 trace: Input waveform trace. 

263 level: Trigger threshold. If None, uses signal midpoint. 

264 hysteresis: Hysteresis band for noise immunity. 

265 

266 Returns: 

267 Tuple of (timestamps, is_rising) where is_rising is True for 

268 rising edges and False for falling edges. 

269 

270 Example: 

271 >>> timestamps, is_rising = find_all_edges(trace, level=1.5) 

272 >>> rising = timestamps[is_rising] 

273 >>> falling = timestamps[~is_rising] 

274 """ 

275 if level is None: 

276 level = (np.min(trace.data) + np.max(trace.data)) / 2 

277 

278 trigger = EdgeTrigger(level=level, edge="either", hysteresis=hysteresis) 

279 events = trigger.find_events(trace) 

280 

281 timestamps = np.array([e.timestamp for e in events], dtype=np.float64) 

282 is_rising = np.array([e.event_type == TriggerType.RISING_EDGE for e in events], dtype=np.bool_) 

283 

284 return timestamps, is_rising 

285 

286 

287def edge_count( 

288 trace: WaveformTrace, 

289 level: float | None = None, 

290 edge: Literal["rising", "falling", "either"] = "either", 

291 *, 

292 hysteresis: float = 0.0, 

293) -> int: 

294 """Count edges in a trace. 

295 

296 Args: 

297 trace: Input waveform trace. 

298 level: Trigger threshold. If None, uses signal midpoint. 

299 edge: Edge type to count. 

300 hysteresis: Hysteresis band for noise immunity. 

301 

302 Returns: 

303 Number of edges found. 

304 

305 Example: 

306 >>> n_rising = edge_count(trace, level=1.5, edge="rising") 

307 """ 

308 if level is None: 

309 level = (np.min(trace.data) + np.max(trace.data)) / 2 

310 

311 trigger = EdgeTrigger(level=level, edge=edge, hysteresis=hysteresis) 

312 return trigger.count_events(trace) 

313 

314 

315def edge_rate( 

316 trace: WaveformTrace, 

317 level: float | None = None, 

318 edge: Literal["rising", "falling", "either"] = "either", 

319 *, 

320 hysteresis: float = 0.0, 

321) -> float: 

322 """Calculate edge rate (edges per second). 

323 

324 Args: 

325 trace: Input waveform trace. 

326 level: Trigger threshold. 

327 edge: Edge type to count. 

328 hysteresis: Hysteresis band for noise immunity. 

329 

330 Returns: 

331 Edge rate in Hz. 

332 

333 Example: 

334 >>> rate = edge_rate(trace, level=1.5, edge="rising") 

335 >>> print(f"Toggle rate: {rate} Hz") 

336 """ 

337 count = edge_count(trace, level, edge, hysteresis=hysteresis) 

338 duration = trace.duration 

339 

340 if duration <= 0: 

341 return 0.0 

342 

343 return count / duration 

344 

345 

346__all__ = [ 

347 "EdgeTrigger", 

348 "edge_count", 

349 "edge_rate", 

350 "find_all_edges", 

351 "find_falling_edges", 

352 "find_rising_edges", 

353]