Coverage for src / tracekit / inference / adaptive_tuning.py: 94%
176 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Adaptive parameter tuning for analysis functions.
3Auto-configures analysis parameters based on signal characteristics,
4reducing the need for manual parameter specification.
7Example:
8 >>> import tracekit as tk
9 >>> trace = tk.load('signal.wfm')
10 >>> tuner = tk.AdaptiveParameterTuner(trace.data, trace.metadata.sample_rate)
11 >>> params = tuner.get_spectral_params()
12 >>> print(f"NFFT: {params.get('nfft')}")
13 >>> print(f"Window: {params.get('window')}")
14 >>> print(f"Reasoning: {params.reasoning}")
16References:
17 Harris, F. J. (1978): On the use of windows for harmonic analysis with DFT
18 Oppenheim, A. V. & Schafer, R. W. (2010): Discrete-Time Signal Processing
19"""
21from __future__ import annotations
23import logging
24from dataclasses import dataclass, field
25from typing import TYPE_CHECKING, Any
27import numpy as np
29if TYPE_CHECKING:
30 from numpy.typing import NDArray
32logger = logging.getLogger(__name__)
35@dataclass
36class TunedParameters:
37 """Container for auto-tuned parameters.
39 Attributes:
40 parameters: Dictionary of parameter names to values.
41 confidence: Confidence in parameter tuning (0.0-1.0).
42 reasoning: Dictionary mapping parameter names to reasoning strings.
43 """
45 parameters: dict[str, Any] = field(default_factory=dict)
46 confidence: float = 0.5
47 reasoning: dict[str, str] = field(default_factory=dict)
49 def get(self, key: str, default: Any = None) -> Any:
50 """Get parameter value with default.
52 Args:
53 key: Parameter name.
54 default: Default value if parameter not found.
56 Returns:
57 Parameter value or default.
58 """
59 return self.parameters.get(key, default)
62class AdaptiveParameterTuner:
63 """Auto-configure analysis parameters based on signal characteristics.
65 This class analyzes signal characteristics and provides intelligent
66 parameter suggestions for various analysis domains (spectral, digital,
67 timing, jitter, pattern recognition).
69 Attributes:
70 data: Input signal data array.
71 sample_rate: Sample rate in Hz.
72 signal_type: Optional signal type hint (digital, analog, etc.).
74 Example:
75 >>> tuner = AdaptiveParameterTuner(signal_data, sample_rate=1e6)
76 >>> spectral_params = tuner.get_spectral_params()
77 >>> print(spectral_params.parameters)
78 {'nfft': 8192, 'window': 'hann', 'overlap': 0.5}
79 """
81 def __init__(
82 self,
83 data: NDArray[np.floating[Any]],
84 sample_rate: float = 1.0,
85 signal_type: str | None = None,
86 ):
87 """Initialize tuner with signal data.
89 Args:
90 data: Input signal data.
91 sample_rate: Sample rate in Hz.
92 signal_type: Optional signal type hint (digital, analog, etc.).
93 """
94 self.data = data
95 self.sample_rate = sample_rate
96 self.signal_type = signal_type
98 # Pre-compute signal characteristics
99 self._characteristics = self._analyze_signal()
101 def _analyze_signal(self) -> dict[str, Any]:
102 """Analyze signal characteristics for parameter tuning.
104 Returns:
105 Dictionary of signal characteristics including statistics,
106 noise estimates, frequency content, and signal type indicators.
107 """
108 chars: dict[str, Any] = {}
110 try:
111 # Basic statistics
112 chars["mean"] = float(np.mean(self.data))
113 chars["std"] = float(np.std(self.data))
114 chars["min"] = float(np.min(self.data))
115 chars["max"] = float(np.max(self.data))
116 chars["range"] = chars["max"] - chars["min"]
117 chars["n_samples"] = len(self.data)
118 chars["duration"] = len(self.data) / self.sample_rate
120 # Detect if digital
121 unique_values = len(np.unique(np.round(self.data, decimals=2)))
122 chars["likely_digital"] = unique_values < 10
124 # Estimate dominant frequency
125 chars["dominant_freq"] = self._estimate_dominant_frequency()
127 # Estimate noise floor
128 median = np.median(self.data)
129 mad = np.median(np.abs(self.data - median)) * 1.4826
130 chars["noise_floor"] = float(mad)
132 # SNR estimate
133 signal_power = np.var(self.data)
134 noise_power = mad**2
135 if noise_power > 0:
136 chars["snr_db"] = float(10 * np.log10(signal_power / noise_power))
137 else:
138 chars["snr_db"] = 40.0
140 except Exception as e:
141 logger.debug(f"Error analyzing signal: {e}")
143 return chars
145 def _estimate_dominant_frequency(self) -> float | None:
146 """Estimate dominant frequency using FFT.
148 Returns:
149 Dominant frequency in Hz, or None if not detectable.
150 """
151 try:
152 data_ac = self.data - np.mean(self.data)
153 fft_result = np.fft.rfft(data_ac)
154 freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / self.sample_rate)
155 magnitude = np.abs(fft_result[1:]) # Skip DC
157 if len(magnitude) > 0:
158 peak_idx = np.argmax(magnitude)
159 return float(freqs[1:][peak_idx])
160 except Exception:
161 pass
162 return None
164 def get_spectral_params(self) -> TunedParameters:
165 """Get tuned parameters for spectral analysis.
167 Selects FFT size, window function, and overlap based on signal
168 characteristics and quality requirements.
170 Returns:
171 TunedParameters with spectral analysis configuration.
173 Example:
174 >>> params = tuner.get_spectral_params()
175 >>> print(f"NFFT: {params.get('nfft')}")
176 >>> print(f"Reasoning: {params.reasoning['nfft']}")
177 """
178 params = {}
179 reasoning = {}
180 confidence = 0.8
182 n_samples = self._characteristics.get("n_samples", 1000)
184 # NFFT - power of 2, balancing resolution and computation
185 ideal_nfft = min(8192, max(256, 2 ** int(np.ceil(np.log2(n_samples / 4)))))
186 params["nfft"] = ideal_nfft
187 reasoning["nfft"] = f"Power of 2 for efficiency, ~{n_samples / ideal_nfft:.0f} averages"
189 # Window selection based on signal characteristics
190 snr = self._characteristics.get("snr_db", 20)
191 if snr < 15:
192 params["window"] = "blackman"
193 reasoning["window"] = "Low SNR - using Blackman for better noise rejection"
194 elif snr < 25:
195 params["window"] = "hann"
196 reasoning["window"] = "Moderate SNR - using Hann for balance"
197 else:
198 params["window"] = "hamming"
199 reasoning["window"] = "Good SNR - using Hamming for resolution"
201 # Overlap
202 params["overlap"] = 0.5
203 reasoning["overlap"] = "Standard 50% overlap for smooth averaging"
205 # Frequency range based on dominant frequency
206 dom_freq = self._characteristics.get("dominant_freq")
207 if dom_freq and dom_freq > 0:
208 params["freq_min"] = max(0, dom_freq / 10)
209 params["freq_max"] = min(self.sample_rate / 2, dom_freq * 5)
210 reasoning["freq_range"] = f"Based on dominant frequency {dom_freq:.1f} Hz"
212 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning)
214 def get_digital_params(self) -> TunedParameters:
215 """Get tuned parameters for digital signal analysis.
217 Determines threshold levels, edge detection sensitivity, and
218 baud rate hints based on signal characteristics.
220 Returns:
221 TunedParameters with digital analysis configuration.
223 Example:
224 >>> params = tuner.get_digital_params()
225 >>> print(f"Threshold: {params.get('threshold')}")
226 >>> print(f"Baud rate hint: {params.get('baud_rate_hint')}")
227 """
228 params = {}
229 reasoning = {}
230 confidence = 0.7
232 chars = self._characteristics
234 # Threshold based on signal levels
235 if chars.get("likely_digital"):
236 mid = (chars["min"] + chars["max"]) / 2
237 params["threshold"] = mid
238 params["threshold_low"] = chars["min"] + 0.3 * chars["range"]
239 params["threshold_high"] = chars["max"] - 0.3 * chars["range"]
240 reasoning["threshold"] = (
241 f"Midpoint of signal range ({chars['min']:.2f} to {chars['max']:.2f})"
242 )
243 confidence = 0.85
244 else:
245 params["threshold"] = chars.get("mean", 0)
246 reasoning["threshold"] = "Using mean (signal may not be digital)"
247 confidence = 0.5
249 # Edge detection sensitivity based on noise
250 noise = chars.get("noise_floor", 0.1)
251 params["min_edge_separation"] = max(2, int(noise * 10))
252 reasoning["min_edge_separation"] = f"Based on noise floor {noise:.3f}"
254 # Baud rate hint from dominant frequency
255 dom_freq = chars.get("dominant_freq")
256 if dom_freq and dom_freq > 0: 256 ↛ 263line 256 didn't jump to line 263 because the condition on line 256 was always true
257 # Common baud rates
258 common_bauds = [300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]
259 closest_baud = min(common_bauds, key=lambda b: abs(b - dom_freq * 2))
260 params["baud_rate_hint"] = closest_baud
261 reasoning["baud_rate"] = f"Estimated from frequency {dom_freq:.0f} Hz"
263 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning)
265 def get_timing_params(self) -> TunedParameters:
266 """Get tuned parameters for timing analysis.
268 Configures time resolution, expected period, and edge timing
269 thresholds based on sample rate and signal characteristics.
271 Returns:
272 TunedParameters with timing analysis configuration.
274 Example:
275 >>> params = tuner.get_timing_params()
276 >>> print(f"Expected period: {params.get('expected_period')}")
277 >>> print(f"Tolerance: {params.get('period_tolerance')}")
278 """
279 params = {}
280 reasoning = {}
281 confidence = 0.75
283 chars = self._characteristics
285 # Time resolution based on sample rate
286 params["time_resolution"] = 1.0 / self.sample_rate
287 reasoning["time_resolution"] = f"Based on sample rate {self.sample_rate:.0f} Hz"
289 # Expected period from dominant frequency
290 dom_freq = chars.get("dominant_freq")
291 if dom_freq and dom_freq > 0: 291 ↛ 298line 291 didn't jump to line 298 because the condition on line 291 was always true
292 params["expected_period"] = 1.0 / dom_freq
293 params["period_tolerance"] = 0.2 / dom_freq # 20% tolerance
294 reasoning["period"] = f"From dominant frequency {dom_freq:.1f} Hz"
295 confidence = 0.85
297 # Edge timing thresholds
298 noise = chars.get("noise_floor", 0.1)
299 params["edge_threshold"] = noise * 3 # 3-sigma
300 reasoning["edge_threshold"] = f"3x noise floor ({noise:.3f})"
302 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning)
304 def get_jitter_params(self) -> TunedParameters:
305 """Get tuned parameters for jitter analysis.
307 Determines unit interval, histogram binning, and tolerance
308 parameters for jitter measurements.
310 Returns:
311 TunedParameters with jitter analysis configuration.
313 Example:
314 >>> params = tuner.get_jitter_params()
315 >>> print(f"Unit interval: {params.get('unit_interval')}")
316 >>> print(f"Histogram bins: {params.get('histogram_bins')}")
317 """
318 params = {}
319 reasoning = {}
320 confidence = 0.7
322 chars = self._characteristics
324 # Unit interval from dominant frequency
325 dom_freq = chars.get("dominant_freq")
326 if dom_freq and dom_freq > 0: 326 ↛ 334line 326 didn't jump to line 334 because the condition on line 326 was always true
327 ui = 1.0 / dom_freq
328 params["unit_interval"] = ui
329 params["ui_tolerance"] = ui * 0.1
330 reasoning["unit_interval"] = f"From dominant frequency {dom_freq:.1f} Hz"
331 confidence = 0.85
333 # Histogram bins based on data range and noise
334 snr = chars.get("snr_db", 20)
335 if snr > 30: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 params["histogram_bins"] = 256
337 elif snr > 20: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 params["histogram_bins"] = 128
339 else:
340 params["histogram_bins"] = 64
341 reasoning["histogram_bins"] = f"Based on SNR {snr:.0f} dB"
343 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning)
345 def get_pattern_params(self) -> TunedParameters:
346 """Get tuned parameters for pattern analysis.
348 Configures minimum pattern length and maximum distance for
349 fuzzy matching based on signal characteristics.
351 Returns:
352 TunedParameters with pattern analysis configuration.
354 Example:
355 >>> params = tuner.get_pattern_params()
356 >>> print(f"Min pattern length: {params.get('min_length')}")
357 >>> print(f"Max fuzzy distance: {params.get('max_distance')}")
358 """
359 params = {}
360 reasoning = {}
361 confidence = 0.7
363 chars = self._characteristics
364 n_samples = chars.get("n_samples", 1000)
366 # Min pattern length based on signal characteristics
367 dom_freq = chars.get("dominant_freq")
368 if dom_freq and dom_freq > 0: 368 ↛ 375line 368 didn't jump to line 375 because the condition on line 368 was always true
369 samples_per_period = self.sample_rate / dom_freq
370 params["min_length"] = max(3, int(samples_per_period / 4))
371 reasoning["min_length"] = (
372 f"Quarter of estimated period ({samples_per_period:.0f} samples)"
373 )
374 else:
375 params["min_length"] = max(3, n_samples // 100)
376 reasoning["min_length"] = "1% of signal length"
378 # Max distance for fuzzy matching based on noise
379 noise_ratio = chars.get("noise_floor", 0.1) / max(chars.get("range", 1), 0.001)
380 params["max_distance"] = max(1, int(noise_ratio * 10))
381 reasoning["max_distance"] = f"Based on noise ratio {noise_ratio:.2%}"
383 return TunedParameters(parameters=params, confidence=confidence, reasoning=reasoning)
385 def get_params_for_domain(self, domain: str) -> TunedParameters:
386 """Get tuned parameters for a specific analysis domain.
388 Args:
389 domain: Analysis domain name (spectral, digital, timing, jitter, pattern).
391 Returns:
392 TunedParameters for the specified domain.
394 Example:
395 >>> params = tuner.get_params_for_domain("spectral")
396 >>> print(params.parameters)
397 {'nfft': 8192, 'window': 'hann', 'overlap': 0.5}
398 """
399 domain_lower = domain.lower()
401 if "spectral" in domain_lower or "fft" in domain_lower:
402 return self.get_spectral_params()
403 elif "digital" in domain_lower:
404 return self.get_digital_params()
405 elif "timing" in domain_lower:
406 return self.get_timing_params()
407 elif "jitter" in domain_lower:
408 return self.get_jitter_params()
409 elif "pattern" in domain_lower:
410 return self.get_pattern_params()
411 else:
412 # Return basic params for unknown domains
413 return TunedParameters(
414 parameters={},
415 confidence=0.5,
416 reasoning={"note": "No domain-specific tuning available"},
417 )
420def get_adaptive_parameters(
421 data: NDArray[np.floating[Any]],
422 sample_rate: float,
423 domain: str,
424 signal_type: str | None = None,
425) -> TunedParameters:
426 """Convenience function to get adaptive parameters.
428 This is a shortcut for creating an AdaptiveParameterTuner and
429 getting parameters for a specific domain.
431 Args:
432 data: Input signal data.
433 sample_rate: Sample rate in Hz.
434 domain: Analysis domain (spectral, digital, timing, jitter, pattern).
435 signal_type: Optional signal type hint.
437 Returns:
438 TunedParameters for the specified domain.
440 Example:
441 >>> params = get_adaptive_parameters(signal, 1e6, "spectral")
442 >>> print(f"Window: {params.get('window')}")
443 >>> print(f"Confidence: {params.confidence}")
444 """
445 tuner = AdaptiveParameterTuner(data, sample_rate, signal_type)
446 return tuner.get_params_for_domain(domain)
449__all__ = [
450 "AdaptiveParameterTuner",
451 "TunedParameters",
452 "get_adaptive_parameters",
453]