Coverage for src / tracekit / analyzers / statistics / outliers.py: 92%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Outlier detection for signal analysis. 

2 

3This module provides multiple outlier detection methods suitable for 

4different data distributions and contamination levels. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.statistics.outliers import ( 

9 ... zscore_outliers, modified_zscore_outliers, iqr_outliers 

10 ... ) 

11 >>> outliers = zscore_outliers(trace, threshold=3.0) 

12 >>> robust_outliers = modified_zscore_outliers(trace, threshold=3.5) 

13 >>> iqr_result = iqr_outliers(trace, multiplier=1.5) 

14 

15References: 

16 Iglewicz, B. & Hoaglin, D. (1993). How to Detect and Handle Outliers 

17 NIST Engineering Statistics Handbook 

18""" 

19 

20from __future__ import annotations 

21 

22from dataclasses import dataclass 

23from typing import TYPE_CHECKING, Any 

24 

25import numpy as np 

26 

27from tracekit.core.types import WaveformTrace 

28 

29if TYPE_CHECKING: 

30 from numpy.typing import NDArray 

31 

32 

33@dataclass 

34class OutlierResult: 

35 """Result of outlier detection. 

36 

37 Attributes: 

38 indices: Array of indices where outliers were detected. 

39 values: Array of outlier values. 

40 scores: Array of outlier scores (z-scores or similar). 

41 mask: Boolean mask (True = outlier). 

42 count: Number of outliers detected. 

43 method: Detection method used. 

44 threshold: Threshold used for detection. 

45 """ 

46 

47 indices: NDArray[np.intp] 

48 values: NDArray[np.float64] 

49 scores: NDArray[np.float64] 

50 mask: NDArray[np.bool_] 

51 count: int 

52 method: str 

53 threshold: float 

54 

55 

56def zscore_outliers( 

57 trace: WaveformTrace | NDArray[np.floating[Any]], 

58 *, 

59 threshold: float = 3.0, 

60 return_scores: bool = False, 

61) -> OutlierResult | tuple[OutlierResult, NDArray[np.float64]]: 

62 """Detect outliers using Z-score method. 

63 

64 Identifies points where |z-score| exceeds the threshold. 

65 Best for normally distributed data without heavy contamination. 

66 

67 Args: 

68 trace: Input trace or numpy array. 

69 threshold: Z-score threshold for outlier detection (default 3.0). 

70 return_scores: If True, also return full z-score array. 

71 

72 Returns: 

73 OutlierResult containing outlier information. 

74 If return_scores=True, also returns full z-score array. 

75 

76 Example: 

77 >>> result = zscore_outliers(trace, threshold=3.0) 

78 >>> print(f"Found {result.count} outliers") 

79 >>> print(f"Outlier indices: {result.indices}") 

80 

81 References: 

82 NIST Engineering Statistics Handbook Section 1.3.5.17 

83 """ 

84 data = trace.data if isinstance(trace, WaveformTrace) else trace 

85 

86 n = len(data) 

87 if n < 3: 

88 empty_result = OutlierResult( 

89 indices=np.array([], dtype=np.intp), 

90 values=np.array([], dtype=np.float64), 

91 scores=np.array([], dtype=np.float64), 

92 mask=np.zeros(n, dtype=np.bool_), 

93 count=0, 

94 method="zscore", 

95 threshold=threshold, 

96 ) 

97 if return_scores: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 return empty_result, np.zeros(n, dtype=np.float64) 

99 return empty_result 

100 

101 # Compute z-scores 

102 mean = np.mean(data) 

103 std = np.std(data, ddof=1) 

104 

105 if std < 1e-12: 

106 # No variation, no outliers 

107 zscores = np.zeros(n, dtype=np.float64) 

108 else: 

109 zscores = (data - mean) / std 

110 

111 # Find outliers 

112 mask = np.abs(zscores) > threshold 

113 indices = np.where(mask)[0] 

114 outlier_values = data[mask].astype(np.float64) 

115 outlier_scores = zscores[mask] 

116 

117 result = OutlierResult( 

118 indices=indices, 

119 values=outlier_values, 

120 scores=outlier_scores, 

121 mask=mask, 

122 count=int(np.sum(mask)), 

123 method="zscore", 

124 threshold=threshold, 

125 ) 

126 

127 if return_scores: 

128 return result, zscores 

129 return result 

130 

131 

132def modified_zscore_outliers( 

133 trace: WaveformTrace | NDArray[np.floating[Any]], 

134 *, 

135 threshold: float = 3.5, 

136 return_scores: bool = False, 

137) -> OutlierResult | tuple[OutlierResult, NDArray[np.float64]]: 

138 """Detect outliers using Modified Z-score (MAD-based) method. 

139 

140 Uses Median Absolute Deviation (MAD) for robust outlier detection. 

141 More resistant to contaminated data than standard z-score. 

142 

143 Args: 

144 trace: Input trace or numpy array. 

145 threshold: Modified z-score threshold (default 3.5, per Iglewicz & Hoaglin). 

146 return_scores: If True, also return full modified z-score array. 

147 

148 Returns: 

149 OutlierResult containing outlier information. 

150 If return_scores=True, also returns full modified z-score array. 

151 

152 Example: 

153 >>> result = modified_zscore_outliers(trace, threshold=3.5) 

154 >>> print(f"Found {result.count} outliers") 

155 >>> # Robust to up to ~50% contamination 

156 

157 References: 

158 Iglewicz, B. & Hoaglin, D. (1993). How to Detect and Handle Outliers 

159 """ 

160 data = trace.data if isinstance(trace, WaveformTrace) else trace 

161 

162 n = len(data) 

163 if n < 3: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 empty_result = OutlierResult( 

165 indices=np.array([], dtype=np.intp), 

166 values=np.array([], dtype=np.float64), 

167 scores=np.array([], dtype=np.float64), 

168 mask=np.zeros(n, dtype=np.bool_), 

169 count=0, 

170 method="modified_zscore", 

171 threshold=threshold, 

172 ) 

173 if return_scores: 

174 return empty_result, np.zeros(n, dtype=np.float64) 

175 return empty_result 

176 

177 # Compute median and MAD 

178 median = np.median(data) 

179 mad = np.median(np.abs(data - median)) 

180 

181 # Consistency constant for normal distribution 

182 # MAD = 0.6745 * sigma for normal distribution 

183 k = 0.6745 

184 

185 if mad < 1e-12: 

186 # Very low spread - use fallback 

187 # Check if there are any points far from median 

188 deviations = np.abs(data - median) 

189 max_dev = np.max(deviations) 

190 if max_dev < 1e-12: 

191 # All points identical, no outliers 

192 modified_zscores = np.zeros(n, dtype=np.float64) 

193 else: 

194 # Scale deviations so that max_dev gets a high score 

195 # This ensures outliers in nearly-constant data are detected 

196 # Use a scale factor that makes max_dev map to a large z-score 

197 scale = max_dev / (threshold * 2) # Conservative scaling 

198 modified_zscores = deviations / scale 

199 else: 

200 modified_zscores = k * (data - median) / mad 

201 

202 # Find outliers 

203 mask = np.abs(modified_zscores) > threshold 

204 indices = np.where(mask)[0] 

205 outlier_values = data[mask].astype(np.float64) 

206 outlier_scores = modified_zscores[mask] 

207 

208 result = OutlierResult( 

209 indices=indices, 

210 values=outlier_values, 

211 scores=outlier_scores, 

212 mask=mask, 

213 count=int(np.sum(mask)), 

214 method="modified_zscore", 

215 threshold=threshold, 

216 ) 

217 

218 if return_scores: 

219 return result, modified_zscores.astype(np.float64) 

220 return result 

221 

222 

223def iqr_outliers( 

224 trace: WaveformTrace | NDArray[np.floating[Any]], 

225 *, 

226 multiplier: float = 1.5, 

227 return_fences: bool = False, 

228) -> OutlierResult | tuple[OutlierResult, dict[str, float]]: 

229 """Detect outliers using Interquartile Range (IQR) method. 

230 

231 Flags points outside the fences: [Q1 - k*IQR, Q3 + k*IQR]. 

232 Good for skewed distributions. 

233 

234 Args: 

235 trace: Input trace or numpy array. 

236 multiplier: IQR multiplier for fence calculation (default 1.5). 

237 Use 3.0 for "extreme" outliers. 

238 return_fences: If True, also return fence values. 

239 

240 Returns: 

241 OutlierResult containing outlier information. 

242 If return_fences=True, also returns dict with Q1, Q3, IQR, fences. 

243 

244 Example: 

245 >>> result = iqr_outliers(trace, multiplier=1.5) 

246 >>> print(f"Found {result.count} outliers") 

247 

248 >>> # Get fence values 

249 >>> result, fences = iqr_outliers(trace, return_fences=True) 

250 >>> print(f"Lower fence: {fences['lower_fence']}") 

251 

252 References: 

253 Tukey, J. W. (1977). Exploratory Data Analysis 

254 """ 

255 data = trace.data if isinstance(trace, WaveformTrace) else trace 

256 

257 n = len(data) 

258 if n < 4: 

259 empty_result = OutlierResult( 

260 indices=np.array([], dtype=np.intp), 

261 values=np.array([], dtype=np.float64), 

262 scores=np.array([], dtype=np.float64), 

263 mask=np.zeros(n, dtype=np.bool_), 

264 count=0, 

265 method="iqr", 

266 threshold=multiplier, 

267 ) 

268 if return_fences: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 return empty_result, { 

270 "q1": np.nan, 

271 "q3": np.nan, 

272 "iqr": np.nan, 

273 "lower_fence": np.nan, 

274 "upper_fence": np.nan, 

275 } 

276 return empty_result 

277 

278 # Compute quartiles 

279 q1 = float(np.percentile(data, 25)) 

280 q3 = float(np.percentile(data, 75)) 

281 iqr = q3 - q1 

282 

283 # Calculate fences 

284 lower_fence = q1 - multiplier * iqr 

285 upper_fence = q3 + multiplier * iqr 

286 

287 # Find outliers 

288 mask = (data < lower_fence) | (data > upper_fence) 

289 indices = np.where(mask)[0] 

290 outlier_values = data[mask].astype(np.float64) 

291 

292 # Calculate "scores" as distance from nearest fence normalized by IQR 

293 if iqr > 0: 

294 scores = np.zeros(n, dtype=np.float64) 

295 below = data < lower_fence 

296 above = data > upper_fence 

297 scores[below] = (lower_fence - data[below]) / iqr 

298 scores[above] = (data[above] - upper_fence) / iqr 

299 outlier_scores = scores[mask] 

300 else: 

301 outlier_scores = np.zeros(len(indices), dtype=np.float64) 

302 

303 result = OutlierResult( 

304 indices=indices, 

305 values=outlier_values, 

306 scores=outlier_scores, 

307 mask=mask, 

308 count=int(np.sum(mask)), 

309 method="iqr", 

310 threshold=multiplier, 

311 ) 

312 

313 if return_fences: 

314 fences = { 

315 "q1": q1, 

316 "q3": q3, 

317 "iqr": iqr, 

318 "lower_fence": lower_fence, 

319 "upper_fence": upper_fence, 

320 } 

321 return result, fences 

322 return result 

323 

324 

325def detect_outliers( 

326 trace: WaveformTrace | NDArray[np.floating[Any]], # type: ignore[name-defined] 

327 *, 

328 method: str = "modified_zscore", 

329 **kwargs: Any, # type: ignore[name-defined] 

330) -> OutlierResult: 

331 """Detect outliers using specified method. 

332 

333 Convenience function that dispatches to the appropriate outlier 

334 detection method based on the method parameter. 

335 

336 Args: 

337 trace: Input trace or numpy array. 

338 method: Detection method. One of: 

339 - "zscore": Standard z-score method 

340 - "modified_zscore": MAD-based robust method (default) 

341 - "iqr": Interquartile range method 

342 **kwargs: Additional arguments passed to the detection method. 

343 

344 Returns: 

345 OutlierResult containing outlier information. 

346 

347 Raises: 

348 ValueError: If method is not one of the supported types. 

349 

350 Example: 

351 >>> result = detect_outliers(trace, method="iqr", multiplier=2.0) 

352 >>> print(f"Method: {result.method}, Count: {result.count}") 

353 """ 

354 methods = { 

355 "zscore": zscore_outliers, # type: ignore[dict-item] 

356 "modified_zscore": modified_zscore_outliers, # type: ignore[dict-item] 

357 "iqr": iqr_outliers, 

358 } 

359 

360 if method not in methods: 

361 available = ", ".join(methods.keys()) 

362 raise ValueError(f"Unknown method: {method}. Available: {available}") 

363 

364 result = methods[method](trace, **kwargs) 

365 

366 # Handle tuple returns 

367 if isinstance(result, tuple): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 return result[0] 

369 return result 

370 

371 

372def remove_outliers( # type: ignore[no-untyped-def] 

373 trace: WaveformTrace | NDArray[np.floating[Any]], 

374 *, 

375 method: str = "modified_zscore", 

376 replacement: str = "nan", 

377 **kwargs, 

378) -> NDArray[np.float64]: 

379 """Remove or replace outliers in data. 

380 

381 Args: 

382 trace: Input trace or numpy array. 

383 method: Detection method (see detect_outliers). 

384 replacement: How to handle outliers: 

385 - "nan": Replace with NaN 

386 - "clip": Clip to nearest fence/threshold 

387 - "interpolate": Linear interpolation from neighbors 

388 **kwargs: Additional arguments for detection method. 

389 

390 Returns: 

391 Array with outliers handled according to replacement method. 

392 

393 Raises: 

394 ValueError: If replacement method is not one of the supported types. 

395 

396 Example: 

397 >>> cleaned = remove_outliers(trace, method="iqr", replacement="nan") 

398 >>> # Use for analysis that can handle NaN values 

399 """ 

400 if isinstance(trace, WaveformTrace): 

401 data = trace.data.copy() 

402 else: 

403 data = np.array(trace, dtype=np.float64) 

404 

405 result = detect_outliers(trace, method=method, **kwargs) 

406 

407 if result.count == 0: 

408 return data 

409 

410 if replacement == "nan": 

411 data[result.mask] = np.nan 

412 

413 elif replacement == "clip": 

414 if method == "iqr": 

415 # Get fence values 

416 _, fences = iqr_outliers(trace, return_fences=True, **kwargs) # type: ignore[misc] 

417 data = np.clip(data, fences["lower_fence"], fences["upper_fence"]) 

418 else: 

419 # For z-score methods, clip to mean +/- threshold * std 

420 mean = np.mean(data[~result.mask]) if np.any(~result.mask) else np.mean(data) 

421 std = ( 

422 np.std(data[~result.mask], ddof=1) if np.any(~result.mask) else np.std(data, ddof=1) 

423 ) 

424 threshold = result.threshold 

425 data = np.clip(data, mean - threshold * std, mean + threshold * std) 

426 

427 elif replacement == "interpolate": 

428 # Linear interpolation from non-outlier neighbors 

429 outlier_indices = result.indices 

430 valid_indices = np.where(~result.mask)[0] 

431 

432 if len(valid_indices) > 0: 432 ↛ 453line 432 didn't jump to line 453 because the condition on line 432 was always true

433 for idx in outlier_indices: 

434 # Find nearest valid neighbors 

435 left_valid = valid_indices[valid_indices < idx] 

436 right_valid = valid_indices[valid_indices > idx] 

437 

438 if len(left_valid) > 0 and len(right_valid) > 0: 

439 # Interpolate between neighbors 

440 left_idx = left_valid[-1] 

441 right_idx = right_valid[0] 

442 weight = (idx - left_idx) / (right_idx - left_idx) 

443 data[idx] = data[left_idx] + weight * (data[right_idx] - data[left_idx]) 

444 elif len(left_valid) > 0: 

445 data[idx] = data[left_valid[-1]] 

446 elif len(right_valid) > 0: 446 ↛ 433line 446 didn't jump to line 433 because the condition on line 446 was always true

447 data[idx] = data[right_valid[0]] 

448 # else: leave unchanged 

449 

450 else: 

451 raise ValueError(f"Unknown replacement method: {replacement}") 

452 

453 return data 

454 

455 

456__all__ = [ 

457 "OutlierResult", 

458 "detect_outliers", 

459 "iqr_outliers", 

460 "modified_zscore_outliers", 

461 "remove_outliers", 

462 "zscore_outliers", 

463]