Coverage for src / tracekit / inference / adaptive_tuning.py: 94%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Adaptive parameter tuning for analysis functions. 

2 

3Auto-configures analysis parameters based on signal characteristics, 

4reducing the need for manual parameter specification. 

5 

6 

7Example: 

8 >>> import tracekit as tk 

9 >>> trace = tk.load('signal.wfm') 

10 >>> tuner = tk.AdaptiveParameterTuner(trace.data, trace.metadata.sample_rate) 

11 >>> params = tuner.get_spectral_params() 

12 >>> print(f"NFFT: {params.get('nfft')}") 

13 >>> print(f"Window: {params.get('window')}") 

14 >>> print(f"Reasoning: {params.reasoning}") 

15 

16References: 

17 Harris, F. J. (1978): On the use of windows for harmonic analysis with DFT 

18 Oppenheim, A. V. & Schafer, R. W. (2010): Discrete-Time Signal Processing 

19""" 

20 

21from __future__ import annotations 

22 

23import logging 

24from dataclasses import dataclass, field 

25from typing import TYPE_CHECKING, Any 

26 

27import numpy as np 

28 

29if TYPE_CHECKING: 

30 from numpy.typing import NDArray 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35@dataclass 

36class TunedParameters: 

37 """Container for auto-tuned parameters. 

38 

39 Attributes: 

40 parameters: Dictionary of parameter names to values. 

41 confidence: Confidence in parameter tuning (0.0-1.0). 

42 reasoning: Dictionary mapping parameter names to reasoning strings. 

43 """ 

44 

45 parameters: dict[str, Any] = field(default_factory=dict) 

46 confidence: float = 0.5 

47 reasoning: dict[str, str] = field(default_factory=dict) 

48 

49 def get(self, key: str, default: Any = None) -> Any: 

50 """Get parameter value with default. 

51 

52 Args: 

53 key: Parameter name. 

54 default: Default value if parameter not found. 

55 

56 Returns: 

57 Parameter value or default. 

58 """ 

59 return self.parameters.get(key, default) 

60 

61 

62class AdaptiveParameterTuner: 

63 """Auto-configure analysis parameters based on signal characteristics. 

64 

65 This class analyzes signal characteristics and provides intelligent 

66 parameter suggestions for various analysis domains (spectral, digital, 

67 timing, jitter, pattern recognition). 

68 

69 Attributes: 

70 data: Input signal data array. 

71 sample_rate: Sample rate in Hz. 

72 signal_type: Optional signal type hint (digital, analog, etc.). 

73 

74 Example: 

75 >>> tuner = AdaptiveParameterTuner(signal_data, sample_rate=1e6) 

76 >>> spectral_params = tuner.get_spectral_params() 

77 >>> print(spectral_params.parameters) 

78 {'nfft': 8192, 'window': 'hann', 'overlap': 0.5} 

79 """ 

80 

81 def __init__( 

82 self, 

83 data: NDArray[np.floating[Any]], 

84 sample_rate: float = 1.0, 

85 signal_type: str | None = None, 

86 ): 

87 """Initialize tuner with signal data. 

88 

89 Args: 

90 data: Input signal data. 

91 sample_rate: Sample rate in Hz. 

92 signal_type: Optional signal type hint (digital, analog, etc.). 

93 """ 

94 self.data = data 

95 self.sample_rate = sample_rate 

96 self.signal_type = signal_type 

97 

98 # Pre-compute signal characteristics 

99 self._characteristics = self._analyze_signal() 

100 

101 def _analyze_signal(self) -> dict[str, Any]: 

102 """Analyze signal characteristics for parameter tuning. 

103 

104 Returns: 

105 Dictionary of signal characteristics including statistics, 

106 noise estimates, frequency content, and signal type indicators. 

107 """ 

108 chars: dict[str, Any] = {} 

109 

110 try: 

111 # Basic statistics 

112 chars["mean"] = float(np.mean(self.data)) 

113 chars["std"] = float(np.std(self.data)) 

114 chars["min"] = float(np.min(self.data)) 

115 chars["max"] = float(np.max(self.data)) 

116 chars["range"] = chars["max"] - chars["min"] 

117 chars["n_samples"] = len(self.data) 

118 chars["duration"] = len(self.data) / self.sample_rate 

119 

120 # Detect if digital 

121 unique_values = len(np.unique(np.round(self.data, decimals=2))) 

122 chars["likely_digital"] = unique_values < 10 

123 

124 # Estimate dominant frequency 

125 chars["dominant_freq"] = self._estimate_dominant_frequency() 

126 

127 # Estimate noise floor 

128 median = np.median(self.data) 

129 mad = np.median(np.abs(self.data - median)) * 1.4826 

130 chars["noise_floor"] = float(mad) 

131 

132 # SNR estimate 

133 signal_power = np.var(self.data) 

134 noise_power = mad**2 

135 if noise_power > 0: 

136 chars["snr_db"] = float(10 * np.log10(signal_power / noise_power)) 

137 else: 

138 chars["snr_db"] = 40.0 

139 

140 except Exception as e: 

141 logger.debug(f"Error analyzing signal: {e}") 

142 

143 return chars 

144 

145 def _estimate_dominant_frequency(self) -> float | None: 

146 """Estimate dominant frequency using FFT. 

147 

148 Returns: 

149 Dominant frequency in Hz, or None if not detectable. 

150 """ 

151 try: 

152 data_ac = self.data - np.mean(self.data) 

153 fft_result = np.fft.rfft(data_ac) 

154 freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / self.sample_rate) 

155 magnitude = np.abs(fft_result[1:]) # Skip DC 

156 

157 if len(magnitude) > 0: 

158 peak_idx = np.argmax(magnitude) 

159 return float(freqs[1:][peak_idx]) 

160 except Exception: 

161 pass 

162 return None 

163 

164 def get_spectral_params(self) -> TunedParameters: 

165 """Get tuned parameters for spectral analysis. 

166 

167 Selects FFT size, window function, and overlap based on signal 

168 characteristics and quality requirements. 

169 

170 Returns: 

171 TunedParameters with spectral analysis configuration. 

172 

173 Example: 

174 >>> params = tuner.get_spectral_params() 

175 >>> print(f"NFFT: {params.get('nfft')}") 

176 >>> print(f"Reasoning: {params.reasoning['nfft']}") 

177 """ 

178 params = {} 

179 reasoning = {} 

180 confidence = 0.8 

181 

182 n_samples = self._characteristics.get("n_samples", 1000) 

183 

184 # NFFT - power of 2, balancing resolution and computation 

185 ideal_nfft = min(8192, max(256, 2 ** int(np.ceil(np.log2(n_samples / 4))))) 

186 params["nfft"] = ideal_nfft 

187 reasoning["nfft"] = f"Power of 2 for efficiency, ~{n_samples / ideal_nfft:.0f} averages" 

188 

189 # Window selection based on signal characteristics 

190 snr = self._characteristics.get("snr_db", 20) 

191 if snr < 15: 

192 params["window"] = "blackman" 

193 reasoning["window"] = "Low SNR - using Blackman for better noise rejection" 

194 elif snr < 25: 

195 params["window"] = "hann" 

196 reasoning["window"] = "Moderate SNR - using Hann for balance" 

197 else: 

198 params["window"] = "hamming" 

199 reasoning["window"] = "Good SNR - using Hamming for resolution" 

200 

201 # Overlap 

202 params["overlap"] = 0.5 

203 reasoning["overlap"] = "Standard 50% overlap for smooth averaging" 

204 

205 # Frequency range based on dominant frequency 

206 dom_freq = self._characteristics.get("dominant_freq") 

207 if dom_freq and dom_freq > 0: 

208 params["freq_min"] = max(0, dom_freq / 10) 

209 params["freq_max"] = min(self.sample_rate / 2, dom_freq * 5) 

210 reasoning["freq_range"] = f"Based on dominant frequency {dom_freq:.1f} Hz" 

211 

212 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning) 

213 

214 def get_digital_params(self) -> TunedParameters: 

215 """Get tuned parameters for digital signal analysis. 

216 

217 Determines threshold levels, edge detection sensitivity, and 

218 baud rate hints based on signal characteristics. 

219 

220 Returns: 

221 TunedParameters with digital analysis configuration. 

222 

223 Example: 

224 >>> params = tuner.get_digital_params() 

225 >>> print(f"Threshold: {params.get('threshold')}") 

226 >>> print(f"Baud rate hint: {params.get('baud_rate_hint')}") 

227 """ 

228 params = {} 

229 reasoning = {} 

230 confidence = 0.7 

231 

232 chars = self._characteristics 

233 

234 # Threshold based on signal levels 

235 if chars.get("likely_digital"): 

236 mid = (chars["min"] + chars["max"]) / 2 

237 params["threshold"] = mid 

238 params["threshold_low"] = chars["min"] + 0.3 * chars["range"] 

239 params["threshold_high"] = chars["max"] - 0.3 * chars["range"] 

240 reasoning["threshold"] = ( 

241 f"Midpoint of signal range ({chars['min']:.2f} to {chars['max']:.2f})" 

242 ) 

243 confidence = 0.85 

244 else: 

245 params["threshold"] = chars.get("mean", 0) 

246 reasoning["threshold"] = "Using mean (signal may not be digital)" 

247 confidence = 0.5 

248 

249 # Edge detection sensitivity based on noise 

250 noise = chars.get("noise_floor", 0.1) 

251 params["min_edge_separation"] = max(2, int(noise * 10)) 

252 reasoning["min_edge_separation"] = f"Based on noise floor {noise:.3f}" 

253 

254 # Baud rate hint from dominant frequency 

255 dom_freq = chars.get("dominant_freq") 

256 if dom_freq and dom_freq > 0: 256 ↛ 263line 256 didn't jump to line 263 because the condition on line 256 was always true

257 # Common baud rates 

258 common_bauds = [300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200] 

259 closest_baud = min(common_bauds, key=lambda b: abs(b - dom_freq * 2)) 

260 params["baud_rate_hint"] = closest_baud 

261 reasoning["baud_rate"] = f"Estimated from frequency {dom_freq:.0f} Hz" 

262 

263 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning) 

264 

265 def get_timing_params(self) -> TunedParameters: 

266 """Get tuned parameters for timing analysis. 

267 

268 Configures time resolution, expected period, and edge timing 

269 thresholds based on sample rate and signal characteristics. 

270 

271 Returns: 

272 TunedParameters with timing analysis configuration. 

273 

274 Example: 

275 >>> params = tuner.get_timing_params() 

276 >>> print(f"Expected period: {params.get('expected_period')}") 

277 >>> print(f"Tolerance: {params.get('period_tolerance')}") 

278 """ 

279 params = {} 

280 reasoning = {} 

281 confidence = 0.75 

282 

283 chars = self._characteristics 

284 

285 # Time resolution based on sample rate 

286 params["time_resolution"] = 1.0 / self.sample_rate 

287 reasoning["time_resolution"] = f"Based on sample rate {self.sample_rate:.0f} Hz" 

288 

289 # Expected period from dominant frequency 

290 dom_freq = chars.get("dominant_freq") 

291 if dom_freq and dom_freq > 0: 291 ↛ 298line 291 didn't jump to line 298 because the condition on line 291 was always true

292 params["expected_period"] = 1.0 / dom_freq 

293 params["period_tolerance"] = 0.2 / dom_freq # 20% tolerance 

294 reasoning["period"] = f"From dominant frequency {dom_freq:.1f} Hz" 

295 confidence = 0.85 

296 

297 # Edge timing thresholds 

298 noise = chars.get("noise_floor", 0.1) 

299 params["edge_threshold"] = noise * 3 # 3-sigma 

300 reasoning["edge_threshold"] = f"3x noise floor ({noise:.3f})" 

301 

302 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning) 

303 

304 def get_jitter_params(self) -> TunedParameters: 

305 """Get tuned parameters for jitter analysis. 

306 

307 Determines unit interval, histogram binning, and tolerance 

308 parameters for jitter measurements. 

309 

310 Returns: 

311 TunedParameters with jitter analysis configuration. 

312 

313 Example: 

314 >>> params = tuner.get_jitter_params() 

315 >>> print(f"Unit interval: {params.get('unit_interval')}") 

316 >>> print(f"Histogram bins: {params.get('histogram_bins')}") 

317 """ 

318 params = {} 

319 reasoning = {} 

320 confidence = 0.7 

321 

322 chars = self._characteristics 

323 

324 # Unit interval from dominant frequency 

325 dom_freq = chars.get("dominant_freq") 

326 if dom_freq and dom_freq > 0: 326 ↛ 334line 326 didn't jump to line 334 because the condition on line 326 was always true

327 ui = 1.0 / dom_freq 

328 params["unit_interval"] = ui 

329 params["ui_tolerance"] = ui * 0.1 

330 reasoning["unit_interval"] = f"From dominant frequency {dom_freq:.1f} Hz" 

331 confidence = 0.85 

332 

333 # Histogram bins based on data range and noise 

334 snr = chars.get("snr_db", 20) 

335 if snr > 30: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 params["histogram_bins"] = 256 

337 elif snr > 20: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 params["histogram_bins"] = 128 

339 else: 

340 params["histogram_bins"] = 64 

341 reasoning["histogram_bins"] = f"Based on SNR {snr:.0f} dB" 

342 

343 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning) 

344 

345 def get_pattern_params(self) -> TunedParameters: 

346 """Get tuned parameters for pattern analysis. 

347 

348 Configures minimum pattern length and maximum distance for 

349 fuzzy matching based on signal characteristics. 

350 

351 Returns: 

352 TunedParameters with pattern analysis configuration. 

353 

354 Example: 

355 >>> params = tuner.get_pattern_params() 

356 >>> print(f"Min pattern length: {params.get('min_length')}") 

357 >>> print(f"Max fuzzy distance: {params.get('max_distance')}") 

358 """ 

359 params = {} 

360 reasoning = {} 

361 confidence = 0.7 

362 

363 chars = self._characteristics 

364 n_samples = chars.get("n_samples", 1000) 

365 

366 # Min pattern length based on signal characteristics 

367 dom_freq = chars.get("dominant_freq") 

368 if dom_freq and dom_freq > 0: 368 ↛ 375line 368 didn't jump to line 375 because the condition on line 368 was always true

369 samples_per_period = self.sample_rate / dom_freq 

370 params["min_length"] = max(3, int(samples_per_period / 4)) 

371 reasoning["min_length"] = ( 

372 f"Quarter of estimated period ({samples_per_period:.0f} samples)" 

373 ) 

374 else: 

375 params["min_length"] = max(3, n_samples // 100) 

376 reasoning["min_length"] = "1% of signal length" 

377 

378 # Max distance for fuzzy matching based on noise 

379 noise_ratio = chars.get("noise_floor", 0.1) / max(chars.get("range", 1), 0.001) 

380 params["max_distance"] = max(1, int(noise_ratio * 10)) 

381 reasoning["max_distance"] = f"Based on noise ratio {noise_ratio:.2%}" 

382 

383 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning) 

384 

385 def get_params_for_domain(self, domain: str) -> TunedParameters: 

386 """Get tuned parameters for a specific analysis domain. 

387 

388 Args: 

389 domain: Analysis domain name (spectral, digital, timing, jitter, pattern). 

390 

391 Returns: 

392 TunedParameters for the specified domain. 

393 

394 Example: 

395 >>> params = tuner.get_params_for_domain("spectral") 

396 >>> print(params.parameters) 

397 {'nfft': 8192, 'window': 'hann', 'overlap': 0.5} 

398 """ 

399 domain_lower = domain.lower() 

400 

401 if "spectral" in domain_lower or "fft" in domain_lower: 

402 return self.get_spectral_params() 

403 elif "digital" in domain_lower: 

404 return self.get_digital_params() 

405 elif "timing" in domain_lower: 

406 return self.get_timing_params() 

407 elif "jitter" in domain_lower: 

408 return self.get_jitter_params() 

409 elif "pattern" in domain_lower: 

410 return self.get_pattern_params() 

411 else: 

412 # Return basic params for unknown domains 

413 return TunedParameters( 

414 parameters={}, 

415 confidence=0.5, 

416 reasoning={"note": "No domain-specific tuning available"}, 

417 ) 

418 

419 

420def get_adaptive_parameters( 

421 data: NDArray[np.floating[Any]], 

422 sample_rate: float, 

423 domain: str, 

424 signal_type: str | None = None, 

425) -> TunedParameters: 

426 """Convenience function to get adaptive parameters. 

427 

428 This is a shortcut for creating an AdaptiveParameterTuner and 

429 getting parameters for a specific domain. 

430 

431 Args: 

432 data: Input signal data. 

433 sample_rate: Sample rate in Hz. 

434 domain: Analysis domain (spectral, digital, timing, jitter, pattern). 

435 signal_type: Optional signal type hint. 

436 

437 Returns: 

438 TunedParameters for the specified domain. 

439 

440 Example: 

441 >>> params = get_adaptive_parameters(signal, 1e6, "spectral") 

442 >>> print(f"Window: {params.get('window')}") 

443 >>> print(f"Confidence: {params.confidence}") 

444 """ 

445 tuner = AdaptiveParameterTuner(data, sample_rate, signal_type) 

446 return tuner.get_params_for_domain(domain) 

447 

448 

449__all__ = [ 

450 "AdaptiveParameterTuner", 

451 "TunedParameters", 

452 "get_adaptive_parameters", 

453]