Coverage for src / tracekit / triggering / window.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Window and zone triggering for TraceKit. 

2 

3Provides window triggering (signal inside/outside voltage window) and 

4zone triggering (signal enters/exits defined zones) for limit testing. 

5 

6Example: 

7 >>> from tracekit.triggering.window import WindowTrigger, find_window_violations 

8 >>> # Trigger when signal exits 0-3.3V window 

9 >>> trigger = WindowTrigger(low_threshold=0, high_threshold=3.3, trigger_on="exit") 

10 >>> violations = trigger.find_events(trace) 

11""" 

12 

13from __future__ import annotations 

14 

15from dataclasses import dataclass 

16from typing import TYPE_CHECKING, Literal 

17 

18import numpy as np 

19 

20from tracekit.core.exceptions import AnalysisError 

21from tracekit.triggering.base import ( 

22 Trigger, 

23 TriggerEvent, 

24 TriggerType, 

25) 

26 

27if TYPE_CHECKING: 

28 from tracekit.core.types import WaveformTrace 

29 

30 

31@dataclass 

32class Zone: 

33 """Defines a voltage/time zone for triggering. 

34 

35 Attributes: 

36 low: Lower voltage boundary. 

37 high: Upper voltage boundary. 

38 start_time: Start time boundary (None for no limit). 

39 end_time: End time boundary (None for no limit). 

40 name: Optional zone name for identification. 

41 """ 

42 

43 low: float 

44 high: float 

45 start_time: float | None = None 

46 end_time: float | None = None 

47 name: str = "" 

48 

49 

50class WindowTrigger(Trigger): 

51 """Window trigger for detecting voltage limit violations. 

52 

53 Triggers when the signal enters or exits a voltage window defined 

54 by low and high threshold levels. 

55 

56 Attributes: 

57 low_threshold: Lower window boundary. 

58 high_threshold: Upper window boundary. 

59 trigger_on: When to trigger - "entry", "exit", or "both". 

60 """ 

61 

62 def __init__( 

63 self, 

64 low_threshold: float, 

65 high_threshold: float, 

66 trigger_on: Literal["entry", "exit", "both"] = "exit", 

67 ) -> None: 

68 """Initialize window trigger. 

69 

70 Args: 

71 low_threshold: Lower window boundary. 

72 high_threshold: Upper window boundary. 

73 trigger_on: "entry" triggers when entering window, 

74 "exit" triggers when leaving window, 

75 "both" triggers on either event. 

76 

77 Raises: 

78 AnalysisError: If low_threshold is not less than high_threshold. 

79 """ 

80 if low_threshold >= high_threshold: 

81 raise AnalysisError("low_threshold must be less than high_threshold") 

82 

83 self.low_threshold = low_threshold 

84 self.high_threshold = high_threshold 

85 self.trigger_on = trigger_on 

86 

87 def find_events( 

88 self, 

89 trace: WaveformTrace, # type: ignore[override] 

90 ) -> list[TriggerEvent]: 

91 """Find all window entry/exit events. 

92 

93 Args: 

94 trace: Input waveform trace. 

95 

96 Returns: 

97 List of trigger events for window crossings. 

98 """ 

99 data = trace.data 

100 sample_period = trace.metadata.time_base 

101 events: list[TriggerEvent] = [] 

102 

103 # Determine if each sample is inside the window 

104 inside = (data >= self.low_threshold) & (data <= self.high_threshold) 

105 

106 # Find transitions 

107 for i in range(1, len(inside)): 

108 if inside[i] and not inside[i - 1]: 

109 # Entry event 

110 if self.trigger_on in ("entry", "both"): 

111 events.append( 

112 TriggerEvent( 

113 timestamp=i * sample_period, 

114 sample_index=i, 

115 event_type=TriggerType.WINDOW_ENTRY, 

116 level=float(data[i]), 

117 data={ 

118 "window": (self.low_threshold, self.high_threshold), 

119 "direction": "entering", 

120 }, 

121 ) 

122 ) 

123 

124 elif not inside[i] and inside[i - 1]: 

125 # Exit event 

126 if self.trigger_on in ("exit", "both"): 

127 # Determine which boundary was crossed 

128 boundary = "high" if data[i] > self.high_threshold else "low" 

129 events.append( 

130 TriggerEvent( 

131 timestamp=i * sample_period, 

132 sample_index=i, 

133 event_type=TriggerType.WINDOW_EXIT, 

134 level=float(data[i]), 

135 data={ 

136 "window": (self.low_threshold, self.high_threshold), 

137 "direction": "exiting", 

138 "boundary": boundary, 

139 }, 

140 ) 

141 ) 

142 

143 return events 

144 

145 

146class ZoneTrigger(Trigger): 

147 """Zone trigger for multiple defined voltage/time zones. 

148 

149 Triggers when signal enters any of the defined zones. Useful for 

150 mask testing and compliance checking. 

151 

152 Attributes: 

153 zones: List of Zone definitions. 

154 trigger_on: When to trigger - "entry", "exit", or "violation". 

155 """ 

156 

157 def __init__( 

158 self, 

159 zones: list[Zone], 

160 trigger_on: Literal["entry", "exit", "violation"] = "violation", 

161 ) -> None: 

162 """Initialize zone trigger. 

163 

164 Args: 

165 zones: List of zones to monitor. 

166 trigger_on: "entry" for entering zones, "exit" for leaving, 

167 "violation" is alias for "entry" (common use case). 

168 """ 

169 self.zones = zones 

170 self.trigger_on = trigger_on 

171 

172 def find_events( 

173 self, 

174 trace: WaveformTrace, # type: ignore[override] 

175 ) -> list[TriggerEvent]: 

176 """Find all zone-related events. 

177 

178 Args: 

179 trace: Input waveform trace. 

180 

181 Returns: 

182 List of trigger events. 

183 """ 

184 data = trace.data 

185 sample_period = trace.metadata.time_base 

186 time_vector = np.arange(len(data)) * sample_period 

187 events: list[TriggerEvent] = [] 

188 

189 for zone in self.zones: 

190 # Check time limits 

191 if zone.start_time is not None: 

192 time_mask = time_vector >= zone.start_time 

193 else: 

194 time_mask = np.ones(len(data), dtype=bool) 

195 

196 if zone.end_time is not None: 

197 time_mask &= time_vector <= zone.end_time 

198 

199 # Check voltage limits 

200 in_zone = (data >= zone.low) & (data <= zone.high) & time_mask 

201 

202 # Find transitions 

203 for i in range(1, len(in_zone)): 

204 if in_zone[i] and not in_zone[i - 1]: 

205 # Entry event 

206 if self.trigger_on in ("entry", "violation"): 

207 events.append( 

208 TriggerEvent( 

209 timestamp=i * sample_period, 

210 sample_index=i, 

211 event_type=TriggerType.ZONE_VIOLATION, 

212 level=float(data[i]), 

213 data={ 

214 "zone_name": zone.name, 

215 "zone_bounds": (zone.low, zone.high), 

216 "direction": "entering", 

217 }, 

218 ) 

219 ) 

220 

221 elif not in_zone[i] and in_zone[i - 1]: 

222 # Exit event 

223 if self.trigger_on == "exit": 

224 events.append( 

225 TriggerEvent( 

226 timestamp=i * sample_period, 

227 sample_index=i, 

228 event_type=TriggerType.ZONE_VIOLATION, 

229 level=float(data[i]), 

230 data={ 

231 "zone_name": zone.name, 

232 "zone_bounds": (zone.low, zone.high), 

233 "direction": "exiting", 

234 }, 

235 ) 

236 ) 

237 

238 # Sort by timestamp 

239 events.sort(key=lambda e: e.timestamp) 

240 return events 

241 

242 

243def find_window_violations( 

244 trace: WaveformTrace, 

245 low: float, 

246 high: float, 

247) -> list[TriggerEvent]: 

248 """Find all window violations (signal outside limits). 

249 

250 Args: 

251 trace: Input waveform trace. 

252 low: Lower limit. 

253 high: Upper limit. 

254 

255 Returns: 

256 List of trigger events for each exit from the window. 

257 

258 Example: 

259 >>> # Check if signal stays within 0-3.3V 

260 >>> violations = find_window_violations(trace, low=0, high=3.3) 

261 >>> if violations: 

262 ... print(f"Signal violated limits {len(violations)} times") 

263 """ 

264 trigger = WindowTrigger( 

265 low_threshold=low, 

266 high_threshold=high, 

267 trigger_on="exit", 

268 ) 

269 return trigger.find_events(trace) 

270 

271 

272def find_zone_events( 

273 trace: WaveformTrace, 

274 zones: list[tuple[float, float] | Zone], 

275) -> list[TriggerEvent]: 

276 """Find events where signal enters defined zones. 

277 

278 Args: 

279 trace: Input waveform trace. 

280 zones: List of zones as (low, high) tuples or Zone objects. 

281 

282 Returns: 

283 List of trigger events. 

284 

285 Example: 

286 >>> # Define forbidden zones 

287 >>> zones = [ 

288 ... (0.8, 1.2), # Metastable region around 1V 

289 ... (3.5, 5.0), # Overvoltage region 

290 ... ] 

291 >>> events = find_zone_events(trace, zones) 

292 """ 

293 zone_objs: list[Zone] = [] 

294 for i, z in enumerate(zones): 

295 if isinstance(z, Zone): 

296 zone_objs.append(z) 

297 else: 

298 zone_objs.append(Zone(low=z[0], high=z[1], name=f"zone_{i}")) 

299 

300 trigger = ZoneTrigger(zones=zone_objs, trigger_on="violation") 

301 return trigger.find_events(trace) 

302 

303 

304def check_limits( 

305 trace: WaveformTrace, 

306 low: float, 

307 high: float, 

308) -> dict: # type: ignore[type-arg] 

309 """Check if trace stays within voltage limits. 

310 

311 Args: 

312 trace: Input waveform trace. 

313 low: Lower limit. 

314 high: Upper limit. 

315 

316 Returns: 

317 Dictionary with: 

318 - passed: True if no violations 

319 - violations: List of violation events 

320 - min_value: Minimum value in trace 

321 - max_value: Maximum value in trace 

322 - time_in_spec: Percentage of time within limits 

323 - time_out_of_spec: Percentage of time outside limits 

324 

325 Example: 

326 >>> result = check_limits(trace, low=0, high=3.3) 

327 >>> if result['passed']: 

328 ... print("Signal within limits") 

329 >>> else: 

330 ... print(f"{result['time_out_of_spec']:.1f}% of time out of spec") 

331 """ 

332 violations = find_window_violations(trace, low, high) 

333 

334 data = trace.data 

335 min_val = float(np.min(data)) 

336 max_val = float(np.max(data)) 

337 

338 # Calculate time in/out of spec 

339 in_spec = (data >= low) & (data <= high) 

340 pct_in_spec = np.sum(in_spec) / len(data) * 100 

341 pct_out_spec = 100 - pct_in_spec 

342 

343 return { 

344 "passed": len(violations) == 0 and min_val >= low and max_val <= high, 

345 "violations": violations, 

346 "min_value": min_val, 

347 "max_value": max_val, 

348 "time_in_spec": pct_in_spec, 

349 "time_out_of_spec": pct_out_spec, 

350 } 

351 

352 

353class MaskTrigger(Trigger): 

354 """Mask trigger for eye diagram and waveform mask testing. 

355 

356 Tests waveform against a defined mask (polygonal region). 

357 Triggers on any mask violation. 

358 """ 

359 

360 def __init__( 

361 self, 

362 mask_points: list[tuple[float, float]], 

363 mode: Literal["inside", "outside"] = "inside", 

364 ) -> None: 

365 """Initialize mask trigger. 

366 

367 Args: 

368 mask_points: List of (time, voltage) points defining mask polygon. 

369 mode: "inside" triggers when signal is inside mask, 

370 "outside" triggers when signal is outside mask. 

371 

372 Raises: 

373 AnalysisError: If mask has fewer than 3 points. 

374 """ 

375 if len(mask_points) < 3: 

376 raise AnalysisError("Mask must have at least 3 points") 

377 

378 self.mask_points = mask_points 

379 self.mode = mode 

380 

381 def find_events( 

382 self, 

383 trace: WaveformTrace, # type: ignore[override] 

384 ) -> list[TriggerEvent]: 

385 """Find mask violations. 

386 

387 Args: 

388 trace: Input waveform trace. 

389 

390 Returns: 

391 List of trigger events for mask violations. 

392 """ 

393 from matplotlib.path import Path 

394 

395 # Create polygon path 

396 mask_path = Path(self.mask_points) 

397 

398 data = trace.data 

399 sample_period = trace.metadata.time_base 

400 time_vector = np.arange(len(data)) * sample_period 

401 

402 # Create points array for containment test 

403 points = np.column_stack([time_vector, data]) 

404 

405 # Check which points are inside the mask 

406 inside = mask_path.contains_points(points) 

407 

408 events: list[TriggerEvent] = [] 

409 

410 # Find violations based on mode 

411 if self.mode == "inside": 

412 # Trigger when inside mask (mask defines forbidden region) 

413 violation_indices = np.where(inside)[0] 

414 else: 

415 # Trigger when outside mask (mask defines required region) 

416 violation_indices = np.where(~inside)[0] 

417 

418 # Group consecutive violations into events 

419 if len(violation_indices) > 0: 

420 # Find starts of violation regions 

421 starts = [violation_indices[0]] 

422 for i in range(1, len(violation_indices)): 

423 if violation_indices[i] != violation_indices[i - 1] + 1: 

424 starts.append(violation_indices[i]) 

425 

426 for start_idx in starts: 

427 events.append( 

428 TriggerEvent( 

429 timestamp=start_idx * sample_period, 

430 sample_index=int(start_idx), 

431 event_type=TriggerType.ZONE_VIOLATION, 

432 level=float(data[start_idx]), 

433 data={ 

434 "mask_mode": self.mode, 

435 "violation_type": "inside_forbidden" 

436 if self.mode == "inside" 

437 else "outside_required", 

438 }, 

439 ) 

440 ) 

441 

442 return events 

443 

444 

445__all__ = [ 

446 "MaskTrigger", 

447 "WindowTrigger", 

448 "Zone", 

449 "ZoneTrigger", 

450 "check_limits", 

451 "find_window_violations", 

452 "find_zone_events", 

453]