Coverage for src / tracekit / search / anomaly.py: 96%
90 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Anomaly detection in signal traces.
4This module provides automated detection of glitches, timing violations,
5and protocol errors with context extraction for debugging.
6"""
8from typing import Any
10import numpy as np
11from numpy.typing import NDArray
14def find_anomalies(
15 trace: NDArray[np.float64],
16 anomaly_type: str = "glitch",
17 *,
18 threshold: float | None = None,
19 min_width: float | None = None,
20 max_width: float | None = None,
21 sample_rate: float | None = None,
22 context_samples: int = 100,
23 **kwargs: Any,
24) -> list[dict[str, Any]]:
25 """Find glitches, timing violations, or protocol errors in traces.
27 : Anomaly detection with context extraction.
28 Integrates with QUAL-005 glitch detection for signal quality analysis.
30 Args:
31 trace: Input signal trace
32 anomaly_type: Type of anomaly to detect:
33 - 'glitch': Short-duration voltage spikes/dips
34 - 'timing': Edge timing violations (requires sample_rate)
35 - 'protocol': Protocol-level errors (requires decoded data)
36 threshold: Detection threshold. Meaning depends on anomaly_type:
37 - glitch: Voltage deviation from expected level
38 - timing: Timing violation threshold in seconds
39 min_width: Minimum anomaly width in seconds (requires sample_rate)
40 max_width: Maximum anomaly width in seconds (requires sample_rate)
41 sample_rate: Sample rate in Hz (required for timing analysis)
42 context_samples: Number of samples to include before/after anomaly
43 for context extraction (default: 100)
44 **kwargs: Additional type-specific parameters
46 Returns:
47 List of anomaly dictionaries, each containing:
48 - index: Sample index where anomaly occurs
49 - type: Anomaly type
50 - severity: Severity score (0-1, higher is worse)
51 - duration: Duration in samples
52 - amplitude: Amplitude deviation (for glitches)
53 - context: ±context_samples around anomaly
54 - description: Human-readable description
56 Raises:
57 ValueError: If invalid anomaly_type or missing required parameters
59 Examples:
60 >>> # Detect voltage glitches
61 >>> trace = np.array([0, 0, 0, 0.8, 0, 0, 0]) # Spike at index 3
62 >>> anomalies = find_anomalies(
63 ... trace,
64 ... anomaly_type='glitch',
65 ... threshold=0.5,
66 ... sample_rate=1e6
67 ... )
68 >>> print(f"Found {len(anomalies)} glitches")
70 >>> # Detect timing violations
71 >>> anomalies = find_anomalies(
72 ... trace,
73 ... anomaly_type='timing',
74 ... min_width=10e-9, # 10 ns minimum
75 ... max_width=100e-9, # 100 ns maximum
76 ... sample_rate=1e9
77 ... )
79 Notes:
80 - Glitch detection uses derivative and threshold methods
81 - Timing detection requires sample_rate for width calculations
82 - Context extraction handles edge cases at trace boundaries
83 - Integrates with QUAL-005 for comprehensive signal quality analysis
85 References:
86 SRCH-002: Anomaly Search
87 QUAL-005: Glitch Detection
88 """
89 if trace.size == 0:
90 return []
92 valid_types = {"glitch", "timing", "protocol"}
93 if anomaly_type not in valid_types:
94 raise ValueError(f"Invalid anomaly_type '{anomaly_type}'. Must be one of: {valid_types}")
96 anomalies: list[dict[str, Any]] = []
98 if anomaly_type == "glitch":
99 anomalies = _detect_glitches(
100 trace,
101 threshold=threshold,
102 min_width=min_width,
103 max_width=max_width,
104 sample_rate=sample_rate,
105 context_samples=context_samples,
106 )
108 elif anomaly_type == "timing":
109 if sample_rate is None:
110 raise ValueError("sample_rate required for timing anomaly detection")
112 anomalies = _detect_timing_violations(
113 trace,
114 sample_rate=sample_rate,
115 min_width=min_width,
116 max_width=max_width,
117 context_samples=context_samples,
118 )
120 elif anomaly_type == "protocol": 120 ↛ 125line 120 didn't jump to line 125 because the condition on line 120 was always true
121 # Protocol error detection would integrate with protocol decoders
122 # For now, return empty list with note
123 anomalies = []
125 return anomalies
128def _detect_glitches(
129 trace: NDArray[np.float64],
130 threshold: float | None,
131 min_width: float | None,
132 max_width: float | None,
133 sample_rate: float | None,
134 context_samples: int,
135) -> list[dict[str, Any]]:
136 """Detect voltage glitches using derivative method."""
137 glitches: list[dict[str, Any]] = []
139 # Auto-threshold if not provided
140 threshold_value: float
141 if threshold is None:
142 # Use 3 sigma as default threshold
143 threshold_value = float(3 * np.std(trace))
144 else:
145 threshold_value = threshold
147 # Compute derivative to find rapid changes
148 derivative = np.diff(trace)
149 abs_derivative = np.abs(derivative)
151 # Find points where derivative exceeds threshold
152 glitch_candidates = np.where(abs_derivative > threshold_value)[0]
154 if len(glitch_candidates) == 0:
155 return glitches
157 # Group consecutive points into glitch events
158 glitch_groups = []
159 current_group = [glitch_candidates[0]]
161 for idx in glitch_candidates[1:]:
162 if idx == current_group[-1] + 1:
163 current_group.append(idx)
164 else:
165 glitch_groups.append(current_group)
166 current_group = [idx]
168 if current_group: 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was always true
169 glitch_groups.append(current_group)
171 # Filter by width if specified
172 for group in glitch_groups:
173 start_idx = group[0]
174 end_idx = group[-1] + 1
175 duration_samples = end_idx - start_idx
177 # Check width constraints
178 if sample_rate is not None:
179 duration_seconds = duration_samples / sample_rate
181 if min_width is not None and duration_seconds < min_width:
182 continue
183 if max_width is not None and duration_seconds > max_width: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 continue
186 # Extract context
187 ctx_start = max(0, start_idx - context_samples)
188 ctx_end = min(len(trace), end_idx + context_samples)
189 context = trace[ctx_start:ctx_end].copy()
191 # Compute amplitude deviation
192 baseline = np.median(trace)
193 amplitude = np.max(np.abs(trace[start_idx:end_idx] - baseline))
195 # Severity: normalized amplitude
196 severity = min(1.0, amplitude / (threshold_value * 3))
198 glitches.append(
199 {
200 "index": start_idx,
201 "type": "glitch",
202 "severity": float(severity),
203 "duration": duration_samples,
204 "amplitude": float(amplitude),
205 "context": context,
206 "description": f"Glitch at sample {start_idx}, amplitude {amplitude:.3g}",
207 }
208 )
210 return glitches
213def _detect_timing_violations(
214 trace: NDArray[np.float64],
215 sample_rate: float,
216 min_width: float | None,
217 max_width: float | None,
218 context_samples: int,
219) -> list[dict[str, Any]]:
220 """Detect timing violations (pulse width violations)."""
221 violations = []
223 # Simple threshold for digital signal
224 threshold = (np.max(trace) + np.min(trace)) / 2
225 digital = (trace >= threshold).astype(int)
227 # Find edges
228 edges = np.diff(digital)
229 rising_edges = np.where(edges == 1)[0]
230 falling_edges = np.where(edges == -1)[0]
232 # Measure pulse widths
233 for rise in rising_edges:
234 # Find next falling edge
235 next_fall = falling_edges[falling_edges > rise]
236 if len(next_fall) == 0:
237 continue
239 fall = next_fall[0]
240 pulse_width_samples = fall - rise
241 pulse_width_seconds = pulse_width_samples / sample_rate
243 # Check violations
244 violated = False
245 violation_type = ""
247 if min_width is not None and pulse_width_seconds < min_width:
248 violated = True
249 violation_type = "too_short"
251 if max_width is not None and pulse_width_seconds > max_width:
252 violated = True
253 violation_type = "too_long"
255 if violated:
256 # Extract context
257 ctx_start = max(0, rise - context_samples)
258 ctx_end = min(len(trace), fall + context_samples)
259 context = trace[ctx_start:ctx_end].copy()
261 # Severity based on deviation
262 if min_width is not None:
263 deviation = abs(pulse_width_seconds - min_width) / min_width
264 elif max_width is not None: 264 ↛ 267line 264 didn't jump to line 267 because the condition on line 264 was always true
265 deviation = abs(pulse_width_seconds - max_width) / max_width
266 else:
267 deviation = 0.0
269 severity = min(1.0, deviation)
271 violations.append(
272 {
273 "index": rise,
274 "type": f"timing_{violation_type}",
275 "severity": float(severity),
276 "duration": pulse_width_samples,
277 "amplitude": float(pulse_width_seconds),
278 "context": context,
279 "description": (
280 f"Timing violation at sample {rise}: "
281 f"pulse width {pulse_width_seconds * 1e9:.1f} ns ({violation_type})"
282 ),
283 }
284 )
286 return violations