Coverage for src / tracekit / discovery / anomaly_detector.py: 67%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Automatic anomaly detection and highlighting. 

2 

3This module detects unusual signal features (glitches, dropouts, noise 

4spikes, timing violations) to guide user attention. 

5 

6 

7Example: 

8 >>> from tracekit.discovery import find_anomalies 

9 >>> anomalies = find_anomalies(trace) 

10 >>> for anom in anomalies: 

11 ... print(f"{anom.timestamp_us:.2f}us: {anom.type} - {anom.description}") 

12 

13References: 

14 IEEE 1057-2017: Digitizing Waveform Recorders 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass, field 

20from typing import TYPE_CHECKING, Any, Literal 

21 

22import numpy as np 

23 

24from tracekit.analyzers.statistics.basic import basic_stats 

25from tracekit.core.types import DigitalTrace, WaveformTrace 

26 

27if TYPE_CHECKING: 

28 from numpy.typing import NDArray 

29 

30AnomalyType = Literal[ 

31 "glitch", 

32 "dropout", 

33 "noise_spike", 

34 "timing_violation", 

35 "ringing", 

36 "overshoot", 

37 "undershoot", 

38] 

39 

40Severity = Literal["CRITICAL", "WARNING", "INFO"] 

41 

42 

43@dataclass 

44class Anomaly: 

45 """Detected signal anomaly. 

46 

47 Represents an unusual or interesting signal feature with timing, 

48 classification, and plain-language explanation. 

49 

50 Attributes: 

51 timestamp_us: Anomaly start time in microseconds. 

52 type: Type of anomaly detected. 

53 severity: Impact level (CRITICAL, WARNING, INFO). 

54 description: Plain-language explanation. 

55 duration_ns: Duration in nanoseconds. 

56 confidence: Detection confidence (0.0-1.0). 

57 metadata: Additional type-specific information. 

58 

59 Example: 

60 >>> anomaly = Anomaly( 

61 ... timestamp_us=45.23, 

62 ... type="glitch", 

63 ... severity="WARNING", 

64 ... description="Brief 35ns pulse, likely noise spike", 

65 ... duration_ns=35.0, 

66 ... confidence=0.92 

67 ... ) 

68 """ 

69 

70 timestamp_us: float 

71 type: AnomalyType 

72 severity: Severity 

73 description: str 

74 duration_ns: float = 0.0 

75 confidence: float = 1.0 

76 metadata: dict[str, float] = field(default_factory=dict) 

77 

78 

79def find_anomalies( 

80 trace: WaveformTrace | DigitalTrace, 

81 *, 

82 severity_filter: list[Severity] | None = None, 

83 min_confidence: float = 0.7, 

84 anomaly_types: list[AnomalyType] | None = None, 

85) -> list[Anomaly]: 

86 """Detect anomalies in signal automatically. 

87 

88 Identifies glitches, dropouts, noise spikes, timing violations, ringing, 

89 and overshoot/undershoot without requiring user configuration. 

90 

91 Args: 

92 trace: Input waveform or digital trace. 

93 severity_filter: Only return specified severity levels (default: all). 

94 min_confidence: Minimum confidence threshold (0.0-1.0). 

95 anomaly_types: Specific anomaly types to detect (default: all). 

96 

97 Returns: 

98 List of detected Anomaly objects, sorted by timestamp. 

99 

100 Raises: 

101 ValueError: If trace is empty or invalid. 

102 

103 Example: 

104 >>> anomalies = find_anomalies(trace, severity_filter=['CRITICAL', 'WARNING']) 

105 >>> print(f"Found {len(anomalies)} critical/warning anomalies") 

106 >>> for anom in anomalies[:5]: 

107 ... print(f" {anom.timestamp_us:.2f}us: {anom.type} - {anom.description}") 

108 

109 References: 

110 DISC-002: Anomaly Highlighting 

111 """ 

112 # Validate input 

113 if len(trace) == 0: 

114 raise ValueError("Cannot detect anomalies in empty trace") 

115 

116 # Get signal data 

117 if isinstance(trace, WaveformTrace): 117 ↛ 121line 117 didn't jump to line 121 because the condition on line 117 was always true

118 data = trace.data 

119 sample_rate = trace.metadata.sample_rate 

120 else: 

121 data = trace.data.astype(np.float64) 

122 sample_rate = trace.metadata.sample_rate 

123 

124 # Compute basic statistics for reference 

125 stats = basic_stats(data) 

126 voltage_swing = stats["max"] - stats["min"] 

127 

128 # Collect all anomalies 

129 all_anomalies: list[Anomaly] = [] 

130 

131 # Define which anomaly types to check 

132 if anomaly_types is None: 

133 check_types: list[AnomalyType] = [ 

134 "glitch", 

135 "dropout", 

136 "noise_spike", 

137 "timing_violation", 

138 "ringing", 

139 "overshoot", 

140 "undershoot", 

141 ] 

142 else: 

143 check_types = anomaly_types 

144 

145 # Detect each type 

146 if "glitch" in check_types: 

147 all_anomalies.extend(_detect_glitches(data, sample_rate, voltage_swing, stats)) 

148 

149 if "dropout" in check_types: 

150 all_anomalies.extend(_detect_dropouts(data, sample_rate, voltage_swing, stats)) 

151 

152 if "noise_spike" in check_types: 

153 all_anomalies.extend(_detect_noise_spikes(data, sample_rate, voltage_swing, stats)) 

154 

155 if "timing_violation" in check_types: 

156 all_anomalies.extend(_detect_timing_violations(data, sample_rate, stats)) 

157 

158 if "ringing" in check_types: 

159 all_anomalies.extend(_detect_ringing(data, sample_rate, voltage_swing, stats)) 

160 

161 if "overshoot" in check_types: 

162 all_anomalies.extend(_detect_overshoot(data, sample_rate, voltage_swing, stats)) 

163 

164 if "undershoot" in check_types: 

165 all_anomalies.extend(_detect_undershoot(data, sample_rate, voltage_swing, stats)) 

166 

167 # Filter by confidence 

168 all_anomalies = [a for a in all_anomalies if a.confidence >= min_confidence] 

169 

170 # Filter by severity if requested 

171 if severity_filter is not None: 

172 all_anomalies = [a for a in all_anomalies if a.severity in severity_filter] 

173 

174 # Sort by timestamp 

175 all_anomalies.sort(key=lambda a: a.timestamp_us) 

176 

177 return all_anomalies 

178 

179 

180def _detect_glitches( 

181 data: NDArray[np.floating[Any]], 

182 sample_rate: float, 

183 voltage_swing: float, 

184 stats: dict[str, float], 

185) -> list[Anomaly]: 

186 """Detect brief narrow pulses (glitches). 

187 

188 Args: 

189 data: Signal data array. 

190 sample_rate: Sample rate in Hz. 

191 voltage_swing: Peak-to-peak voltage. 

192 stats: Basic statistics. 

193 

194 Returns: 

195 List of detected glitch anomalies. 

196 """ 

197 anomalies: list[Anomaly] = [] 

198 

199 if voltage_swing == 0 or len(data) < 10: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 return anomalies 

201 

202 # Threshold for glitch detection 

203 threshold = stats["mean"] 

204 glitch_threshold = voltage_swing * 0.3 # 30% of swing 

205 

206 # Find samples far from mean 

207 deviations = np.abs(data - threshold) 

208 glitch_candidates = np.where(deviations > glitch_threshold)[0] 

209 

210 if len(glitch_candidates) == 0: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return anomalies 

212 

213 # Group consecutive samples into glitches 

214 glitch_groups = [] 

215 current_group = [glitch_candidates[0]] 

216 

217 for idx in glitch_candidates[1:]: 

218 if idx == current_group[-1] + 1: 

219 current_group.append(idx) 

220 else: 

221 glitch_groups.append(current_group) 

222 current_group = [idx] 

223 

224 glitch_groups.append(current_group) 

225 

226 # Analyze each glitch 

227 for group in glitch_groups: 

228 duration_samples = len(group) 

229 duration_ns = (duration_samples / sample_rate) * 1e9 

230 

231 # Only report glitches < 50ns 

232 if duration_ns < 50: 

233 timestamp_us = (group[0] / sample_rate) * 1e6 

234 magnitude = np.max(np.abs(data[group] - threshold)) 

235 

236 # Determine severity based on magnitude 

237 if magnitude > voltage_swing * 0.5: 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true

238 severity: Severity = "WARNING" 

239 else: 

240 severity = "INFO" 

241 

242 description = f"Brief {duration_ns:.0f}ns pulse, likely noise spike" 

243 

244 anomalies.append( 

245 Anomaly( 

246 timestamp_us=timestamp_us, 

247 type="glitch", 

248 severity=severity, 

249 description=description, 

250 duration_ns=duration_ns, 

251 confidence=0.85, 

252 metadata={"magnitude": magnitude}, 

253 ) 

254 ) 

255 

256 return anomalies 

257 

258 

259def _detect_dropouts( 

260 data: NDArray[np.floating[Any]], 

261 sample_rate: float, 

262 voltage_swing: float, 

263 stats: dict[str, float], 

264) -> list[Anomaly]: 

265 """Detect missing transitions or prolonged holds. 

266 

267 Args: 

268 data: Signal data array. 

269 sample_rate: Sample rate in Hz. 

270 voltage_swing: Peak-to-peak voltage. 

271 stats: Basic statistics. 

272 

273 Returns: 

274 List of detected dropout anomalies. 

275 """ 

276 anomalies: list[Anomaly] = [] 

277 

278 if voltage_swing == 0 or len(data) < 100: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 return anomalies 

280 

281 # Estimate expected period from transitions 

282 threshold = (stats["max"] + stats["min"]) / 2 

283 digital = data > threshold 

284 transitions = np.where(np.diff(digital.astype(int)) != 0)[0] 

285 

286 if len(transitions) < 5: 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was always true

287 return anomalies 

288 

289 # Calculate typical transition interval 

290 intervals = np.diff(transitions) 

291 expected_period = np.median(intervals) 

292 

293 # Find unusually long intervals (>2x expected) 

294 for i, interval in enumerate(intervals): 

295 if interval > expected_period * 2.0: 

296 timestamp_us = (transitions[i] / sample_rate) * 1e6 

297 duration_ns = (interval / sample_rate) * 1e9 

298 multiplier = interval / expected_period 

299 

300 description = f"Missing transition, signal held for {multiplier:.1f}x expected duration" 

301 

302 # Severity based on how long the dropout is 

303 if multiplier > 5.0: 

304 severity: Severity = "CRITICAL" 

305 elif multiplier > 3.0: 

306 severity = "WARNING" 

307 else: 

308 severity = "INFO" 

309 

310 anomalies.append( 

311 Anomaly( 

312 timestamp_us=timestamp_us, 

313 type="dropout", 

314 severity=severity, 

315 description=description, 

316 duration_ns=duration_ns, 

317 confidence=0.88, 

318 metadata={"expected_period_ns": (expected_period / sample_rate) * 1e9}, 

319 ) 

320 ) 

321 

322 return anomalies 

323 

324 

325def _detect_noise_spikes( 

326 data: NDArray[np.floating[Any]], 

327 sample_rate: float, 

328 voltage_swing: float, 

329 stats: dict[str, float], 

330) -> list[Anomaly]: 

331 """Detect noise spikes (>20% of signal swing). 

332 

333 Args: 

334 data: Signal data array. 

335 sample_rate: Sample rate in Hz. 

336 voltage_swing: Peak-to-peak voltage. 

337 stats: Basic statistics. 

338 

339 Returns: 

340 List of detected noise spike anomalies. 

341 """ 

342 anomalies: list[Anomaly] = [] 

343 

344 if voltage_swing == 0 or len(data) < 10: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 return anomalies 

346 

347 # Use running window to detect local spikes 

348 window = 10 

349 spike_threshold = voltage_swing * 0.2 

350 

351 for i in range(window, len(data) - window): 

352 local_mean = np.mean(data[i - window : i + window]) 

353 deviation = abs(data[i] - local_mean) 

354 

355 if deviation > spike_threshold: 

356 timestamp_us = (i / sample_rate) * 1e6 

357 percent = (deviation / voltage_swing) * 100 

358 

359 description = f"Noise spike {percent:.0f}% of signal swing" 

360 

361 # Severity based on spike magnitude 

362 if percent > 50: 

363 severity: Severity = "WARNING" 

364 else: 

365 severity = "INFO" 

366 

367 anomalies.append( 

368 Anomaly( 

369 timestamp_us=timestamp_us, 

370 type="noise_spike", 

371 severity=severity, 

372 description=description, 

373 duration_ns=(1 / sample_rate) * 1e9, 

374 confidence=0.80, 

375 metadata={"deviation_v": deviation}, 

376 ) 

377 ) 

378 

379 # Skip ahead to avoid duplicate detections 

380 i += window 

381 

382 # Limit number of noise spikes reported 

383 return anomalies[:50] 

384 

385 

386def _detect_timing_violations( 

387 data: NDArray[np.floating[Any]], 

388 sample_rate: float, 

389 stats: dict[str, float], 

390) -> list[Anomaly]: 

391 """Detect timing violations (±5% of expected timing). 

392 

393 Args: 

394 data: Signal data array. 

395 sample_rate: Sample rate in Hz. 

396 stats: Basic statistics. 

397 

398 Returns: 

399 List of detected timing violation anomalies. 

400 """ 

401 anomalies: list[Anomaly] = [] 

402 

403 if len(data) < 100: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 return anomalies 

405 

406 # Find edges 

407 threshold = stats["mean"] 

408 digital = data > threshold 

409 transitions = np.where(np.diff(digital.astype(int)) != 0)[0] 

410 

411 if len(transitions) < 10: 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true

412 return anomalies 

413 

414 # Analyze timing consistency 

415 intervals = np.diff(transitions) 

416 expected_interval = np.median(intervals) 

417 tolerance = expected_interval * 0.05 # 5% tolerance 

418 

419 # Find violations 

420 for i, interval in enumerate(intervals): 

421 deviation = abs(interval - expected_interval) 

422 

423 if deviation > tolerance: 

424 timestamp_us = (transitions[i] / sample_rate) * 1e6 

425 percent_dev = (deviation / expected_interval) * 100 

426 

427 description = f"Timing deviation {percent_dev:.1f}% from expected" 

428 

429 # Severity based on deviation magnitude 

430 if percent_dev > 15: 

431 severity: Severity = "WARNING" 

432 else: 

433 severity = "INFO" 

434 

435 anomalies.append( 

436 Anomaly( 

437 timestamp_us=timestamp_us, 

438 type="timing_violation", 

439 severity=severity, 

440 description=description, 

441 duration_ns=(interval / sample_rate) * 1e9, 

442 confidence=0.75, 

443 metadata={"deviation_percent": percent_dev}, 

444 ) 

445 ) 

446 

447 # Limit violations reported 

448 return anomalies[:20] 

449 

450 

451def _detect_ringing( 

452 data: NDArray[np.floating[Any]], 

453 sample_rate: float, 

454 voltage_swing: float, 

455 stats: dict[str, float], 

456) -> list[Anomaly]: 

457 """Detect ringing (≥3 oscillations). 

458 

459 Args: 

460 data: Signal data array. 

461 sample_rate: Sample rate in Hz. 

462 voltage_swing: Peak-to-peak voltage. 

463 stats: Basic statistics. 

464 

465 Returns: 

466 List of detected ringing anomalies. 

467 """ 

468 anomalies: list[Anomaly] = [] 

469 

470 if voltage_swing == 0 or len(data) < 50: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 return anomalies 

472 

473 # Look for oscillations after transitions 

474 threshold = stats["mean"] 

475 digital = data > threshold 

476 transitions = np.where(np.diff(digital.astype(int)) != 0)[0] 

477 

478 for trans_idx in transitions: 

479 # Check window after transition 

480 window_size = min(50, len(data) - trans_idx - 1) 

481 if window_size < 10: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 continue 

483 

484 window = data[trans_idx + 1 : trans_idx + 1 + window_size] 

485 

486 # Count zero crossings (oscillations) 

487 window_mean = np.mean(window) 

488 crossings = np.sum(np.diff(np.sign(window - window_mean)) != 0) 

489 

490 # Ringing should have ≥3 oscillations 

491 if crossings >= 6: # 6 crossings = 3 full oscillations 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 timestamp_us = (trans_idx / sample_rate) * 1e6 

493 duration_ns = (window_size / sample_rate) * 1e9 

494 num_oscillations = crossings // 2 

495 

496 description = f"Ringing with {num_oscillations} oscillations after edge" 

497 

498 severity: Severity = "INFO" 

499 

500 anomalies.append( 

501 Anomaly( 

502 timestamp_us=timestamp_us, 

503 type="ringing", 

504 severity=severity, 

505 description=description, 

506 duration_ns=duration_ns, 

507 confidence=0.70, 

508 metadata={"oscillations": num_oscillations}, 

509 ) 

510 ) 

511 

512 return anomalies[:10] 

513 

514 

515def _detect_overshoot( 

516 data: NDArray[np.floating[Any]], 

517 sample_rate: float, 

518 voltage_swing: float, 

519 stats: dict[str, float], 

520) -> list[Anomaly]: 

521 """Detect overshoot (>10% beyond high rail). 

522 

523 Args: 

524 data: Signal data array. 

525 sample_rate: Sample rate in Hz. 

526 voltage_swing: Peak-to-peak voltage. 

527 stats: Basic statistics. 

528 

529 Returns: 

530 List of detected overshoot anomalies. 

531 """ 

532 anomalies: list[Anomaly] = [] 

533 

534 if voltage_swing == 0: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 return anomalies 

536 

537 # Define expected high rail (based on histogram peaks) 

538 high_rail = stats["max"] * 0.95 # Expected rail at 95th percentile 

539 overshoot_threshold = high_rail * 1.1 # 10% above rail 

540 

541 # Find overshoot samples 

542 overshoots = np.where(data > overshoot_threshold)[0] 

543 

544 if len(overshoots) == 0: 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true

545 return anomalies 

546 

547 # Group consecutive samples 

548 groups = [] 

549 current = [overshoots[0]] 

550 

551 for idx in overshoots[1:]: 

552 if idx == current[-1] + 1: 

553 current.append(idx) 

554 else: 

555 groups.append(current) 

556 current = [idx] 

557 

558 groups.append(current) 

559 

560 # Report each overshoot event 

561 for group in groups: 

562 timestamp_us = (group[0] / sample_rate) * 1e6 

563 peak_value = np.max(data[group]) 

564 percent_over = ((peak_value - high_rail) / high_rail) * 100 

565 

566 description = ( 

567 f"Signal exceeded expected high level by {percent_over:.0f}% (peak: {peak_value:.2f}V)" 

568 ) 

569 

570 # Severity based on overshoot magnitude 

571 if percent_over > 20: 

572 severity: Severity = "WARNING" 

573 else: 

574 severity = "INFO" 

575 

576 anomalies.append( 

577 Anomaly( 

578 timestamp_us=timestamp_us, 

579 type="overshoot", 

580 severity=severity, 

581 description=description, 

582 duration_ns=(len(group) / sample_rate) * 1e9, 

583 confidence=0.82, 

584 metadata={"peak_voltage": peak_value}, 

585 ) 

586 ) 

587 

588 return anomalies[:10] 

589 

590 

591def _detect_undershoot( 

592 data: NDArray[np.floating[Any]], 

593 sample_rate: float, 

594 voltage_swing: float, 

595 stats: dict[str, float], 

596) -> list[Anomaly]: 

597 """Detect undershoot (>10% beyond low rail). 

598 

599 Args: 

600 data: Signal data array. 

601 sample_rate: Sample rate in Hz. 

602 voltage_swing: Peak-to-peak voltage. 

603 stats: Basic statistics. 

604 

605 Returns: 

606 List of detected undershoot anomalies. 

607 """ 

608 anomalies: list[Anomaly] = [] 

609 

610 if voltage_swing == 0: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true

611 return anomalies 

612 

613 # Define expected low rail 

614 low_rail = stats["min"] * 1.05 # Expected rail at 5th percentile 

615 undershoot_threshold = low_rail * 0.9 # 10% below rail (more negative) 

616 

617 # Find undershoot samples 

618 undershoots = np.where(data < undershoot_threshold)[0] 

619 

620 if len(undershoots) == 0: 

621 return anomalies 

622 

623 # Group consecutive samples 

624 groups = [] 

625 current = [undershoots[0]] 

626 

627 for idx in undershoots[1:]: 

628 if idx == current[-1] + 1: 628 ↛ 631line 628 didn't jump to line 631 because the condition on line 628 was always true

629 current.append(idx) 

630 else: 

631 groups.append(current) 

632 current = [idx] 

633 

634 groups.append(current) 

635 

636 # Report each undershoot event 

637 for group in groups: 

638 timestamp_us = (group[0] / sample_rate) * 1e6 

639 min_value = np.min(data[group]) 

640 percent_under = ((low_rail - min_value) / abs(low_rail)) * 100 if low_rail != 0 else 0 

641 

642 description = ( 

643 f"Signal fell below expected low level by {percent_under:.0f}% (min: {min_value:.2f}V)" 

644 ) 

645 

646 # Severity based on undershoot magnitude 

647 if percent_under > 20: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 severity: Severity = "WARNING" 

649 else: 

650 severity = "INFO" 

651 

652 anomalies.append( 

653 Anomaly( 

654 timestamp_us=timestamp_us, 

655 type="undershoot", 

656 severity=severity, 

657 description=description, 

658 duration_ns=(len(group) / sample_rate) * 1e9, 

659 confidence=0.82, 

660 metadata={"min_voltage": min_value}, 

661 ) 

662 ) 

663 

664 return anomalies[:10] 

665 

666 

667__all__ = [ 

668 "Anomaly", 

669 "AnomalyType", 

670 "Severity", 

671 "find_anomalies", 

672]