Coverage for src / tracekit / search / anomaly.py: 96%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Anomaly detection in signal traces. 

2 

3 

4This module provides automated detection of glitches, timing violations, 

5and protocol errors with context extraction for debugging. 

6""" 

7 

8from typing import Any 

9 

10import numpy as np 

11from numpy.typing import NDArray 

12 

13 

14def find_anomalies( 

15 trace: NDArray[np.float64], 

16 anomaly_type: str = "glitch", 

17 *, 

18 threshold: float | None = None, 

19 min_width: float | None = None, 

20 max_width: float | None = None, 

21 sample_rate: float | None = None, 

22 context_samples: int = 100, 

23 **kwargs: Any, 

24) -> list[dict[str, Any]]: 

25 """Find glitches, timing violations, or protocol errors in traces. 

26 

27 : Anomaly detection with context extraction. 

28 Integrates with QUAL-005 glitch detection for signal quality analysis. 

29 

30 Args: 

31 trace: Input signal trace 

32 anomaly_type: Type of anomaly to detect: 

33 - 'glitch': Short-duration voltage spikes/dips 

34 - 'timing': Edge timing violations (requires sample_rate) 

35 - 'protocol': Protocol-level errors (requires decoded data) 

36 threshold: Detection threshold. Meaning depends on anomaly_type: 

37 - glitch: Voltage deviation from expected level 

38 - timing: Timing violation threshold in seconds 

39 min_width: Minimum anomaly width in seconds (requires sample_rate) 

40 max_width: Maximum anomaly width in seconds (requires sample_rate) 

41 sample_rate: Sample rate in Hz (required for timing analysis) 

42 context_samples: Number of samples to include before/after anomaly 

43 for context extraction (default: 100) 

44 **kwargs: Additional type-specific parameters 

45 

46 Returns: 

47 List of anomaly dictionaries, each containing: 

48 - index: Sample index where anomaly occurs 

49 - type: Anomaly type 

50 - severity: Severity score (0-1, higher is worse) 

51 - duration: Duration in samples 

52 - amplitude: Amplitude deviation (for glitches) 

53 - context: ±context_samples around anomaly 

54 - description: Human-readable description 

55 

56 Raises: 

57 ValueError: If invalid anomaly_type or missing required parameters 

58 

59 Examples: 

60 >>> # Detect voltage glitches 

61 >>> trace = np.array([0, 0, 0, 0.8, 0, 0, 0]) # Spike at index 3 

62 >>> anomalies = find_anomalies( 

63 ... trace, 

64 ... anomaly_type='glitch', 

65 ... threshold=0.5, 

66 ... sample_rate=1e6 

67 ... ) 

68 >>> print(f"Found {len(anomalies)} glitches") 

69 

70 >>> # Detect timing violations 

71 >>> anomalies = find_anomalies( 

72 ... trace, 

73 ... anomaly_type='timing', 

74 ... min_width=10e-9, # 10 ns minimum 

75 ... max_width=100e-9, # 100 ns maximum 

76 ... sample_rate=1e9 

77 ... ) 

78 

79 Notes: 

80 - Glitch detection uses derivative and threshold methods 

81 - Timing detection requires sample_rate for width calculations 

82 - Context extraction handles edge cases at trace boundaries 

83 - Integrates with QUAL-005 for comprehensive signal quality analysis 

84 

85 References: 

86 SRCH-002: Anomaly Search 

87 QUAL-005: Glitch Detection 

88 """ 

89 if trace.size == 0: 

90 return [] 

91 

92 valid_types = {"glitch", "timing", "protocol"} 

93 if anomaly_type not in valid_types: 

94 raise ValueError(f"Invalid anomaly_type '{anomaly_type}'. Must be one of: {valid_types}") 

95 

96 anomalies: list[dict[str, Any]] = [] 

97 

98 if anomaly_type == "glitch": 

99 anomalies = _detect_glitches( 

100 trace, 

101 threshold=threshold, 

102 min_width=min_width, 

103 max_width=max_width, 

104 sample_rate=sample_rate, 

105 context_samples=context_samples, 

106 ) 

107 

108 elif anomaly_type == "timing": 

109 if sample_rate is None: 

110 raise ValueError("sample_rate required for timing anomaly detection") 

111 

112 anomalies = _detect_timing_violations( 

113 trace, 

114 sample_rate=sample_rate, 

115 min_width=min_width, 

116 max_width=max_width, 

117 context_samples=context_samples, 

118 ) 

119 

120 elif anomaly_type == "protocol": 120 ↛ 125line 120 didn't jump to line 125 because the condition on line 120 was always true

121 # Protocol error detection would integrate with protocol decoders 

122 # For now, return empty list with note 

123 anomalies = [] 

124 

125 return anomalies 

126 

127 

128def _detect_glitches( 

129 trace: NDArray[np.float64], 

130 threshold: float | None, 

131 min_width: float | None, 

132 max_width: float | None, 

133 sample_rate: float | None, 

134 context_samples: int, 

135) -> list[dict[str, Any]]: 

136 """Detect voltage glitches using derivative method.""" 

137 glitches: list[dict[str, Any]] = [] 

138 

139 # Auto-threshold if not provided 

140 threshold_value: float 

141 if threshold is None: 

142 # Use 3 sigma as default threshold 

143 threshold_value = float(3 * np.std(trace)) 

144 else: 

145 threshold_value = threshold 

146 

147 # Compute derivative to find rapid changes 

148 derivative = np.diff(trace) 

149 abs_derivative = np.abs(derivative) 

150 

151 # Find points where derivative exceeds threshold 

152 glitch_candidates = np.where(abs_derivative > threshold_value)[0] 

153 

154 if len(glitch_candidates) == 0: 

155 return glitches 

156 

157 # Group consecutive points into glitch events 

158 glitch_groups = [] 

159 current_group = [glitch_candidates[0]] 

160 

161 for idx in glitch_candidates[1:]: 

162 if idx == current_group[-1] + 1: 

163 current_group.append(idx) 

164 else: 

165 glitch_groups.append(current_group) 

166 current_group = [idx] 

167 

168 if current_group: 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was always true

169 glitch_groups.append(current_group) 

170 

171 # Filter by width if specified 

172 for group in glitch_groups: 

173 start_idx = group[0] 

174 end_idx = group[-1] + 1 

175 duration_samples = end_idx - start_idx 

176 

177 # Check width constraints 

178 if sample_rate is not None: 

179 duration_seconds = duration_samples / sample_rate 

180 

181 if min_width is not None and duration_seconds < min_width: 

182 continue 

183 if max_width is not None and duration_seconds > max_width: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 continue 

185 

186 # Extract context 

187 ctx_start = max(0, start_idx - context_samples) 

188 ctx_end = min(len(trace), end_idx + context_samples) 

189 context = trace[ctx_start:ctx_end].copy() 

190 

191 # Compute amplitude deviation 

192 baseline = np.median(trace) 

193 amplitude = np.max(np.abs(trace[start_idx:end_idx] - baseline)) 

194 

195 # Severity: normalized amplitude 

196 severity = min(1.0, amplitude / (threshold_value * 3)) 

197 

198 glitches.append( 

199 { 

200 "index": start_idx, 

201 "type": "glitch", 

202 "severity": float(severity), 

203 "duration": duration_samples, 

204 "amplitude": float(amplitude), 

205 "context": context, 

206 "description": f"Glitch at sample {start_idx}, amplitude {amplitude:.3g}", 

207 } 

208 ) 

209 

210 return glitches 

211 

212 

213def _detect_timing_violations( 

214 trace: NDArray[np.float64], 

215 sample_rate: float, 

216 min_width: float | None, 

217 max_width: float | None, 

218 context_samples: int, 

219) -> list[dict[str, Any]]: 

220 """Detect timing violations (pulse width violations).""" 

221 violations = [] 

222 

223 # Simple threshold for digital signal 

224 threshold = (np.max(trace) + np.min(trace)) / 2 

225 digital = (trace >= threshold).astype(int) 

226 

227 # Find edges 

228 edges = np.diff(digital) 

229 rising_edges = np.where(edges == 1)[0] 

230 falling_edges = np.where(edges == -1)[0] 

231 

232 # Measure pulse widths 

233 for rise in rising_edges: 

234 # Find next falling edge 

235 next_fall = falling_edges[falling_edges > rise] 

236 if len(next_fall) == 0: 

237 continue 

238 

239 fall = next_fall[0] 

240 pulse_width_samples = fall - rise 

241 pulse_width_seconds = pulse_width_samples / sample_rate 

242 

243 # Check violations 

244 violated = False 

245 violation_type = "" 

246 

247 if min_width is not None and pulse_width_seconds < min_width: 

248 violated = True 

249 violation_type = "too_short" 

250 

251 if max_width is not None and pulse_width_seconds > max_width: 

252 violated = True 

253 violation_type = "too_long" 

254 

255 if violated: 

256 # Extract context 

257 ctx_start = max(0, rise - context_samples) 

258 ctx_end = min(len(trace), fall + context_samples) 

259 context = trace[ctx_start:ctx_end].copy() 

260 

261 # Severity based on deviation 

262 if min_width is not None: 

263 deviation = abs(pulse_width_seconds - min_width) / min_width 

264 elif max_width is not None: 264 ↛ 267line 264 didn't jump to line 267 because the condition on line 264 was always true

265 deviation = abs(pulse_width_seconds - max_width) / max_width 

266 else: 

267 deviation = 0.0 

268 

269 severity = min(1.0, deviation) 

270 

271 violations.append( 

272 { 

273 "index": rise, 

274 "type": f"timing_{violation_type}", 

275 "severity": float(severity), 

276 "duration": pulse_width_samples, 

277 "amplitude": float(pulse_width_seconds), 

278 "context": context, 

279 "description": ( 

280 f"Timing violation at sample {rise}: " 

281 f"pulse width {pulse_width_seconds * 1e9:.1f} ns ({violation_type})" 

282 ), 

283 } 

284 ) 

285 

286 return violations