Coverage for src / tracekit / quality / warnings.py: 99%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Signal quality warnings for TraceKit. 

2 

3This module provides automated detection and warning of signal quality issues 

4including clipping, noise, saturation, and undersampling. 

5 

6 

7Example: 

8 >>> from tracekit.quality.warnings import SignalQualityAnalyzer 

9 >>> analyzer = SignalQualityAnalyzer() 

10 >>> warnings = analyzer.analyze(trace) 

11 >>> for warning in warnings: 

12 ... print(warning) 

13 

14References: 

15 - IEEE 1057: Standard for Digitizing Waveform Recorders 

16 - Nyquist sampling theorem 

17""" 

18 

19from __future__ import annotations 

20 

21from dataclasses import dataclass 

22from typing import TYPE_CHECKING, Literal 

23 

24import numpy as np 

25 

26if TYPE_CHECKING: 

27 from numpy.typing import NDArray 

28 

29 from tracekit.core.types import WaveformTrace 

30 

31 

32@dataclass 

33class QualityWarning: 

34 """Signal quality warning. 

35 

36 Attributes: 

37 severity: Warning severity (error, warning, info) 

38 category: Warning category (clipping, noise, saturation, undersampling) 

39 message: Human-readable warning message 

40 value: Numeric value associated with warning 

41 threshold: Threshold that triggered warning 

42 suggestion: Suggested action to fix issue 

43 

44 Example: 

45 >>> warning = QualityWarning( 

46 ... severity="warning", 

47 ... category="clipping", 

48 ... message="Signal clipping detected", 

49 ... value=5.2, 

50 ... threshold=5.0, 

51 ... suggestion="Reduce input amplitude or increase ADC range" 

52 ... ) 

53 >>> print(warning) 

54 

55 References: 

56 EDGE-001: Signal Quality Warnings 

57 """ 

58 

59 severity: Literal["error", "warning", "info"] 

60 category: Literal["clipping", "noise", "saturation", "undersampling", "dc_offset"] 

61 message: str 

62 value: float 

63 threshold: float 

64 suggestion: str = "" 

65 

66 def __str__(self) -> str: 

67 """Format warning as string. 

68 

69 Returns: 

70 Formatted warning message 

71 """ 

72 prefix = {"error": "ERROR", "warning": "WARNING", "info": "INFO"}[self.severity] 

73 msg = f"[{prefix}] {self.message}" 

74 if self.value is not None: 74 ↛ 76line 74 didn't jump to line 76 because the condition on line 74 was always true

75 msg += f" (value: {self.value:.3f}, threshold: {self.threshold:.3f})" 

76 if self.suggestion: 

77 msg += f"\n Suggestion: {self.suggestion}" 

78 return msg 

79 

80 

81class SignalQualityAnalyzer: 

82 """Analyzer for signal quality issues. 

83 

84 : Detect clipping, undersampling, noise, and saturation. 

85 Performs comprehensive signal quality checks and generates warnings. 

86 

87 Args: 

88 clip_threshold: Clipping detection threshold (fraction of range, default: 0.99) 

89 noise_threshold_db: Noise floor threshold in dB (default: -40) 

90 saturation_threshold: ADC saturation threshold (default: 0.98) 

91 nyquist_factor: Factor for Nyquist frequency check (default: 2.0) 

92 

93 Example: 

94 >>> from tracekit.quality.warnings import SignalQualityAnalyzer 

95 >>> analyzer = SignalQualityAnalyzer(clip_threshold=0.95) 

96 >>> warnings = analyzer.analyze(trace) 

97 >>> if warnings: 

98 ... for w in warnings: 

99 ... print(w) 

100 

101 References: 

102 EDGE-001: Signal Quality Warnings 

103 """ 

104 

105 def __init__( 

106 self, 

107 *, 

108 clip_threshold: float = 0.99, 

109 noise_threshold_db: float = -40.0, 

110 saturation_threshold: float = 0.98, 

111 nyquist_factor: float = 2.0, 

112 ) -> None: 

113 """Initialize signal quality analyzer. 

114 

115 Args: 

116 clip_threshold: Clipping detection threshold 

117 noise_threshold_db: Noise floor threshold in dB 

118 saturation_threshold: ADC saturation threshold 

119 nyquist_factor: Nyquist frequency factor 

120 """ 

121 self.clip_threshold = clip_threshold 

122 self.noise_threshold_db = noise_threshold_db 

123 self.saturation_threshold = saturation_threshold 

124 self.nyquist_factor = nyquist_factor 

125 

126 def analyze( 

127 self, 

128 trace: WaveformTrace | NDArray[np.float64], 

129 *, 

130 sample_rate: float | None = None, 

131 adc_range: tuple[float, float] | None = None, 

132 ) -> list[QualityWarning]: 

133 """Analyze signal quality and generate warnings. 

134 

135 : Comprehensive signal quality detection. 

136 

137 Args: 

138 trace: Input trace or signal array 

139 sample_rate: Sample rate in Hz (required for undersampling check) 

140 adc_range: ADC range as (min, max) tuple 

141 

142 Returns: 

143 List of QualityWarning objects 

144 

145 Example: 

146 >>> warnings = analyzer.analyze(trace, sample_rate=1e9) 

147 >>> for warning in warnings: 

148 ... print(warning) 

149 

150 References: 

151 EDGE-001: Signal Quality Warnings 

152 """ 

153 # Extract data and sample rate 

154 if hasattr(trace, "data"): 

155 data = trace.data # type: ignore[ignore-without-code] 

156 if sample_rate is None and hasattr(trace, "metadata"): 

157 sample_rate = trace.metadata.sample_rate # type: ignore[ignore-without-code] 

158 else: 

159 data = trace # type: ignore[assignment] 

160 

161 # Ensure data is NDArray[np.float64] 

162 data_array: NDArray[np.float64] = np.asarray(data, dtype=np.float64) 

163 

164 warnings: list[QualityWarning] = [] 

165 

166 # Check clipping 

167 warnings.extend( 

168 check_clipping( 

169 data_array, 

170 threshold=self.clip_threshold, 

171 adc_range=adc_range, 

172 ) 

173 ) 

174 

175 # Check saturation 

176 warnings.extend( 

177 check_saturation( 

178 data_array, 

179 threshold=self.saturation_threshold, 

180 adc_range=adc_range, 

181 ) 

182 ) 

183 

184 # Check noise 

185 warnings.extend( 

186 check_noise( 

187 data_array, 

188 threshold_db=self.noise_threshold_db, 

189 ) 

190 ) 

191 

192 # Check undersampling (requires sample rate) 

193 if sample_rate is not None: 

194 warnings.extend( 

195 check_undersampling( 

196 data_array, 

197 sample_rate=sample_rate, 

198 nyquist_factor=self.nyquist_factor, 

199 ) 

200 ) 

201 

202 return warnings 

203 

204 

205def check_clipping( 

206 signal: NDArray[np.float64], 

207 *, 

208 threshold: float = 0.99, 

209 adc_range: tuple[float, float] | None = None, 

210) -> list[QualityWarning]: 

211 """Detect signal clipping. 

212 

213 : Detect clipping (signal hits rail). 

214 

215 Args: 

216 signal: Input signal array 

217 threshold: Fraction of range for clipping detection (default: 0.99) 

218 adc_range: ADC range as (min, max) tuple 

219 

220 Returns: 

221 List of clipping warnings 

222 

223 Example: 

224 >>> import numpy as np 

225 >>> signal = np.clip(np.random.randn(1000), -1, 1) 

226 >>> warnings = check_clipping(signal) 

227 >>> if warnings: 

228 ... print("Clipping detected!") 

229 

230 References: 

231 EDGE-001: Detect clipping 

232 """ 

233 warnings: list[QualityWarning] = [] 

234 

235 # Determine signal range 

236 if adc_range is not None: 

237 min_val, max_val = adc_range 

238 else: 

239 min_val = float(np.min(signal)) 

240 max_val = float(np.max(signal)) 

241 

242 signal_range = max_val - min_val 

243 if signal_range == 0: 

244 return warnings 

245 

246 # Count samples near limits 

247 upper_limit = min_val + signal_range * threshold 

248 lower_limit = min_val + signal_range * (1 - threshold) 

249 

250 n_upper: int = int(np.sum(signal >= upper_limit)) 

251 n_lower: int = int(np.sum(signal <= lower_limit)) 

252 n_clipped = n_upper + n_lower 

253 clip_percent = float(100.0 * n_clipped / len(signal)) 

254 

255 if clip_percent > 1.0: # More than 1% clipped 

256 severity: Literal["error", "warning"] = "error" if clip_percent > 5.0 else "warning" 

257 warnings.append( 

258 QualityWarning( 

259 severity=severity, 

260 category="clipping", 

261 message=f"Signal clipping detected at {clip_percent:.1f}% of samples", 

262 value=clip_percent, 

263 threshold=1.0, 

264 suggestion="Reduce input amplitude or increase ADC range", 

265 ) 

266 ) 

267 

268 return warnings 

269 

270 

271def check_saturation( 

272 signal: NDArray[np.float64], 

273 *, 

274 threshold: float = 0.98, 

275 adc_range: tuple[float, float] | None = None, 

276) -> list[QualityWarning]: 

277 """Detect ADC saturation. 

278 

279 : Detect saturation (ADC range utilization). 

280 

281 Args: 

282 signal: Input signal array 

283 threshold: Saturation threshold as fraction of range (default: 0.98) 

284 adc_range: ADC range as (min, max) tuple 

285 

286 Returns: 

287 List of saturation warnings 

288 

289 Example: 

290 >>> warnings = check_saturation(signal, threshold=0.95) 

291 

292 References: 

293 EDGE-001: Detect saturation 

294 """ 

295 warnings: list[QualityWarning] = [] 

296 

297 # Determine signal range 

298 if adc_range is not None: 

299 adc_min, adc_max = adc_range 

300 adc_span = adc_max - adc_min 

301 else: 

302 # Assume signal uses full observed range as ADC range 

303 adc_min = float(np.min(signal)) 

304 adc_max = float(np.max(signal)) 

305 adc_span = adc_max - adc_min 

306 

307 if adc_span == 0: 

308 return warnings 

309 

310 # Calculate range utilization 

311 signal_min = float(np.min(signal)) 

312 signal_max = float(np.max(signal)) 

313 signal_span = signal_max - signal_min 

314 utilization = signal_span / adc_span 

315 

316 if utilization > threshold: 

317 warnings.append( 

318 QualityWarning( 

319 severity="warning", 

320 category="saturation", 

321 message=f"High ADC range utilization: {utilization * 100:.1f}%", 

322 value=utilization * 100, 

323 threshold=threshold * 100, 

324 suggestion="Consider increasing ADC range or reducing signal amplitude", 

325 ) 

326 ) 

327 

328 return warnings 

329 

330 

331def check_noise( 

332 signal: NDArray[np.float64], 

333 *, 

334 threshold_db: float = -40.0, 

335) -> list[QualityWarning]: 

336 """Detect excessive noise. 

337 

338 : Detect noise (SNR below threshold warning). 

339 

340 Args: 

341 signal: Input signal array 

342 threshold_db: Noise threshold in dB (default: -40) 

343 

344 Returns: 

345 List of noise warnings 

346 

347 Example: 

348 >>> warnings = check_noise(signal, threshold_db=-50) 

349 

350 References: 

351 EDGE-001: Detect noise 

352 """ 

353 warnings: list[QualityWarning] = [] 

354 

355 # Estimate SNR 

356 signal_power = float(np.mean(signal**2)) 

357 if signal_power == 0: 

358 return warnings 

359 

360 # Estimate noise from high-frequency components 

361 # Simple approach: use standard deviation as noise estimate 

362 noise_power = float(np.var(signal)) 

363 if noise_power == 0: 

364 return warnings 

365 

366 snr_linear = signal_power / noise_power 

367 snr_db = 10 * np.log10(snr_linear) if snr_linear > 0 else -np.inf 

368 

369 if snr_db < threshold_db: 

370 warnings.append( 

371 QualityWarning( 

372 severity="warning", 

373 category="noise", 

374 message=f"High noise level detected: SNR = {snr_db:.1f} dB", 

375 value=snr_db, 

376 threshold=threshold_db, 

377 suggestion="Check signal source, grounding, and shielding", 

378 ) 

379 ) 

380 

381 return warnings 

382 

383 

384def check_undersampling( 

385 signal: NDArray[np.float64], 

386 *, 

387 sample_rate: float, 

388 nyquist_factor: float = 2.0, 

389) -> list[QualityWarning]: 

390 """Detect undersampling (Nyquist violation). 

391 

392 : Detect undersampling (Nyquist violation warning). 

393 

394 Args: 

395 signal: Input signal array 

396 sample_rate: Sample rate in Hz 

397 nyquist_factor: Required factor above Nyquist (default: 2.0) 

398 

399 Returns: 

400 List of undersampling warnings 

401 

402 Example: 

403 >>> warnings = check_undersampling(signal, sample_rate=1e9) 

404 

405 References: 

406 EDGE-001: Detect undersampling 

407 Nyquist-Shannon sampling theorem 

408 """ 

409 warnings: list[QualityWarning] = [] 

410 

411 # Estimate highest frequency component using FFT 

412 fft = np.fft.rfft(signal) 

413 freqs = np.fft.rfftfreq(len(signal), d=1 / sample_rate) 

414 power = np.abs(fft) ** 2 

415 

416 # Find frequency where power drops to 1% of peak 

417 peak_power: float = float(np.max(power)) 

418 threshold_power = peak_power * 0.01 

419 

420 # Find highest significant frequency 

421 significant_freqs = freqs[power > threshold_power] 

422 if len(significant_freqs) > 0: 

423 max_freq = float(np.max(significant_freqs)) 

424 nyquist_freq = sample_rate / 2.0 

425 required_nyquist = max_freq * nyquist_factor 

426 

427 if required_nyquist > nyquist_freq: 

428 warnings.append( 

429 QualityWarning( 

430 severity="error", 

431 category="undersampling", 

432 message=( 

433 f"Undersampling detected: signal contains " 

434 f"{max_freq / 1e6:.1f} MHz, but Nyquist frequency is " 

435 f"{nyquist_freq / 1e6:.1f} MHz" 

436 ), 

437 value=max_freq, 

438 threshold=nyquist_freq / nyquist_factor, 

439 suggestion=( 

440 f"Increase sample rate to at least " 

441 f"{required_nyquist * 2 / 1e6:.1f} MS/s or apply anti-aliasing filter" 

442 ), 

443 ) 

444 ) 

445 

446 return warnings 

447 

448 

449__all__ = [ 

450 "QualityWarning", 

451 "SignalQualityAnalyzer", 

452 "check_clipping", 

453 "check_noise", 

454 "check_saturation", 

455 "check_undersampling", 

456]