Coverage for src / tracekit / analyzers / statistics / outliers.py: 92%
136 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Outlier detection for signal analysis.
3This module provides multiple outlier detection methods suitable for
4different data distributions and contamination levels.
7Example:
8 >>> from tracekit.analyzers.statistics.outliers import (
9 ... zscore_outliers, modified_zscore_outliers, iqr_outliers
10 ... )
11 >>> outliers = zscore_outliers(trace, threshold=3.0)
12 >>> robust_outliers = modified_zscore_outliers(trace, threshold=3.5)
13 >>> iqr_result = iqr_outliers(trace, multiplier=1.5)
15References:
16 Iglewicz, B. & Hoaglin, D. (1993). How to Detect and Handle Outliers
17 NIST Engineering Statistics Handbook
18"""
20from __future__ import annotations
22from dataclasses import dataclass
23from typing import TYPE_CHECKING, Any
25import numpy as np
27from tracekit.core.types import WaveformTrace
29if TYPE_CHECKING:
30 from numpy.typing import NDArray
33@dataclass
34class OutlierResult:
35 """Result of outlier detection.
37 Attributes:
38 indices: Array of indices where outliers were detected.
39 values: Array of outlier values.
40 scores: Array of outlier scores (z-scores or similar).
41 mask: Boolean mask (True = outlier).
42 count: Number of outliers detected.
43 method: Detection method used.
44 threshold: Threshold used for detection.
45 """
47 indices: NDArray[np.intp]
48 values: NDArray[np.float64]
49 scores: NDArray[np.float64]
50 mask: NDArray[np.bool_]
51 count: int
52 method: str
53 threshold: float
56def zscore_outliers(
57 trace: WaveformTrace | NDArray[np.floating[Any]],
58 *,
59 threshold: float = 3.0,
60 return_scores: bool = False,
61) -> OutlierResult | tuple[OutlierResult, NDArray[np.float64]]:
62 """Detect outliers using Z-score method.
64 Identifies points where |z-score| exceeds the threshold.
65 Best for normally distributed data without heavy contamination.
67 Args:
68 trace: Input trace or numpy array.
69 threshold: Z-score threshold for outlier detection (default 3.0).
70 return_scores: If True, also return full z-score array.
72 Returns:
73 OutlierResult containing outlier information.
74 If return_scores=True, also returns full z-score array.
76 Example:
77 >>> result = zscore_outliers(trace, threshold=3.0)
78 >>> print(f"Found {result.count} outliers")
79 >>> print(f"Outlier indices: {result.indices}")
81 References:
82 NIST Engineering Statistics Handbook Section 1.3.5.17
83 """
84 data = trace.data if isinstance(trace, WaveformTrace) else trace
86 n = len(data)
87 if n < 3:
88 empty_result = OutlierResult(
89 indices=np.array([], dtype=np.intp),
90 values=np.array([], dtype=np.float64),
91 scores=np.array([], dtype=np.float64),
92 mask=np.zeros(n, dtype=np.bool_),
93 count=0,
94 method="zscore",
95 threshold=threshold,
96 )
97 if return_scores: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 return empty_result, np.zeros(n, dtype=np.float64)
99 return empty_result
101 # Compute z-scores
102 mean = np.mean(data)
103 std = np.std(data, ddof=1)
105 if std < 1e-12:
106 # No variation, no outliers
107 zscores = np.zeros(n, dtype=np.float64)
108 else:
109 zscores = (data - mean) / std
111 # Find outliers
112 mask = np.abs(zscores) > threshold
113 indices = np.where(mask)[0]
114 outlier_values = data[mask].astype(np.float64)
115 outlier_scores = zscores[mask]
117 result = OutlierResult(
118 indices=indices,
119 values=outlier_values,
120 scores=outlier_scores,
121 mask=mask,
122 count=int(np.sum(mask)),
123 method="zscore",
124 threshold=threshold,
125 )
127 if return_scores:
128 return result, zscores
129 return result
132def modified_zscore_outliers(
133 trace: WaveformTrace | NDArray[np.floating[Any]],
134 *,
135 threshold: float = 3.5,
136 return_scores: bool = False,
137) -> OutlierResult | tuple[OutlierResult, NDArray[np.float64]]:
138 """Detect outliers using Modified Z-score (MAD-based) method.
140 Uses Median Absolute Deviation (MAD) for robust outlier detection.
141 More resistant to contaminated data than standard z-score.
143 Args:
144 trace: Input trace or numpy array.
145 threshold: Modified z-score threshold (default 3.5, per Iglewicz & Hoaglin).
146 return_scores: If True, also return full modified z-score array.
148 Returns:
149 OutlierResult containing outlier information.
150 If return_scores=True, also returns full modified z-score array.
152 Example:
153 >>> result = modified_zscore_outliers(trace, threshold=3.5)
154 >>> print(f"Found {result.count} outliers")
155 >>> # Robust to up to ~50% contamination
157 References:
158 Iglewicz, B. & Hoaglin, D. (1993). How to Detect and Handle Outliers
159 """
160 data = trace.data if isinstance(trace, WaveformTrace) else trace
162 n = len(data)
163 if n < 3: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 empty_result = OutlierResult(
165 indices=np.array([], dtype=np.intp),
166 values=np.array([], dtype=np.float64),
167 scores=np.array([], dtype=np.float64),
168 mask=np.zeros(n, dtype=np.bool_),
169 count=0,
170 method="modified_zscore",
171 threshold=threshold,
172 )
173 if return_scores:
174 return empty_result, np.zeros(n, dtype=np.float64)
175 return empty_result
177 # Compute median and MAD
178 median = np.median(data)
179 mad = np.median(np.abs(data - median))
181 # Consistency constant for normal distribution
182 # MAD = 0.6745 * sigma for normal distribution
183 k = 0.6745
185 if mad < 1e-12:
186 # Very low spread - use fallback
187 # Check if there are any points far from median
188 deviations = np.abs(data - median)
189 max_dev = np.max(deviations)
190 if max_dev < 1e-12:
191 # All points identical, no outliers
192 modified_zscores = np.zeros(n, dtype=np.float64)
193 else:
194 # Scale deviations so that max_dev gets a high score
195 # This ensures outliers in nearly-constant data are detected
196 # Use a scale factor that makes max_dev map to a large z-score
197 scale = max_dev / (threshold * 2) # Conservative scaling
198 modified_zscores = deviations / scale
199 else:
200 modified_zscores = k * (data - median) / mad
202 # Find outliers
203 mask = np.abs(modified_zscores) > threshold
204 indices = np.where(mask)[0]
205 outlier_values = data[mask].astype(np.float64)
206 outlier_scores = modified_zscores[mask]
208 result = OutlierResult(
209 indices=indices,
210 values=outlier_values,
211 scores=outlier_scores,
212 mask=mask,
213 count=int(np.sum(mask)),
214 method="modified_zscore",
215 threshold=threshold,
216 )
218 if return_scores:
219 return result, modified_zscores.astype(np.float64)
220 return result
223def iqr_outliers(
224 trace: WaveformTrace | NDArray[np.floating[Any]],
225 *,
226 multiplier: float = 1.5,
227 return_fences: bool = False,
228) -> OutlierResult | tuple[OutlierResult, dict[str, float]]:
229 """Detect outliers using Interquartile Range (IQR) method.
231 Flags points outside the fences: [Q1 - k*IQR, Q3 + k*IQR].
232 Good for skewed distributions.
234 Args:
235 trace: Input trace or numpy array.
236 multiplier: IQR multiplier for fence calculation (default 1.5).
237 Use 3.0 for "extreme" outliers.
238 return_fences: If True, also return fence values.
240 Returns:
241 OutlierResult containing outlier information.
242 If return_fences=True, also returns dict with Q1, Q3, IQR, fences.
244 Example:
245 >>> result = iqr_outliers(trace, multiplier=1.5)
246 >>> print(f"Found {result.count} outliers")
248 >>> # Get fence values
249 >>> result, fences = iqr_outliers(trace, return_fences=True)
250 >>> print(f"Lower fence: {fences['lower_fence']}")
252 References:
253 Tukey, J. W. (1977). Exploratory Data Analysis
254 """
255 data = trace.data if isinstance(trace, WaveformTrace) else trace
257 n = len(data)
258 if n < 4:
259 empty_result = OutlierResult(
260 indices=np.array([], dtype=np.intp),
261 values=np.array([], dtype=np.float64),
262 scores=np.array([], dtype=np.float64),
263 mask=np.zeros(n, dtype=np.bool_),
264 count=0,
265 method="iqr",
266 threshold=multiplier,
267 )
268 if return_fences: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 return empty_result, {
270 "q1": np.nan,
271 "q3": np.nan,
272 "iqr": np.nan,
273 "lower_fence": np.nan,
274 "upper_fence": np.nan,
275 }
276 return empty_result
278 # Compute quartiles
279 q1 = float(np.percentile(data, 25))
280 q3 = float(np.percentile(data, 75))
281 iqr = q3 - q1
283 # Calculate fences
284 lower_fence = q1 - multiplier * iqr
285 upper_fence = q3 + multiplier * iqr
287 # Find outliers
288 mask = (data < lower_fence) | (data > upper_fence)
289 indices = np.where(mask)[0]
290 outlier_values = data[mask].astype(np.float64)
292 # Calculate "scores" as distance from nearest fence normalized by IQR
293 if iqr > 0:
294 scores = np.zeros(n, dtype=np.float64)
295 below = data < lower_fence
296 above = data > upper_fence
297 scores[below] = (lower_fence - data[below]) / iqr
298 scores[above] = (data[above] - upper_fence) / iqr
299 outlier_scores = scores[mask]
300 else:
301 outlier_scores = np.zeros(len(indices), dtype=np.float64)
303 result = OutlierResult(
304 indices=indices,
305 values=outlier_values,
306 scores=outlier_scores,
307 mask=mask,
308 count=int(np.sum(mask)),
309 method="iqr",
310 threshold=multiplier,
311 )
313 if return_fences:
314 fences = {
315 "q1": q1,
316 "q3": q3,
317 "iqr": iqr,
318 "lower_fence": lower_fence,
319 "upper_fence": upper_fence,
320 }
321 return result, fences
322 return result
325def detect_outliers(
326 trace: WaveformTrace | NDArray[np.floating[Any]], # type: ignore[name-defined]
327 *,
328 method: str = "modified_zscore",
329 **kwargs: Any, # type: ignore[name-defined]
330) -> OutlierResult:
331 """Detect outliers using specified method.
333 Convenience function that dispatches to the appropriate outlier
334 detection method based on the method parameter.
336 Args:
337 trace: Input trace or numpy array.
338 method: Detection method. One of:
339 - "zscore": Standard z-score method
340 - "modified_zscore": MAD-based robust method (default)
341 - "iqr": Interquartile range method
342 **kwargs: Additional arguments passed to the detection method.
344 Returns:
345 OutlierResult containing outlier information.
347 Raises:
348 ValueError: If method is not one of the supported types.
350 Example:
351 >>> result = detect_outliers(trace, method="iqr", multiplier=2.0)
352 >>> print(f"Method: {result.method}, Count: {result.count}")
353 """
354 methods = {
355 "zscore": zscore_outliers, # type: ignore[dict-item]
356 "modified_zscore": modified_zscore_outliers, # type: ignore[dict-item]
357 "iqr": iqr_outliers,
358 }
360 if method not in methods:
361 available = ", ".join(methods.keys())
362 raise ValueError(f"Unknown method: {method}. Available: {available}")
364 result = methods[method](trace, **kwargs)
366 # Handle tuple returns
367 if isinstance(result, tuple): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true
368 return result[0]
369 return result
372def remove_outliers( # type: ignore[no-untyped-def]
373 trace: WaveformTrace | NDArray[np.floating[Any]],
374 *,
375 method: str = "modified_zscore",
376 replacement: str = "nan",
377 **kwargs,
378) -> NDArray[np.float64]:
379 """Remove or replace outliers in data.
381 Args:
382 trace: Input trace or numpy array.
383 method: Detection method (see detect_outliers).
384 replacement: How to handle outliers:
385 - "nan": Replace with NaN
386 - "clip": Clip to nearest fence/threshold
387 - "interpolate": Linear interpolation from neighbors
388 **kwargs: Additional arguments for detection method.
390 Returns:
391 Array with outliers handled according to replacement method.
393 Raises:
394 ValueError: If replacement method is not one of the supported types.
396 Example:
397 >>> cleaned = remove_outliers(trace, method="iqr", replacement="nan")
398 >>> # Use for analysis that can handle NaN values
399 """
400 if isinstance(trace, WaveformTrace):
401 data = trace.data.copy()
402 else:
403 data = np.array(trace, dtype=np.float64)
405 result = detect_outliers(trace, method=method, **kwargs)
407 if result.count == 0:
408 return data
410 if replacement == "nan":
411 data[result.mask] = np.nan
413 elif replacement == "clip":
414 if method == "iqr":
415 # Get fence values
416 _, fences = iqr_outliers(trace, return_fences=True, **kwargs) # type: ignore[misc]
417 data = np.clip(data, fences["lower_fence"], fences["upper_fence"])
418 else:
419 # For z-score methods, clip to mean +/- threshold * std
420 mean = np.mean(data[~result.mask]) if np.any(~result.mask) else np.mean(data)
421 std = (
422 np.std(data[~result.mask], ddof=1) if np.any(~result.mask) else np.std(data, ddof=1)
423 )
424 threshold = result.threshold
425 data = np.clip(data, mean - threshold * std, mean + threshold * std)
427 elif replacement == "interpolate":
428 # Linear interpolation from non-outlier neighbors
429 outlier_indices = result.indices
430 valid_indices = np.where(~result.mask)[0]
432 if len(valid_indices) > 0: 432 ↛ 453line 432 didn't jump to line 453 because the condition on line 432 was always true
433 for idx in outlier_indices:
434 # Find nearest valid neighbors
435 left_valid = valid_indices[valid_indices < idx]
436 right_valid = valid_indices[valid_indices > idx]
438 if len(left_valid) > 0 and len(right_valid) > 0:
439 # Interpolate between neighbors
440 left_idx = left_valid[-1]
441 right_idx = right_valid[0]
442 weight = (idx - left_idx) / (right_idx - left_idx)
443 data[idx] = data[left_idx] + weight * (data[right_idx] - data[left_idx])
444 elif len(left_valid) > 0:
445 data[idx] = data[left_valid[-1]]
446 elif len(right_valid) > 0: 446 ↛ 433line 446 didn't jump to line 433 because the condition on line 446 was always true
447 data[idx] = data[right_valid[0]]
448 # else: leave unchanged
450 else:
451 raise ValueError(f"Unknown replacement method: {replacement}")
453 return data
456__all__ = [
457 "OutlierResult",
458 "detect_outliers",
459 "iqr_outliers",
460 "modified_zscore_outliers",
461 "remove_outliers",
462 "zscore_outliers",
463]