Coverage for src / tracekit / analyzers / statistics / advanced.py: 90%
308 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced statistical analysis methods.
3This module provides advanced outlier detection and time series analysis
4methods for signal analysis.
7Example:
8 >>> from tracekit.analyzers.statistics.advanced import (
9 ... isolation_forest_outliers, local_outlier_factor,
10 ... seasonal_decompose, detect_change_points,
11 ... phase_coherence, kernel_density
12 ... )
13 >>> outliers = isolation_forest_outliers(trace)
14 >>> decomp = seasonal_decompose(trace, period=100)
16References:
17 Liu et al. (2008): Isolation Forest
18 Breunig et al. (2000): Local Outlier Factor
19 Cleveland et al. (1990): STL Decomposition
20"""
22from __future__ import annotations
24from dataclasses import dataclass
25from typing import TYPE_CHECKING, Any, Literal
27import numpy as np
28from scipy import signal
29from scipy import stats as sp_stats
31from tracekit.core.types import WaveformTrace
33if TYPE_CHECKING:
34 from numpy.typing import NDArray
37@dataclass
38class IsolationForestResult:
39 """Result of Isolation Forest outlier detection.
41 Attributes:
42 indices: Array of outlier indices.
43 scores: Anomaly scores for all samples (-1 = outlier, 1 = normal).
44 decision_scores: Raw decision function scores.
45 mask: Boolean mask (True = outlier).
46 count: Number of outliers detected.
47 contamination: Contamination fraction used.
49 References:
50 STAT-011
51 """
53 indices: NDArray[np.intp]
54 scores: NDArray[np.int8]
55 decision_scores: NDArray[np.float64]
56 mask: NDArray[np.bool_]
57 count: int
58 contamination: float
61@dataclass
62class LOFResult:
63 """Result of Local Outlier Factor detection.
65 Attributes:
66 indices: Array of outlier indices.
67 scores: LOF scores for all samples (>1 = outlier).
68 mask: Boolean mask (True = outlier).
69 count: Number of outliers detected.
70 threshold: Threshold used for outlier classification.
71 n_neighbors: Number of neighbors used.
73 References:
74 STAT-012
75 """
77 indices: NDArray[np.intp]
78 scores: NDArray[np.float64]
79 mask: NDArray[np.bool_]
80 count: int
81 threshold: float
82 n_neighbors: int
85@dataclass
86class DecompositionResult:
87 """Result of seasonal decomposition.
89 Attributes:
90 trend: Trend component.
91 seasonal: Seasonal component.
92 residual: Residual (remainder) component.
93 period: Detected or specified period.
94 observed: Original signal.
96 References:
97 STAT-013
98 """
100 trend: NDArray[np.float64]
101 seasonal: NDArray[np.float64]
102 residual: NDArray[np.float64]
103 period: int
104 observed: NDArray[np.float64]
107@dataclass
108class ChangePointResult:
109 """Result of change point detection.
111 Attributes:
112 indices: Array of change point indices.
113 n_changes: Number of change points detected.
114 segments: List of (start, end) segment boundaries.
115 segment_means: Mean value for each segment.
116 segment_stds: Standard deviation for each segment.
117 cost: Total cost of the segmentation.
119 References:
120 STAT-014
121 """
123 indices: NDArray[np.intp]
124 n_changes: int
125 segments: list[tuple[int, int]]
126 segment_means: NDArray[np.float64]
127 segment_stds: NDArray[np.float64]
128 cost: float
131@dataclass
132class CoherenceResult:
133 """Result of phase coherence analysis.
135 Attributes:
136 coherence: Coherence spectrum (0 to 1).
137 frequencies: Frequency axis in Hz.
138 phase: Phase difference spectrum in radians.
139 mean_coherence: Average coherence across frequencies.
140 peak_frequency: Frequency of maximum coherence.
141 peak_coherence: Maximum coherence value.
143 References:
144 STAT-015
145 """
147 coherence: NDArray[np.float64]
148 frequencies: NDArray[np.float64]
149 phase: NDArray[np.float64]
150 mean_coherence: float
151 peak_frequency: float
152 peak_coherence: float
155@dataclass
156class KDEResult:
157 """Result of kernel density estimation.
159 Attributes:
160 x: Evaluation points.
161 density: Probability density at each point.
162 bandwidth: Bandwidth used for estimation.
163 peaks: Indices of density peaks (modes).
164 peak_values: X-values at density peaks.
166 References:
167 STAT-016
168 """
170 x: NDArray[np.float64]
171 density: NDArray[np.float64]
172 bandwidth: float
173 peaks: NDArray[np.intp]
174 peak_values: NDArray[np.float64]
177def isolation_forest_outliers(
178 trace: WaveformTrace | NDArray[np.floating[Any]],
179 *,
180 contamination: float = 0.05,
181 n_estimators: int = 100,
182 max_samples: int | str = "auto",
183 random_state: int | None = None,
184) -> IsolationForestResult:
185 """Detect outliers using Isolation Forest algorithm.
187 Isolation Forest isolates anomalies by randomly selecting features
188 and split values. Anomalies are isolated in fewer splits on average.
190 Args:
191 trace: Input trace or numpy array.
192 contamination: Expected proportion of outliers (0.0 to 0.5).
193 n_estimators: Number of isolation trees.
194 max_samples: Samples for each tree ("auto" = min(256, n_samples)).
195 random_state: Random seed for reproducibility.
197 Returns:
198 IsolationForestResult with outlier information.
200 Example:
201 >>> result = isolation_forest_outliers(trace, contamination=0.01)
202 >>> print(f"Found {result.count} outliers")
203 >>> clean_data = trace[~result.mask]
205 References:
206 Liu, Ting & Zhou (2008): Isolation Forest
207 STAT-011
208 """
209 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace)
210 n_samples = len(data)
212 if n_samples < 10:
213 return IsolationForestResult(
214 indices=np.array([], dtype=np.intp),
215 scores=np.ones(n_samples, dtype=np.int8),
216 decision_scores=np.zeros(n_samples, dtype=np.float64),
217 mask=np.zeros(n_samples, dtype=np.bool_),
218 count=0,
219 contamination=contamination,
220 )
222 # Set random state
223 rng = np.random.default_rng(random_state)
225 # Determine max_samples
226 max_samples_int: int
227 if max_samples == "auto": 227 ↛ 229line 227 didn't jump to line 229 because the condition on line 227 was always true
228 max_samples_int = min(256, n_samples)
229 elif isinstance(max_samples, float):
230 max_samples_int = int(max_samples * n_samples)
231 elif isinstance(max_samples, int):
232 max_samples_int = max_samples
233 else:
234 # Fallback for any other string value
235 max_samples_int = min(256, n_samples)
236 max_samples_int = min(max_samples_int, n_samples)
238 # Build isolation forest
239 decision_scores = np.zeros(n_samples, dtype=np.float64)
241 for _ in range(n_estimators):
242 # Bootstrap sample
243 sample_idx = rng.choice(n_samples, size=max_samples_int, replace=False)
244 sample_data = data[sample_idx]
246 # Compute path lengths for all points
247 path_lengths = _isolation_tree_path_lengths(data, sample_data, rng)
248 decision_scores += path_lengths
250 # Average and normalize
251 decision_scores /= n_estimators
253 # Compute anomaly scores: shorter paths = anomalies
254 # Normalize using average path length formula
255 avg_path = _average_path_length(max_samples_int)
256 decision_scores = 2 ** (-decision_scores / avg_path)
258 # Threshold based on contamination
259 threshold = np.percentile(decision_scores, 100 * (1 - contamination))
261 # Classify
262 mask = decision_scores >= threshold
263 indices = np.where(mask)[0]
264 scores = np.where(mask, -1, 1).astype(np.int8)
266 return IsolationForestResult(
267 indices=indices.astype(np.intp),
268 scores=scores,
269 decision_scores=decision_scores,
270 mask=mask,
271 count=int(np.sum(mask)),
272 contamination=contamination,
273 )
276def _isolation_tree_path_lengths(
277 data: NDArray[Any], sample: NDArray[Any], rng: np.random.Generator
278) -> NDArray[np.float64]:
279 """Compute isolation path lengths for data points."""
280 n = len(data)
281 path_lengths = np.zeros(n, dtype=np.float64)
283 # Simple recursive isolation tree simulation
284 # For each point, estimate how many splits to isolate it
285 for i, point in enumerate(data):
286 path_lengths[i] = _compute_path_length(point, sample, rng, 0)
288 return path_lengths
291def _compute_path_length(
292 point: float,
293 sample: NDArray[Any],
294 rng: np.random.Generator,
295 depth: int,
296 max_depth: int = 20,
297) -> float:
298 """Recursively compute path length to isolate a point."""
299 if len(sample) <= 1 or depth >= max_depth:
300 return depth + _average_path_length(len(sample))
302 # Random split point
303 min_val, max_val = np.min(sample), np.max(sample)
304 if max_val == min_val: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 return depth
307 split = rng.uniform(min_val, max_val)
309 if point < split:
310 left_sample = sample[sample < split]
311 return _compute_path_length(point, left_sample, rng, depth + 1, max_depth)
312 else:
313 right_sample = sample[sample >= split]
314 return _compute_path_length(point, right_sample, rng, depth + 1, max_depth)
317def _average_path_length(n: int) -> float:
318 """Compute average path length for n samples (H(n-1) formula)."""
319 if n <= 1:
320 return 0
321 if n == 2:
322 return 1
323 # Harmonic number approximation
324 return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n # type: ignore[no-any-return]
327def local_outlier_factor(
328 trace: WaveformTrace | NDArray[np.floating[Any]],
329 *,
330 n_neighbors: int = 20,
331 threshold: float = 1.5,
332 metric: Literal["euclidean", "manhattan"] = "euclidean",
333) -> LOFResult:
334 """Detect outliers using Local Outlier Factor.
336 LOF measures local density deviation of a point with respect to
337 its neighbors. Points with substantially lower density than
338 their neighbors are considered outliers.
340 Args:
341 trace: Input trace or numpy array.
342 n_neighbors: Number of neighbors to use for density estimation.
343 threshold: LOF threshold for outlier classification (>1 = outlier).
344 metric: Distance metric ("euclidean" or "manhattan").
346 Returns:
347 LOFResult with outlier information.
349 Example:
350 >>> result = local_outlier_factor(trace, n_neighbors=10)
351 >>> print(f"Found {result.count} outliers")
353 References:
354 Breunig, Kriegel, Ng & Sander (2000): LOF Algorithm
355 STAT-012
356 """
357 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace)
358 n_samples = len(data)
360 if n_samples < n_neighbors + 1:
361 return LOFResult(
362 indices=np.array([], dtype=np.intp),
363 scores=np.ones(n_samples, dtype=np.float64),
364 mask=np.zeros(n_samples, dtype=np.bool_),
365 count=0,
366 threshold=threshold,
367 n_neighbors=n_neighbors,
368 )
370 # For 1D data, use index-based neighbors
371 # Reshape for compatibility
372 X = data.reshape(-1, 1)
374 # Compute k-distances and neighbors
375 k_distances = np.zeros(n_samples, dtype=np.float64)
376 k_neighbors = np.zeros((n_samples, n_neighbors), dtype=np.intp)
378 for i in range(n_samples):
379 # Compute distances to all other points
380 if metric == "euclidean": 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true
381 distances = np.abs(X[:, 0] - X[i, 0])
382 else: # manhattan
383 distances = np.abs(X[:, 0] - X[i, 0])
385 # Get k nearest neighbors (excluding self)
386 distances[i] = np.inf
387 neighbor_idx = np.argsort(distances)[:n_neighbors]
388 k_neighbors[i] = neighbor_idx
389 k_distances[i] = distances[neighbor_idx[-1]]
391 # Compute Local Reachability Density (LRD)
392 lrd = np.zeros(n_samples, dtype=np.float64)
393 for i in range(n_samples):
394 reach_dists = np.maximum(
395 np.abs(X[k_neighbors[i], 0] - X[i, 0]),
396 k_distances[k_neighbors[i]],
397 )
398 mean_reach_dist = np.mean(reach_dists)
399 lrd[i] = 1.0 / mean_reach_dist if mean_reach_dist > 0 else np.inf
401 # Compute LOF scores
402 lof_scores = np.zeros(n_samples, dtype=np.float64)
403 for i in range(n_samples):
404 neighbor_lrd = lrd[k_neighbors[i]]
405 lof_scores[i] = np.mean(neighbor_lrd) / lrd[i] if lrd[i] > 0 else 1.0
407 # Handle infinities
408 lof_scores = np.nan_to_num(lof_scores, nan=1.0, posinf=threshold * 2)
410 # Classify outliers
411 mask = lof_scores > threshold
412 indices = np.where(mask)[0]
414 return LOFResult(
415 indices=indices.astype(np.intp),
416 scores=lof_scores,
417 mask=mask,
418 count=int(np.sum(mask)),
419 threshold=threshold,
420 n_neighbors=n_neighbors,
421 )
424def seasonal_decompose(
425 trace: WaveformTrace | NDArray[np.floating[Any]],
426 *,
427 period: int | None = None,
428 model: Literal["additive", "multiplicative"] = "additive",
429) -> DecompositionResult:
430 """Decompose time series into trend, seasonal, and residual components.
432 Uses classical decomposition (moving average for trend extraction).
434 Args:
435 trace: Input trace or numpy array.
436 period: Period of seasonality. If None, auto-detected.
437 model: Decomposition model:
438 - "additive": y = trend + seasonal + residual
439 - "multiplicative": y = trend * seasonal * residual
441 Returns:
442 DecompositionResult with trend, seasonal, and residual components.
444 Example:
445 >>> result = seasonal_decompose(trace, period=100)
446 >>> plt.plot(result.trend, label="Trend")
447 >>> plt.plot(result.seasonal, label="Seasonal")
449 References:
450 Cleveland et al. (1990): STL Decomposition
451 STAT-013
452 """
453 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace)
454 n = len(data)
456 # Auto-detect period if not provided
457 if period is None:
458 period = _detect_period(data)
459 if period is None or period < 2: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 period = min(n // 4, 10) # Default fallback
462 period = max(2, min(period, n // 2))
464 # Extract trend using centered moving average
465 if period % 2 == 0: 465 ↛ 470line 465 didn't jump to line 470 because the condition on line 465 was always true
466 # For even period, use 2-stage moving average
467 ma = np.convolve(data, np.ones(period) / period, mode="same")
468 trend = np.convolve(ma, np.ones(2) / 2, mode="same")
469 else:
470 trend = np.convolve(data, np.ones(period) / period, mode="same")
472 # Handle edges
473 half_period = period // 2
474 trend[:half_period] = trend[half_period]
475 trend[-half_period:] = trend[-half_period - 1]
477 # Detrend
478 if model == "multiplicative":
479 with np.errstate(divide="ignore", invalid="ignore"):
480 detrended = data / trend
481 detrended = np.nan_to_num(detrended, nan=1.0)
482 else:
483 detrended = data - trend
485 # Extract seasonal component (average for each phase)
486 seasonal = np.zeros_like(data)
487 for i in range(period):
488 indices = np.arange(i, n, period)
489 seasonal_mean = np.mean(detrended[indices])
490 seasonal[indices] = seasonal_mean
492 # Center seasonal component
493 if model == "multiplicative":
494 seasonal /= np.mean(seasonal)
495 else:
496 seasonal -= np.mean(seasonal)
498 # Compute residual
499 if model == "multiplicative":
500 with np.errstate(divide="ignore", invalid="ignore"):
501 residual = data / (trend * seasonal)
502 residual = np.nan_to_num(residual, nan=1.0)
503 else:
504 residual = data - trend - seasonal
506 return DecompositionResult(
507 trend=trend.astype(np.float64),
508 seasonal=seasonal.astype(np.float64),
509 residual=residual.astype(np.float64),
510 period=period,
511 observed=data.astype(np.float64),
512 )
515def _detect_period(data: NDArray[Any]) -> int | None:
516 """Auto-detect dominant period using autocorrelation."""
517 n = len(data)
518 if n < 20: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 return None
521 # Compute autocorrelation
522 data_centered = data - np.mean(data)
523 acf = np.correlate(data_centered, data_centered, mode="full")
524 acf = acf[n - 1 :] # Keep positive lags only
525 acf = acf / acf[0] # Normalize
527 # Find first significant peak after lag 0
528 # Skip first few lags to avoid noise
529 min_lag = max(2, n // 100)
530 max_lag = n // 2
532 # Find peaks in autocorrelation
533 peaks, _ = signal.find_peaks(acf[min_lag:max_lag], height=0.1, distance=min_lag)
535 if len(peaks) > 0: 535 ↛ 538line 535 didn't jump to line 538 because the condition on line 535 was always true
536 return peaks[0] + min_lag # type: ignore[no-any-return]
538 return None
541def detect_change_points(
542 trace: WaveformTrace | NDArray[np.floating[Any]],
543 *,
544 n_changes: int | None = None,
545 min_size: int = 10,
546 penalty: float | None = None,
547 method: Literal["pelt", "binseg"] = "pelt",
548) -> ChangePointResult:
549 """Detect change points in time series.
551 Identifies points where the statistical properties of the signal
552 change significantly.
554 Args:
555 trace: Input trace or numpy array.
556 n_changes: Number of change points to find. If None, auto-detected.
557 min_size: Minimum segment length between change points.
558 penalty: Penalty for adding change points (higher = fewer changes).
559 method: Detection method:
560 - "pelt": Pruned Exact Linear Time (fast, optimal)
561 - "binseg": Binary Segmentation (fast, approximate)
563 Returns:
564 ChangePointResult with change point locations and segment info.
566 Example:
567 >>> result = detect_change_points(trace, n_changes=3)
568 >>> for start, end in result.segments:
569 ... print(f"Segment: {start} to {end}")
571 References:
572 Killick et al. (2012): PELT Algorithm
573 STAT-014
574 """
575 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace)
576 n = len(data)
578 if n < min_size * 2: 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true
579 return ChangePointResult(
580 indices=np.array([], dtype=np.intp),
581 n_changes=0,
582 segments=[(0, n)],
583 segment_means=np.array([np.mean(data)]),
584 segment_stds=np.array([np.std(data)]),
585 cost=0.0,
586 )
588 # Set default penalty based on BIC
589 if penalty is None: 589 ↛ 592line 589 didn't jump to line 592 because the condition on line 589 was always true
590 penalty = np.log(n) * np.var(data)
592 if method == "pelt": 592 ↛ 595line 592 didn't jump to line 595 because the condition on line 592 was always true
593 change_points = _pelt_change_points(data, min_size, penalty, n_changes)
594 else:
595 change_points = _binseg_change_points(data, min_size, penalty, n_changes)
597 # Build segments
598 all_points = [0, *list(change_points), n]
599 segments = [(all_points[i], all_points[i + 1]) for i in range(len(all_points) - 1)]
601 # Compute segment statistics
602 segment_means = np.array([np.mean(data[s:e]) for s, e in segments])
603 segment_stds = np.array([np.std(data[s:e]) for s, e in segments])
605 # Compute total cost
606 total_cost = sum(_segment_cost(data[s:e]) for s, e in segments) + penalty * len(change_points)
608 return ChangePointResult(
609 indices=np.array(change_points, dtype=np.intp),
610 n_changes=len(change_points),
611 segments=segments,
612 segment_means=segment_means,
613 segment_stds=segment_stds,
614 cost=float(total_cost),
615 )
618def _segment_cost(segment: NDArray[Any]) -> float:
619 """Compute cost of a segment (negative log-likelihood for normal)."""
620 n = len(segment)
621 if n < 2: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 return 0.0
623 var = np.var(segment)
624 if var <= 0: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 return 0.0
626 return n * np.log(var) # type: ignore[no-any-return]
629def _pelt_change_points(
630 data: NDArray[Any],
631 min_size: int,
632 penalty: float,
633 n_changes: int | None,
634) -> list[int]:
635 """PELT algorithm for change point detection."""
636 len(data)
638 # Simple implementation: use binary segmentation as approximation
639 # Full PELT requires dynamic programming which is more complex
640 return _binseg_change_points(data, min_size, penalty, n_changes)
643def _binseg_change_points(
644 data: NDArray[Any],
645 min_size: int,
646 penalty: float,
647 n_changes: int | None,
648) -> list[int]:
649 """Binary segmentation for change point detection."""
650 n = len(data)
651 change_points: list[int] = []
653 def find_best_split(start: int, end: int) -> tuple[int, float]:
654 """Find best split point in segment."""
655 if end - start < 2 * min_size:
656 return -1, 0.0
658 best_idx = -1
659 best_gain = 0.0
661 for i in range(start + min_size, end - min_size + 1):
662 left = data[start:i]
663 right = data[i:end]
664 full = data[start:end]
666 cost_full = _segment_cost(full)
667 cost_split = _segment_cost(left) + _segment_cost(right)
668 gain = cost_full - cost_split - penalty
670 if gain > best_gain:
671 best_gain = gain
672 best_idx = i
674 return best_idx, best_gain
676 # Iteratively find change points
677 segments = [(0, n)]
678 max_iter = n_changes if n_changes is not None else n // min_size
680 for _ in range(max_iter):
681 best_segment_idx = -1
682 best_split_idx = -1
683 best_gain = 0.0
685 for seg_idx, (start, end) in enumerate(segments):
686 split_idx, gain = find_best_split(start, end)
687 if gain > best_gain:
688 best_gain = gain
689 best_split_idx = split_idx
690 best_segment_idx = seg_idx
692 if best_split_idx == -1:
693 break
695 # Add change point
696 change_points.append(best_split_idx)
698 # Update segments
699 start, end = segments[best_segment_idx]
700 segments[best_segment_idx] = (start, best_split_idx)
701 segments.insert(best_segment_idx + 1, (best_split_idx, end))
703 return sorted(change_points)
706def phase_coherence(
707 trace1: WaveformTrace | NDArray[np.floating[Any]],
708 trace2: WaveformTrace | NDArray[np.floating[Any]],
709 *,
710 sample_rate: float | None = None,
711 nperseg: int | None = None,
712) -> CoherenceResult:
713 """Compute phase coherence between two signals.
715 Coherence measures the linear correlation between two signals
716 as a function of frequency.
718 Args:
719 trace1: First input trace.
720 trace2: Second input trace.
721 sample_rate: Sample rate in Hz. Required if traces are arrays.
722 nperseg: Segment length for Welch method.
724 Returns:
725 CoherenceResult with coherence spectrum and phase.
727 Example:
728 >>> result = phase_coherence(signal1, signal2, sample_rate=1e6)
729 >>> print(f"Mean coherence: {result.mean_coherence:.3f}")
731 References:
732 STAT-015
733 """
734 data1 = trace1.data if isinstance(trace1, WaveformTrace) else np.asarray(trace1)
735 data2 = trace2.data if isinstance(trace2, WaveformTrace) else np.asarray(trace2)
737 # Get sample rate
738 if sample_rate is None:
739 sample_rate = trace1.metadata.sample_rate if isinstance(trace1, WaveformTrace) else 1.0
741 # Ensure same length
742 n = min(len(data1), len(data2))
743 data1 = data1[:n]
744 data2 = data2[:n]
746 if nperseg is None: 746 ↛ 748line 746 didn't jump to line 748 because the condition on line 746 was always true
747 nperseg = min(256, n // 4)
748 nperseg = max(16, min(nperseg, n))
750 # Compute coherence
751 frequencies, coherence = signal.coherence(data1, data2, fs=sample_rate, nperseg=nperseg)
753 # Compute cross-spectral phase
754 _, Pxy = signal.csd(data1, data2, fs=sample_rate, nperseg=nperseg)
755 phase = np.angle(Pxy)
757 # Statistics
758 mean_coherence = float(np.mean(coherence))
759 peak_idx = np.argmax(coherence)
760 peak_frequency = float(frequencies[peak_idx])
761 peak_coherence = float(coherence[peak_idx])
763 return CoherenceResult(
764 coherence=coherence.astype(np.float64),
765 frequencies=frequencies.astype(np.float64),
766 phase=phase.astype(np.float64),
767 mean_coherence=mean_coherence,
768 peak_frequency=peak_frequency,
769 peak_coherence=peak_coherence,
770 )
773def kernel_density(
774 trace: WaveformTrace | NDArray[np.floating[Any]],
775 *,
776 n_points: int = 1000,
777 bandwidth: float | str = "scott",
778 kernel: Literal["gaussian", "tophat", "epanechnikov"] = "gaussian",
779) -> KDEResult:
780 """Estimate probability density using kernel density estimation.
782 Args:
783 trace: Input trace or numpy array.
784 n_points: Number of evaluation points.
785 bandwidth: Bandwidth for kernel ("scott", "silverman", or float).
786 kernel: Kernel function to use.
788 Returns:
789 KDEResult with density estimate and mode information.
791 Raises:
792 ValueError: If kernel is not one of the supported types.
794 Example:
795 >>> result = kernel_density(trace)
796 >>> plt.plot(result.x, result.density)
797 >>> print(f"Modes at: {result.peak_values}")
799 References:
800 Scott (1992): Multivariate Density Estimation
801 STAT-016
802 """
803 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace)
804 n = len(data)
806 if n < 2: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 return KDEResult(
808 x=np.array([np.mean(data)]),
809 density=np.array([1.0]),
810 bandwidth=0.0,
811 peaks=np.array([0], dtype=np.intp),
812 peak_values=np.array([np.mean(data)]),
813 )
815 # Compute bandwidth
816 std = np.std(data)
817 iqr = np.percentile(data, 75) - np.percentile(data, 25)
819 if isinstance(bandwidth, str):
820 if bandwidth == "scott":
821 bw = 1.06 * std * n ** (-1 / 5)
822 elif bandwidth == "silverman": 822 ↛ 825line 822 didn't jump to line 825 because the condition on line 822 was always true
823 bw = 0.9 * min(std, iqr / 1.34) * n ** (-1 / 5)
824 else:
825 bw = 1.06 * std * n ** (-1 / 5)
826 else:
827 bw = bandwidth
829 bw = max(bw, 1e-10) # Prevent zero bandwidth
831 # Evaluation grid
832 margin = 3 * bw
833 x_min = np.min(data) - margin
834 x_max = np.max(data) + margin
835 x = np.linspace(x_min, x_max, n_points)
837 # Compute density
838 if kernel == "gaussian":
839 kde = sp_stats.gaussian_kde(data, bw_method=bw / std if std > 0 else 1.0)
840 density = kde(x)
841 elif kernel == "tophat":
842 density = np.zeros(n_points)
843 for xi in data:
844 mask = np.abs(x - xi) <= bw
845 density[mask] += 1.0
846 density /= n * 2 * bw
847 elif kernel == "epanechnikov": 847 ↛ 855line 847 didn't jump to line 855 because the condition on line 847 was always true
848 density = np.zeros(n_points)
849 for xi in data:
850 u = (x - xi) / bw
851 mask = np.abs(u) <= 1
852 density[mask] += 0.75 * (1 - u[mask] ** 2)
853 density /= n * bw
854 else:
855 raise ValueError(f"Unknown kernel: {kernel}")
857 # Find peaks (modes)
858 peaks_idx, _ = signal.find_peaks(density)
859 if len(peaks_idx) == 0: 859 ↛ 860line 859 didn't jump to line 860 because the condition on line 859 was never true
860 peaks_idx = np.array([np.argmax(density)])
861 peak_values = x[peaks_idx]
863 return KDEResult(
864 x=x.astype(np.float64),
865 density=density.astype(np.float64),
866 bandwidth=float(bw),
867 peaks=peaks_idx.astype(np.intp),
868 peak_values=peak_values.astype(np.float64),
869 )
872__all__ = [
873 "ChangePointResult",
874 "CoherenceResult",
875 "DecompositionResult",
876 "IsolationForestResult",
877 "KDEResult",
878 "LOFResult",
879 "detect_change_points",
880 "isolation_forest_outliers",
881 "kernel_density",
882 "local_outlier_factor",
883 "phase_coherence",
884 "seasonal_decompose",
885]