Coverage for src / tracekit / analyzers / waveform / spectral.py: 80%

434 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Spectral analysis functions for waveform data. 

2 

3This module provides FFT, PSD, and spectral quality metrics 

4per IEEE 1241-2010 for ADC characterization. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.waveform.spectral import fft, thd, snr 

9 >>> freq, magnitude = fft(trace) 

10 >>> thd_db = thd(trace) 

11 >>> snr_db = snr(trace) 

12 

13References: 

14 IEEE 1241-2010: Standard for Terminology and Test Methods for 

15 Analog-to-Digital Converters 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING, Literal 

21 

22import numpy as np 

23from scipy import signal as sp_signal 

24 

25from tracekit.core.exceptions import AnalysisError, InsufficientDataError 

26from tracekit.utils.windowing import get_window 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31 from tracekit.core.types import WaveformTrace 

32 

33 

34def fft( 

35 trace: WaveformTrace, 

36 *, 

37 window: str = "hann", 

38 nfft: int | None = None, 

39 detrend: Literal["none", "mean", "linear"] = "mean", 

40 return_phase: bool = False, 

41) -> ( 

42 tuple[NDArray[np.float64], NDArray[np.float64]] 

43 | tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]] 

44): 

45 """Compute windowed FFT with optional zero-padding. 

46 

47 Computes the single-sided magnitude spectrum in dB with optional 

48 phase output. Uses configurable windowing and zero-padding. 

49 

50 Args: 

51 trace: Input waveform trace. 

52 window: Window function name (default "hann"). 

53 nfft: FFT length. If None, uses next power of 2. 

54 detrend: Detrend method before FFT: 

55 - "none": No detrending 

56 - "mean": Remove DC offset (default) 

57 - "linear": Remove linear trend 

58 return_phase: If True, also return phase in radians. 

59 

60 Returns: 

61 If return_phase=False: 

62 (frequencies, magnitude_db) - Frequency axis and magnitude in dB 

63 If return_phase=True: 

64 (frequencies, magnitude_db, phase_rad) - Plus phase in radians 

65 

66 Raises: 

67 InsufficientDataError: If trace has fewer than 2 samples. 

68 

69 Example: 

70 >>> freq, mag = fft(trace) 

71 >>> plt.semilogx(freq, mag) 

72 >>> plt.xlabel("Frequency (Hz)") 

73 >>> plt.ylabel("Magnitude (dB)") 

74 

75 References: 

76 IEEE 1241-2010 Section 4.1.1 

77 """ 

78 data = trace.data 

79 n = len(data) 

80 

81 if n < 2: 

82 raise InsufficientDataError( 

83 "FFT requires at least 2 samples", 

84 required=2, 

85 available=n, 

86 analysis_type="fft", 

87 ) 

88 

89 # Detrend 

90 if detrend == "mean": 

91 data = data - np.mean(data) 

92 elif detrend == "linear": 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 data = sp_signal.detrend(data, type="linear") 

94 

95 # Determine FFT length 

96 nfft = int(2 ** np.ceil(np.log2(n))) if nfft is None else max(nfft, n) 

97 

98 # Apply window 

99 w = get_window(window, n) 

100 data_windowed = data * w 

101 

102 # Compute FFT 

103 spectrum = np.fft.rfft(data_windowed, n=nfft) 

104 

105 # Frequency axis 

106 sample_rate = trace.metadata.sample_rate 

107 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate) 

108 

109 # Magnitude in dB (normalized by window coherent gain) 

110 window_gain = np.sum(w) / n 

111 magnitude = np.abs(spectrum) / (n * window_gain) 

112 # Avoid log(0) 

113 magnitude = np.maximum(magnitude, 1e-20) 

114 magnitude_db = 20 * np.log10(magnitude) 

115 

116 if return_phase: 

117 phase = np.angle(spectrum) 

118 return freq, magnitude_db, phase 

119 

120 return freq, magnitude_db 

121 

122 

123def psd( 

124 trace: WaveformTrace, 

125 *, 

126 window: str = "hann", 

127 nperseg: int | None = None, 

128 noverlap: int | None = None, 

129 nfft: int | None = None, 

130 scaling: Literal["density", "spectrum"] = "density", 

131) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

132 """Compute Power Spectral Density using Welch's method. 

133 

134 Uses overlapped segment averaging for reduced variance PSD estimation. 

135 

136 Args: 

137 trace: Input waveform trace. 

138 window: Window function name (default "hann"). 

139 nperseg: Segment length. If None, uses n // 8 or 256, whichever larger. 

140 noverlap: Overlap between segments. If None, uses nperseg // 2. 

141 nfft: FFT length per segment. If None, uses nperseg. 

142 scaling: Output scaling: 

143 - "density": Power spectral density (V^2/Hz) 

144 - "spectrum": Power spectrum (V^2) 

145 

146 Returns: 

147 (frequencies, psd) - Frequency axis and PSD in dB/Hz. 

148 

149 Raises: 

150 InsufficientDataError: If trace has insufficient data. 

151 

152 Example: 

153 >>> freq, psd_db = psd(trace) 

154 >>> plt.semilogx(freq, psd_db) 

155 >>> plt.ylabel("PSD (dB/Hz)") 

156 

157 References: 

158 Welch, P. D. (1967). "The use of fast Fourier transform for the 

159 estimation of power spectra" 

160 """ 

161 data = trace.data 

162 n = len(data) 

163 

164 if n < 16: 

165 raise InsufficientDataError( 

166 "PSD requires at least 16 samples", 

167 required=16, 

168 available=n, 

169 analysis_type="psd", 

170 ) 

171 

172 sample_rate = trace.metadata.sample_rate 

173 

174 # Default segment length 

175 if nperseg is None: 

176 nperseg = max(256, n // 8) 

177 nperseg = min(nperseg, n) 

178 

179 # Default overlap (50% for Hann window) 

180 if noverlap is None: 

181 noverlap = nperseg // 2 

182 

183 freq, psd_linear = sp_signal.welch( 

184 data, 

185 fs=sample_rate, 

186 window=window, 

187 nperseg=nperseg, 

188 noverlap=noverlap, 

189 nfft=nfft, 

190 scaling=scaling, 

191 detrend="constant", 

192 ) 

193 

194 # Convert to dB 

195 psd_linear = np.maximum(psd_linear, 1e-20) 

196 psd_db = 10 * np.log10(psd_linear) 

197 

198 return freq, psd_db 

199 

200 

201def periodogram( 

202 trace: WaveformTrace, 

203 *, 

204 window: str = "hann", 

205 nfft: int | None = None, 

206 scaling: Literal["density", "spectrum"] = "density", 

207 detrend: Literal["constant", "linear", False] = "constant", 

208) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

209 """Compute classical periodogram PSD estimate. 

210 

211 Single-segment PSD estimation using scaled FFT magnitude squared. 

212 

213 Args: 

214 trace: Input waveform trace. 

215 window: Window function name (default "hann"). 

216 nfft: FFT length. If None, uses data length. 

217 scaling: Output scaling ("density" or "spectrum"). 

218 detrend: Detrending method. 

219 

220 Returns: 

221 (frequencies, psd) - Frequency axis and PSD. 

222 

223 Example: 

224 >>> freq, psd = periodogram(trace) 

225 

226 References: 

227 IEEE 1241-2010 Section 4.1.2 

228 """ 

229 sample_rate = trace.metadata.sample_rate 

230 

231 freq, psd_linear = sp_signal.periodogram( 

232 trace.data, 

233 fs=sample_rate, 

234 window=window, 

235 nfft=nfft, 

236 scaling=scaling, 

237 detrend=detrend, 

238 ) 

239 

240 # Convert to dB 

241 psd_linear = np.maximum(psd_linear, 1e-20) 

242 psd_db = 10 * np.log10(psd_linear) 

243 

244 return freq, psd_db 

245 

246 

247def bartlett_psd( 

248 trace: WaveformTrace, 

249 *, 

250 n_segments: int = 8, 

251 window: str = "rectangular", 

252 nfft: int | None = None, 

253) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

254 """Compute Bartlett's method PSD estimate. 

255 

256 Averages periodograms of non-overlapping segments for 

257 reduced variance at cost of frequency resolution. 

258 

259 Args: 

260 trace: Input waveform trace. 

261 n_segments: Number of non-overlapping segments. 

262 window: Window function per segment. 

263 nfft: FFT length per segment. 

264 

265 Returns: 

266 (frequencies, psd_db) - Frequency axis and PSD in dB. 

267 

268 Raises: 

269 AnalysisError: If no segments were processed (empty trace). 

270 InsufficientDataError: If trace has fewer than 16*n_segments samples. 

271 

272 Example: 

273 >>> freq, psd = bartlett_psd(trace, n_segments=8) 

274 """ 

275 data = trace.data 

276 n = len(data) 

277 sample_rate = trace.metadata.sample_rate 

278 

279 segment_length = n // n_segments 

280 

281 if segment_length < 16: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 raise InsufficientDataError( 

283 f"Bartlett requires at least {16 * n_segments} samples for {n_segments} segments", 

284 required=16 * n_segments, 

285 available=n, 

286 analysis_type="bartlett_psd", 

287 ) 

288 

289 if nfft is None: 289 ↛ 293line 289 didn't jump to line 293 because the condition on line 289 was always true

290 nfft = segment_length 

291 

292 # Accumulate periodograms 

293 psd_sum = None 

294 w = get_window(window, segment_length) 

295 window_power = np.sum(w**2) 

296 

297 for i in range(n_segments): 

298 segment = data[i * segment_length : (i + 1) * segment_length] 

299 segment_windowed = segment * w 

300 

301 spectrum = np.fft.rfft(segment_windowed, n=nfft) 

302 # Power spectrum (V^2) 

303 psd_segment = (np.abs(spectrum) ** 2) / (sample_rate * window_power) 

304 

305 if psd_sum is None: 

306 psd_sum = psd_segment 

307 else: 

308 psd_sum += psd_segment 

309 

310 if psd_sum is None: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 raise AnalysisError("No segments were processed - input trace may be empty") 

312 psd_avg = psd_sum / n_segments 

313 

314 # Frequency axis 

315 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate) 

316 

317 # Convert to dB 

318 psd_avg = np.maximum(psd_avg, 1e-20) 

319 psd_db = 10 * np.log10(psd_avg) 

320 

321 return freq, psd_db 

322 

323 

324def spectrogram( 

325 trace: WaveformTrace, 

326 *, 

327 window: str = "hann", 

328 nperseg: int | None = None, 

329 noverlap: int | None = None, 

330 nfft: int | None = None, 

331) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: 

332 """Compute Short-Time Fourier Transform spectrogram. 

333 

334 Time-frequency representation for analyzing non-stationary signals. 

335 

336 Args: 

337 trace: Input waveform trace. 

338 window: Window function name. 

339 nperseg: Segment length. If None, auto-selected. 

340 noverlap: Overlap between segments. If None, uses nperseg - 1. 

341 nfft: FFT length per segment. 

342 

343 Returns: 

344 (times, frequencies, magnitude_db) - Time axis, frequency axis, 

345 and magnitude in dB as 2D array. 

346 

347 Example: 

348 >>> t, f, Sxx = spectrogram(trace) 

349 >>> plt.pcolormesh(t, f, Sxx, shading='auto') 

350 >>> plt.ylabel('Frequency (Hz)') 

351 >>> plt.xlabel('Time (s)') 

352 """ 

353 data = trace.data 

354 n = len(data) 

355 sample_rate = trace.metadata.sample_rate 

356 

357 if nperseg is None: 

358 nperseg = min(256, n // 4) 

359 nperseg = max(nperseg, 16) 

360 

361 if noverlap is None: 

362 noverlap = nperseg - nperseg // 8 

363 

364 freq, times, Sxx = sp_signal.spectrogram( 

365 data, 

366 fs=sample_rate, 

367 window=window, 

368 nperseg=nperseg, 

369 noverlap=noverlap, 

370 nfft=nfft, 

371 scaling="spectrum", 

372 ) 

373 

374 # Convert to dB 

375 Sxx = np.maximum(Sxx, 1e-20) 

376 Sxx_db = 10 * np.log10(Sxx) 

377 

378 return times, freq, Sxx_db 

379 

380 

381def _find_fundamental( 

382 freq: NDArray[np.float64], 

383 magnitude: NDArray[np.float64], 

384 *, 

385 min_freq: float = 0.0, 

386) -> tuple[int, float, float]: 

387 """Find fundamental frequency in spectrum. 

388 

389 Args: 

390 freq: Frequency axis. 

391 magnitude: Magnitude spectrum (linear, not dB). 

392 min_freq: Minimum frequency to consider. 

393 

394 Returns: 

395 (index, frequency, magnitude) of fundamental. 

396 """ 

397 # Skip DC and frequencies below min_freq 

398 valid_mask = freq > max(min_freq, freq[1]) 

399 valid_indices = np.where(valid_mask)[0] 

400 

401 if len(valid_indices) == 0: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true

402 return 0, 0.0, 0.0 

403 

404 # Find peak in valid region 

405 local_peak_idx = np.argmax(magnitude[valid_indices]) 

406 peak_idx = valid_indices[local_peak_idx] 

407 

408 return int(peak_idx), float(freq[peak_idx]), float(magnitude[peak_idx]) 

409 

410 

411def _find_harmonic_indices( 

412 freq: NDArray[np.float64], 

413 fundamental_freq: float, 

414 n_harmonics: int, 

415) -> list[int]: 

416 """Find indices of harmonic frequencies. 

417 

418 Args: 

419 freq: Frequency axis. 

420 fundamental_freq: Fundamental frequency. 

421 n_harmonics: Number of harmonics to find. 

422 

423 Returns: 

424 List of indices for harmonics 2, 3, ..., n_harmonics+1. 

425 """ 

426 indices = [] 

427 

428 for h in range(2, n_harmonics + 2): 

429 target_freq = h * fundamental_freq 

430 if target_freq > freq[-1]: 

431 break 

432 

433 # Find closest bin 

434 idx = np.argmin(np.abs(freq - target_freq)) 

435 indices.append(int(idx)) 

436 

437 return indices 

438 

439 

440def thd( 

441 trace: WaveformTrace, 

442 *, 

443 n_harmonics: int = 10, 

444 window: str = "hann", 

445 nfft: int | None = None, 

446 return_db: bool = True, 

447) -> float: 

448 """Compute Total Harmonic Distortion. 

449 

450 THD is the ratio of harmonic power to fundamental power. 

451 

452 Args: 

453 trace: Input waveform trace. 

454 n_harmonics: Number of harmonics to include (default 10). 

455 window: Window function for FFT. 

456 nfft: FFT length. If None, uses data length (no zero-padding) to 

457 preserve coherent sampling per IEEE 1241-2010. 

458 return_db: If True, return in dB. If False, return percentage. 

459 

460 Returns: 

461 THD in dB or percentage. 

462 

463 Example: 

464 >>> thd_db = thd(trace) 

465 >>> thd_pct = thd(trace, return_db=False) 

466 >>> print(f"THD: {thd_db:.1f} dB ({thd_pct:.2f}%)") 

467 

468 References: 

469 IEEE 1241-2010 Section 4.1.4.2 

470 """ 

471 # Use data length as NFFT to avoid zero-padding that breaks coherence 

472 if nfft is None: 472 ↛ 475line 472 didn't jump to line 475 because the condition on line 472 was always true

473 nfft = len(trace.data) 

474 

475 result = fft(trace, window=window, nfft=nfft, detrend="mean") 

476 freq, mag_db = result[0], result[1] 

477 

478 # Convert to linear 

479 magnitude = 10 ** (mag_db / 20) 

480 

481 # Find fundamental 

482 _fund_idx, fund_freq, fund_mag = _find_fundamental(freq, magnitude) 

483 

484 if fund_mag == 0 or fund_freq == 0: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 return np.nan # type: ignore[no-any-return] 

486 

487 # Find harmonics 

488 harmonic_indices = _find_harmonic_indices(freq, fund_freq, n_harmonics) 

489 

490 if len(harmonic_indices) == 0: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 return 0.0 if not return_db else -np.inf 

492 

493 # Sum harmonic power 

494 harmonic_power = sum(magnitude[i] ** 2 for i in harmonic_indices) 

495 

496 # THD ratio 

497 thd_ratio = np.sqrt(harmonic_power) / fund_mag 

498 

499 if return_db: 

500 if thd_ratio <= 0: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 return -np.inf # type: ignore[no-any-return] 

502 return float(20 * np.log10(thd_ratio)) 

503 else: 

504 return float(thd_ratio * 100) 

505 

506 

507def snr( 

508 trace: WaveformTrace, 

509 *, 

510 n_harmonics: int = 10, 

511 window: str = "hann", 

512 nfft: int | None = None, 

513) -> float: 

514 """Compute Signal-to-Noise Ratio. 

515 

516 SNR is the ratio of signal power to noise power, excluding harmonics. 

517 

518 Args: 

519 trace: Input waveform trace. 

520 n_harmonics: Number of harmonics to exclude from noise. 

521 window: Window function for FFT. 

522 nfft: FFT length. If None, uses data length (no zero-padding) to 

523 preserve coherent sampling per IEEE 1241-2010. 

524 

525 Returns: 

526 SNR in dB. 

527 

528 Example: 

529 >>> snr_db = snr(trace) 

530 >>> print(f"SNR: {snr_db:.1f} dB") 

531 

532 References: 

533 IEEE 1241-2010 Section 4.1.4.1 

534 """ 

535 # Use data length as NFFT to avoid zero-padding that breaks coherence 

536 if nfft is None: 536 ↛ 539line 536 didn't jump to line 539 because the condition on line 536 was always true

537 nfft = len(trace.data) 

538 

539 result = fft(trace, window=window, nfft=nfft, detrend="mean") 

540 freq, mag_db = result[0], result[1] 

541 magnitude = 10 ** (mag_db / 20) 

542 

543 # Find fundamental 

544 fund_idx, fund_freq, fund_mag = _find_fundamental(freq, magnitude) 

545 

546 if fund_mag == 0 or fund_freq == 0: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 return np.nan # type: ignore[no-any-return] 

548 

549 # Find harmonics to exclude 

550 harmonic_indices = _find_harmonic_indices(freq, fund_freq, n_harmonics) 

551 

552 # Build exclusion set: DC, fundamental, and harmonics 

553 # Also exclude bins adjacent to fundamental and harmonics (spectral leakage) 

554 exclude_indices = {0} # DC 

555 

556 # Exclude fundamental and adjacent bins 

557 for offset in range(-3, 4): # +/- 3 bins around fundamental 

558 idx = fund_idx + offset 

559 if 0 <= idx < len(magnitude): 

560 exclude_indices.add(idx) 

561 

562 # Exclude harmonics and adjacent bins 

563 for h_idx in harmonic_indices: 

564 for offset in range(-3, 4): # +/- 3 bins around each harmonic 

565 idx = h_idx + offset 

566 if 0 <= idx < len(magnitude): 566 ↛ 564line 566 didn't jump to line 564 because the condition on line 566 was always true

567 exclude_indices.add(idx) 

568 

569 # Signal power (fundamental only, using single bin or small window) 

570 # Use 3-bin sum around fundamental for better estimate 

571 signal_power = 0.0 

572 for offset in range(-1, 2): 

573 idx = fund_idx + offset 

574 if 0 <= idx < len(magnitude): 574 ↛ 572line 574 didn't jump to line 572 because the condition on line 574 was always true

575 signal_power += magnitude[idx] ** 2 

576 

577 # Noise power (all bins except excluded ones) 

578 noise_power = 0.0 

579 for i in range(len(magnitude)): 

580 if i not in exclude_indices: 

581 noise_power += magnitude[i] ** 2 

582 

583 if noise_power <= 0: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 return np.inf # type: ignore[no-any-return] 

585 

586 snr_ratio = signal_power / noise_power 

587 return float(10 * np.log10(snr_ratio)) 

588 

589 

590def sinad( 

591 trace: WaveformTrace, 

592 *, 

593 window: str = "hann", 

594 nfft: int | None = None, 

595) -> float: 

596 """Compute Signal-to-Noise and Distortion ratio. 

597 

598 SINAD is the ratio of signal power to noise plus distortion power. 

599 

600 Args: 

601 trace: Input waveform trace. 

602 window: Window function for FFT. 

603 nfft: FFT length. If None, uses data length (no zero-padding) to 

604 preserve coherent sampling per IEEE 1241-2010. 

605 

606 Returns: 

607 SINAD in dB. 

608 

609 Example: 

610 >>> sinad_db = sinad(trace) 

611 >>> print(f"SINAD: {sinad_db:.1f} dB") 

612 

613 References: 

614 IEEE 1241-2010 Section 4.1.4.3 

615 """ 

616 # Use data length as NFFT to avoid zero-padding that breaks coherence 

617 if nfft is None: 617 ↛ 620line 617 didn't jump to line 620 because the condition on line 617 was always true

618 nfft = len(trace.data) 

619 

620 result = fft(trace, window=window, nfft=nfft, detrend="mean") 

621 freq, mag_db = result[0], result[1] 

622 magnitude = 10 ** (mag_db / 20) 

623 

624 # Find fundamental 

625 fund_idx, _fund_freq, fund_mag = _find_fundamental(freq, magnitude) 

626 

627 if fund_mag == 0: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true

628 return np.nan # type: ignore[no-any-return] 

629 

630 # Signal power: use 3-bin window around fundamental to capture spectral leakage 

631 signal_power = 0.0 

632 for offset in range(-1, 2): 

633 idx = fund_idx + offset 

634 if 0 <= idx < len(magnitude): 634 ↛ 632line 634 didn't jump to line 632 because the condition on line 634 was always true

635 signal_power += magnitude[idx] ** 2 

636 

637 # Total power (exclude DC) 

638 total_power = np.sum(magnitude[1:] ** 2) 

639 

640 # Noise + distortion power = everything except signal 

641 nad_power = total_power - signal_power 

642 

643 if nad_power <= 0: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true

644 return np.inf # type: ignore[no-any-return] 

645 

646 sinad_ratio = signal_power / nad_power 

647 return float(10 * np.log10(sinad_ratio)) 

648 

649 

650def enob( 

651 trace: WaveformTrace, 

652 *, 

653 window: str = "hann", 

654 nfft: int | None = None, 

655) -> float: 

656 """Compute Effective Number of Bits from SINAD. 

657 

658 ENOB = (SINAD - 1.76) / 6.02 

659 

660 Args: 

661 trace: Input waveform trace. 

662 window: Window function for FFT. 

663 nfft: FFT length. 

664 

665 Returns: 

666 ENOB in bits, or np.nan if SINAD is invalid. 

667 

668 Example: 

669 >>> bits = enob(trace) 

670 >>> print(f"ENOB: {bits:.2f} bits") 

671 

672 References: 

673 IEEE 1241-2010 Section 4.1.4.4 

674 """ 

675 sinad_db = sinad(trace, window=window, nfft=nfft) 

676 

677 if np.isnan(sinad_db) or sinad_db <= 0: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 return np.nan # type: ignore[no-any-return] 

679 

680 return float((sinad_db - 1.76) / 6.02) 

681 

682 

683def sfdr( 

684 trace: WaveformTrace, 

685 *, 

686 window: str = "hann", 

687 nfft: int | None = None, 

688) -> float: 

689 """Compute Spurious-Free Dynamic Range. 

690 

691 SFDR is the ratio of fundamental to largest spurious component. 

692 

693 Args: 

694 trace: Input waveform trace. 

695 window: Window function for FFT. 

696 nfft: FFT length. If None, uses data length (no zero-padding) to 

697 preserve coherent sampling per IEEE 1241-2010. 

698 

699 Returns: 

700 SFDR in dBc (dB relative to carrier/fundamental). 

701 

702 Example: 

703 >>> sfdr_db = sfdr(trace) 

704 >>> print(f"SFDR: {sfdr_db:.1f} dBc") 

705 

706 References: 

707 IEEE 1241-2010 Section 4.1.4.5 

708 """ 

709 # Use data length as NFFT to avoid zero-padding that breaks coherence 

710 if nfft is None: 710 ↛ 713line 710 didn't jump to line 713 because the condition on line 710 was always true

711 nfft = len(trace.data) 

712 

713 result = fft(trace, window=window, nfft=nfft, detrend="mean") 

714 freq, mag_db = result[0], result[1] 

715 magnitude = 10 ** (mag_db / 20) 

716 

717 # Find fundamental 

718 fund_idx, _fund_freq, fund_mag = _find_fundamental(freq, magnitude) 

719 

720 if fund_mag == 0: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 return np.nan # type: ignore[no-any-return] 

722 

723 # Create mask for spurs (exclude fundamental and DC) 

724 spur_mask = np.ones(len(magnitude), dtype=bool) 

725 spur_mask[0] = False # DC 

726 spur_mask[fund_idx] = False 

727 

728 # Exclude more bins adjacent to fundamental to account for spectral leakage 

729 # For Hann window, typical main lobe width is ~4 bins 

730 for offset in range(-5, 6): 

731 if offset == 0: 

732 continue 

733 idx = fund_idx + offset 

734 if 0 <= idx < len(magnitude): 734 ↛ 730line 734 didn't jump to line 730 because the condition on line 734 was always true

735 spur_mask[idx] = False 

736 

737 # Find largest spur 

738 spur_magnitudes = magnitude[spur_mask] 

739 if len(spur_magnitudes) == 0: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true

740 return np.inf # type: ignore[no-any-return] 

741 

742 max_spur = np.max(spur_magnitudes) 

743 

744 if max_spur <= 0: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true

745 return np.inf # type: ignore[no-any-return] 

746 

747 sfdr_ratio = fund_mag / max_spur 

748 return float(20 * np.log10(sfdr_ratio)) 

749 

750 

751def hilbert_transform( 

752 trace: WaveformTrace, 

753) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: 

754 """Compute Hilbert transform for envelope and instantaneous frequency. 

755 

756 Computes the analytic signal to extract envelope (instantaneous 

757 amplitude), instantaneous phase, and instantaneous frequency. 

758 

759 Args: 

760 trace: Input waveform trace. 

761 

762 Returns: 

763 (envelope, phase, inst_freq) - Instantaneous amplitude, 

764 phase (radians), and frequency (Hz). 

765 

766 Example: 

767 >>> envelope, phase, inst_freq = hilbert_transform(trace) 

768 >>> plt.plot(trace.time_vector, envelope) 

769 

770 References: 

771 Oppenheim, A. V. & Schafer, R. W. (2009). Discrete-Time 

772 Signal Processing, 3rd ed. 

773 """ 

774 data = trace.data 

775 sample_rate = trace.metadata.sample_rate 

776 

777 # Compute analytic signal 

778 analytic = sp_signal.hilbert(data) 

779 

780 # Instantaneous amplitude (envelope) 

781 envelope = np.abs(analytic) 

782 

783 # Instantaneous phase 

784 phase = np.unwrap(np.angle(analytic)) 

785 

786 # Instantaneous frequency (derivative of phase / 2pi) 

787 inst_freq = np.zeros_like(phase) 

788 inst_freq[1:] = np.diff(phase) * sample_rate / (2 * np.pi) 

789 inst_freq[0] = inst_freq[1] # Extrapolate first sample 

790 

791 return envelope, phase, inst_freq 

792 

793 

794def cwt( 

795 trace: WaveformTrace, 

796 *, 

797 wavelet: Literal["morlet", "mexh", "ricker"] = "morlet", 

798 scales: NDArray[np.float64] | None = None, 

799 n_scales: int = 64, 

800) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: 

801 """Compute Continuous Wavelet Transform for time-frequency analysis. 

802 

803 Uses CWT to analyze non-stationary signals with multi-resolution 

804 time-frequency representation. 

805 

806 Args: 

807 trace: Input waveform trace. 

808 wavelet: Wavelet type: 

809 - "morlet": Morlet wavelet (complex, good for frequency localization) 

810 - "mexh": Mexican hat wavelet (real, good for feature detection) 

811 - "ricker": Ricker wavelet (real, synonym for Mexican hat) 

812 scales: Array of scales to use. If None, auto-generated logarithmically. 

813 n_scales: Number of scales if auto-generated (default 64). 

814 

815 Returns: 

816 (scales, frequencies, coefficients) where coefficients is 2D array 

817 of shape (n_scales, n_samples). 

818 

819 Raises: 

820 InsufficientDataError: If trace has fewer than 8 samples. 

821 ValueError: If wavelet type is not recognized. 

822 

823 Example: 

824 >>> scales, freqs, coef = cwt(trace, wavelet="morlet") 

825 >>> plt.pcolormesh(trace.time_vector, freqs, np.abs(coef)) 

826 >>> plt.ylabel("Frequency (Hz)") 

827 

828 References: 

829 Mallat, S. (2009). A Wavelet Tour of Signal Processing, 3rd ed. 

830 """ 

831 data = trace.data 

832 sample_rate = trace.metadata.sample_rate 

833 

834 if len(data) < 8: 

835 raise InsufficientDataError( 

836 "CWT requires at least 8 samples", 

837 required=8, 

838 available=len(data), 

839 analysis_type="cwt", 

840 ) 

841 

842 # Auto-generate scales if not provided 

843 if scales is None: 

844 # Logarithmically spaced scales from 1 to n/8 

845 scales = np.logspace(0, np.log10(len(data) / 8), n_scales) 

846 

847 # Select wavelet function 

848 if wavelet == "morlet": 

849 # Morlet wavelet (complex): good for frequency analysis 

850 widths = scales 

851 coefficients = sp_signal.cwt(data, sp_signal.morlet2, widths) 

852 elif wavelet in ("mexh", "ricker"): 

853 # Mexican hat wavelet (Ricker): real, good for edge detection 

854 widths = scales 

855 coefficients = sp_signal.cwt(data, sp_signal.ricker, widths) 

856 else: 

857 raise ValueError(f"Unknown wavelet: {wavelet}. Choose from: morlet, mexh, ricker") 

858 

859 # Convert scales to frequencies 

860 # For Morlet: f = fc / (scale * dt) where fc = center frequency 

861 # For simplicity, use approximate relation: f = 1 / (scale * dt) 

862 dt = 1.0 / sample_rate 

863 frequencies = 1.0 / (scales * dt) 

864 

865 return scales, frequencies, coefficients 

866 

867 

868def dwt( 

869 trace: WaveformTrace, 

870 *, 

871 wavelet: str = "db4", 

872 level: int | None = None, 

873 mode: str = "symmetric", 

874) -> dict[str, NDArray[np.float64]]: 

875 """Compute Discrete Wavelet Transform for multi-level decomposition. 

876 

877 Decomposes signal into approximation (low-frequency) and detail 

878 (high-frequency) coefficients at multiple levels. 

879 

880 Args: 

881 trace: Input waveform trace. 

882 wavelet: Wavelet family: 

883 - "dbN": Daubechies wavelets (e.g., "db1", "db4", "db8") 

884 - "symN": Symlet wavelets (e.g., "sym2", "sym8") 

885 - "coifN": Coiflet wavelets (e.g., "coif1", "coif5") 

886 level: Decomposition level (auto-computed if None). 

887 mode: Signal extension mode ("symmetric", "periodic", "zero", etc.). 

888 

889 Returns: 

890 Dictionary with keys: 

891 - "cA": Final approximation coefficients 

892 - "cD1", "cD2", ...: Detail coefficients at each level 

893 

894 Raises: 

895 AnalysisError: If DWT decomposition fails. 

896 ImportError: If PyWavelets library is not installed. 

897 InsufficientDataError: If trace has fewer than 4 samples. 

898 

899 Example: 

900 >>> coeffs = dwt(trace, wavelet="db4", level=3) 

901 >>> print(f"Approximation: {len(coeffs['cA'])} coefficients") 

902 >>> print(f"Detail levels: {[k for k in coeffs if k.startswith('cD')]}") 

903 

904 Note: 

905 Requires pywt library. Install with: pip install PyWavelets 

906 

907 References: 

908 Daubechies, I. (1992). Ten Lectures on Wavelets 

909 """ 

910 try: 

911 import pywt 

912 except ImportError: 

913 raise ImportError( # noqa: B904 

914 "DWT requires PyWavelets library. Install with: pip install PyWavelets" 

915 ) 

916 

917 data = trace.data 

918 

919 if len(data) < 4: 

920 raise InsufficientDataError( 

921 "DWT requires at least 4 samples", 

922 required=4, 

923 available=len(data), 

924 analysis_type="dwt", 

925 ) 

926 

927 # Auto-select decomposition level 

928 if level is None: 

929 level = pywt.dwt_max_level(len(data), wavelet) 

930 # Limit to reasonable levels 

931 level = min(level, 8) 

932 

933 try: 

934 # Perform multi-level DWT 

935 coeffs = pywt.wavedec(data, wavelet, mode=mode, level=level) 

936 except ValueError as e: 

937 raise AnalysisError(f"DWT decomposition failed: {e}", analysis_type="dwt") # noqa: B904 

938 

939 # Package into dictionary 

940 result = {"cA": coeffs[0]} # Approximation coefficients 

941 

942 for i, detail in enumerate(coeffs[1:], start=1): 

943 result[f"cD{i}"] = detail 

944 

945 return result 

946 

947 

948def idwt( 

949 coeffs: dict[str, NDArray[np.float64]], 

950 *, 

951 wavelet: str = "db4", 

952 mode: str = "symmetric", 

953) -> NDArray[np.float64]: 

954 """Reconstruct signal from DWT coefficients. 

955 

956 Performs inverse DWT to reconstruct the original signal from 

957 approximation and detail coefficients. 

958 

959 Args: 

960 coeffs: Dictionary of DWT coefficients from dwt(). 

961 wavelet: Wavelet family (must match original decomposition). 

962 mode: Signal extension mode (must match original decomposition). 

963 

964 Returns: 

965 Reconstructed signal array. 

966 

967 Raises: 

968 AnalysisError: If IDWT reconstruction fails. 

969 ImportError: If PyWavelets library is not installed. 

970 

971 Example: 

972 >>> coeffs = dwt(trace, wavelet="db4") 

973 >>> # Modify coefficients (e.g., denoise) 

974 >>> coeffs["cD1"] *= 0 # Remove finest detail 

975 >>> reconstructed = idwt(coeffs, wavelet="db4") 

976 """ 

977 try: 

978 import pywt 

979 except ImportError: 

980 raise ImportError( # noqa: B904 

981 "IDWT requires PyWavelets library. Install with: pip install PyWavelets" 

982 ) 

983 

984 # Reconstruct coefficient list 

985 cA = coeffs["cA"] 

986 

987 # Get detail levels in order 

988 detail_keys = sorted( 

989 [k for k in coeffs if k.startswith("cD")], 

990 key=lambda x: int(x[2:]), 

991 ) 

992 details = [coeffs[k] for k in detail_keys] 

993 

994 # Combine into pywt format 

995 coeff_list = [cA, *details] 

996 

997 try: 

998 reconstructed = pywt.waverec(coeff_list, wavelet, mode=mode) 

999 except ValueError as e: 

1000 raise AnalysisError(f"IDWT reconstruction failed: {e}", analysis_type="idwt") # noqa: B904 

1001 

1002 return np.asarray(reconstructed, dtype=np.float64) 

1003 

1004 

1005def mfcc( 

1006 trace: WaveformTrace, 

1007 *, 

1008 n_mfcc: int = 13, 

1009 n_fft: int = 512, 

1010 hop_length: int | None = None, 

1011 n_mels: int = 40, 

1012 fmin: float = 0.0, 

1013 fmax: float | None = None, 

1014) -> NDArray[np.float64]: 

1015 """Compute Mel-Frequency Cepstral Coefficients for audio analysis. 

1016 

1017 MFCCs are widely used in speech and audio processing for feature 

1018 extraction and pattern recognition. 

1019 

1020 Args: 

1021 trace: Input waveform trace. 

1022 n_mfcc: Number of MFCC coefficients to return (default 13). 

1023 n_fft: FFT window size (default 512). 

1024 hop_length: Number of samples between frames. If None, uses n_fft // 4. 

1025 n_mels: Number of Mel filterbank channels (default 40). 

1026 fmin: Minimum frequency for Mel filterbank (Hz). 

1027 fmax: Maximum frequency for Mel filterbank (default: sample_rate/2). 

1028 

1029 Returns: 

1030 2D array of shape (n_mfcc, n_frames) with MFCC time series. 

1031 

1032 Raises: 

1033 InsufficientDataError: If trace has fewer than n_fft samples. 

1034 

1035 Example: 

1036 >>> mfcc_features = mfcc(audio_trace, n_mfcc=13) 

1037 >>> print(f"MFCCs: {mfcc_features.shape[0]} coefficients, {mfcc_features.shape[1]} frames") 

1038 

1039 Note: 

1040 This is a custom implementation. For production use, consider librosa. 

1041 

1042 References: 

1043 Davis, S. & Mermelstein, P. (1980). "Comparison of parametric 

1044 representations for monosyllabic word recognition" 

1045 """ 

1046 data = trace.data 

1047 sample_rate = trace.metadata.sample_rate 

1048 

1049 if len(data) < n_fft: 

1050 raise InsufficientDataError( 

1051 f"MFCC requires at least {n_fft} samples", 

1052 required=n_fft, 

1053 available=len(data), 

1054 analysis_type="mfcc", 

1055 ) 

1056 

1057 if hop_length is None: 1057 ↛ 1060line 1057 didn't jump to line 1060 because the condition on line 1057 was always true

1058 hop_length = n_fft // 4 

1059 

1060 if fmax is None: 1060 ↛ 1065line 1060 didn't jump to line 1065 because the condition on line 1060 was always true

1061 fmax = sample_rate / 2 

1062 

1063 # Compute STFT (Short-Time Fourier Transform) 

1064 # Use scipy's spectrogram as a proxy 

1065 _f, _t, Sxx = sp_signal.spectrogram( 

1066 data, 

1067 fs=sample_rate, 

1068 window="hann", 

1069 nperseg=n_fft, 

1070 noverlap=n_fft - hop_length, 

1071 scaling="spectrum", 

1072 ) 

1073 

1074 # Power spectrum magnitude 

1075 power_spec = np.abs(Sxx) 

1076 

1077 # Create Mel filterbank 

1078 mel_filters = _mel_filterbank(n_mels, n_fft, sample_rate, fmin, fmax) 

1079 

1080 # Apply Mel filterbank to power spectrum 

1081 mel_spec = mel_filters @ power_spec 

1082 

1083 # Convert to log scale (dB) 

1084 mel_spec = np.maximum(mel_spec, 1e-10) 

1085 log_mel_spec = 10 * np.log10(mel_spec) 

1086 

1087 # Compute DCT (Discrete Cosine Transform) to get cepstral coefficients 

1088 # Use scipy.fft.dct 

1089 from scipy.fft import dct 

1090 

1091 mfcc_features = dct(log_mel_spec, axis=0, type=2, norm="ortho")[:n_mfcc, :] 

1092 

1093 return np.asarray(mfcc_features, dtype=np.float64) 

1094 

1095 

1096def _mel_filterbank( 

1097 n_filters: int, 

1098 n_fft: int, 

1099 sample_rate: float, 

1100 fmin: float, 

1101 fmax: float, 

1102) -> NDArray[np.float64]: 

1103 """Create Mel-scale filterbank matrix. 

1104 

1105 Args: 

1106 n_filters: Number of Mel filters. 

1107 n_fft: FFT size. 

1108 sample_rate: Sampling rate in Hz. 

1109 fmin: Minimum frequency (Hz). 

1110 fmax: Maximum frequency (Hz). 

1111 

1112 Returns: 

1113 Filterbank matrix of shape (n_filters, n_fft // 2 + 1). 

1114 """ 

1115 

1116 def hz_to_mel(hz: float) -> float: 

1117 """Convert Hz to Mel scale.""" 

1118 return 2595 * np.log10(1 + hz / 700) # type: ignore[no-any-return] 

1119 

1120 def mel_to_hz(mel: float) -> float: 

1121 """Convert Mel scale to Hz.""" 

1122 return 700 * (10 ** (mel / 2595) - 1) 

1123 

1124 # Convert frequency range to Mel scale 

1125 mel_min = hz_to_mel(fmin) 

1126 mel_max = hz_to_mel(fmax) 

1127 

1128 # Create n_filters + 2 equally spaced points in Mel scale 

1129 mel_points = np.linspace(mel_min, mel_max, n_filters + 2) 

1130 

1131 # Convert back to Hz 

1132 hz_points = np.array([mel_to_hz(float(m)) for m in mel_points]) 

1133 

1134 # Convert Hz to FFT bin indices 

1135 n_freqs = n_fft // 2 + 1 

1136 freq_bins = np.floor((n_fft + 1) * hz_points / sample_rate).astype(int) 

1137 

1138 # Create filterbank 

1139 filterbank = np.zeros((n_filters, n_freqs)) 

1140 

1141 for i in range(n_filters): 

1142 left = freq_bins[i] 

1143 center = freq_bins[i + 1] 

1144 right = freq_bins[i + 2] 

1145 

1146 # Rising slope 

1147 for j in range(left, center): 

1148 if center > left: 1148 ↛ 1147line 1148 didn't jump to line 1147 because the condition on line 1148 was always true

1149 filterbank[i, j] = (j - left) / (center - left) 

1150 

1151 # Falling slope 

1152 for j in range(center, right): 

1153 if right > center: 1153 ↛ 1152line 1153 didn't jump to line 1152 because the condition on line 1153 was always true

1154 filterbank[i, j] = (right - j) / (right - center) 

1155 

1156 return filterbank 

1157 

1158 

1159# ========================================================================== 

1160# MEM-004, MEM-005, MEM-006: Chunked Processing for Large Signals 

1161# ========================================================================== 

1162 

1163 

1164def spectrogram_chunked( 

1165 trace: WaveformTrace, 

1166 *, 

1167 chunk_size: int = 100_000_000, 

1168 window: str = "hann", 

1169 nperseg: int | None = None, 

1170 noverlap: int | None = None, 

1171 nfft: int | None = None, 

1172 overlap_factor: float = 2.0, 

1173) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: 

1174 """Compute spectrogram for very large signals using chunked processing. 

1175 

1176 

1177 Processes signal in chunks with overlap to handle files larger than RAM. 

1178 Stitches STFT results from overlapping chunks to create continuous spectrogram. 

1179 

1180 Args: 

1181 trace: Input waveform trace. 

1182 chunk_size: Maximum samples per chunk (default 100M). 

1183 window: Window function name. 

1184 nperseg: Segment length for STFT. If None, auto-selected. 

1185 noverlap: Overlap between STFT segments. If None, uses nperseg - nperseg // 8. 

1186 nfft: FFT length per segment. 

1187 overlap_factor: Overlap factor between chunks (default 2.0 = 2*nperseg overlap). 

1188 

1189 Returns: 

1190 (times, frequencies, magnitude_db) - Time axis, frequency axis, 

1191 and magnitude in dB as 2D array. 

1192 

1193 Example: 

1194 >>> # Process 10 GB file in 50M sample chunks 

1195 >>> t, f, Sxx = spectrogram_chunked(trace, chunk_size=50_000_000, nperseg=4096) 

1196 >>> print(f"Spectrogram shape: {Sxx.shape}") 

1197 

1198 References: 

1199 scipy.signal.stft documentation 

1200 """ 

1201 data = trace.data 

1202 n = len(data) 

1203 sample_rate = trace.metadata.sample_rate 

1204 

1205 # Set default parameters 

1206 if nperseg is None: 1206 ↛ 1207line 1206 didn't jump to line 1207 because the condition on line 1206 was never true

1207 nperseg = min(256, n // 4) 

1208 nperseg = max(nperseg, 16) 

1209 

1210 if noverlap is None: 1210 ↛ 1214line 1210 didn't jump to line 1214 because the condition on line 1210 was always true

1211 noverlap = nperseg - nperseg // 8 

1212 

1213 # Calculate chunk overlap (overlap_factor * nperseg on each boundary) 

1214 chunk_overlap = int(overlap_factor * nperseg) 

1215 

1216 # If data fits in one chunk, use standard spectrogram 

1217 if n <= chunk_size: 

1218 return spectrogram(trace, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft) 

1219 

1220 # Process chunks 

1221 chunks_stft = [] 

1222 chunks_times = [] 

1223 chunk_start = 0 

1224 

1225 while chunk_start < n: 

1226 # Determine chunk end with overlap 

1227 chunk_end = min(chunk_start + chunk_size, n) 

1228 

1229 # Extract chunk with overlap on both sides 

1230 chunk_data_start = chunk_start - chunk_overlap if chunk_start > 0 else 0 

1231 

1232 chunk_data_end = chunk_end + chunk_overlap if chunk_end < n else n 

1233 

1234 chunk_data = data[chunk_data_start:chunk_data_end] 

1235 

1236 # Compute STFT for chunk 

1237 freq, times_chunk, Sxx_chunk = sp_signal.spectrogram( 

1238 chunk_data, 

1239 fs=sample_rate, 

1240 window=window, 

1241 nperseg=nperseg, 

1242 noverlap=noverlap, 

1243 nfft=nfft, 

1244 scaling="spectrum", 

1245 ) 

1246 

1247 # Adjust time offset for chunk position 

1248 time_offset = chunk_data_start / sample_rate 

1249 times_chunk_adjusted = times_chunk + time_offset 

1250 

1251 # For overlapping chunks, trim overlap regions 

1252 if chunk_start > 0 and chunk_end < n: 

1253 # Middle chunk: trim both sides 

1254 valid_time_start = chunk_start / sample_rate 

1255 valid_time_end = chunk_end / sample_rate 

1256 valid_mask = (times_chunk_adjusted >= valid_time_start) & ( 

1257 times_chunk_adjusted < valid_time_end 

1258 ) 

1259 Sxx_chunk = Sxx_chunk[:, valid_mask] 

1260 times_chunk_adjusted = times_chunk_adjusted[valid_mask] 

1261 elif chunk_start > 0: 

1262 # Last chunk: trim left overlap 

1263 valid_time_start = chunk_start / sample_rate 

1264 valid_mask = times_chunk_adjusted >= valid_time_start 

1265 Sxx_chunk = Sxx_chunk[:, valid_mask] 

1266 times_chunk_adjusted = times_chunk_adjusted[valid_mask] 

1267 elif chunk_end < n: 1267 ↛ 1274line 1267 didn't jump to line 1274 because the condition on line 1267 was always true

1268 # First chunk: trim right overlap 

1269 valid_time_end = chunk_end / sample_rate 

1270 valid_mask = times_chunk_adjusted < valid_time_end 

1271 Sxx_chunk = Sxx_chunk[:, valid_mask] 

1272 times_chunk_adjusted = times_chunk_adjusted[valid_mask] 

1273 

1274 chunks_stft.append(Sxx_chunk) 

1275 chunks_times.append(times_chunk_adjusted) 

1276 

1277 # Move to next chunk 

1278 chunk_start += chunk_size 

1279 

1280 # Concatenate all chunks 

1281 Sxx = np.concatenate(chunks_stft, axis=1) 

1282 times = np.concatenate(chunks_times) 

1283 

1284 # Convert to dB 

1285 Sxx = np.maximum(Sxx, 1e-20) 

1286 Sxx_db = 10 * np.log10(Sxx) 

1287 

1288 return times, freq, Sxx_db 

1289 

1290 

1291def psd_chunked( 

1292 trace: WaveformTrace, 

1293 *, 

1294 chunk_size: int = 100_000_000, 

1295 window: str = "hann", 

1296 nperseg: int | None = None, 

1297 noverlap: int | None = None, 

1298 nfft: int | None = None, 

1299 scaling: Literal["density", "spectrum"] = "density", 

1300) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

1301 """Compute Welch PSD for very large signals using chunked processing. 

1302 

1303 

1304 Processes signal in chunks with proper overlap handling to compute 

1305 Power Spectral Density for files larger than available RAM. The result 

1306 is equivalent to computing Welch PSD on the entire signal but with 

1307 bounded memory usage. 

1308 

1309 Args: 

1310 trace: Input waveform trace. 

1311 chunk_size: Maximum samples per chunk (default 100M). 

1312 window: Window function name. 

1313 nperseg: Segment length for Welch. If None, auto-selected. 

1314 noverlap: Overlap between Welch segments. If None, uses nperseg // 2. 

1315 nfft: FFT length per segment. 

1316 scaling: Output scaling ("density" or "spectrum"). 

1317 

1318 Returns: 

1319 (frequencies, psd_db) - Frequency axis and PSD in dB. 

1320 

1321 Example: 

1322 >>> # Process 10 GB file in 50M sample chunks 

1323 >>> freq, psd = psd_chunked(trace, chunk_size=50_000_000, nperseg=4096) 

1324 >>> print(f"Frequency resolution: {freq[1] - freq[0]:.3f} Hz") 

1325 

1326 Note: 

1327 Memory usage is bounded by chunk_size, not file size. 

1328 The result may differ slightly from standard psd() due to 

1329 chunk boundary handling, but variance is typically reduced 

1330 due to increased averaging. 

1331 

1332 References: 

1333 Welch, P. D. (1967). "The use of fast Fourier transform for the 

1334 estimation of power spectra" 

1335 """ 

1336 data = trace.data 

1337 n = len(data) 

1338 sample_rate = trace.metadata.sample_rate 

1339 

1340 # Set default parameters 

1341 if nperseg is None: 

1342 nperseg = max(256, min(n // 8, chunk_size // 8)) 

1343 nperseg = min(nperseg, n) 

1344 

1345 if noverlap is None: 1345 ↛ 1348line 1345 didn't jump to line 1348 because the condition on line 1345 was always true

1346 noverlap = nperseg // 2 

1347 

1348 if nfft is None: 1348 ↛ 1352line 1348 didn't jump to line 1352 because the condition on line 1348 was always true

1349 nfft = nperseg 

1350 

1351 # If data fits in one chunk, use standard PSD 

1352 if n <= chunk_size: 

1353 return psd( 

1354 trace, 

1355 window=window, 

1356 nperseg=nperseg, 

1357 noverlap=noverlap, 

1358 nfft=nfft, 

1359 scaling=scaling, 

1360 ) 

1361 

1362 # Calculate chunk overlap to ensure proper segment handling at boundaries 

1363 # Overlap should be at least nperseg to ensure no gaps in segment coverage 

1364 chunk_overlap = nperseg 

1365 

1366 # Accumulate PSD estimates 

1367 psd_sum: NDArray[np.float64] | None = None 

1368 total_segments = 0 

1369 freq: NDArray[np.float64] | None = None 

1370 

1371 chunk_start = 0 

1372 while chunk_start < n: 

1373 # Determine chunk boundaries with overlap 

1374 chunk_data_start = max(0, chunk_start - chunk_overlap) 

1375 chunk_end = min(chunk_start + chunk_size, n) 

1376 chunk_data_end = min(chunk_end + chunk_overlap, n) 

1377 

1378 # Extract chunk 

1379 chunk_data = data[chunk_data_start:chunk_data_end] 

1380 

1381 if len(chunk_data) < nperseg: 1381 ↛ 1383line 1381 didn't jump to line 1383 because the condition on line 1381 was never true

1382 # Last chunk too small, skip 

1383 break 

1384 

1385 # Compute Welch PSD for chunk 

1386 f, psd_linear = sp_signal.welch( 

1387 chunk_data, 

1388 fs=sample_rate, 

1389 window=window, 

1390 nperseg=nperseg, 

1391 noverlap=noverlap, 

1392 nfft=nfft, 

1393 scaling=scaling, 

1394 detrend="constant", 

1395 ) 

1396 

1397 # Count number of segments in this chunk 

1398 hop = nperseg - noverlap 

1399 num_segments = max(1, (len(chunk_data) - noverlap) // hop) 

1400 

1401 if psd_sum is None: 

1402 psd_sum = psd_linear * num_segments 

1403 freq = f 

1404 else: 

1405 psd_sum += psd_linear * num_segments 

1406 

1407 total_segments += num_segments 

1408 

1409 # Move to next chunk 

1410 chunk_start += chunk_size 

1411 

1412 if psd_sum is None or total_segments == 0 or freq is None: 1412 ↛ 1414line 1412 didn't jump to line 1414 because the condition on line 1412 was never true

1413 # Fallback to standard PSD if something went wrong 

1414 return psd( 

1415 trace, 

1416 window=window, 

1417 nperseg=nperseg, 

1418 noverlap=noverlap, 

1419 nfft=nfft, 

1420 scaling=scaling, 

1421 ) 

1422 

1423 # Average across all segments 

1424 psd_avg = psd_sum / total_segments 

1425 

1426 # Convert to dB 

1427 psd_avg = np.maximum(psd_avg, 1e-20) 

1428 psd_db = 10 * np.log10(psd_avg) 

1429 

1430 return freq, psd_db 

1431 

1432 

1433def fft_chunked( 

1434 trace: WaveformTrace, 

1435 *, 

1436 segment_size: int = 1_000_000, 

1437 overlap_pct: float = 50.0, 

1438 window: str = "hann", 

1439 nfft: int | None = None, 

1440) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

1441 """Compute FFT for very long signals using segmented processing. 

1442 

1443 

1444 Divides signal into overlapping segments, computes FFT for each, 

1445 and averages the magnitude spectra to reduce variance. 

1446 

1447 Args: 

1448 trace: Input waveform trace. 

1449 segment_size: Size of each segment in samples. 

1450 overlap_pct: Percentage overlap between segments (0-100). 

1451 window: Window function name. 

1452 nfft: FFT length. If None, uses segment_size. 

1453 

1454 Returns: 

1455 (frequencies, magnitude_db) - Frequency axis and averaged magnitude in dB. 

1456 

1457 Raises: 

1458 AnalysisError: If no segments were processed (empty trace). 

1459 

1460 Example: 

1461 >>> # Process 1 GB signal in 1M sample segments with 50% overlap 

1462 >>> freq, mag = fft_chunked(trace, segment_size=1_000_000, overlap_pct=50) 

1463 >>> print(f"Frequency resolution: {freq[1] - freq[0]:.3f} Hz") 

1464 

1465 References: 

1466 Welch's method for spectral estimation 

1467 """ 

1468 data = trace.data 

1469 n = len(data) 

1470 sample_rate = trace.metadata.sample_rate 

1471 

1472 if n < segment_size: 1472 ↛ 1474line 1472 didn't jump to line 1474 because the condition on line 1472 was never true

1473 # Use standard FFT if data fits in one segment 

1474 result = fft(trace, window=window, nfft=nfft) 

1475 return result[0], result[1] # type: ignore[return-value] 

1476 

1477 # Calculate overlap 

1478 overlap_samples = int(segment_size * overlap_pct / 100.0) 

1479 hop = segment_size - overlap_samples 

1480 

1481 # Determine number of segments 

1482 num_segments = max(1, (n - overlap_samples) // hop) 

1483 

1484 if nfft is None: 1484 ↛ 1488line 1484 didn't jump to line 1488 because the condition on line 1484 was always true

1485 nfft = int(2 ** np.ceil(np.log2(segment_size))) 

1486 

1487 # Accumulate magnitude spectra 

1488 freq: NDArray[np.float64] | None = None 

1489 magnitude_sum: NDArray[np.float64] | None = None 

1490 w = get_window(window, segment_size) 

1491 window_gain = np.sum(w) / segment_size 

1492 

1493 for i in range(num_segments): 

1494 start = i * hop 

1495 end = min(start + segment_size, n) 

1496 

1497 if end - start < segment_size: 1497 ↛ 1499line 1497 didn't jump to line 1499 because the condition on line 1497 was never true

1498 # Last segment might be shorter, pad with zeros 

1499 segment = np.zeros(segment_size) 

1500 segment[: end - start] = data[start:end] 

1501 else: 

1502 segment = data[start:end] 

1503 

1504 # Detrend 

1505 segment = segment - np.mean(segment) 

1506 

1507 # Window 

1508 segment_windowed = segment * w 

1509 

1510 # FFT 

1511 spectrum = np.fft.rfft(segment_windowed, n=nfft) 

1512 

1513 # Magnitude 

1514 magnitude = np.abs(spectrum) / (segment_size * window_gain) 

1515 

1516 if magnitude_sum is None: 

1517 magnitude_sum = magnitude 

1518 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate) 

1519 else: 

1520 magnitude_sum += magnitude 

1521 

1522 # Average 

1523 if magnitude_sum is None: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true

1524 raise AnalysisError("No segments were processed - input trace may be empty") 

1525 if freq is None: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true

1526 raise AnalysisError("Frequency array was not initialized - internal error") 

1527 magnitude_avg = magnitude_sum / num_segments 

1528 

1529 # Convert to dB 

1530 magnitude_avg = np.maximum(magnitude_avg, 1e-20) 

1531 magnitude_db = 20 * np.log10(magnitude_avg) 

1532 

1533 return freq, magnitude_db 

1534 

1535 

1536__all__ = [ 

1537 "bartlett_psd", 

1538 "cwt", 

1539 "dwt", 

1540 "enob", 

1541 "fft", 

1542 "fft_chunked", 

1543 "hilbert_transform", 

1544 "idwt", 

1545 "mfcc", 

1546 "periodogram", 

1547 "psd", 

1548 "psd_chunked", 

1549 "sfdr", 

1550 "sinad", 

1551 "snr", 

1552 "spectrogram", 

1553 "spectrogram_chunked", 

1554 "thd", 

1555]