Coverage for src / tracekit / quality / warnings.py: 99%
105 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Signal quality warnings for TraceKit.
3This module provides automated detection and warning of signal quality issues
4including clipping, noise, saturation, and undersampling.
7Example:
8 >>> from tracekit.quality.warnings import SignalQualityAnalyzer
9 >>> analyzer = SignalQualityAnalyzer()
10 >>> warnings = analyzer.analyze(trace)
11 >>> for warning in warnings:
12 ... print(warning)
14References:
15 - IEEE 1057: Standard for Digitizing Waveform Recorders
16 - Nyquist sampling theorem
17"""
19from __future__ import annotations
21from dataclasses import dataclass
22from typing import TYPE_CHECKING, Literal
24import numpy as np
26if TYPE_CHECKING:
27 from numpy.typing import NDArray
29 from tracekit.core.types import WaveformTrace
32@dataclass
33class QualityWarning:
34 """Signal quality warning.
36 Attributes:
37 severity: Warning severity (error, warning, info)
38 category: Warning category (clipping, noise, saturation, undersampling)
39 message: Human-readable warning message
40 value: Numeric value associated with warning
41 threshold: Threshold that triggered warning
42 suggestion: Suggested action to fix issue
44 Example:
45 >>> warning = QualityWarning(
46 ... severity="warning",
47 ... category="clipping",
48 ... message="Signal clipping detected",
49 ... value=5.2,
50 ... threshold=5.0,
51 ... suggestion="Reduce input amplitude or increase ADC range"
52 ... )
53 >>> print(warning)
55 References:
56 EDGE-001: Signal Quality Warnings
57 """
59 severity: Literal["error", "warning", "info"]
60 category: Literal["clipping", "noise", "saturation", "undersampling", "dc_offset"]
61 message: str
62 value: float
63 threshold: float
64 suggestion: str = ""
66 def __str__(self) -> str:
67 """Format warning as string.
69 Returns:
70 Formatted warning message
71 """
72 prefix = {"error": "ERROR", "warning": "WARNING", "info": "INFO"}[self.severity]
73 msg = f"[{prefix}] {self.message}"
74 if self.value is not None: 74 ↛ 76line 74 didn't jump to line 76 because the condition on line 74 was always true
75 msg += f" (value: {self.value:.3f}, threshold: {self.threshold:.3f})"
76 if self.suggestion:
77 msg += f"\n Suggestion: {self.suggestion}"
78 return msg
81class SignalQualityAnalyzer:
82 """Analyzer for signal quality issues.
84 : Detect clipping, undersampling, noise, and saturation.
85 Performs comprehensive signal quality checks and generates warnings.
87 Args:
88 clip_threshold: Clipping detection threshold (fraction of range, default: 0.99)
89 noise_threshold_db: Noise floor threshold in dB (default: -40)
90 saturation_threshold: ADC saturation threshold (default: 0.98)
91 nyquist_factor: Factor for Nyquist frequency check (default: 2.0)
93 Example:
94 >>> from tracekit.quality.warnings import SignalQualityAnalyzer
95 >>> analyzer = SignalQualityAnalyzer(clip_threshold=0.95)
96 >>> warnings = analyzer.analyze(trace)
97 >>> if warnings:
98 ... for w in warnings:
99 ... print(w)
101 References:
102 EDGE-001: Signal Quality Warnings
103 """
105 def __init__(
106 self,
107 *,
108 clip_threshold: float = 0.99,
109 noise_threshold_db: float = -40.0,
110 saturation_threshold: float = 0.98,
111 nyquist_factor: float = 2.0,
112 ) -> None:
113 """Initialize signal quality analyzer.
115 Args:
116 clip_threshold: Clipping detection threshold
117 noise_threshold_db: Noise floor threshold in dB
118 saturation_threshold: ADC saturation threshold
119 nyquist_factor: Nyquist frequency factor
120 """
121 self.clip_threshold = clip_threshold
122 self.noise_threshold_db = noise_threshold_db
123 self.saturation_threshold = saturation_threshold
124 self.nyquist_factor = nyquist_factor
126 def analyze(
127 self,
128 trace: WaveformTrace | NDArray[np.float64],
129 *,
130 sample_rate: float | None = None,
131 adc_range: tuple[float, float] | None = None,
132 ) -> list[QualityWarning]:
133 """Analyze signal quality and generate warnings.
135 : Comprehensive signal quality detection.
137 Args:
138 trace: Input trace or signal array
139 sample_rate: Sample rate in Hz (required for undersampling check)
140 adc_range: ADC range as (min, max) tuple
142 Returns:
143 List of QualityWarning objects
145 Example:
146 >>> warnings = analyzer.analyze(trace, sample_rate=1e9)
147 >>> for warning in warnings:
148 ... print(warning)
150 References:
151 EDGE-001: Signal Quality Warnings
152 """
153 # Extract data and sample rate
154 if hasattr(trace, "data"):
155 data = trace.data # type: ignore[ignore-without-code]
156 if sample_rate is None and hasattr(trace, "metadata"):
157 sample_rate = trace.metadata.sample_rate # type: ignore[ignore-without-code]
158 else:
159 data = trace # type: ignore[assignment]
161 # Ensure data is NDArray[np.float64]
162 data_array: NDArray[np.float64] = np.asarray(data, dtype=np.float64)
164 warnings: list[QualityWarning] = []
166 # Check clipping
167 warnings.extend(
168 check_clipping(
169 data_array,
170 threshold=self.clip_threshold,
171 adc_range=adc_range,
172 )
173 )
175 # Check saturation
176 warnings.extend(
177 check_saturation(
178 data_array,
179 threshold=self.saturation_threshold,
180 adc_range=adc_range,
181 )
182 )
184 # Check noise
185 warnings.extend(
186 check_noise(
187 data_array,
188 threshold_db=self.noise_threshold_db,
189 )
190 )
192 # Check undersampling (requires sample rate)
193 if sample_rate is not None:
194 warnings.extend(
195 check_undersampling(
196 data_array,
197 sample_rate=sample_rate,
198 nyquist_factor=self.nyquist_factor,
199 )
200 )
202 return warnings
205def check_clipping(
206 signal: NDArray[np.float64],
207 *,
208 threshold: float = 0.99,
209 adc_range: tuple[float, float] | None = None,
210) -> list[QualityWarning]:
211 """Detect signal clipping.
213 : Detect clipping (signal hits rail).
215 Args:
216 signal: Input signal array
217 threshold: Fraction of range for clipping detection (default: 0.99)
218 adc_range: ADC range as (min, max) tuple
220 Returns:
221 List of clipping warnings
223 Example:
224 >>> import numpy as np
225 >>> signal = np.clip(np.random.randn(1000), -1, 1)
226 >>> warnings = check_clipping(signal)
227 >>> if warnings:
228 ... print("Clipping detected!")
230 References:
231 EDGE-001: Detect clipping
232 """
233 warnings: list[QualityWarning] = []
235 # Determine signal range
236 if adc_range is not None:
237 min_val, max_val = adc_range
238 else:
239 min_val = float(np.min(signal))
240 max_val = float(np.max(signal))
242 signal_range = max_val - min_val
243 if signal_range == 0:
244 return warnings
246 # Count samples near limits
247 upper_limit = min_val + signal_range * threshold
248 lower_limit = min_val + signal_range * (1 - threshold)
250 n_upper: int = int(np.sum(signal >= upper_limit))
251 n_lower: int = int(np.sum(signal <= lower_limit))
252 n_clipped = n_upper + n_lower
253 clip_percent = float(100.0 * n_clipped / len(signal))
255 if clip_percent > 1.0: # More than 1% clipped
256 severity: Literal["error", "warning"] = "error" if clip_percent > 5.0 else "warning"
257 warnings.append(
258 QualityWarning(
259 severity=severity,
260 category="clipping",
261 message=f"Signal clipping detected at {clip_percent:.1f}% of samples",
262 value=clip_percent,
263 threshold=1.0,
264 suggestion="Reduce input amplitude or increase ADC range",
265 )
266 )
268 return warnings
271def check_saturation(
272 signal: NDArray[np.float64],
273 *,
274 threshold: float = 0.98,
275 adc_range: tuple[float, float] | None = None,
276) -> list[QualityWarning]:
277 """Detect ADC saturation.
279 : Detect saturation (ADC range utilization).
281 Args:
282 signal: Input signal array
283 threshold: Saturation threshold as fraction of range (default: 0.98)
284 adc_range: ADC range as (min, max) tuple
286 Returns:
287 List of saturation warnings
289 Example:
290 >>> warnings = check_saturation(signal, threshold=0.95)
292 References:
293 EDGE-001: Detect saturation
294 """
295 warnings: list[QualityWarning] = []
297 # Determine signal range
298 if adc_range is not None:
299 adc_min, adc_max = adc_range
300 adc_span = adc_max - adc_min
301 else:
302 # Assume signal uses full observed range as ADC range
303 adc_min = float(np.min(signal))
304 adc_max = float(np.max(signal))
305 adc_span = adc_max - adc_min
307 if adc_span == 0:
308 return warnings
310 # Calculate range utilization
311 signal_min = float(np.min(signal))
312 signal_max = float(np.max(signal))
313 signal_span = signal_max - signal_min
314 utilization = signal_span / adc_span
316 if utilization > threshold:
317 warnings.append(
318 QualityWarning(
319 severity="warning",
320 category="saturation",
321 message=f"High ADC range utilization: {utilization * 100:.1f}%",
322 value=utilization * 100,
323 threshold=threshold * 100,
324 suggestion="Consider increasing ADC range or reducing signal amplitude",
325 )
326 )
328 return warnings
331def check_noise(
332 signal: NDArray[np.float64],
333 *,
334 threshold_db: float = -40.0,
335) -> list[QualityWarning]:
336 """Detect excessive noise.
338 : Detect noise (SNR below threshold warning).
340 Args:
341 signal: Input signal array
342 threshold_db: Noise threshold in dB (default: -40)
344 Returns:
345 List of noise warnings
347 Example:
348 >>> warnings = check_noise(signal, threshold_db=-50)
350 References:
351 EDGE-001: Detect noise
352 """
353 warnings: list[QualityWarning] = []
355 # Estimate SNR
356 signal_power = float(np.mean(signal**2))
357 if signal_power == 0:
358 return warnings
360 # Estimate noise from high-frequency components
361 # Simple approach: use standard deviation as noise estimate
362 noise_power = float(np.var(signal))
363 if noise_power == 0:
364 return warnings
366 snr_linear = signal_power / noise_power
367 snr_db = 10 * np.log10(snr_linear) if snr_linear > 0 else -np.inf
369 if snr_db < threshold_db:
370 warnings.append(
371 QualityWarning(
372 severity="warning",
373 category="noise",
374 message=f"High noise level detected: SNR = {snr_db:.1f} dB",
375 value=snr_db,
376 threshold=threshold_db,
377 suggestion="Check signal source, grounding, and shielding",
378 )
379 )
381 return warnings
384def check_undersampling(
385 signal: NDArray[np.float64],
386 *,
387 sample_rate: float,
388 nyquist_factor: float = 2.0,
389) -> list[QualityWarning]:
390 """Detect undersampling (Nyquist violation).
392 : Detect undersampling (Nyquist violation warning).
394 Args:
395 signal: Input signal array
396 sample_rate: Sample rate in Hz
397 nyquist_factor: Required factor above Nyquist (default: 2.0)
399 Returns:
400 List of undersampling warnings
402 Example:
403 >>> warnings = check_undersampling(signal, sample_rate=1e9)
405 References:
406 EDGE-001: Detect undersampling
407 Nyquist-Shannon sampling theorem
408 """
409 warnings: list[QualityWarning] = []
411 # Estimate highest frequency component using FFT
412 fft = np.fft.rfft(signal)
413 freqs = np.fft.rfftfreq(len(signal), d=1 / sample_rate)
414 power = np.abs(fft) ** 2
416 # Find frequency where power drops to 1% of peak
417 peak_power: float = float(np.max(power))
418 threshold_power = peak_power * 0.01
420 # Find highest significant frequency
421 significant_freqs = freqs[power > threshold_power]
422 if len(significant_freqs) > 0:
423 max_freq = float(np.max(significant_freqs))
424 nyquist_freq = sample_rate / 2.0
425 required_nyquist = max_freq * nyquist_factor
427 if required_nyquist > nyquist_freq:
428 warnings.append(
429 QualityWarning(
430 severity="error",
431 category="undersampling",
432 message=(
433 f"Undersampling detected: signal contains "
434 f"{max_freq / 1e6:.1f} MHz, but Nyquist frequency is "
435 f"{nyquist_freq / 1e6:.1f} MHz"
436 ),
437 value=max_freq,
438 threshold=nyquist_freq / nyquist_factor,
439 suggestion=(
440 f"Increase sample rate to at least "
441 f"{required_nyquist * 2 / 1e6:.1f} MS/s or apply anti-aliasing filter"
442 ),
443 )
444 )
446 return warnings
449__all__ = [
450 "QualityWarning",
451 "SignalQualityAnalyzer",
452 "check_clipping",
453 "check_noise",
454 "check_saturation",
455 "check_undersampling",
456]