Coverage for src / tracekit / analyzers / statistics / advanced.py: 90%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced statistical analysis methods. 

2 

3This module provides advanced outlier detection and time series analysis 

4methods for signal analysis. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.statistics.advanced import ( 

9 ... isolation_forest_outliers, local_outlier_factor, 

10 ... seasonal_decompose, detect_change_points, 

11 ... phase_coherence, kernel_density 

12 ... ) 

13 >>> outliers = isolation_forest_outliers(trace) 

14 >>> decomp = seasonal_decompose(trace, period=100) 

15 

16References: 

17 Liu et al. (2008): Isolation Forest 

18 Breunig et al. (2000): Local Outlier Factor 

19 Cleveland et al. (1990): STL Decomposition 

20""" 

21 

22from __future__ import annotations 

23 

24from dataclasses import dataclass 

25from typing import TYPE_CHECKING, Any, Literal 

26 

27import numpy as np 

28from scipy import signal 

29from scipy import stats as sp_stats 

30 

31from tracekit.core.types import WaveformTrace 

32 

33if TYPE_CHECKING: 

34 from numpy.typing import NDArray 

35 

36 

37@dataclass 

38class IsolationForestResult: 

39 """Result of Isolation Forest outlier detection. 

40 

41 Attributes: 

42 indices: Array of outlier indices. 

43 scores: Anomaly scores for all samples (-1 = outlier, 1 = normal). 

44 decision_scores: Raw decision function scores. 

45 mask: Boolean mask (True = outlier). 

46 count: Number of outliers detected. 

47 contamination: Contamination fraction used. 

48 

49 References: 

50 STAT-011 

51 """ 

52 

53 indices: NDArray[np.intp] 

54 scores: NDArray[np.int8] 

55 decision_scores: NDArray[np.float64] 

56 mask: NDArray[np.bool_] 

57 count: int 

58 contamination: float 

59 

60 

61@dataclass 

62class LOFResult: 

63 """Result of Local Outlier Factor detection. 

64 

65 Attributes: 

66 indices: Array of outlier indices. 

67 scores: LOF scores for all samples (>1 = outlier). 

68 mask: Boolean mask (True = outlier). 

69 count: Number of outliers detected. 

70 threshold: Threshold used for outlier classification. 

71 n_neighbors: Number of neighbors used. 

72 

73 References: 

74 STAT-012 

75 """ 

76 

77 indices: NDArray[np.intp] 

78 scores: NDArray[np.float64] 

79 mask: NDArray[np.bool_] 

80 count: int 

81 threshold: float 

82 n_neighbors: int 

83 

84 

85@dataclass 

86class DecompositionResult: 

87 """Result of seasonal decomposition. 

88 

89 Attributes: 

90 trend: Trend component. 

91 seasonal: Seasonal component. 

92 residual: Residual (remainder) component. 

93 period: Detected or specified period. 

94 observed: Original signal. 

95 

96 References: 

97 STAT-013 

98 """ 

99 

100 trend: NDArray[np.float64] 

101 seasonal: NDArray[np.float64] 

102 residual: NDArray[np.float64] 

103 period: int 

104 observed: NDArray[np.float64] 

105 

106 

107@dataclass 

108class ChangePointResult: 

109 """Result of change point detection. 

110 

111 Attributes: 

112 indices: Array of change point indices. 

113 n_changes: Number of change points detected. 

114 segments: List of (start, end) segment boundaries. 

115 segment_means: Mean value for each segment. 

116 segment_stds: Standard deviation for each segment. 

117 cost: Total cost of the segmentation. 

118 

119 References: 

120 STAT-014 

121 """ 

122 

123 indices: NDArray[np.intp] 

124 n_changes: int 

125 segments: list[tuple[int, int]] 

126 segment_means: NDArray[np.float64] 

127 segment_stds: NDArray[np.float64] 

128 cost: float 

129 

130 

131@dataclass 

132class CoherenceResult: 

133 """Result of phase coherence analysis. 

134 

135 Attributes: 

136 coherence: Coherence spectrum (0 to 1). 

137 frequencies: Frequency axis in Hz. 

138 phase: Phase difference spectrum in radians. 

139 mean_coherence: Average coherence across frequencies. 

140 peak_frequency: Frequency of maximum coherence. 

141 peak_coherence: Maximum coherence value. 

142 

143 References: 

144 STAT-015 

145 """ 

146 

147 coherence: NDArray[np.float64] 

148 frequencies: NDArray[np.float64] 

149 phase: NDArray[np.float64] 

150 mean_coherence: float 

151 peak_frequency: float 

152 peak_coherence: float 

153 

154 

155@dataclass 

156class KDEResult: 

157 """Result of kernel density estimation. 

158 

159 Attributes: 

160 x: Evaluation points. 

161 density: Probability density at each point. 

162 bandwidth: Bandwidth used for estimation. 

163 peaks: Indices of density peaks (modes). 

164 peak_values: X-values at density peaks. 

165 

166 References: 

167 STAT-016 

168 """ 

169 

170 x: NDArray[np.float64] 

171 density: NDArray[np.float64] 

172 bandwidth: float 

173 peaks: NDArray[np.intp] 

174 peak_values: NDArray[np.float64] 

175 

176 

177def isolation_forest_outliers( 

178 trace: WaveformTrace | NDArray[np.floating[Any]], 

179 *, 

180 contamination: float = 0.05, 

181 n_estimators: int = 100, 

182 max_samples: int | str = "auto", 

183 random_state: int | None = None, 

184) -> IsolationForestResult: 

185 """Detect outliers using Isolation Forest algorithm. 

186 

187 Isolation Forest isolates anomalies by randomly selecting features 

188 and split values. Anomalies are isolated in fewer splits on average. 

189 

190 Args: 

191 trace: Input trace or numpy array. 

192 contamination: Expected proportion of outliers (0.0 to 0.5). 

193 n_estimators: Number of isolation trees. 

194 max_samples: Samples for each tree ("auto" = min(256, n_samples)). 

195 random_state: Random seed for reproducibility. 

196 

197 Returns: 

198 IsolationForestResult with outlier information. 

199 

200 Example: 

201 >>> result = isolation_forest_outliers(trace, contamination=0.01) 

202 >>> print(f"Found {result.count} outliers") 

203 >>> clean_data = trace[~result.mask] 

204 

205 References: 

206 Liu, Ting & Zhou (2008): Isolation Forest 

207 STAT-011 

208 """ 

209 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace) 

210 n_samples = len(data) 

211 

212 if n_samples < 10: 

213 return IsolationForestResult( 

214 indices=np.array([], dtype=np.intp), 

215 scores=np.ones(n_samples, dtype=np.int8), 

216 decision_scores=np.zeros(n_samples, dtype=np.float64), 

217 mask=np.zeros(n_samples, dtype=np.bool_), 

218 count=0, 

219 contamination=contamination, 

220 ) 

221 

222 # Set random state 

223 rng = np.random.default_rng(random_state) 

224 

225 # Determine max_samples 

226 max_samples_int: int 

227 if max_samples == "auto": 227 ↛ 229line 227 didn't jump to line 229 because the condition on line 227 was always true

228 max_samples_int = min(256, n_samples) 

229 elif isinstance(max_samples, float): 

230 max_samples_int = int(max_samples * n_samples) 

231 elif isinstance(max_samples, int): 

232 max_samples_int = max_samples 

233 else: 

234 # Fallback for any other string value 

235 max_samples_int = min(256, n_samples) 

236 max_samples_int = min(max_samples_int, n_samples) 

237 

238 # Build isolation forest 

239 decision_scores = np.zeros(n_samples, dtype=np.float64) 

240 

241 for _ in range(n_estimators): 

242 # Bootstrap sample 

243 sample_idx = rng.choice(n_samples, size=max_samples_int, replace=False) 

244 sample_data = data[sample_idx] 

245 

246 # Compute path lengths for all points 

247 path_lengths = _isolation_tree_path_lengths(data, sample_data, rng) 

248 decision_scores += path_lengths 

249 

250 # Average and normalize 

251 decision_scores /= n_estimators 

252 

253 # Compute anomaly scores: shorter paths = anomalies 

254 # Normalize using average path length formula 

255 avg_path = _average_path_length(max_samples_int) 

256 decision_scores = 2 ** (-decision_scores / avg_path) 

257 

258 # Threshold based on contamination 

259 threshold = np.percentile(decision_scores, 100 * (1 - contamination)) 

260 

261 # Classify 

262 mask = decision_scores >= threshold 

263 indices = np.where(mask)[0] 

264 scores = np.where(mask, -1, 1).astype(np.int8) 

265 

266 return IsolationForestResult( 

267 indices=indices.astype(np.intp), 

268 scores=scores, 

269 decision_scores=decision_scores, 

270 mask=mask, 

271 count=int(np.sum(mask)), 

272 contamination=contamination, 

273 ) 

274 

275 

276def _isolation_tree_path_lengths( 

277 data: NDArray[Any], sample: NDArray[Any], rng: np.random.Generator 

278) -> NDArray[np.float64]: 

279 """Compute isolation path lengths for data points.""" 

280 n = len(data) 

281 path_lengths = np.zeros(n, dtype=np.float64) 

282 

283 # Simple recursive isolation tree simulation 

284 # For each point, estimate how many splits to isolate it 

285 for i, point in enumerate(data): 

286 path_lengths[i] = _compute_path_length(point, sample, rng, 0) 

287 

288 return path_lengths 

289 

290 

291def _compute_path_length( 

292 point: float, 

293 sample: NDArray[Any], 

294 rng: np.random.Generator, 

295 depth: int, 

296 max_depth: int = 20, 

297) -> float: 

298 """Recursively compute path length to isolate a point.""" 

299 if len(sample) <= 1 or depth >= max_depth: 

300 return depth + _average_path_length(len(sample)) 

301 

302 # Random split point 

303 min_val, max_val = np.min(sample), np.max(sample) 

304 if max_val == min_val: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 return depth 

306 

307 split = rng.uniform(min_val, max_val) 

308 

309 if point < split: 

310 left_sample = sample[sample < split] 

311 return _compute_path_length(point, left_sample, rng, depth + 1, max_depth) 

312 else: 

313 right_sample = sample[sample >= split] 

314 return _compute_path_length(point, right_sample, rng, depth + 1, max_depth) 

315 

316 

317def _average_path_length(n: int) -> float: 

318 """Compute average path length for n samples (H(n-1) formula).""" 

319 if n <= 1: 

320 return 0 

321 if n == 2: 

322 return 1 

323 # Harmonic number approximation 

324 return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n # type: ignore[no-any-return] 

325 

326 

327def local_outlier_factor( 

328 trace: WaveformTrace | NDArray[np.floating[Any]], 

329 *, 

330 n_neighbors: int = 20, 

331 threshold: float = 1.5, 

332 metric: Literal["euclidean", "manhattan"] = "euclidean", 

333) -> LOFResult: 

334 """Detect outliers using Local Outlier Factor. 

335 

336 LOF measures local density deviation of a point with respect to 

337 its neighbors. Points with substantially lower density than 

338 their neighbors are considered outliers. 

339 

340 Args: 

341 trace: Input trace or numpy array. 

342 n_neighbors: Number of neighbors to use for density estimation. 

343 threshold: LOF threshold for outlier classification (>1 = outlier). 

344 metric: Distance metric ("euclidean" or "manhattan"). 

345 

346 Returns: 

347 LOFResult with outlier information. 

348 

349 Example: 

350 >>> result = local_outlier_factor(trace, n_neighbors=10) 

351 >>> print(f"Found {result.count} outliers") 

352 

353 References: 

354 Breunig, Kriegel, Ng & Sander (2000): LOF Algorithm 

355 STAT-012 

356 """ 

357 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace) 

358 n_samples = len(data) 

359 

360 if n_samples < n_neighbors + 1: 

361 return LOFResult( 

362 indices=np.array([], dtype=np.intp), 

363 scores=np.ones(n_samples, dtype=np.float64), 

364 mask=np.zeros(n_samples, dtype=np.bool_), 

365 count=0, 

366 threshold=threshold, 

367 n_neighbors=n_neighbors, 

368 ) 

369 

370 # For 1D data, use index-based neighbors 

371 # Reshape for compatibility 

372 X = data.reshape(-1, 1) 

373 

374 # Compute k-distances and neighbors 

375 k_distances = np.zeros(n_samples, dtype=np.float64) 

376 k_neighbors = np.zeros((n_samples, n_neighbors), dtype=np.intp) 

377 

378 for i in range(n_samples): 

379 # Compute distances to all other points 

380 if metric == "euclidean": 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true

381 distances = np.abs(X[:, 0] - X[i, 0]) 

382 else: # manhattan 

383 distances = np.abs(X[:, 0] - X[i, 0]) 

384 

385 # Get k nearest neighbors (excluding self) 

386 distances[i] = np.inf 

387 neighbor_idx = np.argsort(distances)[:n_neighbors] 

388 k_neighbors[i] = neighbor_idx 

389 k_distances[i] = distances[neighbor_idx[-1]] 

390 

391 # Compute Local Reachability Density (LRD) 

392 lrd = np.zeros(n_samples, dtype=np.float64) 

393 for i in range(n_samples): 

394 reach_dists = np.maximum( 

395 np.abs(X[k_neighbors[i], 0] - X[i, 0]), 

396 k_distances[k_neighbors[i]], 

397 ) 

398 mean_reach_dist = np.mean(reach_dists) 

399 lrd[i] = 1.0 / mean_reach_dist if mean_reach_dist > 0 else np.inf 

400 

401 # Compute LOF scores 

402 lof_scores = np.zeros(n_samples, dtype=np.float64) 

403 for i in range(n_samples): 

404 neighbor_lrd = lrd[k_neighbors[i]] 

405 lof_scores[i] = np.mean(neighbor_lrd) / lrd[i] if lrd[i] > 0 else 1.0 

406 

407 # Handle infinities 

408 lof_scores = np.nan_to_num(lof_scores, nan=1.0, posinf=threshold * 2) 

409 

410 # Classify outliers 

411 mask = lof_scores > threshold 

412 indices = np.where(mask)[0] 

413 

414 return LOFResult( 

415 indices=indices.astype(np.intp), 

416 scores=lof_scores, 

417 mask=mask, 

418 count=int(np.sum(mask)), 

419 threshold=threshold, 

420 n_neighbors=n_neighbors, 

421 ) 

422 

423 

424def seasonal_decompose( 

425 trace: WaveformTrace | NDArray[np.floating[Any]], 

426 *, 

427 period: int | None = None, 

428 model: Literal["additive", "multiplicative"] = "additive", 

429) -> DecompositionResult: 

430 """Decompose time series into trend, seasonal, and residual components. 

431 

432 Uses classical decomposition (moving average for trend extraction). 

433 

434 Args: 

435 trace: Input trace or numpy array. 

436 period: Period of seasonality. If None, auto-detected. 

437 model: Decomposition model: 

438 - "additive": y = trend + seasonal + residual 

439 - "multiplicative": y = trend * seasonal * residual 

440 

441 Returns: 

442 DecompositionResult with trend, seasonal, and residual components. 

443 

444 Example: 

445 >>> result = seasonal_decompose(trace, period=100) 

446 >>> plt.plot(result.trend, label="Trend") 

447 >>> plt.plot(result.seasonal, label="Seasonal") 

448 

449 References: 

450 Cleveland et al. (1990): STL Decomposition 

451 STAT-013 

452 """ 

453 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace) 

454 n = len(data) 

455 

456 # Auto-detect period if not provided 

457 if period is None: 

458 period = _detect_period(data) 

459 if period is None or period < 2: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 period = min(n // 4, 10) # Default fallback 

461 

462 period = max(2, min(period, n // 2)) 

463 

464 # Extract trend using centered moving average 

465 if period % 2 == 0: 465 ↛ 470line 465 didn't jump to line 470 because the condition on line 465 was always true

466 # For even period, use 2-stage moving average 

467 ma = np.convolve(data, np.ones(period) / period, mode="same") 

468 trend = np.convolve(ma, np.ones(2) / 2, mode="same") 

469 else: 

470 trend = np.convolve(data, np.ones(period) / period, mode="same") 

471 

472 # Handle edges 

473 half_period = period // 2 

474 trend[:half_period] = trend[half_period] 

475 trend[-half_period:] = trend[-half_period - 1] 

476 

477 # Detrend 

478 if model == "multiplicative": 

479 with np.errstate(divide="ignore", invalid="ignore"): 

480 detrended = data / trend 

481 detrended = np.nan_to_num(detrended, nan=1.0) 

482 else: 

483 detrended = data - trend 

484 

485 # Extract seasonal component (average for each phase) 

486 seasonal = np.zeros_like(data) 

487 for i in range(period): 

488 indices = np.arange(i, n, period) 

489 seasonal_mean = np.mean(detrended[indices]) 

490 seasonal[indices] = seasonal_mean 

491 

492 # Center seasonal component 

493 if model == "multiplicative": 

494 seasonal /= np.mean(seasonal) 

495 else: 

496 seasonal -= np.mean(seasonal) 

497 

498 # Compute residual 

499 if model == "multiplicative": 

500 with np.errstate(divide="ignore", invalid="ignore"): 

501 residual = data / (trend * seasonal) 

502 residual = np.nan_to_num(residual, nan=1.0) 

503 else: 

504 residual = data - trend - seasonal 

505 

506 return DecompositionResult( 

507 trend=trend.astype(np.float64), 

508 seasonal=seasonal.astype(np.float64), 

509 residual=residual.astype(np.float64), 

510 period=period, 

511 observed=data.astype(np.float64), 

512 ) 

513 

514 

515def _detect_period(data: NDArray[Any]) -> int | None: 

516 """Auto-detect dominant period using autocorrelation.""" 

517 n = len(data) 

518 if n < 20: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 return None 

520 

521 # Compute autocorrelation 

522 data_centered = data - np.mean(data) 

523 acf = np.correlate(data_centered, data_centered, mode="full") 

524 acf = acf[n - 1 :] # Keep positive lags only 

525 acf = acf / acf[0] # Normalize 

526 

527 # Find first significant peak after lag 0 

528 # Skip first few lags to avoid noise 

529 min_lag = max(2, n // 100) 

530 max_lag = n // 2 

531 

532 # Find peaks in autocorrelation 

533 peaks, _ = signal.find_peaks(acf[min_lag:max_lag], height=0.1, distance=min_lag) 

534 

535 if len(peaks) > 0: 535 ↛ 538line 535 didn't jump to line 538 because the condition on line 535 was always true

536 return peaks[0] + min_lag # type: ignore[no-any-return] 

537 

538 return None 

539 

540 

541def detect_change_points( 

542 trace: WaveformTrace | NDArray[np.floating[Any]], 

543 *, 

544 n_changes: int | None = None, 

545 min_size: int = 10, 

546 penalty: float | None = None, 

547 method: Literal["pelt", "binseg"] = "pelt", 

548) -> ChangePointResult: 

549 """Detect change points in time series. 

550 

551 Identifies points where the statistical properties of the signal 

552 change significantly. 

553 

554 Args: 

555 trace: Input trace or numpy array. 

556 n_changes: Number of change points to find. If None, auto-detected. 

557 min_size: Minimum segment length between change points. 

558 penalty: Penalty for adding change points (higher = fewer changes). 

559 method: Detection method: 

560 - "pelt": Pruned Exact Linear Time (fast, optimal) 

561 - "binseg": Binary Segmentation (fast, approximate) 

562 

563 Returns: 

564 ChangePointResult with change point locations and segment info. 

565 

566 Example: 

567 >>> result = detect_change_points(trace, n_changes=3) 

568 >>> for start, end in result.segments: 

569 ... print(f"Segment: {start} to {end}") 

570 

571 References: 

572 Killick et al. (2012): PELT Algorithm 

573 STAT-014 

574 """ 

575 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace) 

576 n = len(data) 

577 

578 if n < min_size * 2: 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true

579 return ChangePointResult( 

580 indices=np.array([], dtype=np.intp), 

581 n_changes=0, 

582 segments=[(0, n)], 

583 segment_means=np.array([np.mean(data)]), 

584 segment_stds=np.array([np.std(data)]), 

585 cost=0.0, 

586 ) 

587 

588 # Set default penalty based on BIC 

589 if penalty is None: 589 ↛ 592line 589 didn't jump to line 592 because the condition on line 589 was always true

590 penalty = np.log(n) * np.var(data) 

591 

592 if method == "pelt": 592 ↛ 595line 592 didn't jump to line 595 because the condition on line 592 was always true

593 change_points = _pelt_change_points(data, min_size, penalty, n_changes) 

594 else: 

595 change_points = _binseg_change_points(data, min_size, penalty, n_changes) 

596 

597 # Build segments 

598 all_points = [0, *list(change_points), n] 

599 segments = [(all_points[i], all_points[i + 1]) for i in range(len(all_points) - 1)] 

600 

601 # Compute segment statistics 

602 segment_means = np.array([np.mean(data[s:e]) for s, e in segments]) 

603 segment_stds = np.array([np.std(data[s:e]) for s, e in segments]) 

604 

605 # Compute total cost 

606 total_cost = sum(_segment_cost(data[s:e]) for s, e in segments) + penalty * len(change_points) 

607 

608 return ChangePointResult( 

609 indices=np.array(change_points, dtype=np.intp), 

610 n_changes=len(change_points), 

611 segments=segments, 

612 segment_means=segment_means, 

613 segment_stds=segment_stds, 

614 cost=float(total_cost), 

615 ) 

616 

617 

618def _segment_cost(segment: NDArray[Any]) -> float: 

619 """Compute cost of a segment (negative log-likelihood for normal).""" 

620 n = len(segment) 

621 if n < 2: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 return 0.0 

623 var = np.var(segment) 

624 if var <= 0: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 return 0.0 

626 return n * np.log(var) # type: ignore[no-any-return] 

627 

628 

629def _pelt_change_points( 

630 data: NDArray[Any], 

631 min_size: int, 

632 penalty: float, 

633 n_changes: int | None, 

634) -> list[int]: 

635 """PELT algorithm for change point detection.""" 

636 len(data) 

637 

638 # Simple implementation: use binary segmentation as approximation 

639 # Full PELT requires dynamic programming which is more complex 

640 return _binseg_change_points(data, min_size, penalty, n_changes) 

641 

642 

643def _binseg_change_points( 

644 data: NDArray[Any], 

645 min_size: int, 

646 penalty: float, 

647 n_changes: int | None, 

648) -> list[int]: 

649 """Binary segmentation for change point detection.""" 

650 n = len(data) 

651 change_points: list[int] = [] 

652 

653 def find_best_split(start: int, end: int) -> tuple[int, float]: 

654 """Find best split point in segment.""" 

655 if end - start < 2 * min_size: 

656 return -1, 0.0 

657 

658 best_idx = -1 

659 best_gain = 0.0 

660 

661 for i in range(start + min_size, end - min_size + 1): 

662 left = data[start:i] 

663 right = data[i:end] 

664 full = data[start:end] 

665 

666 cost_full = _segment_cost(full) 

667 cost_split = _segment_cost(left) + _segment_cost(right) 

668 gain = cost_full - cost_split - penalty 

669 

670 if gain > best_gain: 

671 best_gain = gain 

672 best_idx = i 

673 

674 return best_idx, best_gain 

675 

676 # Iteratively find change points 

677 segments = [(0, n)] 

678 max_iter = n_changes if n_changes is not None else n // min_size 

679 

680 for _ in range(max_iter): 

681 best_segment_idx = -1 

682 best_split_idx = -1 

683 best_gain = 0.0 

684 

685 for seg_idx, (start, end) in enumerate(segments): 

686 split_idx, gain = find_best_split(start, end) 

687 if gain > best_gain: 

688 best_gain = gain 

689 best_split_idx = split_idx 

690 best_segment_idx = seg_idx 

691 

692 if best_split_idx == -1: 

693 break 

694 

695 # Add change point 

696 change_points.append(best_split_idx) 

697 

698 # Update segments 

699 start, end = segments[best_segment_idx] 

700 segments[best_segment_idx] = (start, best_split_idx) 

701 segments.insert(best_segment_idx + 1, (best_split_idx, end)) 

702 

703 return sorted(change_points) 

704 

705 

706def phase_coherence( 

707 trace1: WaveformTrace | NDArray[np.floating[Any]], 

708 trace2: WaveformTrace | NDArray[np.floating[Any]], 

709 *, 

710 sample_rate: float | None = None, 

711 nperseg: int | None = None, 

712) -> CoherenceResult: 

713 """Compute phase coherence between two signals. 

714 

715 Coherence measures the linear correlation between two signals 

716 as a function of frequency. 

717 

718 Args: 

719 trace1: First input trace. 

720 trace2: Second input trace. 

721 sample_rate: Sample rate in Hz. Required if traces are arrays. 

722 nperseg: Segment length for Welch method. 

723 

724 Returns: 

725 CoherenceResult with coherence spectrum and phase. 

726 

727 Example: 

728 >>> result = phase_coherence(signal1, signal2, sample_rate=1e6) 

729 >>> print(f"Mean coherence: {result.mean_coherence:.3f}") 

730 

731 References: 

732 STAT-015 

733 """ 

734 data1 = trace1.data if isinstance(trace1, WaveformTrace) else np.asarray(trace1) 

735 data2 = trace2.data if isinstance(trace2, WaveformTrace) else np.asarray(trace2) 

736 

737 # Get sample rate 

738 if sample_rate is None: 

739 sample_rate = trace1.metadata.sample_rate if isinstance(trace1, WaveformTrace) else 1.0 

740 

741 # Ensure same length 

742 n = min(len(data1), len(data2)) 

743 data1 = data1[:n] 

744 data2 = data2[:n] 

745 

746 if nperseg is None: 746 ↛ 748line 746 didn't jump to line 748 because the condition on line 746 was always true

747 nperseg = min(256, n // 4) 

748 nperseg = max(16, min(nperseg, n)) 

749 

750 # Compute coherence 

751 frequencies, coherence = signal.coherence(data1, data2, fs=sample_rate, nperseg=nperseg) 

752 

753 # Compute cross-spectral phase 

754 _, Pxy = signal.csd(data1, data2, fs=sample_rate, nperseg=nperseg) 

755 phase = np.angle(Pxy) 

756 

757 # Statistics 

758 mean_coherence = float(np.mean(coherence)) 

759 peak_idx = np.argmax(coherence) 

760 peak_frequency = float(frequencies[peak_idx]) 

761 peak_coherence = float(coherence[peak_idx]) 

762 

763 return CoherenceResult( 

764 coherence=coherence.astype(np.float64), 

765 frequencies=frequencies.astype(np.float64), 

766 phase=phase.astype(np.float64), 

767 mean_coherence=mean_coherence, 

768 peak_frequency=peak_frequency, 

769 peak_coherence=peak_coherence, 

770 ) 

771 

772 

773def kernel_density( 

774 trace: WaveformTrace | NDArray[np.floating[Any]], 

775 *, 

776 n_points: int = 1000, 

777 bandwidth: float | str = "scott", 

778 kernel: Literal["gaussian", "tophat", "epanechnikov"] = "gaussian", 

779) -> KDEResult: 

780 """Estimate probability density using kernel density estimation. 

781 

782 Args: 

783 trace: Input trace or numpy array. 

784 n_points: Number of evaluation points. 

785 bandwidth: Bandwidth for kernel ("scott", "silverman", or float). 

786 kernel: Kernel function to use. 

787 

788 Returns: 

789 KDEResult with density estimate and mode information. 

790 

791 Raises: 

792 ValueError: If kernel is not one of the supported types. 

793 

794 Example: 

795 >>> result = kernel_density(trace) 

796 >>> plt.plot(result.x, result.density) 

797 >>> print(f"Modes at: {result.peak_values}") 

798 

799 References: 

800 Scott (1992): Multivariate Density Estimation 

801 STAT-016 

802 """ 

803 data = trace.data if isinstance(trace, WaveformTrace) else np.asarray(trace) 

804 n = len(data) 

805 

806 if n < 2: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true

807 return KDEResult( 

808 x=np.array([np.mean(data)]), 

809 density=np.array([1.0]), 

810 bandwidth=0.0, 

811 peaks=np.array([0], dtype=np.intp), 

812 peak_values=np.array([np.mean(data)]), 

813 ) 

814 

815 # Compute bandwidth 

816 std = np.std(data) 

817 iqr = np.percentile(data, 75) - np.percentile(data, 25) 

818 

819 if isinstance(bandwidth, str): 

820 if bandwidth == "scott": 

821 bw = 1.06 * std * n ** (-1 / 5) 

822 elif bandwidth == "silverman": 822 ↛ 825line 822 didn't jump to line 825 because the condition on line 822 was always true

823 bw = 0.9 * min(std, iqr / 1.34) * n ** (-1 / 5) 

824 else: 

825 bw = 1.06 * std * n ** (-1 / 5) 

826 else: 

827 bw = bandwidth 

828 

829 bw = max(bw, 1e-10) # Prevent zero bandwidth 

830 

831 # Evaluation grid 

832 margin = 3 * bw 

833 x_min = np.min(data) - margin 

834 x_max = np.max(data) + margin 

835 x = np.linspace(x_min, x_max, n_points) 

836 

837 # Compute density 

838 if kernel == "gaussian": 

839 kde = sp_stats.gaussian_kde(data, bw_method=bw / std if std > 0 else 1.0) 

840 density = kde(x) 

841 elif kernel == "tophat": 

842 density = np.zeros(n_points) 

843 for xi in data: 

844 mask = np.abs(x - xi) <= bw 

845 density[mask] += 1.0 

846 density /= n * 2 * bw 

847 elif kernel == "epanechnikov": 847 ↛ 855line 847 didn't jump to line 855 because the condition on line 847 was always true

848 density = np.zeros(n_points) 

849 for xi in data: 

850 u = (x - xi) / bw 

851 mask = np.abs(u) <= 1 

852 density[mask] += 0.75 * (1 - u[mask] ** 2) 

853 density /= n * bw 

854 else: 

855 raise ValueError(f"Unknown kernel: {kernel}") 

856 

857 # Find peaks (modes) 

858 peaks_idx, _ = signal.find_peaks(density) 

859 if len(peaks_idx) == 0: 859 ↛ 860line 859 didn't jump to line 860 because the condition on line 859 was never true

860 peaks_idx = np.array([np.argmax(density)]) 

861 peak_values = x[peaks_idx] 

862 

863 return KDEResult( 

864 x=x.astype(np.float64), 

865 density=density.astype(np.float64), 

866 bandwidth=float(bw), 

867 peaks=peaks_idx.astype(np.intp), 

868 peak_values=peak_values.astype(np.float64), 

869 ) 

870 

871 

872__all__ = [ 

873 "ChangePointResult", 

874 "CoherenceResult", 

875 "DecompositionResult", 

876 "IsolationForestResult", 

877 "KDEResult", 

878 "LOFResult", 

879 "detect_change_points", 

880 "isolation_forest_outliers", 

881 "kernel_density", 

882 "local_outlier_factor", 

883 "phase_coherence", 

884 "seasonal_decompose", 

885]