Coverage for src / tracekit / quality / ensemble.py: 88%
243 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Ensemble methods for combining multiple analysis algorithms.
3This module provides robust analysis by combining results from multiple algorithms
4using various aggregation strategies. Ensemble methods reduce individual algorithm
5bias, handle outliers, and provide confidence bounds for more reliable measurements.
8Example:
9 >>> from tracekit.quality.ensemble import EnsembleAggregator, AggregationMethod
10 >>> from tracekit.quality.ensemble import create_frequency_ensemble
11 >>> # Combine multiple frequency measurements
12 >>> result = create_frequency_ensemble(signal, sample_rate=1e9)
13 >>> print(f"Frequency: {result.value:.2f} Hz ± {result.confidence*100:.1f}%")
14 >>> print(f"Methods agree: {result.method_agreement*100:.1f}%")
15 >>> # Use custom ensemble
16 >>> aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE)
17 >>> results = [
18 ... {"value": 1000.0, "confidence": 0.9, "method": "fft"},
19 ... {"value": 1005.0, "confidence": 0.8, "method": "autocorr"},
20 ... {"value": 995.0, "confidence": 0.85, "method": "zero_crossing"},
21 ... ]
22 >>> ensemble_result = aggregator.aggregate(results)
24References:
25 - Kuncheva, L.I.: "Combining Pattern Classifiers" (2nd Ed), Wiley, 2014
26 - Polikar, R.: "Ensemble Learning", Scholarpedia, 2009
27 - Dietterich, T.G.: "Ensemble Methods in Machine Learning", 2000
28"""
30from __future__ import annotations
32import logging
33from collections import Counter
34from dataclasses import dataclass, field
35from enum import Enum
36from typing import TYPE_CHECKING, Any
38import numpy as np
39from scipy import stats
41from tracekit.quality.scoring import AnalysisQualityScore, combine_quality_scores
43if TYPE_CHECKING:
44 from numpy.typing import NDArray
46logger = logging.getLogger(__name__)
49class AggregationMethod(Enum):
50 """Strategy for combining multiple analysis results.
52 Attributes:
53 WEIGHTED_AVERAGE: Weight results by confidence (best for numeric values).
54 VOTING: Majority voting (best for categorical results).
55 MEDIAN: Robust to outliers (best when outliers expected).
56 BAYESIAN: Bayesian combination with prior (best when prior knowledge available).
57 """
59 WEIGHTED_AVERAGE = "weighted_average"
60 VOTING = "voting"
61 MEDIAN = "median"
62 BAYESIAN = "bayesian"
65@dataclass
66class EnsembleResult:
67 """Combined result from multiple analysis methods.
69 Attributes:
70 value: Aggregated value (numeric or categorical).
71 confidence: Overall confidence in combined result (0-1).
72 lower_bound: Lower confidence bound (None for categorical).
73 upper_bound: Upper confidence bound (None for categorical).
74 method_agreement: Agreement between methods (0-1, higher is better).
75 individual_results: List of individual method results.
76 aggregation_method: Method used for aggregation.
77 quality_score: Optional quality score for the ensemble result.
78 outlier_methods: Indices of methods producing outlier results.
80 Example:
81 >>> if result.method_agreement > 0.8:
82 ... print(f"High agreement: {result.value}")
83 >>> else:
84 ... print(f"Methods disagree, confidence: {result.confidence}")
85 """
87 value: Any
88 confidence: float
89 lower_bound: float | None = None
90 upper_bound: float | None = None
91 method_agreement: float = 1.0
92 individual_results: list[dict[str, Any]] = field(default_factory=list)
93 aggregation_method: AggregationMethod = AggregationMethod.WEIGHTED_AVERAGE
94 quality_score: AnalysisQualityScore | None = None
95 outlier_methods: list[int] = field(default_factory=list)
97 def __post_init__(self) -> None:
98 """Validate confidence and agreement values."""
99 if not 0 <= self.confidence <= 1:
100 raise ValueError(f"Confidence must be in [0, 1], got {self.confidence}")
101 if not 0 <= self.method_agreement <= 1:
102 raise ValueError(f"Method agreement must be in [0, 1], got {self.method_agreement}")
104 def to_dict(self) -> dict[str, Any]:
105 """Convert to dictionary for serialization.
107 Returns:
108 Dictionary representation of ensemble result.
109 """
110 return {
111 "value": self.value,
112 "confidence": self.confidence,
113 "lower_bound": self.lower_bound,
114 "upper_bound": self.upper_bound,
115 "method_agreement": self.method_agreement,
116 "aggregation_method": self.aggregation_method.value,
117 "individual_results": self.individual_results,
118 "outlier_methods": self.outlier_methods,
119 "quality_score": self.quality_score.to_dict() if self.quality_score else None,
120 }
123class EnsembleAggregator:
124 """Combines multiple analysis results for robust estimation.
126 Supports various aggregation strategies optimized for different data types
127 and analysis scenarios. Automatically detects and handles outliers, computes
128 confidence bounds, and measures inter-method agreement.
130 QUAL-004: Ensemble Methods for Robust Analysis
131 QUAL-005: Disagreement Detection and Handling
132 QUAL-006: Confidence Bound Estimation
134 Example:
135 >>> aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE)
136 >>> results = [
137 ... {"value": 100.0, "confidence": 0.9},
138 ... {"value": 102.0, "confidence": 0.85},
139 ... {"value": 98.0, "confidence": 0.8},
140 ... ]
141 >>> ensemble = aggregator.aggregate(results)
142 >>> print(f"Result: {ensemble.value:.2f} ± {ensemble.confidence*100:.1f}%")
143 """
145 def __init__(
146 self,
147 method: AggregationMethod = AggregationMethod.WEIGHTED_AVERAGE,
148 outlier_threshold: float = 3.0,
149 min_agreement: float = 0.5,
150 ):
151 """Initialize ensemble aggregator.
153 Args:
154 method: Aggregation strategy to use.
155 outlier_threshold: Z-score threshold for outlier detection (default 3.0).
156 min_agreement: Minimum agreement threshold to warn (default 0.5).
157 """
158 self.method = method
159 self.outlier_threshold = outlier_threshold
160 self.min_agreement = min_agreement
162 def aggregate(self, results: list[dict[str, Any]]) -> EnsembleResult:
163 """Combine multiple results into one robust estimate.
165 Args:
166 results: List of result dictionaries with keys:
167 - value: Measured value (numeric or categorical)
168 - confidence: Confidence score (0-1)
169 - method: Optional method name
170 - quality_score: Optional AnalysisQualityScore
172 Returns:
173 EnsembleResult with combined value and metadata.
175 Raises:
176 ValueError: If results list is empty or invalid.
178 Example:
179 >>> results = [
180 ... {"value": 1000, "confidence": 0.9, "method": "fft"},
181 ... {"value": 1005, "confidence": 0.85, "method": "autocorr"},
182 ... ]
183 >>> ensemble = aggregator.aggregate(results)
184 """
185 if not results:
186 raise ValueError("Cannot aggregate empty results list")
188 # Extract values and confidences
189 values = [r["value"] for r in results]
190 confidences = [r.get("confidence", 1.0) for r in results]
192 # Determine if values are numeric or categorical
193 is_numeric = all(isinstance(v, int | float | np.number) for v in values)
195 if is_numeric:
196 return self.aggregate_numeric(
197 [float(v) for v in values],
198 confidences,
199 original_results=results,
200 )
201 else:
202 return self.aggregate_categorical(
203 [str(v) for v in values],
204 confidences,
205 original_results=results,
206 )
208 def aggregate_numeric(
209 self,
210 values: list[float],
211 confidences: list[float],
212 original_results: list[dict[str, Any]] | None = None,
213 ) -> EnsembleResult:
214 """Combine numeric values with confidence weighting.
216 Args:
217 values: List of numeric values to combine.
218 confidences: Confidence scores for each value (0-1).
219 original_results: Optional original result dictionaries.
221 Returns:
222 EnsembleResult with aggregated numeric value.
224 Raises:
225 ValueError: If values list is empty.
227 Example:
228 >>> values = [100.0, 102.0, 98.0, 150.0] # 150 is outlier
229 >>> confidences = [0.9, 0.85, 0.8, 0.7]
230 >>> result = aggregator.aggregate_numeric(values, confidences)
231 >>> # Outlier detected and handled
232 """
233 if not values: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 raise ValueError("Cannot aggregate empty values list")
236 if original_results is None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 original_results = [
238 {"value": v, "confidence": c} for v, c in zip(values, confidences, strict=False)
239 ]
241 values_arr = np.array(values, dtype=np.float64)
242 confidences_arr = np.array(confidences, dtype=np.float64)
244 # Detect outliers
245 outlier_indices = self.detect_outlier_methods(original_results)
247 # Create mask for non-outlier values
248 valid_mask = np.ones(len(values), dtype=bool)
249 valid_mask[outlier_indices] = False
251 # Use only non-outliers for aggregation
252 valid_values = values_arr[valid_mask]
253 valid_confidences = confidences_arr[valid_mask]
255 if len(valid_values) == 0: 255 ↛ 257line 255 didn't jump to line 257 because the condition on line 255 was never true
256 # All values are outliers, use all with warning
257 logger.warning("All methods detected as outliers, using all values")
258 valid_values = values_arr
259 valid_confidences = confidences_arr
260 outlier_indices = []
262 # Compute aggregated value based on method
263 if self.method == AggregationMethod.WEIGHTED_AVERAGE:
264 # Normalize weights
265 weights = valid_confidences / np.sum(valid_confidences)
266 aggregated_value = float(np.sum(valid_values * weights))
267 # Weighted variance
268 variance = float(np.sum(weights * (valid_values - aggregated_value) ** 2))
269 std_dev = np.sqrt(variance)
271 elif self.method == AggregationMethod.MEDIAN:
272 aggregated_value = float(np.median(valid_values))
273 # Use MAD (Median Absolute Deviation) for robust std estimate
274 mad = float(np.median(np.abs(valid_values - aggregated_value)))
275 std_dev = mad * 1.4826 # Scale factor for normal distribution
277 elif self.method == AggregationMethod.BAYESIAN: 277 ↛ 290line 277 didn't jump to line 290 because the condition on line 277 was always true
278 # Bayesian combination with Gaussian likelihood
279 # Prior: uniform over range
280 # Likelihood: Gaussian with confidence-based variance
281 precisions = valid_confidences**2 # Higher confidence = lower variance
282 total_precision = np.sum(precisions)
283 aggregated_value = float(np.sum(valid_values * precisions) / total_precision)
284 # Posterior variance
285 variance = 1.0 / total_precision
286 std_dev = float(np.sqrt(variance))
288 else:
289 # Fallback to simple average
290 aggregated_value = float(np.mean(valid_values))
291 std_dev = float(np.std(valid_values))
293 # Compute confidence bounds (95% confidence interval)
294 if len(valid_values) > 1:
295 # Use t-distribution for small samples
296 dof = len(valid_values) - 1
297 t_value = stats.t.ppf(0.975, dof) # 95% CI
298 margin = t_value * std_dev / np.sqrt(len(valid_values))
299 lower_bound = aggregated_value - margin
300 upper_bound = aggregated_value + margin
301 else:
302 lower_bound = aggregated_value
303 upper_bound = aggregated_value
305 # Compute method agreement (inverse of coefficient of variation)
306 if len(valid_values) > 1 and aggregated_value != 0:
307 cv = std_dev / abs(aggregated_value)
308 method_agreement = float(np.clip(1.0 - cv, 0.0, 1.0))
309 else:
310 method_agreement = 1.0
312 # Overall confidence (weighted average of individual confidences)
313 overall_confidence = float(np.mean(valid_confidences))
315 # Penalize confidence if agreement is low
316 if method_agreement < self.min_agreement:
317 overall_confidence *= method_agreement
318 logger.warning(
319 f"Low method agreement ({method_agreement:.2f}), "
320 f"reduced confidence to {overall_confidence:.2f}"
321 )
323 # Combine quality scores if available
324 quality_scores_raw = [
325 r.get("quality_score") for r in original_results if "quality_score" in r
326 ]
327 ensemble_quality = None
328 if quality_scores_raw and all(
329 isinstance(q, AnalysisQualityScore) for q in quality_scores_raw
330 ):
331 # Type narrowing - we know all are AnalysisQualityScore at this point
332 quality_scores: list[AnalysisQualityScore] = quality_scores_raw # type: ignore[assignment]
333 ensemble_quality = combine_quality_scores(
334 quality_scores, weights=confidences[: len(quality_scores)]
335 )
337 return EnsembleResult(
338 value=aggregated_value,
339 confidence=overall_confidence,
340 lower_bound=lower_bound,
341 upper_bound=upper_bound,
342 method_agreement=method_agreement,
343 individual_results=original_results,
344 aggregation_method=self.method,
345 quality_score=ensemble_quality,
346 outlier_methods=outlier_indices,
347 )
349 def aggregate_categorical(
350 self,
351 values: list[str],
352 confidences: list[float],
353 original_results: list[dict[str, Any]] | None = None,
354 ) -> EnsembleResult:
355 """Combine categorical values via weighted voting.
357 Args:
358 values: List of categorical values to combine.
359 confidences: Confidence scores for each value (0-1).
360 original_results: Optional original result dictionaries.
362 Returns:
363 EnsembleResult with majority vote value.
365 Raises:
366 ValueError: If values list is empty.
368 Example:
369 >>> values = ["rising", "rising", "falling", "rising"]
370 >>> confidences = [0.9, 0.85, 0.6, 0.8]
371 >>> result = aggregator.aggregate_categorical(values, confidences)
372 >>> # "rising" wins by weighted vote
373 """
374 if not values: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 raise ValueError("Cannot aggregate empty values list")
377 if original_results is None: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 original_results = [
379 {"value": v, "confidence": c} for v, c in zip(values, confidences, strict=False)
380 ]
382 # Weighted voting
383 vote_weights: dict[str, float] = {}
384 for value, confidence in zip(values, confidences, strict=False):
385 vote_weights[value] = vote_weights.get(value, 0.0) + confidence
387 # Get winner
388 winner = max(vote_weights.items(), key=lambda x: x[1])
389 aggregated_value = winner[0]
390 total_weight = sum(vote_weights.values())
392 # Confidence is the fraction of votes for winner
393 overall_confidence = winner[1] / total_weight if total_weight > 0 else 0.0
395 # Agreement is measured by vote concentration
396 # Higher agreement when votes are concentrated on one option
397 vote_counts = Counter(values)
398 total_votes = len(values)
399 max_count = vote_counts.most_common(1)[0][1]
400 method_agreement = max_count / total_votes
402 # Combine quality scores if available
403 quality_scores_raw = [
404 r.get("quality_score") for r in original_results if "quality_score" in r
405 ]
406 ensemble_quality = None
407 if quality_scores_raw and all( 407 ↛ 411line 407 didn't jump to line 411 because the condition on line 407 was never true
408 isinstance(q, AnalysisQualityScore) for q in quality_scores_raw
409 ):
410 # Type narrowing - we know all are AnalysisQualityScore at this point
411 quality_scores: list[AnalysisQualityScore] = quality_scores_raw # type: ignore[assignment]
412 ensemble_quality = combine_quality_scores(
413 quality_scores, weights=confidences[: len(quality_scores)]
414 )
416 return EnsembleResult(
417 value=aggregated_value,
418 confidence=overall_confidence,
419 lower_bound=None,
420 upper_bound=None,
421 method_agreement=method_agreement,
422 individual_results=original_results,
423 aggregation_method=self.method,
424 quality_score=ensemble_quality,
425 outlier_methods=[], # No outlier detection for categorical
426 )
428 def detect_outlier_methods(self, results: list[dict[str, Any]]) -> list[int]:
429 """Identify methods producing outlier results.
431 Uses modified Z-score (based on MAD) for robust outlier detection.
433 Args:
434 results: List of result dictionaries with "value" key.
436 Returns:
437 List of indices corresponding to outlier methods.
439 Example:
440 >>> results = [
441 ... {"value": 100}, {"value": 102}, {"value": 98}, {"value": 500}
442 ... ]
443 >>> outliers = aggregator.detect_outlier_methods(results)
444 >>> # Returns [3] - the 500 value is an outlier
445 """
446 values = [r["value"] for r in results]
448 # Only works for numeric values
449 if not all(isinstance(v, int | float | np.number) for v in values): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 return []
452 if len(values) < 3:
453 # Need at least 3 values for outlier detection
454 return []
456 values_arr = np.array(values, dtype=np.float64)
458 # Use modified Z-score based on MAD (robust to outliers)
459 median = np.median(values_arr)
460 mad = np.median(np.abs(values_arr - median))
462 if mad == 0:
463 # All values are identical
464 return []
466 # Modified Z-score
467 modified_z_scores = 0.6745 * (values_arr - median) / mad
469 # Identify outliers
470 outlier_mask = np.abs(modified_z_scores) > self.outlier_threshold
471 outlier_indices: list[int] = np.where(outlier_mask)[0].tolist()
473 if outlier_indices:
474 logger.info(f"Detected {len(outlier_indices)} outlier method(s): {outlier_indices}")
476 return outlier_indices
479# Pre-configured ensembles for common analysis types
480# Weights represent the relative reliability of each method
482FREQUENCY_ENSEMBLE: list[tuple[str, float]] = [
483 ("fft_peak", 0.4), # FFT peak is generally most reliable
484 ("zero_crossing", 0.3), # Zero crossing is robust but can be noisy
485 ("autocorrelation", 0.3), # Autocorrelation handles noise well
486]
488EDGE_DETECTION_ENSEMBLE: list[tuple[str, float]] = [
489 ("threshold_crossing", 0.5), # Most direct method
490 ("derivative", 0.3), # Good for clean signals
491 ("schmitt_trigger", 0.2), # Noise immunity but less precise
492]
494AMPLITUDE_ENSEMBLE: list[tuple[str, float]] = [
495 ("peak_to_peak", 0.4), # Direct measurement
496 ("rms", 0.3), # Robust to noise
497 ("percentile_99", 0.3), # Outlier resistant
498]
501def create_frequency_ensemble(
502 signal: NDArray[np.float64],
503 sample_rate: float,
504 method_weights: list[tuple[str, float]] | None = None,
505) -> EnsembleResult:
506 """Run multiple frequency detection methods and combine results.
508 Applies FFT peak detection, zero-crossing rate, and autocorrelation-based
509 frequency estimation, then combines using weighted averaging.
511 Args:
512 signal: Input signal array.
513 sample_rate: Sample rate in Hz.
514 method_weights: Optional custom method weights. Defaults to FREQUENCY_ENSEMBLE.
516 Returns:
517 EnsembleResult with combined frequency estimate.
519 Raises:
520 ValueError: If all frequency detection methods fail.
522 Example:
523 >>> import numpy as np
524 >>> t = np.linspace(0, 1, 1000)
525 >>> signal = np.sin(2 * np.pi * 10 * t) # 10 Hz sine
526 >>> result = create_frequency_ensemble(signal, sample_rate=1000)
527 >>> print(f"Frequency: {result.value:.2f} Hz")
528 >>> print(f"Confidence: {result.confidence:.2%}")
529 """
530 if method_weights is None:
531 method_weights = FREQUENCY_ENSEMBLE
533 results = []
535 # Method 1: FFT peak detection
536 try:
537 fft_result = np.fft.rfft(signal)
538 freqs = np.fft.rfftfreq(len(signal), d=1.0 / sample_rate)
539 peak_idx = np.argmax(np.abs(fft_result[1:])) + 1 # Skip DC
540 freq_fft = float(freqs[peak_idx])
541 # Confidence based on peak prominence
542 peak_magnitude = np.abs(fft_result[peak_idx])
543 mean_magnitude = np.mean(np.abs(fft_result[1:]))
544 confidence_fft = min(1.0, peak_magnitude / (mean_magnitude * 10))
545 results.append(
546 {
547 "value": freq_fft,
548 "confidence": confidence_fft * method_weights[0][1],
549 "method": "fft_peak",
550 }
551 )
552 except Exception as e:
553 logger.debug(f"FFT peak detection failed: {e}")
555 # Method 2: Zero crossing rate
556 try:
557 zero_crossings = np.where(np.diff(np.sign(signal)))[0]
558 if len(zero_crossings) > 1: 558 ↛ 576line 558 didn't jump to line 576 because the condition on line 558 was always true
559 # Average time between zero crossings (half period)
560 avg_half_period = np.mean(np.diff(zero_crossings)) / sample_rate
561 freq_zc = 1.0 / (2.0 * avg_half_period)
562 # Confidence based on regularity of crossings
563 std_half_period = np.std(np.diff(zero_crossings)) / sample_rate
564 confidence_zc = max(0.0, 1.0 - std_half_period / avg_half_period)
565 results.append(
566 {
567 "value": float(freq_zc),
568 "confidence": confidence_zc * method_weights[1][1],
569 "method": "zero_crossing",
570 }
571 )
572 except Exception as e:
573 logger.debug(f"Zero crossing detection failed: {e}")
575 # Method 3: Autocorrelation
576 try:
577 # Compute autocorrelation
578 autocorr = np.correlate(signal, signal, mode="full")
579 autocorr = autocorr[len(autocorr) // 2 :]
580 # Find first peak after zero lag (skip DC)
581 peaks = []
582 for i in range(1, min(len(autocorr) - 1, len(signal) // 2)):
583 if autocorr[i] > autocorr[i - 1] and autocorr[i] > autocorr[i + 1]:
584 peaks.append(i)
585 if peaks:
586 first_peak = peaks[0]
587 period_samples = first_peak
588 freq_ac = sample_rate / period_samples
589 # Confidence based on peak strength
590 peak_strength = autocorr[first_peak] / autocorr[0]
591 confidence_ac = float(np.clip(peak_strength, 0.0, 1.0))
592 results.append(
593 {
594 "value": float(freq_ac),
595 "confidence": confidence_ac * method_weights[2][1],
596 "method": "autocorrelation",
597 }
598 )
599 except Exception as e:
600 logger.debug(f"Autocorrelation detection failed: {e}")
602 if not results: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 raise ValueError("All frequency detection methods failed")
605 # Aggregate results
606 aggregator = EnsembleAggregator(method=AggregationMethod.WEIGHTED_AVERAGE)
607 return aggregator.aggregate(results)
610def create_edge_ensemble(
611 signal: NDArray[np.float64],
612 sample_rate: float,
613 threshold: float | None = None,
614 method_weights: list[tuple[str, float]] | None = None,
615) -> EnsembleResult:
616 """Run multiple edge detection methods and combine results.
618 Applies threshold crossing, derivative-based, and Schmitt trigger edge
619 detection, then combines results using weighted voting or averaging.
621 Args:
622 signal: Input signal array.
623 sample_rate: Sample rate in Hz.
624 threshold: Detection threshold. If None, uses signal midpoint.
625 method_weights: Optional custom method weights. Defaults to EDGE_DETECTION_ENSEMBLE.
627 Returns:
628 EnsembleResult with combined edge detection results.
630 Raises:
631 ValueError: If all edge detection methods fail.
633 Example:
634 >>> signal = np.array([0, 0, 1, 1, 0, 0, 1, 1])
635 >>> result = create_edge_ensemble(signal, sample_rate=1000)
636 >>> print(f"Edge count: {result.value}")
637 >>> print(f"Agreement: {result.method_agreement:.2%}")
638 """
639 if method_weights is None:
640 method_weights = EDGE_DETECTION_ENSEMBLE
642 if threshold is None:
643 threshold = float((np.max(signal) + np.min(signal)) / 2.0)
645 results = []
647 # Method 1: Threshold crossing
648 try:
649 crossings = np.where(np.diff(np.sign(signal - threshold)))[0]
650 edge_count_tc = len(crossings)
651 # Confidence based on signal quality (SNR proxy)
652 signal_range = np.ptp(signal)
653 noise_estimate = np.std(np.diff(signal))
654 confidence_tc = (
655 min(1.0, signal_range / (noise_estimate * 10)) if noise_estimate > 0 else 0.5
656 )
657 results.append(
658 {
659 "value": edge_count_tc,
660 "confidence": confidence_tc * method_weights[0][1],
661 "method": "threshold_crossing",
662 }
663 )
664 except Exception as e:
665 logger.debug(f"Threshold crossing detection failed: {e}")
667 # Method 2: Derivative-based
668 try:
669 derivative = np.diff(signal)
670 # Find peaks in absolute derivative
671 deriv_std = np.std(derivative)
672 deriv_threshold = deriv_std * 2
673 edge_indices = np.where(np.abs(derivative) > deriv_threshold)[0]
674 # Remove consecutive detections (within 2 samples)
675 filtered_edges = []
676 for i, idx in enumerate(edge_indices):
677 if i == 0 or idx - edge_indices[i - 1] > 2:
678 filtered_edges.append(idx)
679 edge_count_deriv = len(filtered_edges)
680 # Confidence based on peak derivative prominence above threshold
681 # Higher max derivative relative to threshold means clearer edges
682 max_deriv = np.max(np.abs(derivative)) if len(derivative) > 0 else 0.0
683 prominence_ratio = (max_deriv / deriv_threshold) if deriv_threshold > 0 else 0.0
684 confidence_deriv = float(
685 np.clip(prominence_ratio / 3.0, 0.0, 1.0)
686 ) # Normalize: 3x threshold = 100%
687 results.append(
688 {
689 "value": edge_count_deriv,
690 "confidence": confidence_deriv * method_weights[1][1],
691 "method": "derivative",
692 }
693 )
694 except Exception as e:
695 logger.debug(f"Derivative edge detection failed: {e}")
697 # Method 3: Schmitt trigger (hysteresis)
698 try:
699 hysteresis = float(np.std(signal) * 0.1)
700 thresh_high = threshold + hysteresis
701 thresh_low = threshold - hysteresis
702 state = signal[0] > threshold
703 edge_count_schmitt = 0
704 for val in signal:
705 if not state and val > thresh_high:
706 edge_count_schmitt += 1
707 state = True
708 elif state and val < thresh_low:
709 edge_count_schmitt += 1
710 state = False
711 # Confidence based on hysteresis effectiveness
712 confidence_schmitt = 0.7 # Lower base confidence due to hysteresis delay
713 results.append(
714 {
715 "value": edge_count_schmitt,
716 "confidence": confidence_schmitt * method_weights[2][1],
717 "method": "schmitt_trigger",
718 }
719 )
720 except Exception as e:
721 logger.debug(f"Schmitt trigger detection failed: {e}")
723 if not results: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 raise ValueError("All edge detection methods failed")
726 # Aggregate results (use median for integer counts)
727 aggregator = EnsembleAggregator(method=AggregationMethod.MEDIAN)
728 return aggregator.aggregate(results)
731__all__ = [
732 "AMPLITUDE_ENSEMBLE",
733 "EDGE_DETECTION_ENSEMBLE",
734 "FREQUENCY_ENSEMBLE",
735 "AggregationMethod",
736 "EnsembleAggregator",
737 "EnsembleResult",
738 "create_edge_ensemble",
739 "create_frequency_ensemble",
740]