Coverage for src / tracekit / core / edge_cases.py: 97%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Edge case handling utilities for TraceKit. 

2 

3This module provides utilities for gracefully handling edge cases including 

4empty inputs, single-sample traces, and NaN/Inf values. 

5 

6 

7Example: 

8 >>> from tracekit.core.edge_cases import ( 

9 ... validate_signal, 

10 ... handle_empty_trace, 

11 ... sanitize_signal 

12 ... ) 

13 >>> validated = validate_signal(signal, min_samples=10) 

14 >>> clean_signal = sanitize_signal(noisy_signal) 

15 

16References: 

17 - IEEE 754 floating-point standard 

18 - NumPy NaN handling best practices 

19""" 

20 

21from __future__ import annotations 

22 

23import warnings 

24from typing import TYPE_CHECKING 

25 

26import numpy as np 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31 

32class EmptyTraceError(Exception): 

33 """Exception raised when trace has no data. 

34 

35 : Empty trace returns informative error, not crash. 

36 

37 Example: 

38 >>> from tracekit.core.edge_cases import EmptyTraceError 

39 >>> raise EmptyTraceError("Cannot analyze empty trace (0 samples)") 

40 

41 References: 

42 EDGE-002: Graceful Empty/Short Signal Handling 

43 """ 

44 

45 def __init__(self, message: str = "Trace is empty (0 samples)") -> None: 

46 """Initialize EmptyTraceError. 

47 

48 Args: 

49 message: Error message (default: "Trace is empty (0 samples)") 

50 """ 

51 super().__init__(message) 

52 

53 

54class InsufficientSamplesError(Exception): 

55 """Exception raised when trace has insufficient samples. 

56 

57 : Too-short trace warns and adapts. 

58 

59 Attributes: 

60 required: Minimum samples required 

61 available: Actual samples available 

62 

63 Example: 

64 >>> from tracekit.core.edge_cases import InsufficientSamplesError 

65 >>> raise InsufficientSamplesError("Need at least 100 samples", 100, 10) 

66 

67 References: 

68 EDGE-002: Graceful Empty/Short Signal Handling 

69 """ 

70 

71 def __init__(self, message: str, required: int, available: int) -> None: 

72 """Initialize InsufficientSamplesError. 

73 

74 Args: 

75 message: Error message 

76 required: Minimum samples required 

77 available: Actual samples available 

78 """ 

79 self.required = required 

80 self.available = available 

81 full_message = f"{message} (required: {required}, available: {available})" 

82 super().__init__(full_message) 

83 

84 

85def validate_signal( 

86 signal: NDArray[np.float64], 

87 *, 

88 min_samples: int = 1, 

89 allow_empty: bool = False, 

90 name: str = "signal", 

91) -> NDArray[np.float64]: 

92 """Validate signal array for basic requirements. 

93 

94 : Empty trace returns informative error, not crash. 

95 Checks for empty arrays and minimum sample requirements. 

96 

97 Args: 

98 signal: Input signal array 

99 min_samples: Minimum required samples (default: 1) 

100 allow_empty: Allow empty arrays (default: False) 

101 name: Signal name for error messages (default: "signal") 

102 

103 Returns: 

104 Validated signal array 

105 

106 Raises: 

107 EmptyTraceError: If signal is empty and allow_empty=False 

108 InsufficientSamplesError: If signal has fewer than min_samples 

109 ValueError: If signal is not 1D or has invalid shape 

110 

111 Example: 

112 >>> import numpy as np 

113 >>> from tracekit.core.edge_cases import validate_signal 

114 >>> signal = np.array([1.0, 2.0, 3.0]) 

115 >>> validated = validate_signal(signal, min_samples=2) 

116 >>> # Empty signal raises error 

117 >>> validate_signal(np.array([])) # Raises EmptyTraceError 

118 

119 References: 

120 EDGE-002: Graceful Empty/Short Signal Handling 

121 """ 

122 # Check if array 

123 if not isinstance(signal, np.ndarray): 

124 raise ValueError(f"{name} must be a numpy array, got {type(signal)}") 

125 

126 # Check dimensions 

127 if signal.ndim != 1: 

128 raise ValueError(f"{name} must be 1-dimensional, got {signal.ndim}D") 

129 

130 # Check for empty 

131 n_samples = len(signal) 

132 if n_samples == 0: 

133 if allow_empty: 

134 return signal 

135 else: 

136 raise EmptyTraceError(f"{name} is empty (0 samples)") 

137 

138 # Check minimum samples 

139 if n_samples < min_samples: 

140 raise InsufficientSamplesError( 

141 f"{name} has too few samples", 

142 required=min_samples, 

143 available=n_samples, 

144 ) 

145 

146 return signal 

147 

148 

149def handle_empty_trace(default_value: float = np.nan) -> NDArray[np.float64]: 

150 """Return a safe default for empty trace operations. 

151 

152 : Empty trace returns informative error, not crash. 

153 Provides graceful fallback for operations on empty traces. 

154 

155 Args: 

156 default_value: Default value to return (default: NaN) 

157 

158 Returns: 

159 Single-element array with default value 

160 

161 Example: 

162 >>> from tracekit.core.edge_cases import handle_empty_trace 

163 >>> result = handle_empty_trace(0.0) 

164 >>> print(result) 

165 [0.] 

166 

167 References: 

168 EDGE-002: Graceful Empty/Short Signal Handling 

169 """ 

170 return np.array([default_value]) 

171 

172 

173def check_single_sample( 

174 signal: NDArray[np.float64], 

175 operation: str = "operation", 

176) -> bool: 

177 """Check if signal has only one sample and warn. 

178 

179 : Handle traces with 1 sample. 

180 Warns user that statistical operations may not be meaningful. 

181 

182 Args: 

183 signal: Input signal 

184 operation: Operation name for warning message 

185 

186 Returns: 

187 True if signal has only 1 sample 

188 

189 Example: 

190 >>> import numpy as np 

191 >>> from tracekit.core.edge_cases import check_single_sample 

192 >>> signal = np.array([42.0]) 

193 >>> if check_single_sample(signal, "FFT"): 

194 ... print("Cannot compute FFT on single sample") 

195 

196 References: 

197 EDGE-002: Graceful Empty/Short Signal Handling 

198 """ 

199 if len(signal) == 1: 

200 warnings.warn( 

201 f"Signal has only 1 sample. {operation} may not produce meaningful results.", 

202 UserWarning, 

203 stacklevel=2, 

204 ) 

205 return True 

206 return False 

207 

208 

209def sanitize_signal( 

210 signal: NDArray[np.float64], 

211 *, 

212 replace_nan: float | str = "interpolate", 

213 replace_inf: float | str = "clip", 

214 warn: bool = True, 

215) -> NDArray[np.float64]: 

216 """Remove or replace NaN and Inf values in signal. 

217 

218 : Handle NaN and Inf values gracefully. 

219 Cleans signal data for robust analysis. 

220 

221 Args: 

222 signal: Input signal array 

223 replace_nan: How to handle NaN: 

224 - "interpolate": Linear interpolation (default) 

225 - "zero": Replace with 0 

226 - "remove": Remove samples (changes length) 

227 - float: Replace with specific value 

228 replace_inf: How to handle Inf: 

229 - "clip": Clip to min/max of finite values (default) 

230 - "zero": Replace with 0 

231 - "remove": Remove samples (changes length) 

232 - float: Replace with specific value 

233 warn: Issue warning if NaN/Inf found (default: True) 

234 

235 Returns: 

236 Sanitized signal array 

237 

238 Raises: 

239 ValueError: If replace_nan or replace_inf option is invalid. 

240 

241 Example: 

242 >>> import numpy as np 

243 >>> from tracekit.core.edge_cases import sanitize_signal 

244 >>> signal = np.array([1.0, np.nan, 3.0, np.inf, 5.0]) 

245 >>> clean = sanitize_signal(signal) 

246 >>> print(clean) 

247 [1. 2. 3. 5. 5.] 

248 

249 References: 

250 EDGE-003: NaN/Inf Handling 

251 """ 

252 signal = signal.copy() # Don't modify input 

253 n_nan = np.sum(np.isnan(signal)) 

254 n_inf = np.sum(np.isinf(signal)) 

255 

256 # Warn if issues found 

257 if warn and (n_nan > 0 or n_inf > 0): 

258 warnings.warn( 

259 f"Signal contains {n_nan} NaN and {n_inf} Inf values. Applying sanitization.", 

260 UserWarning, 

261 stacklevel=2, 

262 ) 

263 

264 # Handle NaN 

265 if n_nan > 0: 

266 if replace_nan == "interpolate": 

267 signal = _interpolate_nan(signal) 

268 elif replace_nan == "zero": 

269 signal[np.isnan(signal)] = 0.0 

270 elif replace_nan == "remove": 

271 signal = signal[~np.isnan(signal)] 

272 elif isinstance(replace_nan, int | float): 

273 signal[np.isnan(signal)] = float(replace_nan) 

274 else: 

275 raise ValueError(f"Invalid replace_nan option: {replace_nan}") 

276 

277 # Handle Inf 

278 if n_inf > 0: 

279 if replace_inf == "clip": 

280 finite_mask = np.isfinite(signal) 

281 if np.any(finite_mask): 

282 min_val = np.min(signal[finite_mask]) 

283 max_val = np.max(signal[finite_mask]) 

284 signal[signal == np.inf] = max_val 

285 signal[signal == -np.inf] = min_val 

286 else: 

287 signal[np.isinf(signal)] = 0.0 

288 elif replace_inf == "zero": 

289 signal[np.isinf(signal)] = 0.0 

290 elif replace_inf == "remove": 

291 signal = signal[~np.isinf(signal)] 

292 elif isinstance(replace_inf, int | float): 

293 signal[np.isinf(signal)] = float(replace_inf) 

294 else: 

295 raise ValueError(f"Invalid replace_inf option: {replace_inf}") 

296 

297 return signal 

298 

299 

300def _interpolate_nan(signal: NDArray[np.float64]) -> NDArray[np.float64]: 

301 """Interpolate NaN values using linear interpolation. 

302 

303 Args: 

304 signal: Signal with NaN values 

305 

306 Returns: 

307 Signal with NaN values interpolated 

308 

309 References: 

310 EDGE-003: NaN/Inf Handling 

311 """ 

312 # Find NaN locations 

313 nan_mask = np.isnan(signal) 

314 

315 if not np.any(nan_mask): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 return signal 

317 

318 # Get valid indices and values 

319 valid_mask = ~nan_mask 

320 if not np.any(valid_mask): 

321 # All NaN - replace with zeros 

322 return np.zeros_like(signal) 

323 

324 valid_indices = np.where(valid_mask)[0] 

325 valid_values = signal[valid_mask] 

326 

327 # Interpolate 

328 nan_indices = np.where(nan_mask)[0] 

329 interpolated = np.interp(nan_indices, valid_indices, valid_values) 

330 

331 # Replace NaN with interpolated values 

332 result = signal.copy() 

333 result[nan_mask] = interpolated 

334 

335 return result 

336 

337 

338def check_signal_quality( 

339 signal: NDArray[np.float64], 

340 *, 

341 clipping_threshold: float = 0.95, 

342 noise_floor_db: float = -60.0, 

343 dc_offset_max: float = 0.1, 

344) -> SignalQualityReport: 

345 """Check signal quality and detect common issues. 

346 

347 : Detect clipping, noise floor, and DC offset problems. 

348 Analyzes signal for quality issues that may affect results. 

349 

350 Args: 

351 signal: Input signal array 

352 clipping_threshold: Fraction of range for clipping detection (default: 0.95) 

353 noise_floor_db: Expected noise floor in dB (default: -60) 

354 dc_offset_max: Maximum acceptable DC offset (default: 0.1) 

355 

356 Returns: 

357 SignalQualityReport with detected issues 

358 

359 Example: 

360 >>> import numpy as np 

361 >>> from tracekit.core.edge_cases import check_signal_quality 

362 >>> signal = np.random.randn(1000) + 0.5 # Signal with DC offset 

363 >>> quality = check_signal_quality(signal, dc_offset_max=0.1) 

364 >>> if quality.dc_offset_excessive: 

365 ... print(f"DC offset: {quality.dc_offset:.3f}") 

366 

367 References: 

368 EDGE-001: Signal Quality Warnings 

369 """ 

370 # Calculate statistics 

371 min_val = float(np.min(signal)) 

372 max_val = float(np.max(signal)) 

373 mean_val = float(np.mean(signal)) 

374 std_val = float(np.std(signal)) 

375 

376 # Check for clipping 

377 signal_range = max_val - min_val 

378 clipping_detected = False 

379 clipping_percent = 0.0 

380 

381 if signal_range > 0: 381 ↛ 391line 381 didn't jump to line 391 because the condition on line 381 was always true

382 # Count samples near limits 

383 upper_thresh = min_val + signal_range * clipping_threshold 

384 lower_thresh = min_val + signal_range * (1 - clipping_threshold) 

385 

386 n_clipped = np.sum((signal >= upper_thresh) | (signal <= lower_thresh)) 

387 clipping_percent = float(100.0 * n_clipped / len(signal)) 

388 clipping_detected = clipping_percent > 1.0 # >1% clipping 

389 

390 # Check noise floor (estimate SNR) 

391 if std_val > 0: 391 ↛ 394line 391 didn't jump to line 394 because the condition on line 391 was always true

392 snr_db = 20 * np.log10(abs(mean_val) / std_val) if abs(mean_val) > 0 else -np.inf 

393 else: 

394 snr_db = np.inf 

395 

396 high_noise = snr_db < noise_floor_db 

397 

398 # Check DC offset 

399 dc_offset = abs(mean_val) 

400 dc_offset_excessive = dc_offset > dc_offset_max 

401 

402 return SignalQualityReport( 

403 clipping_detected=clipping_detected, 

404 clipping_percent=clipping_percent, 

405 adc_min=min_val, 

406 adc_max=max_val, 

407 high_noise=high_noise, 

408 noise_floor_db=float(snr_db), 

409 snr_db=float(snr_db), 

410 dc_offset_excessive=dc_offset_excessive, 

411 dc_offset=dc_offset, 

412 ) 

413 

414 

415class SignalQualityReport: 

416 """Report of signal quality issues. 

417 

418 : Warnings included in measurement results. 

419 

420 Attributes: 

421 clipping_detected: Whether clipping was detected 

422 clipping_percent: Percentage of samples clipped 

423 adc_min: Minimum signal value 

424 adc_max: Maximum signal value 

425 high_noise: Whether noise floor is excessive 

426 noise_floor_db: Estimated noise floor in dB 

427 snr_db: Signal-to-noise ratio in dB 

428 dc_offset_excessive: Whether DC offset is excessive 

429 dc_offset: DC offset value 

430 

431 Example: 

432 >>> from tracekit.core.edge_cases import check_signal_quality 

433 >>> quality = check_signal_quality(signal) 

434 >>> print(quality.summary()) 

435 

436 References: 

437 EDGE-001: Signal Quality Warnings 

438 """ 

439 

440 def __init__( 

441 self, 

442 *, 

443 clipping_detected: bool = False, 

444 clipping_percent: float = 0.0, 

445 adc_min: float = 0.0, 

446 adc_max: float = 0.0, 

447 high_noise: bool = False, 

448 noise_floor_db: float = 0.0, 

449 snr_db: float = 0.0, 

450 dc_offset_excessive: bool = False, 

451 dc_offset: float = 0.0, 

452 ) -> None: 

453 """Initialize SignalQualityReport. 

454 

455 Args: 

456 clipping_detected: Clipping detected flag 

457 clipping_percent: Percentage of clipped samples 

458 adc_min: Minimum signal value 

459 adc_max: Maximum signal value 

460 high_noise: High noise flag 

461 noise_floor_db: Noise floor in dB 

462 snr_db: Signal-to-noise ratio in dB 

463 dc_offset_excessive: Excessive DC offset flag 

464 dc_offset: DC offset value 

465 """ 

466 self.clipping_detected = clipping_detected 

467 self.clipping_percent = clipping_percent 

468 self.adc_min = adc_min 

469 self.adc_max = adc_max 

470 self.high_noise = high_noise 

471 self.noise_floor_db = noise_floor_db 

472 self.snr_db = snr_db 

473 self.dc_offset_excessive = dc_offset_excessive 

474 self.dc_offset = dc_offset 

475 

476 def has_issues(self) -> bool: 

477 """Check if any quality issues were detected. 

478 

479 Returns: 

480 True if any issues found 

481 

482 Example: 

483 >>> if quality.has_issues(): 

484 ... print(quality.summary()) 

485 

486 References: 

487 EDGE-001: Signal Quality Warnings 

488 """ 

489 return self.clipping_detected or self.high_noise or self.dc_offset_excessive 

490 

491 def summary(self) -> str: 

492 """Get text summary of quality issues. 

493 

494 Returns: 

495 Summary string 

496 

497 Example: 

498 >>> print(quality.summary()) 

499 Signal Quality Report: 

500 ✓ No clipping detected 

501 ⚠ High noise floor: -45.2 dB 

502 ✓ DC offset within limits 

503 

504 References: 

505 EDGE-001: Signal Quality Warnings 

506 """ 

507 lines = ["Signal Quality Report:"] 

508 

509 # Clipping 

510 if self.clipping_detected: 

511 lines.append(f" ⚠ Clipping detected: {self.clipping_percent:.1f}% of samples") 

512 lines.append(f" ADC range: {self.adc_min:.3f} to {self.adc_max:.3f}") 

513 else: 

514 lines.append(" ✓ No clipping detected") 

515 

516 # Noise 

517 if self.high_noise: 

518 lines.append(f" ⚠ High noise floor: {self.noise_floor_db:.1f} dB") 

519 lines.append(f" SNR: {self.snr_db:.1f} dB") 

520 else: 

521 lines.append(f" ✓ Noise floor acceptable (SNR: {self.snr_db:.1f} dB)") 

522 

523 # DC offset 

524 if self.dc_offset_excessive: 

525 lines.append(f" ⚠ DC offset: {self.dc_offset:.3f}") 

526 else: 

527 lines.append(" ✓ DC offset within limits") 

528 

529 return "\n".join(lines) 

530 

531 

532__all__ = [ 

533 "EmptyTraceError", 

534 "InsufficientSamplesError", 

535 "SignalQualityReport", 

536 "check_signal_quality", 

537 "check_single_sample", 

538 "handle_empty_trace", 

539 "sanitize_signal", 

540 "validate_signal", 

541]