Coverage for src / tracekit / inference / bayesian.py: 11%

266 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Bayesian inference for signal analysis and protocol characterization. 

2 

3This module provides probabilistic reasoning about signal characteristics and 

4protocol properties using Bayesian updating. It enables inference with full 

5uncertainty quantification and supports sequential updates as more data arrives. 

6 

7 

8Key Features: 

9- Prior distributions for common signal properties (baud rate, frequency, etc.) 

10- Likelihood functions for observed measurements 

11- Posterior calculation with credible intervals 

12- Integration with quality scoring system (0-1 confidence mapping) 

13- Sequential Bayesian updating for streaming analysis 

14- Support for multiple distribution families (normal, uniform, beta, etc.) 

15 

16Example: 

17 >>> from tracekit.inference.bayesian import BayesianInference, infer_with_uncertainty 

18 >>> import numpy as np 

19 >>> 

20 >>> # Infer baud rate from edge timing observations 

21 >>> inference = BayesianInference() 

22 >>> edge_times = np.array([0.0, 0.00001, 0.00002, 0.00003]) # 100 kHz 

23 >>> posterior = inference.infer_baud_rate(edge_times) 

24 >>> print(f"Baud rate: {posterior.mean:.0f} ± {posterior.std:.0f}") 

25 >>> print(f"95% CI: [{posterior.ci_lower:.0f}, {posterior.ci_upper:.0f}]") 

26 >>> print(f"Confidence: {posterior.confidence:.2%}") 

27 >>> 

28 >>> # Infer number of symbols from amplitude histogram 

29 >>> amplitudes = np.random.choice([0.0, 0.33, 0.67, 1.0], size=1000) 

30 >>> histogram, _ = np.histogram(amplitudes, bins=50) 

31 >>> symbol_posterior = inference.infer_symbol_count(histogram) 

32 >>> print(f"Estimated symbols: {int(symbol_posterior.mean)}") 

33 >>> 

34 >>> # Sequential updating for streaming data 

35 >>> from tracekit.inference.bayesian import SequentialBayesian, Prior 

36 >>> prior = Prior("normal", {"mean": 115200, "std": 10000}) 

37 >>> sequential = SequentialBayesian("baud_rate", prior) 

38 >>> for _observation in streaming_data: 

39 ... posterior = sequential.update(likelihood_fn) 

40 ... if sequential.get_confidence() > 0.95: 

41 ... break # High confidence reached 

42 

43References: 

44 - Gelman et al., "Bayesian Data Analysis" (3rd ed.) 

45 - Murphy, "Machine Learning: A Probabilistic Perspective" 

46 - scipy.stats documentation for distribution families 

47""" 

48 

49from __future__ import annotations 

50 

51from collections.abc import Callable 

52from dataclasses import dataclass 

53from typing import TYPE_CHECKING, Any 

54 

55import numpy as np 

56from scipy import stats 

57from scipy.signal import find_peaks 

58 

59from tracekit.core.exceptions import AnalysisError, InsufficientDataError 

60 

61if TYPE_CHECKING: 

62 from numpy.typing import NDArray 

63 

64 

65@dataclass 

66class Prior: 

67 """Prior distribution for a parameter. 

68 

69 Represents prior belief about a parameter before observing data. 

70 Supports common distribution families used in signal analysis. 

71 

72 Attributes: 

73 distribution: Distribution family name (e.g., "normal", "uniform", "beta"). 

74 params: Distribution parameters as dict (keys depend on distribution). 

75 

76 Supported distributions: 

77 - "normal": params = {"mean": float, "std": float} 

78 - "uniform": params = {"low": float, "high": float} 

79 - "log_uniform": params = {"low": float, "high": float} (for scale-invariant priors) 

80 - "beta": params = {"a": float, "b": float} (for probabilities) 

81 - "gamma": params = {"shape": float, "scale": float} (for positive values) 

82 - "half_normal": params = {"scale": float} (for positive values like noise std) 

83 - "geometric": params = {"p": float} (for discrete counts) 

84 

85 Example: 

86 >>> # Weakly informative prior for baud rate (log-uniform over range) 

87 >>> prior = Prior("log_uniform", {"low": 100, "high": 10_000_000}) 

88 >>> samples = prior.sample(1000) 

89 >>> density = prior.pdf(115200) 

90 >>> 

91 >>> # Prior for duty cycle (beta distribution centered at 0.5) 

92 >>> duty_prior = Prior("beta", {"a": 2, "b": 2}) 

93 """ 

94 

95 distribution: str 

96 params: dict[str, float] 

97 

98 def __post_init__(self) -> None: 

99 """Validate distribution parameters after initialization.""" 

100 valid_distributions = { 

101 "normal", 

102 "uniform", 

103 "log_uniform", 

104 "beta", 

105 "gamma", 

106 "half_normal", 

107 "geometric", 

108 } 

109 

110 if self.distribution not in valid_distributions: 

111 raise ValueError( 

112 f"Unknown distribution: {self.distribution}. " 

113 f"Supported: {sorted(valid_distributions)}" 

114 ) 

115 

116 # Validate required parameters for each distribution 

117 required_params = { 

118 "normal": {"mean", "std"}, 

119 "uniform": {"low", "high"}, 

120 "log_uniform": {"low", "high"}, 

121 "beta": {"a", "b"}, 

122 "gamma": {"shape", "scale"}, 

123 "half_normal": {"scale"}, 

124 "geometric": {"p"}, 

125 } 

126 

127 required = required_params[self.distribution] 

128 missing = required - set(self.params.keys()) 

129 if missing: 

130 raise ValueError(f"Missing parameters for {self.distribution} distribution: {missing}") 

131 

132 def pdf(self, x: float | NDArray[np.floating[Any]]) -> float | NDArray[np.floating[Any]]: 

133 """Compute probability density at x. 

134 

135 Args: 

136 x: Value(s) at which to evaluate density. 

137 

138 Returns: 

139 Probability density value(s). 

140 

141 Raises: 

142 ValueError: If distribution is not recognized. 

143 """ 

144 if self.distribution == "normal": 

145 return float(stats.norm.pdf(x, loc=self.params["mean"], scale=self.params["std"])) # type: ignore[no-any-return] 

146 elif self.distribution == "uniform": 

147 return float( # type: ignore[no-any-return] 

148 stats.uniform.pdf( 

149 x, loc=self.params["low"], scale=self.params["high"] - self.params["low"] 

150 ) 

151 ) 

152 elif self.distribution == "log_uniform": 

153 # Log-uniform: uniform on log scale 

154 log_low = np.log(self.params["low"]) 

155 log_high = np.log(self.params["high"]) 

156 log_x = np.log(np.maximum(x, 1e-100)) # Avoid log(0) 

157 density = stats.uniform.pdf(log_x, loc=log_low, scale=log_high - log_low) 

158 # Jacobian correction: d(log x)/dx = 1/x 

159 result = density / np.maximum(x, 1e-100) 

160 return result # type: ignore[return-value, no-any-return] 

161 elif self.distribution == "beta": 

162 return float(stats.beta.pdf(x, a=self.params["a"], b=self.params["b"])) # type: ignore[no-any-return] 

163 elif self.distribution == "gamma": 

164 return float(stats.gamma.pdf(x, a=self.params["shape"], scale=self.params["scale"])) # type: ignore[no-any-return] 

165 elif self.distribution == "half_normal": 

166 return float(stats.halfnorm.pdf(x, scale=self.params["scale"])) # type: ignore[no-any-return] 

167 elif self.distribution == "geometric": 

168 return float(stats.geom.pmf(x, p=self.params["p"])) # type: ignore[no-any-return] 

169 else: 

170 raise ValueError(f"PDF not implemented for {self.distribution}") 

171 

172 def sample(self, n: int = 1) -> NDArray[np.floating[Any]]: 

173 """Draw samples from prior distribution. 

174 

175 Args: 

176 n: Number of samples to draw. 

177 

178 Returns: 

179 Array of samples from the prior. 

180 

181 Raises: 

182 ValueError: If distribution is not recognized. 

183 """ 

184 if self.distribution == "normal": 

185 return stats.norm.rvs(loc=self.params["mean"], scale=self.params["std"], size=n) # type: ignore[no-any-return] 

186 elif self.distribution == "uniform": 

187 return stats.uniform.rvs( # type: ignore[no-any-return] 

188 loc=self.params["low"], scale=self.params["high"] - self.params["low"], size=n 

189 ) 

190 elif self.distribution == "log_uniform": 

191 # Sample uniformly on log scale, then exponentiate 

192 log_low = np.log(self.params["low"]) 

193 log_high = np.log(self.params["high"]) 

194 log_samples = stats.uniform.rvs(loc=log_low, scale=log_high - log_low, size=n) # type: ignore[no-any-return] 

195 return np.exp(log_samples) # type: ignore[no-any-return] 

196 elif self.distribution == "beta": 

197 return stats.beta.rvs(a=self.params["a"], b=self.params["b"], size=n) # type: ignore[no-any-return] 

198 elif self.distribution == "gamma": 

199 return stats.gamma.rvs(a=self.params["shape"], scale=self.params["scale"], size=n) # type: ignore[no-any-return] 

200 elif self.distribution == "half_normal": 

201 return stats.halfnorm.rvs(scale=self.params["scale"], size=n) # type: ignore[no-any-return] 

202 elif self.distribution == "geometric": 

203 return stats.geom.rvs(p=self.params["p"], size=n) # type: ignore[no-any-return] 

204 else: 

205 raise ValueError(f"Sampling not implemented for {self.distribution}") 

206 

207 

208@dataclass 

209class Posterior: 

210 """Posterior distribution after updating with evidence. 

211 

212 Represents updated belief about a parameter after observing data. 

213 Provides point estimates, uncertainty quantification, and confidence scores. 

214 

215 Attributes: 

216 mean: Posterior mean (point estimate). 

217 std: Posterior standard deviation (uncertainty). 

218 ci_lower: Lower bound of 95% credible interval. 

219 ci_upper: Upper bound of 95% credible interval. 

220 samples: Optional array of posterior samples (for non-parametric posteriors). 

221 

222 Example: 

223 >>> posterior = Posterior(mean=115200, std=5000, ci_lower=105600, ci_upper=124800) 

224 >>> print(f"Estimate: {posterior.mean:.0f} ± {posterior.std:.0f}") 

225 >>> print(f"95% CI: [{posterior.ci_lower:.0f}, {posterior.ci_upper:.0f}]") 

226 >>> print(f"Confidence: {posterior.confidence:.2%}") 

227 """ 

228 

229 mean: float 

230 std: float 

231 ci_lower: float 

232 ci_upper: float 

233 samples: NDArray[np.floating[Any]] | None = None 

234 

235 @property 

236 def confidence(self) -> float: 

237 """Convert posterior certainty to 0-1 confidence score. 

238 

239 Maps posterior standard deviation to confidence using an empirical formula. 

240 Lower std (more certain) -> higher confidence. 

241 

242 The mapping is based on coefficient of variation (CV = std/mean): 

243 - CV < 0.05 (5%): High confidence (~0.95) 

244 - CV ~ 0.10 (10%): Medium confidence (~0.85) 

245 - CV ~ 0.20 (20%): Low confidence (~0.70) 

246 - CV > 0.50 (50%): Very low confidence (~0.50) 

247 

248 Returns: 

249 Confidence score between 0 and 1. 

250 

251 Example: 

252 >>> # Low uncertainty -> high confidence 

253 >>> p1 = Posterior(mean=100, std=5, ci_lower=90, ci_upper=110) 

254 >>> p1.confidence # ~0.95 

255 >>> 

256 >>> # High uncertainty -> low confidence 

257 >>> p2 = Posterior(mean=100, std=30, ci_lower=40, ci_upper=160) 

258 >>> p2.confidence # ~0.70 

259 """ 

260 # Avoid division by zero 

261 if abs(self.mean) < 1e-10: 

262 cv = self.std / 1e-10 

263 else: 

264 cv = abs(self.std / self.mean) # Coefficient of variation 

265 

266 # Map CV to confidence using sigmoid-like function 

267 # confidence = 1 - min(1, cv / scale_factor) 

268 # Scale factor determines how quickly confidence drops with uncertainty 

269 scale_factor = 0.5 # 50% CV -> 0% confidence 

270 confidence = 1.0 - min(1.0, cv / scale_factor) 

271 

272 # Ensure in valid range [0, 1] 

273 return max(0.0, min(1.0, confidence)) 

274 

275 

276class BayesianInference: 

277 """Bayesian inference for signal analysis. 

278 

279 Provides methods for inferring signal properties (baud rate, frequency, 

280 symbol count, etc.) with full uncertainty quantification using Bayesian 

281 methods. 

282 

283 Attributes: 

284 priors: Dictionary of default prior distributions for common parameters. 

285 

286 Example: 

287 >>> inference = BayesianInference() 

288 >>> 

289 >>> # Infer baud rate from edge timings 

290 >>> edge_times = np.array([0.0, 0.00001, 0.00002, 0.00003]) 

291 >>> posterior = inference.infer_baud_rate(edge_times) 

292 >>> 

293 >>> # Infer protocol type probabilities 

294 >>> observations = {"idle_level": "high", "regularity": 0.3, "duty_cycle": 0.9} 

295 >>> protocol_probs = inference.infer_protocol_type(observations) 

296 >>> print(protocol_probs) # {"UART": 0.85, "I2C": 0.10, "SPI": 0.05} 

297 """ 

298 

299 def __init__(self) -> None: 

300 """Initialize Bayesian inference engine with default priors.""" 

301 self.priors = self._default_priors() 

302 

303 def _default_priors(self) -> dict[str, Prior]: 

304 """Create default priors for common signal properties. 

305 

306 Returns: 

307 Dictionary mapping parameter names to Prior objects. 

308 

309 Priors are designed to be weakly informative: 

310 - Broad enough to cover typical use cases 

311 - Narrow enough to provide regularization 

312 - Match physical constraints (e.g., positive values) 

313 """ 

314 return { 

315 # Log-uniform for scale-invariant parameters (wide range) 

316 "baud_rate": Prior("log_uniform", {"low": 100, "high": 10_000_000}), 

317 "frequency": Prior("log_uniform", {"low": 1, "high": 1e9}), 

318 # Beta distribution for probabilities/proportions 

319 "duty_cycle": Prior("beta", {"a": 2, "b": 2}), # Centered at 0.5 

320 # Half-normal for positive values (noise, std, etc.) 

321 "noise_std": Prior("half_normal", {"scale": 0.1}), 

322 # Geometric for discrete counts (favor smaller values) 

323 "num_symbols": Prior("geometric", {"p": 0.3}), 

324 # Normal for typical signal characteristics 

325 "amplitude": Prior("normal", {"mean": 0.0, "std": 1.0}), 

326 "offset": Prior("normal", {"mean": 0.0, "std": 0.1}), 

327 } 

328 

329 def update( 

330 self, 

331 param: str, 

332 likelihood_fn: Callable[[float], float], 

333 *, 

334 prior: Prior | None = None, 

335 num_samples: int = 10000, 

336 ) -> Posterior: 

337 """Update belief about parameter given observation. 

338 

339 General-purpose Bayesian updating using sampling-based inference. 

340 Uses the prior distribution and likelihood function to compute 

341 the posterior via importance sampling. 

342 

343 Args: 

344 param: Parameter name (used to get default prior if not provided). 

345 likelihood_fn: Function that computes likelihood p(observation | param_value). 

346 prior: Prior distribution (uses default if None). 

347 num_samples: Number of samples for posterior approximation. 

348 

349 Returns: 

350 Posterior distribution with mean, std, and credible intervals. 

351 

352 Raises: 

353 ValueError: If parameter is unknown and no prior is provided. 

354 AnalysisError: If likelihood function fails. 

355 

356 Example: 

357 >>> def likelihood(rate: float) -> float: 

358 ... # Example: Poisson likelihood for event rate 

359 ... observed_count = 42 

360 ... time_window = 1.0 

361 ... expected = rate * time_window 

362 ... return stats.poisson.pmf(observed_count, mu=expected) 

363 >>> 

364 >>> inference = BayesianInference() 

365 >>> posterior = inference.update("frequency", likelihood) 

366 """ 

367 # Get prior 

368 if prior is None: 

369 if param not in self.priors: 

370 raise ValueError( 

371 f"Unknown parameter '{param}' and no prior provided. " 

372 f"Known parameters: {list(self.priors.keys())}" 

373 ) 

374 prior = self.priors[param] 

375 

376 # Sample from prior 

377 try: 

378 samples = prior.sample(num_samples) 

379 except Exception as e: 

380 raise AnalysisError( 

381 f"Failed to sample from prior for '{param}'", 

382 details=str(e), 

383 ) from e 

384 

385 # Compute likelihood for each sample 

386 try: 

387 likelihoods = np.array([likelihood_fn(s) for s in samples]) 

388 except Exception as e: 

389 raise AnalysisError( 

390 f"Likelihood function failed for '{param}'", 

391 details=str(e), 

392 fix_hint="Check that likelihood_fn is compatible with prior samples", 

393 ) from e 

394 

395 # Check for valid likelihoods 

396 if np.all(likelihoods == 0): 

397 raise AnalysisError( 

398 f"All likelihood values are zero for '{param}'", 

399 details="Observation may be incompatible with prior range", 

400 fix_hint="Adjust prior range or check likelihood function", 

401 ) 

402 

403 # Compute importance weights (posterior proportional to prior * likelihood) 

404 weights = likelihoods / np.sum(likelihoods) 

405 

406 # Compute posterior statistics 

407 mean = float(np.sum(samples * weights)) 

408 variance = float(np.sum(weights * (samples - mean) ** 2)) 

409 std = float(np.sqrt(variance)) 

410 

411 # Compute 95% credible interval via weighted percentiles 

412 sorted_indices = np.argsort(samples) 

413 sorted_samples = samples[sorted_indices] 

414 sorted_weights = weights[sorted_indices] 

415 cumsum = np.cumsum(sorted_weights) 

416 

417 # Find 2.5th and 97.5th percentiles 

418 ci_lower = float(sorted_samples[np.searchsorted(cumsum, 0.025)]) 

419 ci_upper = float(sorted_samples[np.searchsorted(cumsum, 0.975)]) 

420 

421 return Posterior( 

422 mean=mean, 

423 std=std, 

424 ci_lower=ci_lower, 

425 ci_upper=ci_upper, 

426 samples=samples, 

427 ) 

428 

429 def infer_baud_rate( 

430 self, edge_times: NDArray[np.floating[Any]], *, prior: Prior | None = None 

431 ) -> Posterior: 

432 """Infer baud rate from edge timing observations. 

433 

434 Uses the distribution of inter-edge intervals to infer the underlying 

435 baud rate. Assumes edges occur at bit boundaries. 

436 

437 Args: 

438 edge_times: Array of edge timestamps in seconds. 

439 prior: Optional prior for baud rate (uses default if None). 

440 

441 Returns: 

442 Posterior distribution for baud rate in bits per second. 

443 

444 Raises: 

445 InsufficientDataError: If fewer than 2 edges provided. 

446 

447 Example: 

448 >>> # 115200 baud UART (bit period = 8.68 μs) 

449 >>> edge_times = np.array([0, 8.68e-6, 17.36e-6, 26.04e-6]) 

450 >>> posterior = inference.infer_baud_rate(edge_times) 

451 >>> print(f"Baud rate: {posterior.mean:.0f} bps") 

452 """ 

453 if len(edge_times) < 2: 

454 raise InsufficientDataError( 

455 "Need at least 2 edges to infer baud rate", 

456 required=2, 

457 available=len(edge_times), 

458 ) 

459 

460 # Compute inter-edge intervals 

461 intervals = np.diff(edge_times) 

462 

463 # Filter out zero/negative intervals (should not happen but be safe) 

464 intervals = intervals[intervals > 0] 

465 

466 if len(intervals) == 0: 

467 raise InsufficientDataError("No valid inter-edge intervals found") 

468 

469 # Likelihood: assume intervals are Gaussian around 1/baud_rate 

470 # Multiple of bit period due to encoding (e.g., start/stop bits) 

471 def likelihood(baud_rate: float) -> float: 

472 if baud_rate <= 0: 

473 return 0.0 

474 bit_period = 1.0 / baud_rate 

475 # Intervals should be multiples of bit_period 

476 # Use smallest interval as proxy for single bit period 

477 min_interval = np.min(intervals) 

478 # Likelihood: intervals match multiples of estimated bit period 

479 expected = min_interval 

480 # Gaussian likelihood around expected interval 

481 sigma = expected * 0.1 # 10% uncertainty 

482 log_likelihood = -0.5 * ((expected - bit_period) / sigma) ** 2 

483 return float(np.exp(log_likelihood)) 

484 

485 return self.update("baud_rate", likelihood, prior=prior) 

486 

487 def infer_protocol_type(self, observations: dict[str, Any]) -> dict[str, float]: 

488 """Infer probability of each protocol type given observations. 

489 

490 Uses a simple Bayesian classifier to compute posterior probabilities 

491 for different protocol types (UART, SPI, I2C, CAN) based on observed 

492 signal characteristics. 

493 

494 Args: 

495 observations: Dictionary of observed signal characteristics: 

496 - "idle_level": "high" or "low" 

497 - "regularity": 0-1 (edge regularity) 

498 - "duty_cycle": 0-1 (fraction time high) 

499 - "symbol_rate": Hz (optional) 

500 - "transition_density": edges/sec (optional) 

501 

502 Returns: 

503 Dictionary mapping protocol names to posterior probabilities. 

504 Probabilities sum to 1.0. 

505 

506 Example: 

507 >>> observations = { 

508 ... "idle_level": "high", 

509 ... "regularity": 0.25, 

510 ... "duty_cycle": 0.85, 

511 ... "symbol_rate": 115200, 

512 ... } 

513 >>> probs = inference.infer_protocol_type(observations) 

514 >>> print(probs) # {"UART": 0.85, "I2C": 0.10, "SPI": 0.03, "CAN": 0.02} 

515 """ 

516 # Prior probabilities (uniform over protocols) 

517 protocols = ["UART", "SPI", "I2C", "CAN"] 

518 prior_prob = 1.0 / len(protocols) 

519 

520 # Compute likelihoods for each protocol 

521 likelihoods = { 

522 "UART": self._likelihood_uart(observations), 

523 "SPI": self._likelihood_spi(observations), 

524 "I2C": self._likelihood_i2c(observations), 

525 "CAN": self._likelihood_can(observations), 

526 } 

527 

528 # Compute posterior probabilities (Bayes' theorem) 

529 posteriors = {proto: prior_prob * likelihoods[proto] for proto in protocols} 

530 

531 # Normalize to sum to 1 

532 total = sum(posteriors.values()) 

533 if total > 0: 

534 posteriors = {proto: prob / total for proto, prob in posteriors.items()} 

535 else: 

536 # No evidence - return uniform 

537 posteriors = {proto: 1.0 / len(protocols) for proto in protocols} 

538 

539 return posteriors 

540 

541 def _likelihood_uart(self, obs: dict[str, Any]) -> float: 

542 """Compute likelihood of observations given UART protocol.""" 

543 likelihood = 1.0 

544 

545 # UART characteristics: idle high, low regularity, high duty cycle 

546 if obs.get("idle_level") == "high": 

547 likelihood *= 0.9 

548 else: 

549 likelihood *= 0.2 

550 

551 regularity = obs.get("regularity", 0.5) 

552 if regularity < 0.3: 

553 likelihood *= 0.8 

554 elif regularity > 0.7: 

555 likelihood *= 0.2 

556 

557 duty_cycle = obs.get("duty_cycle", 0.5) 

558 if duty_cycle > 0.7: 

559 likelihood *= 0.8 

560 elif duty_cycle < 0.3: 

561 likelihood *= 0.3 

562 

563 return likelihood 

564 

565 def _likelihood_spi(self, obs: dict[str, Any]) -> float: 

566 """Compute likelihood of observations given SPI protocol.""" 

567 likelihood = 1.0 

568 

569 # SPI characteristics: regular clock, ~50% duty, high transition density 

570 regularity = obs.get("regularity", 0.5) 

571 if regularity > 0.7: 

572 likelihood *= 0.9 

573 elif regularity < 0.4: 

574 likelihood *= 0.2 

575 

576 duty_cycle = obs.get("duty_cycle", 0.5) 

577 duty_error = abs(duty_cycle - 0.5) 

578 if duty_error < 0.1: 

579 likelihood *= 0.8 

580 elif duty_error > 0.3: 

581 likelihood *= 0.3 

582 

583 return likelihood 

584 

585 def _likelihood_i2c(self, obs: dict[str, Any]) -> float: 

586 """Compute likelihood of observations given I2C protocol.""" 

587 likelihood = 1.0 

588 

589 # I2C characteristics: idle high, moderate regularity 

590 if obs.get("idle_level") == "high": 

591 likelihood *= 0.85 

592 else: 

593 likelihood *= 0.3 

594 

595 regularity = obs.get("regularity", 0.5) 

596 if 0.4 < regularity < 0.8: 

597 likelihood *= 0.8 

598 else: 

599 likelihood *= 0.4 

600 

601 return likelihood 

602 

603 def _likelihood_can(self, obs: dict[str, Any]) -> float: 

604 """Compute likelihood of observations given CAN protocol.""" 

605 likelihood = 1.0 

606 

607 # CAN characteristics: idle high, moderate irregularity (bit stuffing) 

608 if obs.get("idle_level") == "high": 

609 likelihood *= 0.85 

610 else: 

611 likelihood *= 0.2 

612 

613 regularity = obs.get("regularity", 0.5) 

614 if 0.3 < regularity < 0.7: 

615 likelihood *= 0.8 

616 else: 

617 likelihood *= 0.4 

618 

619 # Check for standard CAN baud rates 

620 symbol_rate = obs.get("symbol_rate", 0) 

621 if symbol_rate > 0: 

622 standard_rates = [125000, 250000, 500000, 1000000] 

623 for rate in standard_rates: 

624 if abs(symbol_rate - rate) / rate < 0.1: 

625 likelihood *= 1.5 

626 break 

627 

628 return likelihood 

629 

630 def infer_symbol_count( 

631 self, 

632 amplitude_histogram: NDArray[np.floating[Any]], 

633 *, 

634 prior: Prior | None = None, 

635 max_symbols: int = 16, 

636 ) -> Posterior: 

637 """Infer number of discrete symbols from amplitude distribution. 

638 

639 Analyzes the amplitude histogram to determine how many discrete 

640 signal levels (symbols) are present. Useful for multi-level signaling 

641 (PAM-4, etc.). 

642 

643 Args: 

644 amplitude_histogram: Histogram of signal amplitudes (bin counts). 

645 prior: Optional prior for symbol count (uses default if None). 

646 max_symbols: Maximum number of symbols to consider. 

647 

648 Returns: 

649 Posterior distribution for number of symbols. 

650 

651 Raises: 

652 InsufficientDataError: If histogram is empty or all zeros. 

653 

654 Example: 

655 >>> # PAM-4 signal (4 levels) 

656 >>> amplitudes = np.random.choice([0.0, 0.33, 0.67, 1.0], size=1000) 

657 >>> hist, _ = np.histogram(amplitudes, bins=50) 

658 >>> posterior = inference.infer_symbol_count(hist) 

659 >>> print(f"Symbols: {int(posterior.mean)}") # Should be close to 4 

660 """ 

661 if len(amplitude_histogram) == 0 or np.sum(amplitude_histogram) == 0: 

662 raise InsufficientDataError("Amplitude histogram is empty or all zeros") 

663 

664 # Pre-compute peaks in histogram (optimization: only compute once) 

665 # Use robust peak detection that also considers edge bins 

666 prominence_threshold = np.max(amplitude_histogram) * 0.1 

667 detected_peaks, _ = find_peaks(amplitude_histogram, prominence=prominence_threshold) 

668 peak_indices = list(detected_peaks) 

669 

670 # Check edge bins - they can be peaks but find_peaks won't detect them 

671 # First bin is a peak if it's significant and higher than second bin 

672 if len(amplitude_histogram) > 1: 

673 if ( 

674 amplitude_histogram[0] > prominence_threshold 

675 and amplitude_histogram[0] > amplitude_histogram[1] 

676 ): 

677 peak_indices.insert(0, 0) 

678 # Last bin is a peak if significant and higher than second-to-last 

679 if ( 

680 amplitude_histogram[-1] > prominence_threshold 

681 and amplitude_histogram[-1] > amplitude_histogram[-2] 

682 ): 

683 peak_indices.append(len(amplitude_histogram) - 1) 

684 

685 num_peaks = len(peak_indices) 

686 

687 # Likelihood: number of peaks in histogram should match symbol count 

688 def likelihood(num_symbols: float) -> float: 

689 k = int(round(num_symbols)) 

690 if k < 1 or k > max_symbols: 

691 return 0.0 

692 

693 # Likelihood: peaks should match symbols 

694 # Allow ±1 tolerance for noise 

695 if abs(num_peaks - k) <= 1: 

696 return 0.8 

697 elif abs(num_peaks - k) == 2: 

698 return 0.3 

699 else: 

700 return 0.1 

701 

702 return self.update( 

703 "num_symbols", 

704 likelihood, 

705 prior=prior, 

706 num_samples=max_symbols * 100, 

707 ) 

708 

709 

710class SequentialBayesian: 

711 """Sequential Bayesian updating for streaming analysis. 

712 

713 Maintains a posterior that is updated as new observations arrive. 

714 Useful for online/streaming signal analysis where data comes in 

715 incrementally. 

716 

717 Attributes: 

718 param: Parameter name being inferred. 

719 current_posterior: Current posterior after all updates so far. 

720 

721 Example: 

722 >>> from tracekit.inference.bayesian import SequentialBayesian, Prior 

723 >>> prior = Prior("normal", {"mean": 115200, "std": 10000}) 

724 >>> sequential = SequentialBayesian("baud_rate", prior) 

725 >>> 

726 >>> # Update with streaming observations 

727 >>> for _observation in streaming_data: 

728 ... posterior = sequential.update(likelihood_fn) 

729 ... print(f"Current estimate: {posterior.mean:.0f} (confidence: {sequential.get_confidence():.2%})") 

730 ... if sequential.get_confidence() > 0.95: 

731 ... break # High confidence reached 

732 """ 

733 

734 def __init__(self, param: str, prior: Prior) -> None: 

735 """Initialize sequential Bayesian updater. 

736 

737 Args: 

738 param: Parameter name being inferred. 

739 prior: Initial prior distribution. 

740 """ 

741 self.param = param 

742 self.current_posterior: Prior | Posterior = prior 

743 self._samples: list[NDArray[np.floating[Any]]] = [] 

744 self._weights: list[NDArray[np.floating[Any]]] = [] 

745 

746 def update( 

747 self, 

748 likelihood_fn: Callable[[float], float], 

749 *, 

750 num_samples: int = 5000, 

751 ) -> Posterior: 

752 """Update posterior with new observation. 

753 

754 Performs one step of sequential Bayesian updating. The current 

755 posterior becomes the prior for the next update. 

756 

757 Args: 

758 likelihood_fn: Likelihood function p(observation | param). 

759 num_samples: Number of samples for approximation. 

760 

761 Returns: 

762 Updated posterior distribution. 

763 

764 Raises: 

765 AnalysisError: If likelihood function fails. 

766 """ 

767 # If we have a Prior, sample from it 

768 if isinstance(self.current_posterior, Prior): 

769 samples = self.current_posterior.sample(num_samples) 

770 else: 

771 # Resample from previous posterior (must be Posterior) 

772 if self.current_posterior.samples is not None: 

773 # Resample with replacement 

774 indices = np.random.choice( 

775 len(self.current_posterior.samples), 

776 size=num_samples, 

777 replace=True, 

778 ) 

779 samples = self.current_posterior.samples[indices] 

780 else: 

781 # Approximate as normal distribution 

782 samples = np.random.normal( 

783 self.current_posterior.mean, 

784 self.current_posterior.std, 

785 size=num_samples, 

786 ) 

787 

788 # Compute likelihoods 

789 try: 

790 likelihoods = np.array([likelihood_fn(s) for s in samples]) 

791 except Exception as e: 

792 raise AnalysisError( 

793 f"Likelihood function failed for '{self.param}'", 

794 details=str(e), 

795 ) from e 

796 

797 # Check for valid likelihoods 

798 if np.all(likelihoods == 0): 

799 # No update - keep current posterior 

800 return ( 

801 self.current_posterior 

802 if isinstance(self.current_posterior, Posterior) 

803 else Posterior(mean=0.0, std=1.0, ci_lower=-2.0, ci_upper=2.0) 

804 ) 

805 

806 # Compute importance weights 

807 weights = likelihoods / np.sum(likelihoods) 

808 

809 # Store for potential resampling 

810 self._samples.append(samples) 

811 self._weights.append(weights) 

812 

813 # Compute posterior statistics 

814 mean = float(np.sum(samples * weights)) 

815 variance = float(np.sum(weights * (samples - mean) ** 2)) 

816 std = float(np.sqrt(variance)) 

817 

818 # Credible interval 

819 sorted_indices = np.argsort(samples) 

820 sorted_samples = samples[sorted_indices] 

821 sorted_weights = weights[sorted_indices] 

822 cumsum = np.cumsum(sorted_weights) 

823 

824 ci_lower = float(sorted_samples[np.searchsorted(cumsum, 0.025)]) 

825 ci_upper = float(sorted_samples[np.searchsorted(cumsum, 0.975)]) 

826 

827 posterior = Posterior( 

828 mean=mean, 

829 std=std, 

830 ci_lower=ci_lower, 

831 ci_upper=ci_upper, 

832 samples=samples, 

833 ) 

834 

835 self.current_posterior = posterior 

836 return posterior 

837 

838 def get_confidence(self) -> float: 

839 """Get current confidence in estimate. 

840 

841 Returns: 

842 Confidence score between 0 and 1. 

843 """ 

844 if isinstance(self.current_posterior, Posterior): 

845 return self.current_posterior.confidence 

846 else: 

847 # Prior - no evidence yet 

848 return 0.0 

849 

850 

851def infer_with_uncertainty( 

852 measurements: list[float] | NDArray[np.floating[Any]], 

853 prior: Prior | None = None, 

854) -> Posterior: 

855 """Infer parameter value with full uncertainty quantification. 

856 

857 Convenience function for simple parameter inference from repeated measurements. 

858 Assumes measurements are normally distributed around the true value. 

859 

860 Args: 

861 measurements: List or array of independent measurements. 

862 prior: Optional prior distribution (uses uninformative normal if None). 

863 

864 Returns: 

865 Posterior distribution combining prior and measurement likelihood. 

866 

867 Raises: 

868 InsufficientDataError: If measurements list is empty. 

869 

870 Example: 

871 >>> # Repeated measurements of signal frequency 

872 >>> measurements = [99.8, 100.2, 99.9, 100.1, 100.0] # Hz 

873 >>> posterior = infer_with_uncertainty(measurements) 

874 >>> print(f"Frequency: {posterior.mean:.2f} ± {posterior.std:.2f} Hz") 

875 >>> print(f"95% CI: [{posterior.ci_lower:.2f}, {posterior.ci_upper:.2f}] Hz") 

876 """ 

877 measurements_array = np.asarray(measurements) 

878 

879 if len(measurements_array) == 0: 

880 raise InsufficientDataError( 

881 "Cannot infer from empty measurements", 

882 required=1, 

883 available=0, 

884 ) 

885 

886 # Use sample statistics if no prior 

887 if prior is None: 

888 # Uninformative prior centered at sample mean 

889 sample_mean = float(np.mean(measurements_array)) 

890 sample_std = ( 

891 float(np.std(measurements_array, ddof=1)) if len(measurements_array) > 1 else 1.0 

892 ) 

893 prior = Prior("normal", {"mean": sample_mean, "std": sample_std * 10}) 

894 

895 # Likelihood: measurements are Gaussian around true value 

896 measurement_std = ( 

897 float(np.std(measurements_array, ddof=1)) if len(measurements_array) > 1 else 1.0 

898 ) 

899 

900 def likelihood(param_value: float) -> float: 

901 # Product of Gaussian likelihoods 

902 log_likelihood = -0.5 * np.sum(((measurements_array - param_value) / measurement_std) ** 2) 

903 return float(np.exp(log_likelihood)) 

904 

905 inference = BayesianInference() 

906 return inference.update( 

907 "parameter", 

908 likelihood, 

909 prior=prior, 

910 ) 

911 

912 

913__all__ = [ 

914 "BayesianInference", 

915 "Posterior", 

916 "Prior", 

917 "SequentialBayesian", 

918 "infer_with_uncertainty", 

919]