Coverage for src / tracekit / quality / ensemble.py: 88%

243 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Ensemble methods for combining multiple analysis algorithms. 

2 

3This module provides robust analysis by combining results from multiple algorithms 

4using various aggregation strategies. Ensemble methods reduce individual algorithm 

5bias, handle outliers, and provide confidence bounds for more reliable measurements. 

6 

7 

8Example: 

9 >>> from tracekit.quality.ensemble import EnsembleAggregator, AggregationMethod 

10 >>> from tracekit.quality.ensemble import create_frequency_ensemble 

11 >>> # Combine multiple frequency measurements 

12 >>> result = create_frequency_ensemble(signal, sample_rate=1e9) 

13 >>> print(f"Frequency: {result.value:.2f} Hz ± {result.confidence*100:.1f}%") 

14 >>> print(f"Methods agree: {result.method_agreement*100:.1f}%") 

15 >>> # Use custom ensemble 

16 >>> aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE) 

17 >>> results = [ 

18 ... {"value": 1000.0, "confidence": 0.9, "method": "fft"}, 

19 ... {"value": 1005.0, "confidence": 0.8, "method": "autocorr"}, 

20 ... {"value": 995.0, "confidence": 0.85, "method": "zero_crossing"}, 

21 ... ] 

22 >>> ensemble_result = aggregator.aggregate(results) 

23 

24References: 

25 - Kuncheva, L.I.: "Combining Pattern Classifiers" (2nd Ed), Wiley, 2014 

26 - Polikar, R.: "Ensemble Learning", Scholarpedia, 2009 

27 - Dietterich, T.G.: "Ensemble Methods in Machine Learning", 2000 

28""" 

29 

30from __future__ import annotations 

31 

32import logging 

33from collections import Counter 

34from dataclasses import dataclass, field 

35from enum import Enum 

36from typing import TYPE_CHECKING, Any 

37 

38import numpy as np 

39from scipy import stats 

40 

41from tracekit.quality.scoring import AnalysisQualityScore, combine_quality_scores 

42 

43if TYPE_CHECKING: 

44 from numpy.typing import NDArray 

45 

46logger = logging.getLogger(__name__) 

47 

48 

49class AggregationMethod(Enum): 

50 """Strategy for combining multiple analysis results. 

51 

52 Attributes: 

53 WEIGHTED_AVERAGE: Weight results by confidence (best for numeric values). 

54 VOTING: Majority voting (best for categorical results). 

55 MEDIAN: Robust to outliers (best when outliers expected). 

56 BAYESIAN: Bayesian combination with prior (best when prior knowledge available). 

57 """ 

58 

59 WEIGHTED_AVERAGE = "weighted_average" 

60 VOTING = "voting" 

61 MEDIAN = "median" 

62 BAYESIAN = "bayesian" 

63 

64 

65@dataclass 

66class EnsembleResult: 

67 """Combined result from multiple analysis methods. 

68 

69 Attributes: 

70 value: Aggregated value (numeric or categorical). 

71 confidence: Overall confidence in combined result (0-1). 

72 lower_bound: Lower confidence bound (None for categorical). 

73 upper_bound: Upper confidence bound (None for categorical). 

74 method_agreement: Agreement between methods (0-1, higher is better). 

75 individual_results: List of individual method results. 

76 aggregation_method: Method used for aggregation. 

77 quality_score: Optional quality score for the ensemble result. 

78 outlier_methods: Indices of methods producing outlier results. 

79 

80 Example: 

81 >>> if result.method_agreement > 0.8: 

82 ... print(f"High agreement: {result.value}") 

83 >>> else: 

84 ... print(f"Methods disagree, confidence: {result.confidence}") 

85 """ 

86 

87 value: Any 

88 confidence: float 

89 lower_bound: float | None = None 

90 upper_bound: float | None = None 

91 method_agreement: float = 1.0 

92 individual_results: list[dict[str, Any]] = field(default_factory=list) 

93 aggregation_method: AggregationMethod = AggregationMethod.WEIGHTED_AVERAGE 

94 quality_score: AnalysisQualityScore | None = None 

95 outlier_methods: list[int] = field(default_factory=list) 

96 

97 def __post_init__(self) -> None: 

98 """Validate confidence and agreement values.""" 

99 if not 0 <= self.confidence <= 1: 

100 raise ValueError(f"Confidence must be in [0, 1], got {self.confidence}") 

101 if not 0 <= self.method_agreement <= 1: 

102 raise ValueError(f"Method agreement must be in [0, 1], got {self.method_agreement}") 

103 

104 def to_dict(self) -> dict[str, Any]: 

105 """Convert to dictionary for serialization. 

106 

107 Returns: 

108 Dictionary representation of ensemble result. 

109 """ 

110 return { 

111 "value": self.value, 

112 "confidence": self.confidence, 

113 "lower_bound": self.lower_bound, 

114 "upper_bound": self.upper_bound, 

115 "method_agreement": self.method_agreement, 

116 "aggregation_method": self.aggregation_method.value, 

117 "individual_results": self.individual_results, 

118 "outlier_methods": self.outlier_methods, 

119 "quality_score": self.quality_score.to_dict() if self.quality_score else None, 

120 } 

121 

122 

123class EnsembleAggregator: 

124 """Combines multiple analysis results for robust estimation. 

125 

126 Supports various aggregation strategies optimized for different data types 

127 and analysis scenarios. Automatically detects and handles outliers, computes 

128 confidence bounds, and measures inter-method agreement. 

129 

130 QUAL-004: Ensemble Methods for Robust Analysis 

131 QUAL-005: Disagreement Detection and Handling 

132 QUAL-006: Confidence Bound Estimation 

133 

134 Example: 

135 >>> aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE) 

136 >>> results = [ 

137 ... {"value": 100.0, "confidence": 0.9}, 

138 ... {"value": 102.0, "confidence": 0.85}, 

139 ... {"value": 98.0, "confidence": 0.8}, 

140 ... ] 

141 >>> ensemble = aggregator.aggregate(results) 

142 >>> print(f"Result: {ensemble.value:.2f} ± {ensemble.confidence*100:.1f}%") 

143 """ 

144 

145 def __init__( 

146 self, 

147 method: AggregationMethod = AggregationMethod.WEIGHTED_AVERAGE, 

148 outlier_threshold: float = 3.0, 

149 min_agreement: float = 0.5, 

150 ): 

151 """Initialize ensemble aggregator. 

152 

153 Args: 

154 method: Aggregation strategy to use. 

155 outlier_threshold: Z-score threshold for outlier detection (default 3.0). 

156 min_agreement: Minimum agreement threshold to warn (default 0.5). 

157 """ 

158 self.method = method 

159 self.outlier_threshold = outlier_threshold 

160 self.min_agreement = min_agreement 

161 

162 def aggregate(self, results: list[dict[str, Any]]) -> EnsembleResult: 

163 """Combine multiple results into one robust estimate. 

164 

165 Args: 

166 results: List of result dictionaries with keys: 

167 - value: Measured value (numeric or categorical) 

168 - confidence: Confidence score (0-1) 

169 - method: Optional method name 

170 - quality_score: Optional AnalysisQualityScore 

171 

172 Returns: 

173 EnsembleResult with combined value and metadata. 

174 

175 Raises: 

176 ValueError: If results list is empty or invalid. 

177 

178 Example: 

179 >>> results = [ 

180 ... {"value": 1000, "confidence": 0.9, "method": "fft"}, 

181 ... {"value": 1005, "confidence": 0.85, "method": "autocorr"}, 

182 ... ] 

183 >>> ensemble = aggregator.aggregate(results) 

184 """ 

185 if not results: 

186 raise ValueError("Cannot aggregate empty results list") 

187 

188 # Extract values and confidences 

189 values = [r["value"] for r in results] 

190 confidences = [r.get("confidence", 1.0) for r in results] 

191 

192 # Determine if values are numeric or categorical 

193 is_numeric = all(isinstance(v, int | float | np.number) for v in values) 

194 

195 if is_numeric: 

196 return self.aggregate_numeric( 

197 [float(v) for v in values], 

198 confidences, 

199 original_results=results, 

200 ) 

201 else: 

202 return self.aggregate_categorical( 

203 [str(v) for v in values], 

204 confidences, 

205 original_results=results, 

206 ) 

207 

208 def aggregate_numeric( 

209 self, 

210 values: list[float], 

211 confidences: list[float], 

212 original_results: list[dict[str, Any]] | None = None, 

213 ) -> EnsembleResult: 

214 """Combine numeric values with confidence weighting. 

215 

216 Args: 

217 values: List of numeric values to combine. 

218 confidences: Confidence scores for each value (0-1). 

219 original_results: Optional original result dictionaries. 

220 

221 Returns: 

222 EnsembleResult with aggregated numeric value. 

223 

224 Raises: 

225 ValueError: If values list is empty. 

226 

227 Example: 

228 >>> values = [100.0, 102.0, 98.0, 150.0] # 150 is outlier 

229 >>> confidences = [0.9, 0.85, 0.8, 0.7] 

230 >>> result = aggregator.aggregate_numeric(values, confidences) 

231 >>> # Outlier detected and handled 

232 """ 

233 if not values: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 raise ValueError("Cannot aggregate empty values list") 

235 

236 if original_results is None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 original_results = [ 

238 {"value": v, "confidence": c} for v, c in zip(values, confidences, strict=False) 

239 ] 

240 

241 values_arr = np.array(values, dtype=np.float64) 

242 confidences_arr = np.array(confidences, dtype=np.float64) 

243 

244 # Detect outliers 

245 outlier_indices = self.detect_outlier_methods(original_results) 

246 

247 # Create mask for non-outlier values 

248 valid_mask = np.ones(len(values), dtype=bool) 

249 valid_mask[outlier_indices] = False 

250 

251 # Use only non-outliers for aggregation 

252 valid_values = values_arr[valid_mask] 

253 valid_confidences = confidences_arr[valid_mask] 

254 

255 if len(valid_values) == 0: 255 ↛ 257line 255 didn't jump to line 257 because the condition on line 255 was never true

256 # All values are outliers, use all with warning 

257 logger.warning("All methods detected as outliers, using all values") 

258 valid_values = values_arr 

259 valid_confidences = confidences_arr 

260 outlier_indices = [] 

261 

262 # Compute aggregated value based on method 

263 if self.method == AggregationMethod.WEIGHTED_AVERAGE: 

264 # Normalize weights 

265 weights = valid_confidences / np.sum(valid_confidences) 

266 aggregated_value = float(np.sum(valid_values * weights)) 

267 # Weighted variance 

268 variance = float(np.sum(weights * (valid_values - aggregated_value) ** 2)) 

269 std_dev = np.sqrt(variance) 

270 

271 elif self.method == AggregationMethod.MEDIAN: 

272 aggregated_value = float(np.median(valid_values)) 

273 # Use MAD (Median Absolute Deviation) for robust std estimate 

274 mad = float(np.median(np.abs(valid_values - aggregated_value))) 

275 std_dev = mad * 1.4826 # Scale factor for normal distribution 

276 

277 elif self.method == AggregationMethod.BAYESIAN: 277 ↛ 290line 277 didn't jump to line 290 because the condition on line 277 was always true

278 # Bayesian combination with Gaussian likelihood 

279 # Prior: uniform over range 

280 # Likelihood: Gaussian with confidence-based variance 

281 precisions = valid_confidences**2 # Higher confidence = lower variance 

282 total_precision = np.sum(precisions) 

283 aggregated_value = float(np.sum(valid_values * precisions) / total_precision) 

284 # Posterior variance 

285 variance = 1.0 / total_precision 

286 std_dev = float(np.sqrt(variance)) 

287 

288 else: 

289 # Fallback to simple average 

290 aggregated_value = float(np.mean(valid_values)) 

291 std_dev = float(np.std(valid_values)) 

292 

293 # Compute confidence bounds (95% confidence interval) 

294 if len(valid_values) > 1: 

295 # Use t-distribution for small samples 

296 dof = len(valid_values) - 1 

297 t_value = stats.t.ppf(0.975, dof) # 95% CI 

298 margin = t_value * std_dev / np.sqrt(len(valid_values)) 

299 lower_bound = aggregated_value - margin 

300 upper_bound = aggregated_value + margin 

301 else: 

302 lower_bound = aggregated_value 

303 upper_bound = aggregated_value 

304 

305 # Compute method agreement (inverse of coefficient of variation) 

306 if len(valid_values) > 1 and aggregated_value != 0: 

307 cv = std_dev / abs(aggregated_value) 

308 method_agreement = float(np.clip(1.0 - cv, 0.0, 1.0)) 

309 else: 

310 method_agreement = 1.0 

311 

312 # Overall confidence (weighted average of individual confidences) 

313 overall_confidence = float(np.mean(valid_confidences)) 

314 

315 # Penalize confidence if agreement is low 

316 if method_agreement < self.min_agreement: 

317 overall_confidence *= method_agreement 

318 logger.warning( 

319 f"Low method agreement ({method_agreement:.2f}), " 

320 f"reduced confidence to {overall_confidence:.2f}" 

321 ) 

322 

323 # Combine quality scores if available 

324 quality_scores_raw = [ 

325 r.get("quality_score") for r in original_results if "quality_score" in r 

326 ] 

327 ensemble_quality = None 

328 if quality_scores_raw and all( 

329 isinstance(q, AnalysisQualityScore) for q in quality_scores_raw 

330 ): 

331 # Type narrowing - we know all are AnalysisQualityScore at this point 

332 quality_scores: list[AnalysisQualityScore] = quality_scores_raw # type: ignore[assignment] 

333 ensemble_quality = combine_quality_scores( 

334 quality_scores, weights=confidences[: len(quality_scores)] 

335 ) 

336 

337 return EnsembleResult( 

338 value=aggregated_value, 

339 confidence=overall_confidence, 

340 lower_bound=lower_bound, 

341 upper_bound=upper_bound, 

342 method_agreement=method_agreement, 

343 individual_results=original_results, 

344 aggregation_method=self.method, 

345 quality_score=ensemble_quality, 

346 outlier_methods=outlier_indices, 

347 ) 

348 

349 def aggregate_categorical( 

350 self, 

351 values: list[str], 

352 confidences: list[float], 

353 original_results: list[dict[str, Any]] | None = None, 

354 ) -> EnsembleResult: 

355 """Combine categorical values via weighted voting. 

356 

357 Args: 

358 values: List of categorical values to combine. 

359 confidences: Confidence scores for each value (0-1). 

360 original_results: Optional original result dictionaries. 

361 

362 Returns: 

363 EnsembleResult with majority vote value. 

364 

365 Raises: 

366 ValueError: If values list is empty. 

367 

368 Example: 

369 >>> values = ["rising", "rising", "falling", "rising"] 

370 >>> confidences = [0.9, 0.85, 0.6, 0.8] 

371 >>> result = aggregator.aggregate_categorical(values, confidences) 

372 >>> # "rising" wins by weighted vote 

373 """ 

374 if not values: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 raise ValueError("Cannot aggregate empty values list") 

376 

377 if original_results is None: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 original_results = [ 

379 {"value": v, "confidence": c} for v, c in zip(values, confidences, strict=False) 

380 ] 

381 

382 # Weighted voting 

383 vote_weights: dict[str, float] = {} 

384 for value, confidence in zip(values, confidences, strict=False): 

385 vote_weights[value] = vote_weights.get(value, 0.0) + confidence 

386 

387 # Get winner 

388 winner = max(vote_weights.items(), key=lambda x: x[1]) 

389 aggregated_value = winner[0] 

390 total_weight = sum(vote_weights.values()) 

391 

392 # Confidence is the fraction of votes for winner 

393 overall_confidence = winner[1] / total_weight if total_weight > 0 else 0.0 

394 

395 # Agreement is measured by vote concentration 

396 # Higher agreement when votes are concentrated on one option 

397 vote_counts = Counter(values) 

398 total_votes = len(values) 

399 max_count = vote_counts.most_common(1)[0][1] 

400 method_agreement = max_count / total_votes 

401 

402 # Combine quality scores if available 

403 quality_scores_raw = [ 

404 r.get("quality_score") for r in original_results if "quality_score" in r 

405 ] 

406 ensemble_quality = None 

407 if quality_scores_raw and all( 407 ↛ 411line 407 didn't jump to line 411 because the condition on line 407 was never true

408 isinstance(q, AnalysisQualityScore) for q in quality_scores_raw 

409 ): 

410 # Type narrowing - we know all are AnalysisQualityScore at this point 

411 quality_scores: list[AnalysisQualityScore] = quality_scores_raw # type: ignore[assignment] 

412 ensemble_quality = combine_quality_scores( 

413 quality_scores, weights=confidences[: len(quality_scores)] 

414 ) 

415 

416 return EnsembleResult( 

417 value=aggregated_value, 

418 confidence=overall_confidence, 

419 lower_bound=None, 

420 upper_bound=None, 

421 method_agreement=method_agreement, 

422 individual_results=original_results, 

423 aggregation_method=self.method, 

424 quality_score=ensemble_quality, 

425 outlier_methods=[], # No outlier detection for categorical 

426 ) 

427 

428 def detect_outlier_methods(self, results: list[dict[str, Any]]) -> list[int]: 

429 """Identify methods producing outlier results. 

430 

431 Uses modified Z-score (based on MAD) for robust outlier detection. 

432 

433 Args: 

434 results: List of result dictionaries with "value" key. 

435 

436 Returns: 

437 List of indices corresponding to outlier methods. 

438 

439 Example: 

440 >>> results = [ 

441 ... {"value": 100}, {"value": 102}, {"value": 98}, {"value": 500} 

442 ... ] 

443 >>> outliers = aggregator.detect_outlier_methods(results) 

444 >>> # Returns [3] - the 500 value is an outlier 

445 """ 

446 values = [r["value"] for r in results] 

447 

448 # Only works for numeric values 

449 if not all(isinstance(v, int | float | np.number) for v in values): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 return [] 

451 

452 if len(values) < 3: 

453 # Need at least 3 values for outlier detection 

454 return [] 

455 

456 values_arr = np.array(values, dtype=np.float64) 

457 

458 # Use modified Z-score based on MAD (robust to outliers) 

459 median = np.median(values_arr) 

460 mad = np.median(np.abs(values_arr - median)) 

461 

462 if mad == 0: 

463 # All values are identical 

464 return [] 

465 

466 # Modified Z-score 

467 modified_z_scores = 0.6745 * (values_arr - median) / mad 

468 

469 # Identify outliers 

470 outlier_mask = np.abs(modified_z_scores) > self.outlier_threshold 

471 outlier_indices: list[int] = np.where(outlier_mask)[0].tolist() 

472 

473 if outlier_indices: 

474 logger.info(f"Detected {len(outlier_indices)} outlier method(s): {outlier_indices}") 

475 

476 return outlier_indices 

477 

478 

479# Pre-configured ensembles for common analysis types 

480# Weights represent the relative reliability of each method 

481 

482FREQUENCY_ENSEMBLE: list[tuple[str, float]] = [ 

483 ("fft_peak", 0.4), # FFT peak is generally most reliable 

484 ("zero_crossing", 0.3), # Zero crossing is robust but can be noisy 

485 ("autocorrelation", 0.3), # Autocorrelation handles noise well 

486] 

487 

488EDGE_DETECTION_ENSEMBLE: list[tuple[str, float]] = [ 

489 ("threshold_crossing", 0.5), # Most direct method 

490 ("derivative", 0.3), # Good for clean signals 

491 ("schmitt_trigger", 0.2), # Noise immunity but less precise 

492] 

493 

494AMPLITUDE_ENSEMBLE: list[tuple[str, float]] = [ 

495 ("peak_to_peak", 0.4), # Direct measurement 

496 ("rms", 0.3), # Robust to noise 

497 ("percentile_99", 0.3), # Outlier resistant 

498] 

499 

500 

501def create_frequency_ensemble( 

502 signal: NDArray[np.float64], 

503 sample_rate: float, 

504 method_weights: list[tuple[str, float]] | None = None, 

505) -> EnsembleResult: 

506 """Run multiple frequency detection methods and combine results. 

507 

508 Applies FFT peak detection, zero-crossing rate, and autocorrelation-based 

509 frequency estimation, then combines using weighted averaging. 

510 

511 Args: 

512 signal: Input signal array. 

513 sample_rate: Sample rate in Hz. 

514 method_weights: Optional custom method weights. Defaults to FREQUENCY_ENSEMBLE. 

515 

516 Returns: 

517 EnsembleResult with combined frequency estimate. 

518 

519 Raises: 

520 ValueError: If all frequency detection methods fail. 

521 

522 Example: 

523 >>> import numpy as np 

524 >>> t = np.linspace(0, 1, 1000) 

525 >>> signal = np.sin(2 * np.pi * 10 * t) # 10 Hz sine 

526 >>> result = create_frequency_ensemble(signal, sample_rate=1000) 

527 >>> print(f"Frequency: {result.value:.2f} Hz") 

528 >>> print(f"Confidence: {result.confidence:.2%}") 

529 """ 

530 if method_weights is None: 

531 method_weights = FREQUENCY_ENSEMBLE 

532 

533 results = [] 

534 

535 # Method 1: FFT peak detection 

536 try: 

537 fft_result = np.fft.rfft(signal) 

538 freqs = np.fft.rfftfreq(len(signal), d=1.0 / sample_rate) 

539 peak_idx = np.argmax(np.abs(fft_result[1:])) + 1 # Skip DC 

540 freq_fft = float(freqs[peak_idx]) 

541 # Confidence based on peak prominence 

542 peak_magnitude = np.abs(fft_result[peak_idx]) 

543 mean_magnitude = np.mean(np.abs(fft_result[1:])) 

544 confidence_fft = min(1.0, peak_magnitude / (mean_magnitude * 10)) 

545 results.append( 

546 { 

547 "value": freq_fft, 

548 "confidence": confidence_fft * method_weights[0][1], 

549 "method": "fft_peak", 

550 } 

551 ) 

552 except Exception as e: 

553 logger.debug(f"FFT peak detection failed: {e}") 

554 

555 # Method 2: Zero crossing rate 

556 try: 

557 zero_crossings = np.where(np.diff(np.sign(signal)))[0] 

558 if len(zero_crossings) > 1: 558 ↛ 576line 558 didn't jump to line 576 because the condition on line 558 was always true

559 # Average time between zero crossings (half period) 

560 avg_half_period = np.mean(np.diff(zero_crossings)) / sample_rate 

561 freq_zc = 1.0 / (2.0 * avg_half_period) 

562 # Confidence based on regularity of crossings 

563 std_half_period = np.std(np.diff(zero_crossings)) / sample_rate 

564 confidence_zc = max(0.0, 1.0 - std_half_period / avg_half_period) 

565 results.append( 

566 { 

567 "value": float(freq_zc), 

568 "confidence": confidence_zc * method_weights[1][1], 

569 "method": "zero_crossing", 

570 } 

571 ) 

572 except Exception as e: 

573 logger.debug(f"Zero crossing detection failed: {e}") 

574 

575 # Method 3: Autocorrelation 

576 try: 

577 # Compute autocorrelation 

578 autocorr = np.correlate(signal, signal, mode="full") 

579 autocorr = autocorr[len(autocorr) // 2 :] 

580 # Find first peak after zero lag (skip DC) 

581 peaks = [] 

582 for i in range(1, min(len(autocorr) - 1, len(signal) // 2)): 

583 if autocorr[i] > autocorr[i - 1] and autocorr[i] > autocorr[i + 1]: 

584 peaks.append(i) 

585 if peaks: 

586 first_peak = peaks[0] 

587 period_samples = first_peak 

588 freq_ac = sample_rate / period_samples 

589 # Confidence based on peak strength 

590 peak_strength = autocorr[first_peak] / autocorr[0] 

591 confidence_ac = float(np.clip(peak_strength, 0.0, 1.0)) 

592 results.append( 

593 { 

594 "value": float(freq_ac), 

595 "confidence": confidence_ac * method_weights[2][1], 

596 "method": "autocorrelation", 

597 } 

598 ) 

599 except Exception as e: 

600 logger.debug(f"Autocorrelation detection failed: {e}") 

601 

602 if not results: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 raise ValueError("All frequency detection methods failed") 

604 

605 # Aggregate results 

606 aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE) 

607 return aggregator.aggregate(results) 

608 

609 

610def create_edge_ensemble( 

611 signal: NDArray[np.float64], 

612 sample_rate: float, 

613 threshold: float | None = None, 

614 method_weights: list[tuple[str, float]] | None = None, 

615) -> EnsembleResult: 

616 """Run multiple edge detection methods and combine results. 

617 

618 Applies threshold crossing, derivative-based, and Schmitt trigger edge 

619 detection, then combines results using weighted voting or averaging. 

620 

621 Args: 

622 signal: Input signal array. 

623 sample_rate: Sample rate in Hz. 

624 threshold: Detection threshold. If None, uses signal midpoint. 

625 method_weights: Optional custom method weights. Defaults to EDGE_DETECTION_ENSEMBLE. 

626 

627 Returns: 

628 EnsembleResult with combined edge detection results. 

629 

630 Raises: 

631 ValueError: If all edge detection methods fail. 

632 

633 Example: 

634 >>> signal = np.array([0, 0, 1, 1, 0, 0, 1, 1]) 

635 >>> result = create_edge_ensemble(signal, sample_rate=1000) 

636 >>> print(f"Edge count: {result.value}") 

637 >>> print(f"Agreement: {result.method_agreement:.2%}") 

638 """ 

639 if method_weights is None: 

640 method_weights = EDGE_DETECTION_ENSEMBLE 

641 

642 if threshold is None: 

643 threshold = float((np.max(signal) + np.min(signal)) / 2.0) 

644 

645 results = [] 

646 

647 # Method 1: Threshold crossing 

648 try: 

649 crossings = np.where(np.diff(np.sign(signal - threshold)))[0] 

650 edge_count_tc = len(crossings) 

651 # Confidence based on signal quality (SNR proxy) 

652 signal_range = np.ptp(signal) 

653 noise_estimate = np.std(np.diff(signal)) 

654 confidence_tc = ( 

655 min(1.0, signal_range / (noise_estimate * 10)) if noise_estimate > 0 else 0.5 

656 ) 

657 results.append( 

658 { 

659 "value": edge_count_tc, 

660 "confidence": confidence_tc * method_weights[0][1], 

661 "method": "threshold_crossing", 

662 } 

663 ) 

664 except Exception as e: 

665 logger.debug(f"Threshold crossing detection failed: {e}") 

666 

667 # Method 2: Derivative-based 

668 try: 

669 derivative = np.diff(signal) 

670 # Find peaks in absolute derivative 

671 deriv_std = np.std(derivative) 

672 deriv_threshold = deriv_std * 2 

673 edge_indices = np.where(np.abs(derivative) > deriv_threshold)[0] 

674 # Remove consecutive detections (within 2 samples) 

675 filtered_edges = [] 

676 for i, idx in enumerate(edge_indices): 

677 if i == 0 or idx - edge_indices[i - 1] > 2: 

678 filtered_edges.append(idx) 

679 edge_count_deriv = len(filtered_edges) 

680 # Confidence based on peak derivative prominence above threshold 

681 # Higher max derivative relative to threshold means clearer edges 

682 max_deriv = np.max(np.abs(derivative)) if len(derivative) > 0 else 0.0 

683 prominence_ratio = (max_deriv / deriv_threshold) if deriv_threshold > 0 else 0.0 

684 confidence_deriv = float( 

685 np.clip(prominence_ratio / 3.0, 0.0, 1.0) 

686 ) # Normalize: 3x threshold = 100% 

687 results.append( 

688 { 

689 "value": edge_count_deriv, 

690 "confidence": confidence_deriv * method_weights[1][1], 

691 "method": "derivative", 

692 } 

693 ) 

694 except Exception as e: 

695 logger.debug(f"Derivative edge detection failed: {e}") 

696 

697 # Method 3: Schmitt trigger (hysteresis) 

698 try: 

699 hysteresis = float(np.std(signal) * 0.1) 

700 thresh_high = threshold + hysteresis 

701 thresh_low = threshold - hysteresis 

702 state = signal[0] > threshold 

703 edge_count_schmitt = 0 

704 for val in signal: 

705 if not state and val > thresh_high: 

706 edge_count_schmitt += 1 

707 state = True 

708 elif state and val < thresh_low: 

709 edge_count_schmitt += 1 

710 state = False 

711 # Confidence based on hysteresis effectiveness 

712 confidence_schmitt = 0.7 # Lower base confidence due to hysteresis delay 

713 results.append( 

714 { 

715 "value": edge_count_schmitt, 

716 "confidence": confidence_schmitt * method_weights[2][1], 

717 "method": "schmitt_trigger", 

718 } 

719 ) 

720 except Exception as e: 

721 logger.debug(f"Schmitt trigger detection failed: {e}") 

722 

723 if not results: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 raise ValueError("All edge detection methods failed") 

725 

726 # Aggregate results (use median for integer counts) 

727 aggregator = EnsembleAggregator(method=AggregationMethod.MEDIAN) 

728 return aggregator.aggregate(results) 

729 

730 

731__all__ = [ 

732 "AMPLITUDE_ENSEMBLE", 

733 "EDGE_DETECTION_ENSEMBLE", 

734 "FREQUENCY_ENSEMBLE", 

735 "AggregationMethod", 

736 "EnsembleAggregator", 

737 "EnsembleResult", 

738 "create_edge_ensemble", 

739 "create_frequency_ensemble", 

740]