Coverage for src / tracekit / triggering / pulse.py: 80%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Pulse width and glitch triggering for TraceKit. 

2 

3Provides pulse width triggering, glitch detection, and runt pulse 

4detection for signal integrity analysis. 

5 

6Example: 

7 >>> from tracekit.triggering.pulse import PulseWidthTrigger, find_glitches 

8 >>> # Find pulses between 100ns and 200ns 

9 >>> trigger = PulseWidthTrigger(level=1.5, min_width=100e-9, max_width=200e-9) 

10 >>> events = trigger.find_events(trace) 

11 >>> # Find glitches shorter than 50ns 

12 >>> glitches = find_glitches(trace, max_width=50e-9) 

13""" 

14 

15from __future__ import annotations 

16 

17from dataclasses import dataclass 

18from typing import Literal 

19 

20import numpy as np 

21 

22from tracekit.core.exceptions import AnalysisError 

23from tracekit.core.types import DigitalTrace, WaveformTrace 

24from tracekit.triggering.base import ( 

25 Trigger, 

26 TriggerEvent, 

27 TriggerType, 

28 interpolate_crossing, 

29) 

30 

31 

32@dataclass 

33class PulseInfo: 

34 """Information about a detected pulse. 

35 

36 Attributes: 

37 start_time: Start time of pulse in seconds. 

38 end_time: End time of pulse in seconds. 

39 width: Pulse width in seconds. 

40 polarity: "positive" or "negative". 

41 start_index: Sample index at pulse start. 

42 end_index: Sample index at pulse end. 

43 amplitude: Peak amplitude during pulse. 

44 """ 

45 

46 start_time: float 

47 end_time: float 

48 width: float 

49 polarity: Literal["positive", "negative"] 

50 start_index: int 

51 end_index: int 

52 amplitude: float 

53 

54 

55class PulseWidthTrigger(Trigger): 

56 """Pulse width trigger for detecting pulses in a width range. 

57 

58 Triggers on pulses that fall within the specified width range. 

59 

60 Attributes: 

61 level: Threshold level for pulse detection. 

62 polarity: Pulse polarity - "positive", "negative", or "either". 

63 min_width: Minimum pulse width (None for no minimum). 

64 max_width: Maximum pulse width (None for no maximum). 

65 """ 

66 

67 def __init__( 

68 self, 

69 level: float, 

70 polarity: Literal["positive", "negative", "either"] = "positive", 

71 min_width: float | None = None, 

72 max_width: float | None = None, 

73 ) -> None: 

74 """Initialize pulse width trigger. 

75 

76 Args: 

77 level: Threshold level for pulse detection. 

78 polarity: Pulse polarity to detect. 

79 min_width: Minimum pulse width in seconds. 

80 max_width: Maximum pulse width in seconds. 

81 

82 Raises: 

83 AnalysisError: If min_width is greater than max_width. 

84 """ 

85 self.level = level 

86 self.polarity = polarity 

87 self.min_width = min_width 

88 self.max_width = max_width 

89 

90 if min_width is not None and max_width is not None and min_width > max_width: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 raise AnalysisError("min_width cannot be greater than max_width") 

92 

93 def find_events( 

94 self, 

95 trace: WaveformTrace | DigitalTrace, 

96 ) -> list[TriggerEvent]: 

97 """Find pulses matching the width criteria. 

98 

99 Args: 

100 trace: Input trace. 

101 

102 Returns: 

103 List of trigger events for matching pulses. 

104 """ 

105 pulses = self._find_all_pulses(trace) 

106 

107 # Filter by width 

108 events: list[TriggerEvent] = [] 

109 for pulse in pulses: 

110 if self.min_width is not None and pulse.width < self.min_width: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 continue 

112 if self.max_width is not None and pulse.width > self.max_width: 

113 continue 

114 

115 events.append( 

116 TriggerEvent( 

117 timestamp=pulse.start_time, 

118 sample_index=pulse.start_index, 

119 event_type=TriggerType.PULSE_WIDTH, 

120 level=pulse.amplitude, 

121 duration=pulse.width, 

122 data={ 

123 "polarity": pulse.polarity, 

124 "end_time": pulse.end_time, 

125 "end_index": pulse.end_index, 

126 }, 

127 ) 

128 ) 

129 

130 return events 

131 

132 def _find_all_pulses( 

133 self, 

134 trace: WaveformTrace | DigitalTrace, 

135 ) -> list[PulseInfo]: 

136 """Find all pulses in the trace.""" 

137 if isinstance(trace, DigitalTrace): 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 data = trace.data.astype(np.float64) 

139 level = 0.5 

140 else: 

141 data = trace.data 

142 level = self.level 

143 

144 sample_period = trace.metadata.time_base 

145 pulses: list[PulseInfo] = [] 

146 

147 # Find all threshold crossings 

148 above = data >= level 

149 below = data < level 

150 

151 # Rising edges: transition from below to above 

152 rising = np.where(below[:-1] & above[1:])[0] 

153 # Falling edges: transition from above to below 

154 falling = np.where(above[:-1] & below[1:])[0] 

155 

156 if self.polarity in ("positive", "either"): 156 ↛ 185line 156 didn't jump to line 185 because the condition on line 156 was always true

157 # Positive pulses: rising -> falling 

158 for r_idx in rising: 

159 # Find next falling edge 

160 next_falling = falling[falling > r_idx] 

161 if len(next_falling) == 0: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 continue 

163 f_idx = next_falling[0] 

164 

165 start_time = interpolate_crossing(data, r_idx, level, sample_period, True) 

166 end_time = interpolate_crossing(data, f_idx, level, sample_period, False) 

167 width = end_time - start_time 

168 

169 # Get peak amplitude 

170 pulse_data = data[r_idx : f_idx + 1] 

171 amplitude = float(np.max(pulse_data)) if len(pulse_data) > 0 else level 

172 

173 pulses.append( 

174 PulseInfo( 

175 start_time=start_time, 

176 end_time=end_time, 

177 width=width, 

178 polarity="positive", 

179 start_index=int(r_idx), 

180 end_index=int(f_idx), 

181 amplitude=amplitude, 

182 ) 

183 ) 

184 

185 if self.polarity in ("negative", "either"): 

186 # Negative pulses: falling -> rising 

187 for f_idx in falling: 

188 # Find next rising edge 

189 next_rising = rising[rising > f_idx] 

190 if len(next_rising) == 0: 

191 continue 

192 r_idx = next_rising[0] 

193 

194 start_time = interpolate_crossing(data, f_idx, level, sample_period, False) 

195 end_time = interpolate_crossing(data, r_idx, level, sample_period, True) 

196 width = end_time - start_time 

197 

198 # Get peak (minimum) amplitude 

199 pulse_data = data[f_idx : r_idx + 1] 

200 amplitude = float(np.min(pulse_data)) if len(pulse_data) > 0 else level 

201 

202 pulses.append( 

203 PulseInfo( 

204 start_time=start_time, 

205 end_time=end_time, 

206 width=width, 

207 polarity="negative", 

208 start_index=int(f_idx), 

209 end_index=int(r_idx), 

210 amplitude=amplitude, 

211 ) 

212 ) 

213 

214 # Sort by start time 

215 pulses.sort(key=lambda p: p.start_time) 

216 return pulses 

217 

218 

219class GlitchTrigger(Trigger): 

220 """Glitch trigger for detecting narrow pulses. 

221 

222 Glitches are pulses shorter than a maximum width threshold. 

223 

224 Attributes: 

225 level: Threshold level. 

226 max_width: Maximum pulse width to be considered a glitch. 

227 polarity: Glitch polarity - "positive", "negative", or "either". 

228 """ 

229 

230 def __init__( 

231 self, 

232 level: float, 

233 max_width: float = 100e-9, 

234 polarity: Literal["positive", "negative", "either"] = "either", 

235 ) -> None: 

236 """Initialize glitch trigger. 

237 

238 Args: 

239 level: Threshold level. 

240 max_width: Maximum pulse width to trigger (in seconds). 

241 polarity: Glitch polarity to detect. 

242 """ 

243 self.level = level 

244 self.max_width = max_width 

245 self.polarity = polarity 

246 

247 def find_events( 

248 self, 

249 trace: WaveformTrace | DigitalTrace, 

250 ) -> list[TriggerEvent]: 

251 """Find all glitches in the trace. 

252 

253 Args: 

254 trace: Input trace. 

255 

256 Returns: 

257 List of trigger events for each glitch. 

258 """ 

259 pulse_trigger = PulseWidthTrigger( 

260 level=self.level, 

261 polarity=self.polarity, 

262 min_width=None, 

263 max_width=self.max_width, 

264 ) 

265 

266 events = pulse_trigger.find_events(trace) 

267 

268 # Reclassify as glitch events 

269 for event in events: 

270 event.event_type = TriggerType.GLITCH 

271 

272 return events 

273 

274 

275class RuntTrigger(Trigger): 

276 """Runt pulse trigger for detecting incomplete transitions. 

277 

278 Runt pulses cross one threshold but not the other, indicating 

279 incomplete signal transitions. 

280 

281 Attributes: 

282 low_threshold: Lower threshold level. 

283 high_threshold: Upper threshold level. 

284 polarity: Runt polarity - "positive", "negative", or "either". 

285 """ 

286 

287 def __init__( 

288 self, 

289 low_threshold: float, 

290 high_threshold: float, 

291 polarity: Literal["positive", "negative", "either"] = "either", 

292 ) -> None: 

293 """Initialize runt trigger. 

294 

295 Args: 

296 low_threshold: Lower threshold (e.g., logic low). 

297 high_threshold: Upper threshold (e.g., logic high). 

298 polarity: "positive" for rising runts, "negative" for falling. 

299 

300 Raises: 

301 AnalysisError: If low_threshold is not less than high_threshold. 

302 """ 

303 if low_threshold >= high_threshold: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 raise AnalysisError("low_threshold must be less than high_threshold") 

305 

306 self.low_threshold = low_threshold 

307 self.high_threshold = high_threshold 

308 self.polarity = polarity 

309 

310 def find_events( 

311 self, 

312 trace: WaveformTrace | DigitalTrace, 

313 ) -> list[TriggerEvent]: 

314 """Find all runt pulses in the trace. 

315 

316 Args: 

317 trace: Input trace. 

318 

319 Returns: 

320 List of trigger events for each runt pulse. 

321 """ 

322 if isinstance(trace, DigitalTrace): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true

323 # Digital traces don't have runts 

324 return [] 

325 

326 data = trace.data 

327 sample_period = trace.metadata.time_base 

328 events: list[TriggerEvent] = [] 

329 

330 # Track signal zones 

331 # Zone 0: below low_threshold 

332 # Zone 1: between thresholds 

333 # Zone 2: above high_threshold 

334 def get_zone(value: float) -> int: 

335 if value < self.low_threshold: 

336 return 0 

337 elif value > self.high_threshold: 

338 return 2 

339 else: 

340 return 1 

341 

342 zones = np.array([get_zone(v) for v in data]) 

343 

344 # Find runt pulses: transitions that enter zone 1 but don't reach the other side 

345 i = 0 

346 while i < len(zones) - 1: 

347 curr_zone = zones[i] 

348 

349 if curr_zone == 0: 

350 # Starting low - look for positive runt 

351 if self.polarity in ("positive", "either"): 351 ↛ 406line 351 didn't jump to line 406 because the condition on line 351 was always true

352 # Find transition to zone 1 

353 if zones[i + 1] == 1: 

354 start_idx = i 

355 # Track through zone 1 

356 j = i + 1 

357 while j < len(zones) and zones[j] == 1: 

358 j += 1 

359 if j < len(zones) and zones[j] == 0: 359 ↛ 376line 359 didn't jump to line 376 because the condition on line 359 was always true

360 # Returned to low without reaching high - RUNT 

361 peak = float(np.max(data[start_idx : j + 1])) 

362 events.append( 

363 TriggerEvent( 

364 timestamp=start_idx * sample_period, 

365 sample_index=start_idx, 

366 event_type=TriggerType.RUNT, 

367 level=peak, 

368 duration=(j - start_idx) * sample_period, 

369 data={ 

370 "polarity": "positive", 

371 "expected_high": self.high_threshold, 

372 "actual_peak": peak, 

373 }, 

374 ) 

375 ) 

376 i = j 

377 continue 

378 

379 elif curr_zone == 2: 379 ↛ 406line 379 didn't jump to line 406 because the condition on line 379 was always true

380 # Starting high - look for negative runt 

381 if self.polarity in ("negative", "either") and zones[i + 1] == 1: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 start_idx = i 

383 j = i + 1 

384 while j < len(zones) and zones[j] == 1: 

385 j += 1 

386 if j < len(zones) and zones[j] == 2: 

387 # Returned to high without reaching low - RUNT 

388 trough = float(np.min(data[start_idx : j + 1])) 

389 events.append( 

390 TriggerEvent( 

391 timestamp=start_idx * sample_period, 

392 sample_index=start_idx, 

393 event_type=TriggerType.RUNT, 

394 level=trough, 

395 duration=(j - start_idx) * sample_period, 

396 data={ 

397 "polarity": "negative", 

398 "expected_low": self.low_threshold, 

399 "actual_trough": trough, 

400 }, 

401 ) 

402 ) 

403 i = j 

404 continue 

405 

406 i += 1 

407 

408 return events 

409 

410 

411def find_pulses( 

412 trace: WaveformTrace, 

413 *, 

414 level: float | None = None, 

415 polarity: Literal["positive", "negative", "either"] = "positive", 

416 min_width: float | None = None, 

417 max_width: float | None = None, 

418) -> list[TriggerEvent]: 

419 """Find pulses matching width criteria. 

420 

421 Args: 

422 trace: Input waveform trace. 

423 level: Threshold level. If None, uses 50% of amplitude. 

424 polarity: Pulse polarity to find. 

425 min_width: Minimum pulse width in seconds. 

426 max_width: Maximum pulse width in seconds. 

427 

428 Returns: 

429 List of trigger events for matching pulses. 

430 

431 Example: 

432 >>> # Find all positive pulses between 1us and 10us 

433 >>> pulses = find_pulses(trace, min_width=1e-6, max_width=10e-6) 

434 """ 

435 if level is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 level = (np.min(trace.data) + np.max(trace.data)) / 2 

437 

438 trigger = PulseWidthTrigger( 

439 level=level, 

440 polarity=polarity, 

441 min_width=min_width, 

442 max_width=max_width, 

443 ) 

444 return trigger.find_events(trace) 

445 

446 

447def find_glitches( 

448 trace: WaveformTrace, 

449 max_width: float = 100e-9, 

450 *, 

451 level: float | None = None, 

452 polarity: Literal["positive", "negative", "either"] = "either", 

453) -> list[TriggerEvent]: 

454 """Find glitches (narrow pulses) in a trace. 

455 

456 Args: 

457 trace: Input waveform trace. 

458 max_width: Maximum width to be considered a glitch (default 100ns). 

459 level: Threshold level. If None, uses 50% of amplitude. 

460 polarity: Glitch polarity to find. 

461 

462 Returns: 

463 List of trigger events for each glitch. 

464 

465 Example: 

466 >>> # Find all glitches shorter than 50ns 

467 >>> glitches = find_glitches(trace, max_width=50e-9) 

468 >>> print(f"Found {len(glitches)} glitches") 

469 """ 

470 if level is None: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 level = (np.min(trace.data) + np.max(trace.data)) / 2 

472 

473 trigger = GlitchTrigger( 

474 level=level, 

475 max_width=max_width, 

476 polarity=polarity, 

477 ) 

478 return trigger.find_events(trace) 

479 

480 

481def find_runt_pulses( 

482 trace: WaveformTrace, 

483 low_threshold: float | None = None, 

484 high_threshold: float | None = None, 

485 *, 

486 polarity: Literal["positive", "negative", "either"] = "either", 

487) -> list[TriggerEvent]: 

488 """Find runt pulses (incomplete transitions) in a trace. 

489 

490 Args: 

491 trace: Input waveform trace. 

492 low_threshold: Lower threshold. If None, uses 20% of amplitude. 

493 high_threshold: Upper threshold. If None, uses 80% of amplitude. 

494 polarity: Runt polarity to find. 

495 

496 Returns: 

497 List of trigger events for each runt pulse. 

498 

499 Example: 

500 >>> # Find runts using standard 20%/80% thresholds 

501 >>> runts = find_runt_pulses(trace) 

502 >>> for runt in runts: 

503 ... print(f"Runt at {runt.timestamp*1e6:.2f} us") 

504 """ 

505 if low_threshold is None: 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true

506 amplitude = np.max(trace.data) - np.min(trace.data) 

507 low_threshold = np.min(trace.data) + 0.2 * amplitude 

508 

509 if high_threshold is None: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 amplitude = np.max(trace.data) - np.min(trace.data) 

511 high_threshold = np.min(trace.data) + 0.8 * amplitude 

512 

513 trigger = RuntTrigger( 

514 low_threshold=low_threshold, 

515 high_threshold=high_threshold, 

516 polarity=polarity, 

517 ) 

518 return trigger.find_events(trace) 

519 

520 

521def pulse_statistics( 

522 trace: WaveformTrace, 

523 *, 

524 level: float | None = None, 

525 polarity: Literal["positive", "negative"] = "positive", 

526) -> dict[str, float]: 

527 """Calculate pulse width statistics. 

528 

529 Args: 

530 trace: Input waveform trace. 

531 level: Threshold level. 

532 polarity: Pulse polarity to analyze. 

533 

534 Returns: 

535 Dictionary with pulse statistics: 

536 - count: Number of pulses 

537 - min_width: Minimum pulse width 

538 - max_width: Maximum pulse width 

539 - mean_width: Mean pulse width 

540 - std_width: Standard deviation of pulse widths 

541 

542 Example: 

543 >>> stats = pulse_statistics(trace) 

544 >>> print(f"Mean pulse width: {stats['mean_width']*1e6:.2f} us") 

545 """ 

546 if level is None: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 level = (np.min(trace.data) + np.max(trace.data)) / 2 

548 

549 trigger = PulseWidthTrigger(level=level, polarity=polarity) 

550 events = trigger.find_events(trace) 

551 

552 if len(events) == 0: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 return { 

554 "count": 0, 

555 "min_width": np.nan, 

556 "max_width": np.nan, 

557 "mean_width": np.nan, 

558 "std_width": np.nan, 

559 } 

560 

561 widths = np.array([e.duration for e in events if e.duration is not None]) 

562 

563 return { 

564 "count": len(widths), 

565 "min_width": float(np.min(widths)), 

566 "max_width": float(np.max(widths)), 

567 "mean_width": float(np.mean(widths)), 

568 "std_width": float(np.std(widths)), 

569 } 

570 

571 

572__all__ = [ 

573 "GlitchTrigger", 

574 "PulseInfo", 

575 "PulseWidthTrigger", 

576 "RuntTrigger", 

577 "find_glitches", 

578 "find_pulses", 

579 "find_runt_pulses", 

580 "pulse_statistics", 

581]