Coverage for src / tracekit / discovery / anomaly_detector.py: 67%
223 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Automatic anomaly detection and highlighting.
3This module detects unusual signal features (glitches, dropouts, noise
4spikes, timing violations) to guide user attention.
7Example:
8 >>> from tracekit.discovery import find_anomalies
9 >>> anomalies = find_anomalies(trace)
10 >>> for anom in anomalies:
11 ... print(f"{anom.timestamp_us:.2f}us: {anom.type} - {anom.description}")
13References:
14 IEEE 1057-2017: Digitizing Waveform Recorders
15"""
17from __future__ import annotations
19from dataclasses import dataclass, field
20from typing import TYPE_CHECKING, Any, Literal
22import numpy as np
24from tracekit.analyzers.statistics.basic import basic_stats
25from tracekit.core.types import DigitalTrace, WaveformTrace
27if TYPE_CHECKING:
28 from numpy.typing import NDArray
30AnomalyType = Literal[
31 "glitch",
32 "dropout",
33 "noise_spike",
34 "timing_violation",
35 "ringing",
36 "overshoot",
37 "undershoot",
38]
40Severity = Literal["CRITICAL", "WARNING", "INFO"]
43@dataclass
44class Anomaly:
45 """Detected signal anomaly.
47 Represents an unusual or interesting signal feature with timing,
48 classification, and plain-language explanation.
50 Attributes:
51 timestamp_us: Anomaly start time in microseconds.
52 type: Type of anomaly detected.
53 severity: Impact level (CRITICAL, WARNING, INFO).
54 description: Plain-language explanation.
55 duration_ns: Duration in nanoseconds.
56 confidence: Detection confidence (0.0-1.0).
57 metadata: Additional type-specific information.
59 Example:
60 >>> anomaly = Anomaly(
61 ... timestamp_us=45.23,
62 ... type="glitch",
63 ... severity="WARNING",
64 ... description="Brief 35ns pulse, likely noise spike",
65 ... duration_ns=35.0,
66 ... confidence=0.92
67 ... )
68 """
70 timestamp_us: float
71 type: AnomalyType
72 severity: Severity
73 description: str
74 duration_ns: float = 0.0
75 confidence: float = 1.0
76 metadata: dict[str, float] = field(default_factory=dict)
79def find_anomalies(
80 trace: WaveformTrace | DigitalTrace,
81 *,
82 severity_filter: list[Severity] | None = None,
83 min_confidence: float = 0.7,
84 anomaly_types: list[AnomalyType] | None = None,
85) -> list[Anomaly]:
86 """Detect anomalies in signal automatically.
88 Identifies glitches, dropouts, noise spikes, timing violations, ringing,
89 and overshoot/undershoot without requiring user configuration.
91 Args:
92 trace: Input waveform or digital trace.
93 severity_filter: Only return specified severity levels (default: all).
94 min_confidence: Minimum confidence threshold (0.0-1.0).
95 anomaly_types: Specific anomaly types to detect (default: all).
97 Returns:
98 List of detected Anomaly objects, sorted by timestamp.
100 Raises:
101 ValueError: If trace is empty or invalid.
103 Example:
104 >>> anomalies = find_anomalies(trace, severity_filter=['CRITICAL', 'WARNING'])
105 >>> print(f"Found {len(anomalies)} critical/warning anomalies")
106 >>> for anom in anomalies[:5]:
107 ... print(f" {anom.timestamp_us:.2f}us: {anom.type} - {anom.description}")
109 References:
110 DISC-002: Anomaly Highlighting
111 """
112 # Validate input
113 if len(trace) == 0:
114 raise ValueError("Cannot detect anomalies in empty trace")
116 # Get signal data
117 if isinstance(trace, WaveformTrace): 117 ↛ 121line 117 didn't jump to line 121 because the condition on line 117 was always true
118 data = trace.data
119 sample_rate = trace.metadata.sample_rate
120 else:
121 data = trace.data.astype(np.float64)
122 sample_rate = trace.metadata.sample_rate
124 # Compute basic statistics for reference
125 stats = basic_stats(data)
126 voltage_swing = stats["max"] - stats["min"]
128 # Collect all anomalies
129 all_anomalies: list[Anomaly] = []
131 # Define which anomaly types to check
132 if anomaly_types is None:
133 check_types: list[AnomalyType] = [
134 "glitch",
135 "dropout",
136 "noise_spike",
137 "timing_violation",
138 "ringing",
139 "overshoot",
140 "undershoot",
141 ]
142 else:
143 check_types = anomaly_types
145 # Detect each type
146 if "glitch" in check_types:
147 all_anomalies.extend(_detect_glitches(data, sample_rate, voltage_swing, stats))
149 if "dropout" in check_types:
150 all_anomalies.extend(_detect_dropouts(data, sample_rate, voltage_swing, stats))
152 if "noise_spike" in check_types:
153 all_anomalies.extend(_detect_noise_spikes(data, sample_rate, voltage_swing, stats))
155 if "timing_violation" in check_types:
156 all_anomalies.extend(_detect_timing_violations(data, sample_rate, stats))
158 if "ringing" in check_types:
159 all_anomalies.extend(_detect_ringing(data, sample_rate, voltage_swing, stats))
161 if "overshoot" in check_types:
162 all_anomalies.extend(_detect_overshoot(data, sample_rate, voltage_swing, stats))
164 if "undershoot" in check_types:
165 all_anomalies.extend(_detect_undershoot(data, sample_rate, voltage_swing, stats))
167 # Filter by confidence
168 all_anomalies = [a for a in all_anomalies if a.confidence >= min_confidence]
170 # Filter by severity if requested
171 if severity_filter is not None:
172 all_anomalies = [a for a in all_anomalies if a.severity in severity_filter]
174 # Sort by timestamp
175 all_anomalies.sort(key=lambda a: a.timestamp_us)
177 return all_anomalies
180def _detect_glitches(
181 data: NDArray[np.floating[Any]],
182 sample_rate: float,
183 voltage_swing: float,
184 stats: dict[str, float],
185) -> list[Anomaly]:
186 """Detect brief narrow pulses (glitches).
188 Args:
189 data: Signal data array.
190 sample_rate: Sample rate in Hz.
191 voltage_swing: Peak-to-peak voltage.
192 stats: Basic statistics.
194 Returns:
195 List of detected glitch anomalies.
196 """
197 anomalies: list[Anomaly] = []
199 if voltage_swing == 0 or len(data) < 10: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 return anomalies
202 # Threshold for glitch detection
203 threshold = stats["mean"]
204 glitch_threshold = voltage_swing * 0.3 # 30% of swing
206 # Find samples far from mean
207 deviations = np.abs(data - threshold)
208 glitch_candidates = np.where(deviations > glitch_threshold)[0]
210 if len(glitch_candidates) == 0: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return anomalies
213 # Group consecutive samples into glitches
214 glitch_groups = []
215 current_group = [glitch_candidates[0]]
217 for idx in glitch_candidates[1:]:
218 if idx == current_group[-1] + 1:
219 current_group.append(idx)
220 else:
221 glitch_groups.append(current_group)
222 current_group = [idx]
224 glitch_groups.append(current_group)
226 # Analyze each glitch
227 for group in glitch_groups:
228 duration_samples = len(group)
229 duration_ns = (duration_samples / sample_rate) * 1e9
231 # Only report glitches < 50ns
232 if duration_ns < 50:
233 timestamp_us = (group[0] / sample_rate) * 1e6
234 magnitude = np.max(np.abs(data[group] - threshold))
236 # Determine severity based on magnitude
237 if magnitude > voltage_swing * 0.5: 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true
238 severity: Severity = "WARNING"
239 else:
240 severity = "INFO"
242 description = f"Brief {duration_ns:.0f}ns pulse, likely noise spike"
244 anomalies.append(
245 Anomaly(
246 timestamp_us=timestamp_us,
247 type="glitch",
248 severity=severity,
249 description=description,
250 duration_ns=duration_ns,
251 confidence=0.85,
252 metadata={"magnitude": magnitude},
253 )
254 )
256 return anomalies
259def _detect_dropouts(
260 data: NDArray[np.floating[Any]],
261 sample_rate: float,
262 voltage_swing: float,
263 stats: dict[str, float],
264) -> list[Anomaly]:
265 """Detect missing transitions or prolonged holds.
267 Args:
268 data: Signal data array.
269 sample_rate: Sample rate in Hz.
270 voltage_swing: Peak-to-peak voltage.
271 stats: Basic statistics.
273 Returns:
274 List of detected dropout anomalies.
275 """
276 anomalies: list[Anomaly] = []
278 if voltage_swing == 0 or len(data) < 100: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 return anomalies
281 # Estimate expected period from transitions
282 threshold = (stats["max"] + stats["min"]) / 2
283 digital = data > threshold
284 transitions = np.where(np.diff(digital.astype(int)) != 0)[0]
286 if len(transitions) < 5: 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was always true
287 return anomalies
289 # Calculate typical transition interval
290 intervals = np.diff(transitions)
291 expected_period = np.median(intervals)
293 # Find unusually long intervals (>2x expected)
294 for i, interval in enumerate(intervals):
295 if interval > expected_period * 2.0:
296 timestamp_us = (transitions[i] / sample_rate) * 1e6
297 duration_ns = (interval / sample_rate) * 1e9
298 multiplier = interval / expected_period
300 description = f"Missing transition, signal held for {multiplier:.1f}x expected duration"
302 # Severity based on how long the dropout is
303 if multiplier > 5.0:
304 severity: Severity = "CRITICAL"
305 elif multiplier > 3.0:
306 severity = "WARNING"
307 else:
308 severity = "INFO"
310 anomalies.append(
311 Anomaly(
312 timestamp_us=timestamp_us,
313 type="dropout",
314 severity=severity,
315 description=description,
316 duration_ns=duration_ns,
317 confidence=0.88,
318 metadata={"expected_period_ns": (expected_period / sample_rate) * 1e9},
319 )
320 )
322 return anomalies
325def _detect_noise_spikes(
326 data: NDArray[np.floating[Any]],
327 sample_rate: float,
328 voltage_swing: float,
329 stats: dict[str, float],
330) -> list[Anomaly]:
331 """Detect noise spikes (>20% of signal swing).
333 Args:
334 data: Signal data array.
335 sample_rate: Sample rate in Hz.
336 voltage_swing: Peak-to-peak voltage.
337 stats: Basic statistics.
339 Returns:
340 List of detected noise spike anomalies.
341 """
342 anomalies: list[Anomaly] = []
344 if voltage_swing == 0 or len(data) < 10: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 return anomalies
347 # Use running window to detect local spikes
348 window = 10
349 spike_threshold = voltage_swing * 0.2
351 for i in range(window, len(data) - window):
352 local_mean = np.mean(data[i - window : i + window])
353 deviation = abs(data[i] - local_mean)
355 if deviation > spike_threshold:
356 timestamp_us = (i / sample_rate) * 1e6
357 percent = (deviation / voltage_swing) * 100
359 description = f"Noise spike {percent:.0f}% of signal swing"
361 # Severity based on spike magnitude
362 if percent > 50:
363 severity: Severity = "WARNING"
364 else:
365 severity = "INFO"
367 anomalies.append(
368 Anomaly(
369 timestamp_us=timestamp_us,
370 type="noise_spike",
371 severity=severity,
372 description=description,
373 duration_ns=(1 / sample_rate) * 1e9,
374 confidence=0.80,
375 metadata={"deviation_v": deviation},
376 )
377 )
379 # Skip ahead to avoid duplicate detections
380 i += window
382 # Limit number of noise spikes reported
383 return anomalies[:50]
386def _detect_timing_violations(
387 data: NDArray[np.floating[Any]],
388 sample_rate: float,
389 stats: dict[str, float],
390) -> list[Anomaly]:
391 """Detect timing violations (±5% of expected timing).
393 Args:
394 data: Signal data array.
395 sample_rate: Sample rate in Hz.
396 stats: Basic statistics.
398 Returns:
399 List of detected timing violation anomalies.
400 """
401 anomalies: list[Anomaly] = []
403 if len(data) < 100: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 return anomalies
406 # Find edges
407 threshold = stats["mean"]
408 digital = data > threshold
409 transitions = np.where(np.diff(digital.astype(int)) != 0)[0]
411 if len(transitions) < 10: 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true
412 return anomalies
414 # Analyze timing consistency
415 intervals = np.diff(transitions)
416 expected_interval = np.median(intervals)
417 tolerance = expected_interval * 0.05 # 5% tolerance
419 # Find violations
420 for i, interval in enumerate(intervals):
421 deviation = abs(interval - expected_interval)
423 if deviation > tolerance:
424 timestamp_us = (transitions[i] / sample_rate) * 1e6
425 percent_dev = (deviation / expected_interval) * 100
427 description = f"Timing deviation {percent_dev:.1f}% from expected"
429 # Severity based on deviation magnitude
430 if percent_dev > 15:
431 severity: Severity = "WARNING"
432 else:
433 severity = "INFO"
435 anomalies.append(
436 Anomaly(
437 timestamp_us=timestamp_us,
438 type="timing_violation",
439 severity=severity,
440 description=description,
441 duration_ns=(interval / sample_rate) * 1e9,
442 confidence=0.75,
443 metadata={"deviation_percent": percent_dev},
444 )
445 )
447 # Limit violations reported
448 return anomalies[:20]
451def _detect_ringing(
452 data: NDArray[np.floating[Any]],
453 sample_rate: float,
454 voltage_swing: float,
455 stats: dict[str, float],
456) -> list[Anomaly]:
457 """Detect ringing (≥3 oscillations).
459 Args:
460 data: Signal data array.
461 sample_rate: Sample rate in Hz.
462 voltage_swing: Peak-to-peak voltage.
463 stats: Basic statistics.
465 Returns:
466 List of detected ringing anomalies.
467 """
468 anomalies: list[Anomaly] = []
470 if voltage_swing == 0 or len(data) < 50: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 return anomalies
473 # Look for oscillations after transitions
474 threshold = stats["mean"]
475 digital = data > threshold
476 transitions = np.where(np.diff(digital.astype(int)) != 0)[0]
478 for trans_idx in transitions:
479 # Check window after transition
480 window_size = min(50, len(data) - trans_idx - 1)
481 if window_size < 10: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 continue
484 window = data[trans_idx + 1 : trans_idx + 1 + window_size]
486 # Count zero crossings (oscillations)
487 window_mean = np.mean(window)
488 crossings = np.sum(np.diff(np.sign(window - window_mean)) != 0)
490 # Ringing should have ≥3 oscillations
491 if crossings >= 6: # 6 crossings = 3 full oscillations 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 timestamp_us = (trans_idx / sample_rate) * 1e6
493 duration_ns = (window_size / sample_rate) * 1e9
494 num_oscillations = crossings // 2
496 description = f"Ringing with {num_oscillations} oscillations after edge"
498 severity: Severity = "INFO"
500 anomalies.append(
501 Anomaly(
502 timestamp_us=timestamp_us,
503 type="ringing",
504 severity=severity,
505 description=description,
506 duration_ns=duration_ns,
507 confidence=0.70,
508 metadata={"oscillations": num_oscillations},
509 )
510 )
512 return anomalies[:10]
515def _detect_overshoot(
516 data: NDArray[np.floating[Any]],
517 sample_rate: float,
518 voltage_swing: float,
519 stats: dict[str, float],
520) -> list[Anomaly]:
521 """Detect overshoot (>10% beyond high rail).
523 Args:
524 data: Signal data array.
525 sample_rate: Sample rate in Hz.
526 voltage_swing: Peak-to-peak voltage.
527 stats: Basic statistics.
529 Returns:
530 List of detected overshoot anomalies.
531 """
532 anomalies: list[Anomaly] = []
534 if voltage_swing == 0: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 return anomalies
537 # Define expected high rail (based on histogram peaks)
538 high_rail = stats["max"] * 0.95 # Expected rail at 95th percentile
539 overshoot_threshold = high_rail * 1.1 # 10% above rail
541 # Find overshoot samples
542 overshoots = np.where(data > overshoot_threshold)[0]
544 if len(overshoots) == 0: 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true
545 return anomalies
547 # Group consecutive samples
548 groups = []
549 current = [overshoots[0]]
551 for idx in overshoots[1:]:
552 if idx == current[-1] + 1:
553 current.append(idx)
554 else:
555 groups.append(current)
556 current = [idx]
558 groups.append(current)
560 # Report each overshoot event
561 for group in groups:
562 timestamp_us = (group[0] / sample_rate) * 1e6
563 peak_value = np.max(data[group])
564 percent_over = ((peak_value - high_rail) / high_rail) * 100
566 description = (
567 f"Signal exceeded expected high level by {percent_over:.0f}% (peak: {peak_value:.2f}V)"
568 )
570 # Severity based on overshoot magnitude
571 if percent_over > 20:
572 severity: Severity = "WARNING"
573 else:
574 severity = "INFO"
576 anomalies.append(
577 Anomaly(
578 timestamp_us=timestamp_us,
579 type="overshoot",
580 severity=severity,
581 description=description,
582 duration_ns=(len(group) / sample_rate) * 1e9,
583 confidence=0.82,
584 metadata={"peak_voltage": peak_value},
585 )
586 )
588 return anomalies[:10]
591def _detect_undershoot(
592 data: NDArray[np.floating[Any]],
593 sample_rate: float,
594 voltage_swing: float,
595 stats: dict[str, float],
596) -> list[Anomaly]:
597 """Detect undershoot (>10% beyond low rail).
599 Args:
600 data: Signal data array.
601 sample_rate: Sample rate in Hz.
602 voltage_swing: Peak-to-peak voltage.
603 stats: Basic statistics.
605 Returns:
606 List of detected undershoot anomalies.
607 """
608 anomalies: list[Anomaly] = []
610 if voltage_swing == 0: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true
611 return anomalies
613 # Define expected low rail
614 low_rail = stats["min"] * 1.05 # Expected rail at 5th percentile
615 undershoot_threshold = low_rail * 0.9 # 10% below rail (more negative)
617 # Find undershoot samples
618 undershoots = np.where(data < undershoot_threshold)[0]
620 if len(undershoots) == 0:
621 return anomalies
623 # Group consecutive samples
624 groups = []
625 current = [undershoots[0]]
627 for idx in undershoots[1:]:
628 if idx == current[-1] + 1: 628 ↛ 631line 628 didn't jump to line 631 because the condition on line 628 was always true
629 current.append(idx)
630 else:
631 groups.append(current)
632 current = [idx]
634 groups.append(current)
636 # Report each undershoot event
637 for group in groups:
638 timestamp_us = (group[0] / sample_rate) * 1e6
639 min_value = np.min(data[group])
640 percent_under = ((low_rail - min_value) / abs(low_rail)) * 100 if low_rail != 0 else 0
642 description = (
643 f"Signal fell below expected low level by {percent_under:.0f}% (min: {min_value:.2f}V)"
644 )
646 # Severity based on undershoot magnitude
647 if percent_under > 20: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 severity: Severity = "WARNING"
649 else:
650 severity = "INFO"
652 anomalies.append(
653 Anomaly(
654 timestamp_us=timestamp_us,
655 type="undershoot",
656 severity=severity,
657 description=description,
658 duration_ns=(len(group) / sample_rate) * 1e9,
659 confidence=0.82,
660 metadata={"min_voltage": min_value},
661 )
662 )
664 return anomalies[:10]
667__all__ = [
668 "Anomaly",
669 "AnomalyType",
670 "Severity",
671 "find_anomalies",
672]