Coverage for src / tracekit / inference / bayesian.py: 11%
266 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Bayesian inference for signal analysis and protocol characterization.
3This module provides probabilistic reasoning about signal characteristics and
4protocol properties using Bayesian updating. It enables inference with full
5uncertainty quantification and supports sequential updates as more data arrives.
8Key Features:
9- Prior distributions for common signal properties (baud rate, frequency, etc.)
10- Likelihood functions for observed measurements
11- Posterior calculation with credible intervals
12- Integration with quality scoring system (0-1 confidence mapping)
13- Sequential Bayesian updating for streaming analysis
14- Support for multiple distribution families (normal, uniform, beta, etc.)
16Example:
17 >>> from tracekit.inference.bayesian import BayesianInference, infer_with_uncertainty
18 >>> import numpy as np
19 >>>
20 >>> # Infer baud rate from edge timing observations
21 >>> inference = BayesianInference()
22 >>> edge_times = np.array([0.0, 0.00001, 0.00002, 0.00003]) # 100 kHz
23 >>> posterior = inference.infer_baud_rate(edge_times)
24 >>> print(f"Baud rate: {posterior.mean:.0f} ± {posterior.std:.0f}")
25 >>> print(f"95% CI: [{posterior.ci_lower:.0f}, {posterior.ci_upper:.0f}]")
26 >>> print(f"Confidence: {posterior.confidence:.2%}")
27 >>>
28 >>> # Infer number of symbols from amplitude histogram
29 >>> amplitudes = np.random.choice([0.0, 0.33, 0.67, 1.0], size=1000)
30 >>> histogram, _ = np.histogram(amplitudes, bins=50)
31 >>> symbol_posterior = inference.infer_symbol_count(histogram)
32 >>> print(f"Estimated symbols: {int(symbol_posterior.mean)}")
33 >>>
34 >>> # Sequential updating for streaming data
35 >>> from tracekit.inference.bayesian import SequentialBayesian, Prior
36 >>> prior = Prior("normal", {"mean": 115200, "std": 10000})
37 >>> sequential = SequentialBayesian("baud_rate", prior)
38 >>> for _observation in streaming_data:
39 ... posterior = sequential.update(likelihood_fn)
40 ... if sequential.get_confidence() > 0.95:
41 ... break # High confidence reached
43References:
44 - Gelman et al., "Bayesian Data Analysis" (3rd ed.)
45 - Murphy, "Machine Learning: A Probabilistic Perspective"
46 - scipy.stats documentation for distribution families
47"""
49from __future__ import annotations
51from collections.abc import Callable
52from dataclasses import dataclass
53from typing import TYPE_CHECKING, Any
55import numpy as np
56from scipy import stats
57from scipy.signal import find_peaks
59from tracekit.core.exceptions import AnalysisError, InsufficientDataError
61if TYPE_CHECKING:
62 from numpy.typing import NDArray
65@dataclass
66class Prior:
67 """Prior distribution for a parameter.
69 Represents prior belief about a parameter before observing data.
70 Supports common distribution families used in signal analysis.
72 Attributes:
73 distribution: Distribution family name (e.g., "normal", "uniform", "beta").
74 params: Distribution parameters as dict (keys depend on distribution).
76 Supported distributions:
77 - "normal": params = {"mean": float, "std": float}
78 - "uniform": params = {"low": float, "high": float}
79 - "log_uniform": params = {"low": float, "high": float} (for scale-invariant priors)
80 - "beta": params = {"a": float, "b": float} (for probabilities)
81 - "gamma": params = {"shape": float, "scale": float} (for positive values)
82 - "half_normal": params = {"scale": float} (for positive values like noise std)
83 - "geometric": params = {"p": float} (for discrete counts)
85 Example:
86 >>> # Weakly informative prior for baud rate (log-uniform over range)
87 >>> prior = Prior("log_uniform", {"low": 100, "high": 10_000_000})
88 >>> samples = prior.sample(1000)
89 >>> density = prior.pdf(115200)
90 >>>
91 >>> # Prior for duty cycle (beta distribution centered at 0.5)
92 >>> duty_prior = Prior("beta", {"a": 2, "b": 2})
93 """
95 distribution: str
96 params: dict[str, float]
98 def __post_init__(self) -> None:
99 """Validate distribution parameters after initialization."""
100 valid_distributions = {
101 "normal",
102 "uniform",
103 "log_uniform",
104 "beta",
105 "gamma",
106 "half_normal",
107 "geometric",
108 }
110 if self.distribution not in valid_distributions:
111 raise ValueError(
112 f"Unknown distribution: {self.distribution}. "
113 f"Supported: {sorted(valid_distributions)}"
114 )
116 # Validate required parameters for each distribution
117 required_params = {
118 "normal": {"mean", "std"},
119 "uniform": {"low", "high"},
120 "log_uniform": {"low", "high"},
121 "beta": {"a", "b"},
122 "gamma": {"shape", "scale"},
123 "half_normal": {"scale"},
124 "geometric": {"p"},
125 }
127 required = required_params[self.distribution]
128 missing = required - set(self.params.keys())
129 if missing:
130 raise ValueError(f"Missing parameters for {self.distribution} distribution: {missing}")
132 def pdf(self, x: float | NDArray[np.floating[Any]]) -> float | NDArray[np.floating[Any]]:
133 """Compute probability density at x.
135 Args:
136 x: Value(s) at which to evaluate density.
138 Returns:
139 Probability density value(s).
141 Raises:
142 ValueError: If distribution is not recognized.
143 """
144 if self.distribution == "normal":
145 return float(stats.norm.pdf(x, loc=self.params["mean"], scale=self.params["std"])) # type: ignore[no-any-return]
146 elif self.distribution == "uniform":
147 return float( # type: ignore[no-any-return]
148 stats.uniform.pdf(
149 x, loc=self.params["low"], scale=self.params["high"] - self.params["low"]
150 )
151 )
152 elif self.distribution == "log_uniform":
153 # Log-uniform: uniform on log scale
154 log_low = np.log(self.params["low"])
155 log_high = np.log(self.params["high"])
156 log_x = np.log(np.maximum(x, 1e-100)) # Avoid log(0)
157 density = stats.uniform.pdf(log_x, loc=log_low, scale=log_high - log_low)
158 # Jacobian correction: d(log x)/dx = 1/x
159 result = density / np.maximum(x, 1e-100)
160 return result # type: ignore[return-value, no-any-return]
161 elif self.distribution == "beta":
162 return float(stats.beta.pdf(x, a=self.params["a"], b=self.params["b"])) # type: ignore[no-any-return]
163 elif self.distribution == "gamma":
164 return float(stats.gamma.pdf(x, a=self.params["shape"], scale=self.params["scale"])) # type: ignore[no-any-return]
165 elif self.distribution == "half_normal":
166 return float(stats.halfnorm.pdf(x, scale=self.params["scale"])) # type: ignore[no-any-return]
167 elif self.distribution == "geometric":
168 return float(stats.geom.pmf(x, p=self.params["p"])) # type: ignore[no-any-return]
169 else:
170 raise ValueError(f"PDF not implemented for {self.distribution}")
172 def sample(self, n: int = 1) -> NDArray[np.floating[Any]]:
173 """Draw samples from prior distribution.
175 Args:
176 n: Number of samples to draw.
178 Returns:
179 Array of samples from the prior.
181 Raises:
182 ValueError: If distribution is not recognized.
183 """
184 if self.distribution == "normal":
185 return stats.norm.rvs(loc=self.params["mean"], scale=self.params["std"], size=n) # type: ignore[no-any-return]
186 elif self.distribution == "uniform":
187 return stats.uniform.rvs( # type: ignore[no-any-return]
188 loc=self.params["low"], scale=self.params["high"] - self.params["low"], size=n
189 )
190 elif self.distribution == "log_uniform":
191 # Sample uniformly on log scale, then exponentiate
192 log_low = np.log(self.params["low"])
193 log_high = np.log(self.params["high"])
194 log_samples = stats.uniform.rvs(loc=log_low, scale=log_high - log_low, size=n) # type: ignore[no-any-return]
195 return np.exp(log_samples) # type: ignore[no-any-return]
196 elif self.distribution == "beta":
197 return stats.beta.rvs(a=self.params["a"], b=self.params["b"], size=n) # type: ignore[no-any-return]
198 elif self.distribution == "gamma":
199 return stats.gamma.rvs(a=self.params["shape"], scale=self.params["scale"], size=n) # type: ignore[no-any-return]
200 elif self.distribution == "half_normal":
201 return stats.halfnorm.rvs(scale=self.params["scale"], size=n) # type: ignore[no-any-return]
202 elif self.distribution == "geometric":
203 return stats.geom.rvs(p=self.params["p"], size=n) # type: ignore[no-any-return]
204 else:
205 raise ValueError(f"Sampling not implemented for {self.distribution}")
208@dataclass
209class Posterior:
210 """Posterior distribution after updating with evidence.
212 Represents updated belief about a parameter after observing data.
213 Provides point estimates, uncertainty quantification, and confidence scores.
215 Attributes:
216 mean: Posterior mean (point estimate).
217 std: Posterior standard deviation (uncertainty).
218 ci_lower: Lower bound of 95% credible interval.
219 ci_upper: Upper bound of 95% credible interval.
220 samples: Optional array of posterior samples (for non-parametric posteriors).
222 Example:
223 >>> posterior = Posterior(mean=115200, std=5000, ci_lower=105600, ci_upper=124800)
224 >>> print(f"Estimate: {posterior.mean:.0f} ± {posterior.std:.0f}")
225 >>> print(f"95% CI: [{posterior.ci_lower:.0f}, {posterior.ci_upper:.0f}]")
226 >>> print(f"Confidence: {posterior.confidence:.2%}")
227 """
229 mean: float
230 std: float
231 ci_lower: float
232 ci_upper: float
233 samples: NDArray[np.floating[Any]] | None = None
235 @property
236 def confidence(self) -> float:
237 """Convert posterior certainty to 0-1 confidence score.
239 Maps posterior standard deviation to confidence using an empirical formula.
240 Lower std (more certain) -> higher confidence.
242 The mapping is based on coefficient of variation (CV = std/mean):
243 - CV < 0.05 (5%): High confidence (~0.95)
244 - CV ~ 0.10 (10%): Medium confidence (~0.85)
245 - CV ~ 0.20 (20%): Low confidence (~0.70)
246 - CV > 0.50 (50%): Very low confidence (~0.50)
248 Returns:
249 Confidence score between 0 and 1.
251 Example:
252 >>> # Low uncertainty -> high confidence
253 >>> p1 = Posterior(mean=100, std=5, ci_lower=90, ci_upper=110)
254 >>> p1.confidence # ~0.95
255 >>>
256 >>> # High uncertainty -> low confidence
257 >>> p2 = Posterior(mean=100, std=30, ci_lower=40, ci_upper=160)
258 >>> p2.confidence # ~0.70
259 """
260 # Avoid division by zero
261 if abs(self.mean) < 1e-10:
262 cv = self.std / 1e-10
263 else:
264 cv = abs(self.std / self.mean) # Coefficient of variation
266 # Map CV to confidence using sigmoid-like function
267 # confidence = 1 - min(1, cv / scale_factor)
268 # Scale factor determines how quickly confidence drops with uncertainty
269 scale_factor = 0.5 # 50% CV -> 0% confidence
270 confidence = 1.0 - min(1.0, cv / scale_factor)
272 # Ensure in valid range [0, 1]
273 return max(0.0, min(1.0, confidence))
276class BayesianInference:
277 """Bayesian inference for signal analysis.
279 Provides methods for inferring signal properties (baud rate, frequency,
280 symbol count, etc.) with full uncertainty quantification using Bayesian
281 methods.
283 Attributes:
284 priors: Dictionary of default prior distributions for common parameters.
286 Example:
287 >>> inference = BayesianInference()
288 >>>
289 >>> # Infer baud rate from edge timings
290 >>> edge_times = np.array([0.0, 0.00001, 0.00002, 0.00003])
291 >>> posterior = inference.infer_baud_rate(edge_times)
292 >>>
293 >>> # Infer protocol type probabilities
294 >>> observations = {"idle_level": "high", "regularity": 0.3, "duty_cycle": 0.9}
295 >>> protocol_probs = inference.infer_protocol_type(observations)
296 >>> print(protocol_probs) # {"UART": 0.85, "I2C": 0.10, "SPI": 0.05}
297 """
299 def __init__(self) -> None:
300 """Initialize Bayesian inference engine with default priors."""
301 self.priors = self._default_priors()
303 def _default_priors(self) -> dict[str, Prior]:
304 """Create default priors for common signal properties.
306 Returns:
307 Dictionary mapping parameter names to Prior objects.
309 Priors are designed to be weakly informative:
310 - Broad enough to cover typical use cases
311 - Narrow enough to provide regularization
312 - Match physical constraints (e.g., positive values)
313 """
314 return {
315 # Log-uniform for scale-invariant parameters (wide range)
316 "baud_rate": Prior("log_uniform", {"low": 100, "high": 10_000_000}),
317 "frequency": Prior("log_uniform", {"low": 1, "high": 1e9}),
318 # Beta distribution for probabilities/proportions
319 "duty_cycle": Prior("beta", {"a": 2, "b": 2}), # Centered at 0.5
320 # Half-normal for positive values (noise, std, etc.)
321 "noise_std": Prior("half_normal", {"scale": 0.1}),
322 # Geometric for discrete counts (favor smaller values)
323 "num_symbols": Prior("geometric", {"p": 0.3}),
324 # Normal for typical signal characteristics
325 "amplitude": Prior("normal", {"mean": 0.0, "std": 1.0}),
326 "offset": Prior("normal", {"mean": 0.0, "std": 0.1}),
327 }
329 def update(
330 self,
331 param: str,
332 likelihood_fn: Callable[[float], float],
333 *,
334 prior: Prior | None = None,
335 num_samples: int = 10000,
336 ) -> Posterior:
337 """Update belief about parameter given observation.
339 General-purpose Bayesian updating using sampling-based inference.
340 Uses the prior distribution and likelihood function to compute
341 the posterior via importance sampling.
343 Args:
344 param: Parameter name (used to get default prior if not provided).
345 likelihood_fn: Function that computes likelihood p(observation | param_value).
346 prior: Prior distribution (uses default if None).
347 num_samples: Number of samples for posterior approximation.
349 Returns:
350 Posterior distribution with mean, std, and credible intervals.
352 Raises:
353 ValueError: If parameter is unknown and no prior is provided.
354 AnalysisError: If likelihood function fails.
356 Example:
357 >>> def likelihood(rate: float) -> float:
358 ... # Example: Poisson likelihood for event rate
359 ... observed_count = 42
360 ... time_window = 1.0
361 ... expected = rate * time_window
362 ... return stats.poisson.pmf(observed_count, mu=expected)
363 >>>
364 >>> inference = BayesianInference()
365 >>> posterior = inference.update("frequency", likelihood)
366 """
367 # Get prior
368 if prior is None:
369 if param not in self.priors:
370 raise ValueError(
371 f"Unknown parameter '{param}' and no prior provided. "
372 f"Known parameters: {list(self.priors.keys())}"
373 )
374 prior = self.priors[param]
376 # Sample from prior
377 try:
378 samples = prior.sample(num_samples)
379 except Exception as e:
380 raise AnalysisError(
381 f"Failed to sample from prior for '{param}'",
382 details=str(e),
383 ) from e
385 # Compute likelihood for each sample
386 try:
387 likelihoods = np.array([likelihood_fn(s) for s in samples])
388 except Exception as e:
389 raise AnalysisError(
390 f"Likelihood function failed for '{param}'",
391 details=str(e),
392 fix_hint="Check that likelihood_fn is compatible with prior samples",
393 ) from e
395 # Check for valid likelihoods
396 if np.all(likelihoods == 0):
397 raise AnalysisError(
398 f"All likelihood values are zero for '{param}'",
399 details="Observation may be incompatible with prior range",
400 fix_hint="Adjust prior range or check likelihood function",
401 )
403 # Compute importance weights (posterior proportional to prior * likelihood)
404 weights = likelihoods / np.sum(likelihoods)
406 # Compute posterior statistics
407 mean = float(np.sum(samples * weights))
408 variance = float(np.sum(weights * (samples - mean) ** 2))
409 std = float(np.sqrt(variance))
411 # Compute 95% credible interval via weighted percentiles
412 sorted_indices = np.argsort(samples)
413 sorted_samples = samples[sorted_indices]
414 sorted_weights = weights[sorted_indices]
415 cumsum = np.cumsum(sorted_weights)
417 # Find 2.5th and 97.5th percentiles
418 ci_lower = float(sorted_samples[np.searchsorted(cumsum, 0.025)])
419 ci_upper = float(sorted_samples[np.searchsorted(cumsum, 0.975)])
421 return Posterior(
422 mean=mean,
423 std=std,
424 ci_lower=ci_lower,
425 ci_upper=ci_upper,
426 samples=samples,
427 )
429 def infer_baud_rate(
430 self, edge_times: NDArray[np.floating[Any]], *, prior: Prior | None = None
431 ) -> Posterior:
432 """Infer baud rate from edge timing observations.
434 Uses the distribution of inter-edge intervals to infer the underlying
435 baud rate. Assumes edges occur at bit boundaries.
437 Args:
438 edge_times: Array of edge timestamps in seconds.
439 prior: Optional prior for baud rate (uses default if None).
441 Returns:
442 Posterior distribution for baud rate in bits per second.
444 Raises:
445 InsufficientDataError: If fewer than 2 edges provided.
447 Example:
448 >>> # 115200 baud UART (bit period = 8.68 μs)
449 >>> edge_times = np.array([0, 8.68e-6, 17.36e-6, 26.04e-6])
450 >>> posterior = inference.infer_baud_rate(edge_times)
451 >>> print(f"Baud rate: {posterior.mean:.0f} bps")
452 """
453 if len(edge_times) < 2:
454 raise InsufficientDataError(
455 "Need at least 2 edges to infer baud rate",
456 required=2,
457 available=len(edge_times),
458 )
460 # Compute inter-edge intervals
461 intervals = np.diff(edge_times)
463 # Filter out zero/negative intervals (should not happen but be safe)
464 intervals = intervals[intervals > 0]
466 if len(intervals) == 0:
467 raise InsufficientDataError("No valid inter-edge intervals found")
469 # Likelihood: assume intervals are Gaussian around 1/baud_rate
470 # Multiple of bit period due to encoding (e.g., start/stop bits)
471 def likelihood(baud_rate: float) -> float:
472 if baud_rate <= 0:
473 return 0.0
474 bit_period = 1.0 / baud_rate
475 # Intervals should be multiples of bit_period
476 # Use smallest interval as proxy for single bit period
477 min_interval = np.min(intervals)
478 # Likelihood: intervals match multiples of estimated bit period
479 expected = min_interval
480 # Gaussian likelihood around expected interval
481 sigma = expected * 0.1 # 10% uncertainty
482 log_likelihood = -0.5 * ((expected - bit_period) / sigma) ** 2
483 return float(np.exp(log_likelihood))
485 return self.update("baud_rate", likelihood, prior=prior)
487 def infer_protocol_type(self, observations: dict[str, Any]) -> dict[str, float]:
488 """Infer probability of each protocol type given observations.
490 Uses a simple Bayesian classifier to compute posterior probabilities
491 for different protocol types (UART, SPI, I2C, CAN) based on observed
492 signal characteristics.
494 Args:
495 observations: Dictionary of observed signal characteristics:
496 - "idle_level": "high" or "low"
497 - "regularity": 0-1 (edge regularity)
498 - "duty_cycle": 0-1 (fraction time high)
499 - "symbol_rate": Hz (optional)
500 - "transition_density": edges/sec (optional)
502 Returns:
503 Dictionary mapping protocol names to posterior probabilities.
504 Probabilities sum to 1.0.
506 Example:
507 >>> observations = {
508 ... "idle_level": "high",
509 ... "regularity": 0.25,
510 ... "duty_cycle": 0.85,
511 ... "symbol_rate": 115200,
512 ... }
513 >>> probs = inference.infer_protocol_type(observations)
514 >>> print(probs) # {"UART": 0.85, "I2C": 0.10, "SPI": 0.03, "CAN": 0.02}
515 """
516 # Prior probabilities (uniform over protocols)
517 protocols = ["UART", "SPI", "I2C", "CAN"]
518 prior_prob = 1.0 / len(protocols)
520 # Compute likelihoods for each protocol
521 likelihoods = {
522 "UART": self._likelihood_uart(observations),
523 "SPI": self._likelihood_spi(observations),
524 "I2C": self._likelihood_i2c(observations),
525 "CAN": self._likelihood_can(observations),
526 }
528 # Compute posterior probabilities (Bayes' theorem)
529 posteriors = {proto: prior_prob * likelihoods[proto] for proto in protocols}
531 # Normalize to sum to 1
532 total = sum(posteriors.values())
533 if total > 0:
534 posteriors = {proto: prob / total for proto, prob in posteriors.items()}
535 else:
536 # No evidence - return uniform
537 posteriors = {proto: 1.0 / len(protocols) for proto in protocols}
539 return posteriors
541 def _likelihood_uart(self, obs: dict[str, Any]) -> float:
542 """Compute likelihood of observations given UART protocol."""
543 likelihood = 1.0
545 # UART characteristics: idle high, low regularity, high duty cycle
546 if obs.get("idle_level") == "high":
547 likelihood *= 0.9
548 else:
549 likelihood *= 0.2
551 regularity = obs.get("regularity", 0.5)
552 if regularity < 0.3:
553 likelihood *= 0.8
554 elif regularity > 0.7:
555 likelihood *= 0.2
557 duty_cycle = obs.get("duty_cycle", 0.5)
558 if duty_cycle > 0.7:
559 likelihood *= 0.8
560 elif duty_cycle < 0.3:
561 likelihood *= 0.3
563 return likelihood
565 def _likelihood_spi(self, obs: dict[str, Any]) -> float:
566 """Compute likelihood of observations given SPI protocol."""
567 likelihood = 1.0
569 # SPI characteristics: regular clock, ~50% duty, high transition density
570 regularity = obs.get("regularity", 0.5)
571 if regularity > 0.7:
572 likelihood *= 0.9
573 elif regularity < 0.4:
574 likelihood *= 0.2
576 duty_cycle = obs.get("duty_cycle", 0.5)
577 duty_error = abs(duty_cycle - 0.5)
578 if duty_error < 0.1:
579 likelihood *= 0.8
580 elif duty_error > 0.3:
581 likelihood *= 0.3
583 return likelihood
585 def _likelihood_i2c(self, obs: dict[str, Any]) -> float:
586 """Compute likelihood of observations given I2C protocol."""
587 likelihood = 1.0
589 # I2C characteristics: idle high, moderate regularity
590 if obs.get("idle_level") == "high":
591 likelihood *= 0.85
592 else:
593 likelihood *= 0.3
595 regularity = obs.get("regularity", 0.5)
596 if 0.4 < regularity < 0.8:
597 likelihood *= 0.8
598 else:
599 likelihood *= 0.4
601 return likelihood
603 def _likelihood_can(self, obs: dict[str, Any]) -> float:
604 """Compute likelihood of observations given CAN protocol."""
605 likelihood = 1.0
607 # CAN characteristics: idle high, moderate irregularity (bit stuffing)
608 if obs.get("idle_level") == "high":
609 likelihood *= 0.85
610 else:
611 likelihood *= 0.2
613 regularity = obs.get("regularity", 0.5)
614 if 0.3 < regularity < 0.7:
615 likelihood *= 0.8
616 else:
617 likelihood *= 0.4
619 # Check for standard CAN baud rates
620 symbol_rate = obs.get("symbol_rate", 0)
621 if symbol_rate > 0:
622 standard_rates = [125000, 250000, 500000, 1000000]
623 for rate in standard_rates:
624 if abs(symbol_rate - rate) / rate < 0.1:
625 likelihood *= 1.5
626 break
628 return likelihood
630 def infer_symbol_count(
631 self,
632 amplitude_histogram: NDArray[np.floating[Any]],
633 *,
634 prior: Prior | None = None,
635 max_symbols: int = 16,
636 ) -> Posterior:
637 """Infer number of discrete symbols from amplitude distribution.
639 Analyzes the amplitude histogram to determine how many discrete
640 signal levels (symbols) are present. Useful for multi-level signaling
641 (PAM-4, etc.).
643 Args:
644 amplitude_histogram: Histogram of signal amplitudes (bin counts).
645 prior: Optional prior for symbol count (uses default if None).
646 max_symbols: Maximum number of symbols to consider.
648 Returns:
649 Posterior distribution for number of symbols.
651 Raises:
652 InsufficientDataError: If histogram is empty or all zeros.
654 Example:
655 >>> # PAM-4 signal (4 levels)
656 >>> amplitudes = np.random.choice([0.0, 0.33, 0.67, 1.0], size=1000)
657 >>> hist, _ = np.histogram(amplitudes, bins=50)
658 >>> posterior = inference.infer_symbol_count(hist)
659 >>> print(f"Symbols: {int(posterior.mean)}") # Should be close to 4
660 """
661 if len(amplitude_histogram) == 0 or np.sum(amplitude_histogram) == 0:
662 raise InsufficientDataError("Amplitude histogram is empty or all zeros")
664 # Pre-compute peaks in histogram (optimization: only compute once)
665 # Use robust peak detection that also considers edge bins
666 prominence_threshold = np.max(amplitude_histogram) * 0.1
667 detected_peaks, _ = find_peaks(amplitude_histogram, prominence=prominence_threshold)
668 peak_indices = list(detected_peaks)
670 # Check edge bins - they can be peaks but find_peaks won't detect them
671 # First bin is a peak if it's significant and higher than second bin
672 if len(amplitude_histogram) > 1:
673 if (
674 amplitude_histogram[0] > prominence_threshold
675 and amplitude_histogram[0] > amplitude_histogram[1]
676 ):
677 peak_indices.insert(0, 0)
678 # Last bin is a peak if significant and higher than second-to-last
679 if (
680 amplitude_histogram[-1] > prominence_threshold
681 and amplitude_histogram[-1] > amplitude_histogram[-2]
682 ):
683 peak_indices.append(len(amplitude_histogram) - 1)
685 num_peaks = len(peak_indices)
687 # Likelihood: number of peaks in histogram should match symbol count
688 def likelihood(num_symbols: float) -> float:
689 k = int(round(num_symbols))
690 if k < 1 or k > max_symbols:
691 return 0.0
693 # Likelihood: peaks should match symbols
694 # Allow ±1 tolerance for noise
695 if abs(num_peaks - k) <= 1:
696 return 0.8
697 elif abs(num_peaks - k) == 2:
698 return 0.3
699 else:
700 return 0.1
702 return self.update(
703 "num_symbols",
704 likelihood,
705 prior=prior,
706 num_samples=max_symbols * 100,
707 )
710class SequentialBayesian:
711 """Sequential Bayesian updating for streaming analysis.
713 Maintains a posterior that is updated as new observations arrive.
714 Useful for online/streaming signal analysis where data comes in
715 incrementally.
717 Attributes:
718 param: Parameter name being inferred.
719 current_posterior: Current posterior after all updates so far.
721 Example:
722 >>> from tracekit.inference.bayesian import SequentialBayesian, Prior
723 >>> prior = Prior("normal", {"mean": 115200, "std": 10000})
724 >>> sequential = SequentialBayesian("baud_rate", prior)
725 >>>
726 >>> # Update with streaming observations
727 >>> for _observation in streaming_data:
728 ... posterior = sequential.update(likelihood_fn)
729 ... print(f"Current estimate: {posterior.mean:.0f} (confidence: {sequential.get_confidence():.2%})")
730 ... if sequential.get_confidence() > 0.95:
731 ... break # High confidence reached
732 """
734 def __init__(self, param: str, prior: Prior) -> None:
735 """Initialize sequential Bayesian updater.
737 Args:
738 param: Parameter name being inferred.
739 prior: Initial prior distribution.
740 """
741 self.param = param
742 self.current_posterior: Prior | Posterior = prior
743 self._samples: list[NDArray[np.floating[Any]]] = []
744 self._weights: list[NDArray[np.floating[Any]]] = []
746 def update(
747 self,
748 likelihood_fn: Callable[[float], float],
749 *,
750 num_samples: int = 5000,
751 ) -> Posterior:
752 """Update posterior with new observation.
754 Performs one step of sequential Bayesian updating. The current
755 posterior becomes the prior for the next update.
757 Args:
758 likelihood_fn: Likelihood function p(observation | param).
759 num_samples: Number of samples for approximation.
761 Returns:
762 Updated posterior distribution.
764 Raises:
765 AnalysisError: If likelihood function fails.
766 """
767 # If we have a Prior, sample from it
768 if isinstance(self.current_posterior, Prior):
769 samples = self.current_posterior.sample(num_samples)
770 else:
771 # Resample from previous posterior (must be Posterior)
772 if self.current_posterior.samples is not None:
773 # Resample with replacement
774 indices = np.random.choice(
775 len(self.current_posterior.samples),
776 size=num_samples,
777 replace=True,
778 )
779 samples = self.current_posterior.samples[indices]
780 else:
781 # Approximate as normal distribution
782 samples = np.random.normal(
783 self.current_posterior.mean,
784 self.current_posterior.std,
785 size=num_samples,
786 )
788 # Compute likelihoods
789 try:
790 likelihoods = np.array([likelihood_fn(s) for s in samples])
791 except Exception as e:
792 raise AnalysisError(
793 f"Likelihood function failed for '{self.param}'",
794 details=str(e),
795 ) from e
797 # Check for valid likelihoods
798 if np.all(likelihoods == 0):
799 # No update - keep current posterior
800 return (
801 self.current_posterior
802 if isinstance(self.current_posterior, Posterior)
803 else Posterior(mean=0.0, std=1.0, ci_lower=-2.0, ci_upper=2.0)
804 )
806 # Compute importance weights
807 weights = likelihoods / np.sum(likelihoods)
809 # Store for potential resampling
810 self._samples.append(samples)
811 self._weights.append(weights)
813 # Compute posterior statistics
814 mean = float(np.sum(samples * weights))
815 variance = float(np.sum(weights * (samples - mean) ** 2))
816 std = float(np.sqrt(variance))
818 # Credible interval
819 sorted_indices = np.argsort(samples)
820 sorted_samples = samples[sorted_indices]
821 sorted_weights = weights[sorted_indices]
822 cumsum = np.cumsum(sorted_weights)
824 ci_lower = float(sorted_samples[np.searchsorted(cumsum, 0.025)])
825 ci_upper = float(sorted_samples[np.searchsorted(cumsum, 0.975)])
827 posterior = Posterior(
828 mean=mean,
829 std=std,
830 ci_lower=ci_lower,
831 ci_upper=ci_upper,
832 samples=samples,
833 )
835 self.current_posterior = posterior
836 return posterior
838 def get_confidence(self) -> float:
839 """Get current confidence in estimate.
841 Returns:
842 Confidence score between 0 and 1.
843 """
844 if isinstance(self.current_posterior, Posterior):
845 return self.current_posterior.confidence
846 else:
847 # Prior - no evidence yet
848 return 0.0
851def infer_with_uncertainty(
852 measurements: list[float] | NDArray[np.floating[Any]],
853 prior: Prior | None = None,
854) -> Posterior:
855 """Infer parameter value with full uncertainty quantification.
857 Convenience function for simple parameter inference from repeated measurements.
858 Assumes measurements are normally distributed around the true value.
860 Args:
861 measurements: List or array of independent measurements.
862 prior: Optional prior distribution (uses uninformative normal if None).
864 Returns:
865 Posterior distribution combining prior and measurement likelihood.
867 Raises:
868 InsufficientDataError: If measurements list is empty.
870 Example:
871 >>> # Repeated measurements of signal frequency
872 >>> measurements = [99.8, 100.2, 99.9, 100.1, 100.0] # Hz
873 >>> posterior = infer_with_uncertainty(measurements)
874 >>> print(f"Frequency: {posterior.mean:.2f} ± {posterior.std:.2f} Hz")
875 >>> print(f"95% CI: [{posterior.ci_lower:.2f}, {posterior.ci_upper:.2f}] Hz")
876 """
877 measurements_array = np.asarray(measurements)
879 if len(measurements_array) == 0:
880 raise InsufficientDataError(
881 "Cannot infer from empty measurements",
882 required=1,
883 available=0,
884 )
886 # Use sample statistics if no prior
887 if prior is None:
888 # Uninformative prior centered at sample mean
889 sample_mean = float(np.mean(measurements_array))
890 sample_std = (
891 float(np.std(measurements_array, ddof=1)) if len(measurements_array) > 1 else 1.0
892 )
893 prior = Prior("normal", {"mean": sample_mean, "std": sample_std * 10})
895 # Likelihood: measurements are Gaussian around true value
896 measurement_std = (
897 float(np.std(measurements_array, ddof=1)) if len(measurements_array) > 1 else 1.0
898 )
900 def likelihood(param_value: float) -> float:
901 # Product of Gaussian likelihoods
902 log_likelihood = -0.5 * np.sum(((measurements_array - param_value) / measurement_std) ** 2)
903 return float(np.exp(log_likelihood))
905 inference = BayesianInference()
906 return inference.update(
907 "parameter",
908 likelihood,
909 prior=prior,
910 )
913__all__ = [
914 "BayesianInference",
915 "Posterior",
916 "Prior",
917 "SequentialBayesian",
918 "infer_with_uncertainty",
919]