Coverage for src / tracekit / analyzers / waveform / spectral.py: 80%
434 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Spectral analysis functions for waveform data.
3This module provides FFT, PSD, and spectral quality metrics
4per IEEE 1241-2010 for ADC characterization.
7Example:
8 >>> from tracekit.analyzers.waveform.spectral import fft, thd, snr
9 >>> freq, magnitude = fft(trace)
10 >>> thd_db = thd(trace)
11 >>> snr_db = snr(trace)
13References:
14 IEEE 1241-2010: Standard for Terminology and Test Methods for
15 Analog-to-Digital Converters
16"""
18from __future__ import annotations
20from typing import TYPE_CHECKING, Literal
22import numpy as np
23from scipy import signal as sp_signal
25from tracekit.core.exceptions import AnalysisError, InsufficientDataError
26from tracekit.utils.windowing import get_window
28if TYPE_CHECKING:
29 from numpy.typing import NDArray
31 from tracekit.core.types import WaveformTrace
34def fft(
35 trace: WaveformTrace,
36 *,
37 window: str = "hann",
38 nfft: int | None = None,
39 detrend: Literal["none", "mean", "linear"] = "mean",
40 return_phase: bool = False,
41) -> (
42 tuple[NDArray[np.float64], NDArray[np.float64]]
43 | tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]
44):
45 """Compute windowed FFT with optional zero-padding.
47 Computes the single-sided magnitude spectrum in dB with optional
48 phase output. Uses configurable windowing and zero-padding.
50 Args:
51 trace: Input waveform trace.
52 window: Window function name (default "hann").
53 nfft: FFT length. If None, uses next power of 2.
54 detrend: Detrend method before FFT:
55 - "none": No detrending
56 - "mean": Remove DC offset (default)
57 - "linear": Remove linear trend
58 return_phase: If True, also return phase in radians.
60 Returns:
61 If return_phase=False:
62 (frequencies, magnitude_db) - Frequency axis and magnitude in dB
63 If return_phase=True:
64 (frequencies, magnitude_db, phase_rad) - Plus phase in radians
66 Raises:
67 InsufficientDataError: If trace has fewer than 2 samples.
69 Example:
70 >>> freq, mag = fft(trace)
71 >>> plt.semilogx(freq, mag)
72 >>> plt.xlabel("Frequency (Hz)")
73 >>> plt.ylabel("Magnitude (dB)")
75 References:
76 IEEE 1241-2010 Section 4.1.1
77 """
78 data = trace.data
79 n = len(data)
81 if n < 2:
82 raise InsufficientDataError(
83 "FFT requires at least 2 samples",
84 required=2,
85 available=n,
86 analysis_type="fft",
87 )
89 # Detrend
90 if detrend == "mean":
91 data = data - np.mean(data)
92 elif detrend == "linear": 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 data = sp_signal.detrend(data, type="linear")
95 # Determine FFT length
96 nfft = int(2 ** np.ceil(np.log2(n))) if nfft is None else max(nfft, n)
98 # Apply window
99 w = get_window(window, n)
100 data_windowed = data * w
102 # Compute FFT
103 spectrum = np.fft.rfft(data_windowed, n=nfft)
105 # Frequency axis
106 sample_rate = trace.metadata.sample_rate
107 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate)
109 # Magnitude in dB (normalized by window coherent gain)
110 window_gain = np.sum(w) / n
111 magnitude = np.abs(spectrum) / (n * window_gain)
112 # Avoid log(0)
113 magnitude = np.maximum(magnitude, 1e-20)
114 magnitude_db = 20 * np.log10(magnitude)
116 if return_phase:
117 phase = np.angle(spectrum)
118 return freq, magnitude_db, phase
120 return freq, magnitude_db
123def psd(
124 trace: WaveformTrace,
125 *,
126 window: str = "hann",
127 nperseg: int | None = None,
128 noverlap: int | None = None,
129 nfft: int | None = None,
130 scaling: Literal["density", "spectrum"] = "density",
131) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
132 """Compute Power Spectral Density using Welch's method.
134 Uses overlapped segment averaging for reduced variance PSD estimation.
136 Args:
137 trace: Input waveform trace.
138 window: Window function name (default "hann").
139 nperseg: Segment length. If None, uses n // 8 or 256, whichever larger.
140 noverlap: Overlap between segments. If None, uses nperseg // 2.
141 nfft: FFT length per segment. If None, uses nperseg.
142 scaling: Output scaling:
143 - "density": Power spectral density (V^2/Hz)
144 - "spectrum": Power spectrum (V^2)
146 Returns:
147 (frequencies, psd) - Frequency axis and PSD in dB/Hz.
149 Raises:
150 InsufficientDataError: If trace has insufficient data.
152 Example:
153 >>> freq, psd_db = psd(trace)
154 >>> plt.semilogx(freq, psd_db)
155 >>> plt.ylabel("PSD (dB/Hz)")
157 References:
158 Welch, P. D. (1967). "The use of fast Fourier transform for the
159 estimation of power spectra"
160 """
161 data = trace.data
162 n = len(data)
164 if n < 16:
165 raise InsufficientDataError(
166 "PSD requires at least 16 samples",
167 required=16,
168 available=n,
169 analysis_type="psd",
170 )
172 sample_rate = trace.metadata.sample_rate
174 # Default segment length
175 if nperseg is None:
176 nperseg = max(256, n // 8)
177 nperseg = min(nperseg, n)
179 # Default overlap (50% for Hann window)
180 if noverlap is None:
181 noverlap = nperseg // 2
183 freq, psd_linear = sp_signal.welch(
184 data,
185 fs=sample_rate,
186 window=window,
187 nperseg=nperseg,
188 noverlap=noverlap,
189 nfft=nfft,
190 scaling=scaling,
191 detrend="constant",
192 )
194 # Convert to dB
195 psd_linear = np.maximum(psd_linear, 1e-20)
196 psd_db = 10 * np.log10(psd_linear)
198 return freq, psd_db
201def periodogram(
202 trace: WaveformTrace,
203 *,
204 window: str = "hann",
205 nfft: int | None = None,
206 scaling: Literal["density", "spectrum"] = "density",
207 detrend: Literal["constant", "linear", False] = "constant",
208) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
209 """Compute classical periodogram PSD estimate.
211 Single-segment PSD estimation using scaled FFT magnitude squared.
213 Args:
214 trace: Input waveform trace.
215 window: Window function name (default "hann").
216 nfft: FFT length. If None, uses data length.
217 scaling: Output scaling ("density" or "spectrum").
218 detrend: Detrending method.
220 Returns:
221 (frequencies, psd) - Frequency axis and PSD.
223 Example:
224 >>> freq, psd = periodogram(trace)
226 References:
227 IEEE 1241-2010 Section 4.1.2
228 """
229 sample_rate = trace.metadata.sample_rate
231 freq, psd_linear = sp_signal.periodogram(
232 trace.data,
233 fs=sample_rate,
234 window=window,
235 nfft=nfft,
236 scaling=scaling,
237 detrend=detrend,
238 )
240 # Convert to dB
241 psd_linear = np.maximum(psd_linear, 1e-20)
242 psd_db = 10 * np.log10(psd_linear)
244 return freq, psd_db
247def bartlett_psd(
248 trace: WaveformTrace,
249 *,
250 n_segments: int = 8,
251 window: str = "rectangular",
252 nfft: int | None = None,
253) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
254 """Compute Bartlett's method PSD estimate.
256 Averages periodograms of non-overlapping segments for
257 reduced variance at cost of frequency resolution.
259 Args:
260 trace: Input waveform trace.
261 n_segments: Number of non-overlapping segments.
262 window: Window function per segment.
263 nfft: FFT length per segment.
265 Returns:
266 (frequencies, psd_db) - Frequency axis and PSD in dB.
268 Raises:
269 AnalysisError: If no segments were processed (empty trace).
270 InsufficientDataError: If trace has fewer than 16*n_segments samples.
272 Example:
273 >>> freq, psd = bartlett_psd(trace, n_segments=8)
274 """
275 data = trace.data
276 n = len(data)
277 sample_rate = trace.metadata.sample_rate
279 segment_length = n // n_segments
281 if segment_length < 16: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 raise InsufficientDataError(
283 f"Bartlett requires at least {16 * n_segments} samples for {n_segments} segments",
284 required=16 * n_segments,
285 available=n,
286 analysis_type="bartlett_psd",
287 )
289 if nfft is None: 289 ↛ 293line 289 didn't jump to line 293 because the condition on line 289 was always true
290 nfft = segment_length
292 # Accumulate periodograms
293 psd_sum = None
294 w = get_window(window, segment_length)
295 window_power = np.sum(w**2)
297 for i in range(n_segments):
298 segment = data[i * segment_length : (i + 1) * segment_length]
299 segment_windowed = segment * w
301 spectrum = np.fft.rfft(segment_windowed, n=nfft)
302 # Power spectrum (V^2)
303 psd_segment = (np.abs(spectrum) ** 2) / (sample_rate * window_power)
305 if psd_sum is None:
306 psd_sum = psd_segment
307 else:
308 psd_sum += psd_segment
310 if psd_sum is None: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 raise AnalysisError("No segments were processed - input trace may be empty")
312 psd_avg = psd_sum / n_segments
314 # Frequency axis
315 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate)
317 # Convert to dB
318 psd_avg = np.maximum(psd_avg, 1e-20)
319 psd_db = 10 * np.log10(psd_avg)
321 return freq, psd_db
324def spectrogram(
325 trace: WaveformTrace,
326 *,
327 window: str = "hann",
328 nperseg: int | None = None,
329 noverlap: int | None = None,
330 nfft: int | None = None,
331) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
332 """Compute Short-Time Fourier Transform spectrogram.
334 Time-frequency representation for analyzing non-stationary signals.
336 Args:
337 trace: Input waveform trace.
338 window: Window function name.
339 nperseg: Segment length. If None, auto-selected.
340 noverlap: Overlap between segments. If None, uses nperseg - 1.
341 nfft: FFT length per segment.
343 Returns:
344 (times, frequencies, magnitude_db) - Time axis, frequency axis,
345 and magnitude in dB as 2D array.
347 Example:
348 >>> t, f, Sxx = spectrogram(trace)
349 >>> plt.pcolormesh(t, f, Sxx, shading='auto')
350 >>> plt.ylabel('Frequency (Hz)')
351 >>> plt.xlabel('Time (s)')
352 """
353 data = trace.data
354 n = len(data)
355 sample_rate = trace.metadata.sample_rate
357 if nperseg is None:
358 nperseg = min(256, n // 4)
359 nperseg = max(nperseg, 16)
361 if noverlap is None:
362 noverlap = nperseg - nperseg // 8
364 freq, times, Sxx = sp_signal.spectrogram(
365 data,
366 fs=sample_rate,
367 window=window,
368 nperseg=nperseg,
369 noverlap=noverlap,
370 nfft=nfft,
371 scaling="spectrum",
372 )
374 # Convert to dB
375 Sxx = np.maximum(Sxx, 1e-20)
376 Sxx_db = 10 * np.log10(Sxx)
378 return times, freq, Sxx_db
381def _find_fundamental(
382 freq: NDArray[np.float64],
383 magnitude: NDArray[np.float64],
384 *,
385 min_freq: float = 0.0,
386) -> tuple[int, float, float]:
387 """Find fundamental frequency in spectrum.
389 Args:
390 freq: Frequency axis.
391 magnitude: Magnitude spectrum (linear, not dB).
392 min_freq: Minimum frequency to consider.
394 Returns:
395 (index, frequency, magnitude) of fundamental.
396 """
397 # Skip DC and frequencies below min_freq
398 valid_mask = freq > max(min_freq, freq[1])
399 valid_indices = np.where(valid_mask)[0]
401 if len(valid_indices) == 0: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true
402 return 0, 0.0, 0.0
404 # Find peak in valid region
405 local_peak_idx = np.argmax(magnitude[valid_indices])
406 peak_idx = valid_indices[local_peak_idx]
408 return int(peak_idx), float(freq[peak_idx]), float(magnitude[peak_idx])
411def _find_harmonic_indices(
412 freq: NDArray[np.float64],
413 fundamental_freq: float,
414 n_harmonics: int,
415) -> list[int]:
416 """Find indices of harmonic frequencies.
418 Args:
419 freq: Frequency axis.
420 fundamental_freq: Fundamental frequency.
421 n_harmonics: Number of harmonics to find.
423 Returns:
424 List of indices for harmonics 2, 3, ..., n_harmonics+1.
425 """
426 indices = []
428 for h in range(2, n_harmonics + 2):
429 target_freq = h * fundamental_freq
430 if target_freq > freq[-1]:
431 break
433 # Find closest bin
434 idx = np.argmin(np.abs(freq - target_freq))
435 indices.append(int(idx))
437 return indices
440def thd(
441 trace: WaveformTrace,
442 *,
443 n_harmonics: int = 10,
444 window: str = "hann",
445 nfft: int | None = None,
446 return_db: bool = True,
447) -> float:
448 """Compute Total Harmonic Distortion.
450 THD is the ratio of harmonic power to fundamental power.
452 Args:
453 trace: Input waveform trace.
454 n_harmonics: Number of harmonics to include (default 10).
455 window: Window function for FFT.
456 nfft: FFT length. If None, uses data length (no zero-padding) to
457 preserve coherent sampling per IEEE 1241-2010.
458 return_db: If True, return in dB. If False, return percentage.
460 Returns:
461 THD in dB or percentage.
463 Example:
464 >>> thd_db = thd(trace)
465 >>> thd_pct = thd(trace, return_db=False)
466 >>> print(f"THD: {thd_db:.1f} dB ({thd_pct:.2f}%)")
468 References:
469 IEEE 1241-2010 Section 4.1.4.2
470 """
471 # Use data length as NFFT to avoid zero-padding that breaks coherence
472 if nfft is None: 472 ↛ 475line 472 didn't jump to line 475 because the condition on line 472 was always true
473 nfft = len(trace.data)
475 result = fft(trace, window=window, nfft=nfft, detrend="mean")
476 freq, mag_db = result[0], result[1]
478 # Convert to linear
479 magnitude = 10 ** (mag_db / 20)
481 # Find fundamental
482 _fund_idx, fund_freq, fund_mag = _find_fundamental(freq, magnitude)
484 if fund_mag == 0 or fund_freq == 0: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 return np.nan # type: ignore[no-any-return]
487 # Find harmonics
488 harmonic_indices = _find_harmonic_indices(freq, fund_freq, n_harmonics)
490 if len(harmonic_indices) == 0: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true
491 return 0.0 if not return_db else -np.inf
493 # Sum harmonic power
494 harmonic_power = sum(magnitude[i] ** 2 for i in harmonic_indices)
496 # THD ratio
497 thd_ratio = np.sqrt(harmonic_power) / fund_mag
499 if return_db:
500 if thd_ratio <= 0: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 return -np.inf # type: ignore[no-any-return]
502 return float(20 * np.log10(thd_ratio))
503 else:
504 return float(thd_ratio * 100)
507def snr(
508 trace: WaveformTrace,
509 *,
510 n_harmonics: int = 10,
511 window: str = "hann",
512 nfft: int | None = None,
513) -> float:
514 """Compute Signal-to-Noise Ratio.
516 SNR is the ratio of signal power to noise power, excluding harmonics.
518 Args:
519 trace: Input waveform trace.
520 n_harmonics: Number of harmonics to exclude from noise.
521 window: Window function for FFT.
522 nfft: FFT length. If None, uses data length (no zero-padding) to
523 preserve coherent sampling per IEEE 1241-2010.
525 Returns:
526 SNR in dB.
528 Example:
529 >>> snr_db = snr(trace)
530 >>> print(f"SNR: {snr_db:.1f} dB")
532 References:
533 IEEE 1241-2010 Section 4.1.4.1
534 """
535 # Use data length as NFFT to avoid zero-padding that breaks coherence
536 if nfft is None: 536 ↛ 539line 536 didn't jump to line 539 because the condition on line 536 was always true
537 nfft = len(trace.data)
539 result = fft(trace, window=window, nfft=nfft, detrend="mean")
540 freq, mag_db = result[0], result[1]
541 magnitude = 10 ** (mag_db / 20)
543 # Find fundamental
544 fund_idx, fund_freq, fund_mag = _find_fundamental(freq, magnitude)
546 if fund_mag == 0 or fund_freq == 0: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 return np.nan # type: ignore[no-any-return]
549 # Find harmonics to exclude
550 harmonic_indices = _find_harmonic_indices(freq, fund_freq, n_harmonics)
552 # Build exclusion set: DC, fundamental, and harmonics
553 # Also exclude bins adjacent to fundamental and harmonics (spectral leakage)
554 exclude_indices = {0} # DC
556 # Exclude fundamental and adjacent bins
557 for offset in range(-3, 4): # +/- 3 bins around fundamental
558 idx = fund_idx + offset
559 if 0 <= idx < len(magnitude):
560 exclude_indices.add(idx)
562 # Exclude harmonics and adjacent bins
563 for h_idx in harmonic_indices:
564 for offset in range(-3, 4): # +/- 3 bins around each harmonic
565 idx = h_idx + offset
566 if 0 <= idx < len(magnitude): 566 ↛ 564line 566 didn't jump to line 564 because the condition on line 566 was always true
567 exclude_indices.add(idx)
569 # Signal power (fundamental only, using single bin or small window)
570 # Use 3-bin sum around fundamental for better estimate
571 signal_power = 0.0
572 for offset in range(-1, 2):
573 idx = fund_idx + offset
574 if 0 <= idx < len(magnitude): 574 ↛ 572line 574 didn't jump to line 572 because the condition on line 574 was always true
575 signal_power += magnitude[idx] ** 2
577 # Noise power (all bins except excluded ones)
578 noise_power = 0.0
579 for i in range(len(magnitude)):
580 if i not in exclude_indices:
581 noise_power += magnitude[i] ** 2
583 if noise_power <= 0: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 return np.inf # type: ignore[no-any-return]
586 snr_ratio = signal_power / noise_power
587 return float(10 * np.log10(snr_ratio))
590def sinad(
591 trace: WaveformTrace,
592 *,
593 window: str = "hann",
594 nfft: int | None = None,
595) -> float:
596 """Compute Signal-to-Noise and Distortion ratio.
598 SINAD is the ratio of signal power to noise plus distortion power.
600 Args:
601 trace: Input waveform trace.
602 window: Window function for FFT.
603 nfft: FFT length. If None, uses data length (no zero-padding) to
604 preserve coherent sampling per IEEE 1241-2010.
606 Returns:
607 SINAD in dB.
609 Example:
610 >>> sinad_db = sinad(trace)
611 >>> print(f"SINAD: {sinad_db:.1f} dB")
613 References:
614 IEEE 1241-2010 Section 4.1.4.3
615 """
616 # Use data length as NFFT to avoid zero-padding that breaks coherence
617 if nfft is None: 617 ↛ 620line 617 didn't jump to line 620 because the condition on line 617 was always true
618 nfft = len(trace.data)
620 result = fft(trace, window=window, nfft=nfft, detrend="mean")
621 freq, mag_db = result[0], result[1]
622 magnitude = 10 ** (mag_db / 20)
624 # Find fundamental
625 fund_idx, _fund_freq, fund_mag = _find_fundamental(freq, magnitude)
627 if fund_mag == 0: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 return np.nan # type: ignore[no-any-return]
630 # Signal power: use 3-bin window around fundamental to capture spectral leakage
631 signal_power = 0.0
632 for offset in range(-1, 2):
633 idx = fund_idx + offset
634 if 0 <= idx < len(magnitude): 634 ↛ 632line 634 didn't jump to line 632 because the condition on line 634 was always true
635 signal_power += magnitude[idx] ** 2
637 # Total power (exclude DC)
638 total_power = np.sum(magnitude[1:] ** 2)
640 # Noise + distortion power = everything except signal
641 nad_power = total_power - signal_power
643 if nad_power <= 0: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true
644 return np.inf # type: ignore[no-any-return]
646 sinad_ratio = signal_power / nad_power
647 return float(10 * np.log10(sinad_ratio))
650def enob(
651 trace: WaveformTrace,
652 *,
653 window: str = "hann",
654 nfft: int | None = None,
655) -> float:
656 """Compute Effective Number of Bits from SINAD.
658 ENOB = (SINAD - 1.76) / 6.02
660 Args:
661 trace: Input waveform trace.
662 window: Window function for FFT.
663 nfft: FFT length.
665 Returns:
666 ENOB in bits, or np.nan if SINAD is invalid.
668 Example:
669 >>> bits = enob(trace)
670 >>> print(f"ENOB: {bits:.2f} bits")
672 References:
673 IEEE 1241-2010 Section 4.1.4.4
674 """
675 sinad_db = sinad(trace, window=window, nfft=nfft)
677 if np.isnan(sinad_db) or sinad_db <= 0: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 return np.nan # type: ignore[no-any-return]
680 return float((sinad_db - 1.76) / 6.02)
683def sfdr(
684 trace: WaveformTrace,
685 *,
686 window: str = "hann",
687 nfft: int | None = None,
688) -> float:
689 """Compute Spurious-Free Dynamic Range.
691 SFDR is the ratio of fundamental to largest spurious component.
693 Args:
694 trace: Input waveform trace.
695 window: Window function for FFT.
696 nfft: FFT length. If None, uses data length (no zero-padding) to
697 preserve coherent sampling per IEEE 1241-2010.
699 Returns:
700 SFDR in dBc (dB relative to carrier/fundamental).
702 Example:
703 >>> sfdr_db = sfdr(trace)
704 >>> print(f"SFDR: {sfdr_db:.1f} dBc")
706 References:
707 IEEE 1241-2010 Section 4.1.4.5
708 """
709 # Use data length as NFFT to avoid zero-padding that breaks coherence
710 if nfft is None: 710 ↛ 713line 710 didn't jump to line 713 because the condition on line 710 was always true
711 nfft = len(trace.data)
713 result = fft(trace, window=window, nfft=nfft, detrend="mean")
714 freq, mag_db = result[0], result[1]
715 magnitude = 10 ** (mag_db / 20)
717 # Find fundamental
718 fund_idx, _fund_freq, fund_mag = _find_fundamental(freq, magnitude)
720 if fund_mag == 0: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 return np.nan # type: ignore[no-any-return]
723 # Create mask for spurs (exclude fundamental and DC)
724 spur_mask = np.ones(len(magnitude), dtype=bool)
725 spur_mask[0] = False # DC
726 spur_mask[fund_idx] = False
728 # Exclude more bins adjacent to fundamental to account for spectral leakage
729 # For Hann window, typical main lobe width is ~4 bins
730 for offset in range(-5, 6):
731 if offset == 0:
732 continue
733 idx = fund_idx + offset
734 if 0 <= idx < len(magnitude): 734 ↛ 730line 734 didn't jump to line 730 because the condition on line 734 was always true
735 spur_mask[idx] = False
737 # Find largest spur
738 spur_magnitudes = magnitude[spur_mask]
739 if len(spur_magnitudes) == 0: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true
740 return np.inf # type: ignore[no-any-return]
742 max_spur = np.max(spur_magnitudes)
744 if max_spur <= 0: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true
745 return np.inf # type: ignore[no-any-return]
747 sfdr_ratio = fund_mag / max_spur
748 return float(20 * np.log10(sfdr_ratio))
751def hilbert_transform(
752 trace: WaveformTrace,
753) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
754 """Compute Hilbert transform for envelope and instantaneous frequency.
756 Computes the analytic signal to extract envelope (instantaneous
757 amplitude), instantaneous phase, and instantaneous frequency.
759 Args:
760 trace: Input waveform trace.
762 Returns:
763 (envelope, phase, inst_freq) - Instantaneous amplitude,
764 phase (radians), and frequency (Hz).
766 Example:
767 >>> envelope, phase, inst_freq = hilbert_transform(trace)
768 >>> plt.plot(trace.time_vector, envelope)
770 References:
771 Oppenheim, A. V. & Schafer, R. W. (2009). Discrete-Time
772 Signal Processing, 3rd ed.
773 """
774 data = trace.data
775 sample_rate = trace.metadata.sample_rate
777 # Compute analytic signal
778 analytic = sp_signal.hilbert(data)
780 # Instantaneous amplitude (envelope)
781 envelope = np.abs(analytic)
783 # Instantaneous phase
784 phase = np.unwrap(np.angle(analytic))
786 # Instantaneous frequency (derivative of phase / 2pi)
787 inst_freq = np.zeros_like(phase)
788 inst_freq[1:] = np.diff(phase) * sample_rate / (2 * np.pi)
789 inst_freq[0] = inst_freq[1] # Extrapolate first sample
791 return envelope, phase, inst_freq
794def cwt(
795 trace: WaveformTrace,
796 *,
797 wavelet: Literal["morlet", "mexh", "ricker"] = "morlet",
798 scales: NDArray[np.float64] | None = None,
799 n_scales: int = 64,
800) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
801 """Compute Continuous Wavelet Transform for time-frequency analysis.
803 Uses CWT to analyze non-stationary signals with multi-resolution
804 time-frequency representation.
806 Args:
807 trace: Input waveform trace.
808 wavelet: Wavelet type:
809 - "morlet": Morlet wavelet (complex, good for frequency localization)
810 - "mexh": Mexican hat wavelet (real, good for feature detection)
811 - "ricker": Ricker wavelet (real, synonym for Mexican hat)
812 scales: Array of scales to use. If None, auto-generated logarithmically.
813 n_scales: Number of scales if auto-generated (default 64).
815 Returns:
816 (scales, frequencies, coefficients) where coefficients is 2D array
817 of shape (n_scales, n_samples).
819 Raises:
820 InsufficientDataError: If trace has fewer than 8 samples.
821 ValueError: If wavelet type is not recognized.
823 Example:
824 >>> scales, freqs, coef = cwt(trace, wavelet="morlet")
825 >>> plt.pcolormesh(trace.time_vector, freqs, np.abs(coef))
826 >>> plt.ylabel("Frequency (Hz)")
828 References:
829 Mallat, S. (2009). A Wavelet Tour of Signal Processing, 3rd ed.
830 """
831 data = trace.data
832 sample_rate = trace.metadata.sample_rate
834 if len(data) < 8:
835 raise InsufficientDataError(
836 "CWT requires at least 8 samples",
837 required=8,
838 available=len(data),
839 analysis_type="cwt",
840 )
842 # Auto-generate scales if not provided
843 if scales is None:
844 # Logarithmically spaced scales from 1 to n/8
845 scales = np.logspace(0, np.log10(len(data) / 8), n_scales)
847 # Select wavelet function
848 if wavelet == "morlet":
849 # Morlet wavelet (complex): good for frequency analysis
850 widths = scales
851 coefficients = sp_signal.cwt(data, sp_signal.morlet2, widths)
852 elif wavelet in ("mexh", "ricker"):
853 # Mexican hat wavelet (Ricker): real, good for edge detection
854 widths = scales
855 coefficients = sp_signal.cwt(data, sp_signal.ricker, widths)
856 else:
857 raise ValueError(f"Unknown wavelet: {wavelet}. Choose from: morlet, mexh, ricker")
859 # Convert scales to frequencies
860 # For Morlet: f = fc / (scale * dt) where fc = center frequency
861 # For simplicity, use approximate relation: f = 1 / (scale * dt)
862 dt = 1.0 / sample_rate
863 frequencies = 1.0 / (scales * dt)
865 return scales, frequencies, coefficients
868def dwt(
869 trace: WaveformTrace,
870 *,
871 wavelet: str = "db4",
872 level: int | None = None,
873 mode: str = "symmetric",
874) -> dict[str, NDArray[np.float64]]:
875 """Compute Discrete Wavelet Transform for multi-level decomposition.
877 Decomposes signal into approximation (low-frequency) and detail
878 (high-frequency) coefficients at multiple levels.
880 Args:
881 trace: Input waveform trace.
882 wavelet: Wavelet family:
883 - "dbN": Daubechies wavelets (e.g., "db1", "db4", "db8")
884 - "symN": Symlet wavelets (e.g., "sym2", "sym8")
885 - "coifN": Coiflet wavelets (e.g., "coif1", "coif5")
886 level: Decomposition level (auto-computed if None).
887 mode: Signal extension mode ("symmetric", "periodic", "zero", etc.).
889 Returns:
890 Dictionary with keys:
891 - "cA": Final approximation coefficients
892 - "cD1", "cD2", ...: Detail coefficients at each level
894 Raises:
895 AnalysisError: If DWT decomposition fails.
896 ImportError: If PyWavelets library is not installed.
897 InsufficientDataError: If trace has fewer than 4 samples.
899 Example:
900 >>> coeffs = dwt(trace, wavelet="db4", level=3)
901 >>> print(f"Approximation: {len(coeffs['cA'])} coefficients")
902 >>> print(f"Detail levels: {[k for k in coeffs if k.startswith('cD')]}")
904 Note:
905 Requires pywt library. Install with: pip install PyWavelets
907 References:
908 Daubechies, I. (1992). Ten Lectures on Wavelets
909 """
910 try:
911 import pywt
912 except ImportError:
913 raise ImportError( # noqa: B904
914 "DWT requires PyWavelets library. Install with: pip install PyWavelets"
915 )
917 data = trace.data
919 if len(data) < 4:
920 raise InsufficientDataError(
921 "DWT requires at least 4 samples",
922 required=4,
923 available=len(data),
924 analysis_type="dwt",
925 )
927 # Auto-select decomposition level
928 if level is None:
929 level = pywt.dwt_max_level(len(data), wavelet)
930 # Limit to reasonable levels
931 level = min(level, 8)
933 try:
934 # Perform multi-level DWT
935 coeffs = pywt.wavedec(data, wavelet, mode=mode, level=level)
936 except ValueError as e:
937 raise AnalysisError(f"DWT decomposition failed: {e}", analysis_type="dwt") # noqa: B904
939 # Package into dictionary
940 result = {"cA": coeffs[0]} # Approximation coefficients
942 for i, detail in enumerate(coeffs[1:], start=1):
943 result[f"cD{i}"] = detail
945 return result
948def idwt(
949 coeffs: dict[str, NDArray[np.float64]],
950 *,
951 wavelet: str = "db4",
952 mode: str = "symmetric",
953) -> NDArray[np.float64]:
954 """Reconstruct signal from DWT coefficients.
956 Performs inverse DWT to reconstruct the original signal from
957 approximation and detail coefficients.
959 Args:
960 coeffs: Dictionary of DWT coefficients from dwt().
961 wavelet: Wavelet family (must match original decomposition).
962 mode: Signal extension mode (must match original decomposition).
964 Returns:
965 Reconstructed signal array.
967 Raises:
968 AnalysisError: If IDWT reconstruction fails.
969 ImportError: If PyWavelets library is not installed.
971 Example:
972 >>> coeffs = dwt(trace, wavelet="db4")
973 >>> # Modify coefficients (e.g., denoise)
974 >>> coeffs["cD1"] *= 0 # Remove finest detail
975 >>> reconstructed = idwt(coeffs, wavelet="db4")
976 """
977 try:
978 import pywt
979 except ImportError:
980 raise ImportError( # noqa: B904
981 "IDWT requires PyWavelets library. Install with: pip install PyWavelets"
982 )
984 # Reconstruct coefficient list
985 cA = coeffs["cA"]
987 # Get detail levels in order
988 detail_keys = sorted(
989 [k for k in coeffs if k.startswith("cD")],
990 key=lambda x: int(x[2:]),
991 )
992 details = [coeffs[k] for k in detail_keys]
994 # Combine into pywt format
995 coeff_list = [cA, *details]
997 try:
998 reconstructed = pywt.waverec(coeff_list, wavelet, mode=mode)
999 except ValueError as e:
1000 raise AnalysisError(f"IDWT reconstruction failed: {e}", analysis_type="idwt") # noqa: B904
1002 return np.asarray(reconstructed, dtype=np.float64)
1005def mfcc(
1006 trace: WaveformTrace,
1007 *,
1008 n_mfcc: int = 13,
1009 n_fft: int = 512,
1010 hop_length: int | None = None,
1011 n_mels: int = 40,
1012 fmin: float = 0.0,
1013 fmax: float | None = None,
1014) -> NDArray[np.float64]:
1015 """Compute Mel-Frequency Cepstral Coefficients for audio analysis.
1017 MFCCs are widely used in speech and audio processing for feature
1018 extraction and pattern recognition.
1020 Args:
1021 trace: Input waveform trace.
1022 n_mfcc: Number of MFCC coefficients to return (default 13).
1023 n_fft: FFT window size (default 512).
1024 hop_length: Number of samples between frames. If None, uses n_fft // 4.
1025 n_mels: Number of Mel filterbank channels (default 40).
1026 fmin: Minimum frequency for Mel filterbank (Hz).
1027 fmax: Maximum frequency for Mel filterbank (default: sample_rate/2).
1029 Returns:
1030 2D array of shape (n_mfcc, n_frames) with MFCC time series.
1032 Raises:
1033 InsufficientDataError: If trace has fewer than n_fft samples.
1035 Example:
1036 >>> mfcc_features = mfcc(audio_trace, n_mfcc=13)
1037 >>> print(f"MFCCs: {mfcc_features.shape[0]} coefficients, {mfcc_features.shape[1]} frames")
1039 Note:
1040 This is a custom implementation. For production use, consider librosa.
1042 References:
1043 Davis, S. & Mermelstein, P. (1980). "Comparison of parametric
1044 representations for monosyllabic word recognition"
1045 """
1046 data = trace.data
1047 sample_rate = trace.metadata.sample_rate
1049 if len(data) < n_fft:
1050 raise InsufficientDataError(
1051 f"MFCC requires at least {n_fft} samples",
1052 required=n_fft,
1053 available=len(data),
1054 analysis_type="mfcc",
1055 )
1057 if hop_length is None: 1057 ↛ 1060line 1057 didn't jump to line 1060 because the condition on line 1057 was always true
1058 hop_length = n_fft // 4
1060 if fmax is None: 1060 ↛ 1065line 1060 didn't jump to line 1065 because the condition on line 1060 was always true
1061 fmax = sample_rate / 2
1063 # Compute STFT (Short-Time Fourier Transform)
1064 # Use scipy's spectrogram as a proxy
1065 _f, _t, Sxx = sp_signal.spectrogram(
1066 data,
1067 fs=sample_rate,
1068 window="hann",
1069 nperseg=n_fft,
1070 noverlap=n_fft - hop_length,
1071 scaling="spectrum",
1072 )
1074 # Power spectrum magnitude
1075 power_spec = np.abs(Sxx)
1077 # Create Mel filterbank
1078 mel_filters = _mel_filterbank(n_mels, n_fft, sample_rate, fmin, fmax)
1080 # Apply Mel filterbank to power spectrum
1081 mel_spec = mel_filters @ power_spec
1083 # Convert to log scale (dB)
1084 mel_spec = np.maximum(mel_spec, 1e-10)
1085 log_mel_spec = 10 * np.log10(mel_spec)
1087 # Compute DCT (Discrete Cosine Transform) to get cepstral coefficients
1088 # Use scipy.fft.dct
1089 from scipy.fft import dct
1091 mfcc_features = dct(log_mel_spec, axis=0, type=2, norm="ortho")[:n_mfcc, :]
1093 return np.asarray(mfcc_features, dtype=np.float64)
1096def _mel_filterbank(
1097 n_filters: int,
1098 n_fft: int,
1099 sample_rate: float,
1100 fmin: float,
1101 fmax: float,
1102) -> NDArray[np.float64]:
1103 """Create Mel-scale filterbank matrix.
1105 Args:
1106 n_filters: Number of Mel filters.
1107 n_fft: FFT size.
1108 sample_rate: Sampling rate in Hz.
1109 fmin: Minimum frequency (Hz).
1110 fmax: Maximum frequency (Hz).
1112 Returns:
1113 Filterbank matrix of shape (n_filters, n_fft // 2 + 1).
1114 """
1116 def hz_to_mel(hz: float) -> float:
1117 """Convert Hz to Mel scale."""
1118 return 2595 * np.log10(1 + hz / 700) # type: ignore[no-any-return]
1120 def mel_to_hz(mel: float) -> float:
1121 """Convert Mel scale to Hz."""
1122 return 700 * (10 ** (mel / 2595) - 1)
1124 # Convert frequency range to Mel scale
1125 mel_min = hz_to_mel(fmin)
1126 mel_max = hz_to_mel(fmax)
1128 # Create n_filters + 2 equally spaced points in Mel scale
1129 mel_points = np.linspace(mel_min, mel_max, n_filters + 2)
1131 # Convert back to Hz
1132 hz_points = np.array([mel_to_hz(float(m)) for m in mel_points])
1134 # Convert Hz to FFT bin indices
1135 n_freqs = n_fft // 2 + 1
1136 freq_bins = np.floor((n_fft + 1) * hz_points / sample_rate).astype(int)
1138 # Create filterbank
1139 filterbank = np.zeros((n_filters, n_freqs))
1141 for i in range(n_filters):
1142 left = freq_bins[i]
1143 center = freq_bins[i + 1]
1144 right = freq_bins[i + 2]
1146 # Rising slope
1147 for j in range(left, center):
1148 if center > left: 1148 ↛ 1147line 1148 didn't jump to line 1147 because the condition on line 1148 was always true
1149 filterbank[i, j] = (j - left) / (center - left)
1151 # Falling slope
1152 for j in range(center, right):
1153 if right > center: 1153 ↛ 1152line 1153 didn't jump to line 1152 because the condition on line 1153 was always true
1154 filterbank[i, j] = (right - j) / (right - center)
1156 return filterbank
1159# ==========================================================================
1160# MEM-004, MEM-005, MEM-006: Chunked Processing for Large Signals
1161# ==========================================================================
1164def spectrogram_chunked(
1165 trace: WaveformTrace,
1166 *,
1167 chunk_size: int = 100_000_000,
1168 window: str = "hann",
1169 nperseg: int | None = None,
1170 noverlap: int | None = None,
1171 nfft: int | None = None,
1172 overlap_factor: float = 2.0,
1173) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
1174 """Compute spectrogram for very large signals using chunked processing.
1177 Processes signal in chunks with overlap to handle files larger than RAM.
1178 Stitches STFT results from overlapping chunks to create continuous spectrogram.
1180 Args:
1181 trace: Input waveform trace.
1182 chunk_size: Maximum samples per chunk (default 100M).
1183 window: Window function name.
1184 nperseg: Segment length for STFT. If None, auto-selected.
1185 noverlap: Overlap between STFT segments. If None, uses nperseg - nperseg // 8.
1186 nfft: FFT length per segment.
1187 overlap_factor: Overlap factor between chunks (default 2.0 = 2*nperseg overlap).
1189 Returns:
1190 (times, frequencies, magnitude_db) - Time axis, frequency axis,
1191 and magnitude in dB as 2D array.
1193 Example:
1194 >>> # Process 10 GB file in 50M sample chunks
1195 >>> t, f, Sxx = spectrogram_chunked(trace, chunk_size=50_000_000, nperseg=4096)
1196 >>> print(f"Spectrogram shape: {Sxx.shape}")
1198 References:
1199 scipy.signal.stft documentation
1200 """
1201 data = trace.data
1202 n = len(data)
1203 sample_rate = trace.metadata.sample_rate
1205 # Set default parameters
1206 if nperseg is None: 1206 ↛ 1207line 1206 didn't jump to line 1207 because the condition on line 1206 was never true
1207 nperseg = min(256, n // 4)
1208 nperseg = max(nperseg, 16)
1210 if noverlap is None: 1210 ↛ 1214line 1210 didn't jump to line 1214 because the condition on line 1210 was always true
1211 noverlap = nperseg - nperseg // 8
1213 # Calculate chunk overlap (overlap_factor * nperseg on each boundary)
1214 chunk_overlap = int(overlap_factor * nperseg)
1216 # If data fits in one chunk, use standard spectrogram
1217 if n <= chunk_size:
1218 return spectrogram(trace, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft)
1220 # Process chunks
1221 chunks_stft = []
1222 chunks_times = []
1223 chunk_start = 0
1225 while chunk_start < n:
1226 # Determine chunk end with overlap
1227 chunk_end = min(chunk_start + chunk_size, n)
1229 # Extract chunk with overlap on both sides
1230 chunk_data_start = chunk_start - chunk_overlap if chunk_start > 0 else 0
1232 chunk_data_end = chunk_end + chunk_overlap if chunk_end < n else n
1234 chunk_data = data[chunk_data_start:chunk_data_end]
1236 # Compute STFT for chunk
1237 freq, times_chunk, Sxx_chunk = sp_signal.spectrogram(
1238 chunk_data,
1239 fs=sample_rate,
1240 window=window,
1241 nperseg=nperseg,
1242 noverlap=noverlap,
1243 nfft=nfft,
1244 scaling="spectrum",
1245 )
1247 # Adjust time offset for chunk position
1248 time_offset = chunk_data_start / sample_rate
1249 times_chunk_adjusted = times_chunk + time_offset
1251 # For overlapping chunks, trim overlap regions
1252 if chunk_start > 0 and chunk_end < n:
1253 # Middle chunk: trim both sides
1254 valid_time_start = chunk_start / sample_rate
1255 valid_time_end = chunk_end / sample_rate
1256 valid_mask = (times_chunk_adjusted >= valid_time_start) & (
1257 times_chunk_adjusted < valid_time_end
1258 )
1259 Sxx_chunk = Sxx_chunk[:, valid_mask]
1260 times_chunk_adjusted = times_chunk_adjusted[valid_mask]
1261 elif chunk_start > 0:
1262 # Last chunk: trim left overlap
1263 valid_time_start = chunk_start / sample_rate
1264 valid_mask = times_chunk_adjusted >= valid_time_start
1265 Sxx_chunk = Sxx_chunk[:, valid_mask]
1266 times_chunk_adjusted = times_chunk_adjusted[valid_mask]
1267 elif chunk_end < n: 1267 ↛ 1274line 1267 didn't jump to line 1274 because the condition on line 1267 was always true
1268 # First chunk: trim right overlap
1269 valid_time_end = chunk_end / sample_rate
1270 valid_mask = times_chunk_adjusted < valid_time_end
1271 Sxx_chunk = Sxx_chunk[:, valid_mask]
1272 times_chunk_adjusted = times_chunk_adjusted[valid_mask]
1274 chunks_stft.append(Sxx_chunk)
1275 chunks_times.append(times_chunk_adjusted)
1277 # Move to next chunk
1278 chunk_start += chunk_size
1280 # Concatenate all chunks
1281 Sxx = np.concatenate(chunks_stft, axis=1)
1282 times = np.concatenate(chunks_times)
1284 # Convert to dB
1285 Sxx = np.maximum(Sxx, 1e-20)
1286 Sxx_db = 10 * np.log10(Sxx)
1288 return times, freq, Sxx_db
1291def psd_chunked(
1292 trace: WaveformTrace,
1293 *,
1294 chunk_size: int = 100_000_000,
1295 window: str = "hann",
1296 nperseg: int | None = None,
1297 noverlap: int | None = None,
1298 nfft: int | None = None,
1299 scaling: Literal["density", "spectrum"] = "density",
1300) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
1301 """Compute Welch PSD for very large signals using chunked processing.
1304 Processes signal in chunks with proper overlap handling to compute
1305 Power Spectral Density for files larger than available RAM. The result
1306 is equivalent to computing Welch PSD on the entire signal but with
1307 bounded memory usage.
1309 Args:
1310 trace: Input waveform trace.
1311 chunk_size: Maximum samples per chunk (default 100M).
1312 window: Window function name.
1313 nperseg: Segment length for Welch. If None, auto-selected.
1314 noverlap: Overlap between Welch segments. If None, uses nperseg // 2.
1315 nfft: FFT length per segment.
1316 scaling: Output scaling ("density" or "spectrum").
1318 Returns:
1319 (frequencies, psd_db) - Frequency axis and PSD in dB.
1321 Example:
1322 >>> # Process 10 GB file in 50M sample chunks
1323 >>> freq, psd = psd_chunked(trace, chunk_size=50_000_000, nperseg=4096)
1324 >>> print(f"Frequency resolution: {freq[1] - freq[0]:.3f} Hz")
1326 Note:
1327 Memory usage is bounded by chunk_size, not file size.
1328 The result may differ slightly from standard psd() due to
1329 chunk boundary handling, but variance is typically reduced
1330 due to increased averaging.
1332 References:
1333 Welch, P. D. (1967). "The use of fast Fourier transform for the
1334 estimation of power spectra"
1335 """
1336 data = trace.data
1337 n = len(data)
1338 sample_rate = trace.metadata.sample_rate
1340 # Set default parameters
1341 if nperseg is None:
1342 nperseg = max(256, min(n // 8, chunk_size // 8))
1343 nperseg = min(nperseg, n)
1345 if noverlap is None: 1345 ↛ 1348line 1345 didn't jump to line 1348 because the condition on line 1345 was always true
1346 noverlap = nperseg // 2
1348 if nfft is None: 1348 ↛ 1352line 1348 didn't jump to line 1352 because the condition on line 1348 was always true
1349 nfft = nperseg
1351 # If data fits in one chunk, use standard PSD
1352 if n <= chunk_size:
1353 return psd(
1354 trace,
1355 window=window,
1356 nperseg=nperseg,
1357 noverlap=noverlap,
1358 nfft=nfft,
1359 scaling=scaling,
1360 )
1362 # Calculate chunk overlap to ensure proper segment handling at boundaries
1363 # Overlap should be at least nperseg to ensure no gaps in segment coverage
1364 chunk_overlap = nperseg
1366 # Accumulate PSD estimates
1367 psd_sum: NDArray[np.float64] | None = None
1368 total_segments = 0
1369 freq: NDArray[np.float64] | None = None
1371 chunk_start = 0
1372 while chunk_start < n:
1373 # Determine chunk boundaries with overlap
1374 chunk_data_start = max(0, chunk_start - chunk_overlap)
1375 chunk_end = min(chunk_start + chunk_size, n)
1376 chunk_data_end = min(chunk_end + chunk_overlap, n)
1378 # Extract chunk
1379 chunk_data = data[chunk_data_start:chunk_data_end]
1381 if len(chunk_data) < nperseg: 1381 ↛ 1383line 1381 didn't jump to line 1383 because the condition on line 1381 was never true
1382 # Last chunk too small, skip
1383 break
1385 # Compute Welch PSD for chunk
1386 f, psd_linear = sp_signal.welch(
1387 chunk_data,
1388 fs=sample_rate,
1389 window=window,
1390 nperseg=nperseg,
1391 noverlap=noverlap,
1392 nfft=nfft,
1393 scaling=scaling,
1394 detrend="constant",
1395 )
1397 # Count number of segments in this chunk
1398 hop = nperseg - noverlap
1399 num_segments = max(1, (len(chunk_data) - noverlap) // hop)
1401 if psd_sum is None:
1402 psd_sum = psd_linear * num_segments
1403 freq = f
1404 else:
1405 psd_sum += psd_linear * num_segments
1407 total_segments += num_segments
1409 # Move to next chunk
1410 chunk_start += chunk_size
1412 if psd_sum is None or total_segments == 0 or freq is None: 1412 ↛ 1414line 1412 didn't jump to line 1414 because the condition on line 1412 was never true
1413 # Fallback to standard PSD if something went wrong
1414 return psd(
1415 trace,
1416 window=window,
1417 nperseg=nperseg,
1418 noverlap=noverlap,
1419 nfft=nfft,
1420 scaling=scaling,
1421 )
1423 # Average across all segments
1424 psd_avg = psd_sum / total_segments
1426 # Convert to dB
1427 psd_avg = np.maximum(psd_avg, 1e-20)
1428 psd_db = 10 * np.log10(psd_avg)
1430 return freq, psd_db
1433def fft_chunked(
1434 trace: WaveformTrace,
1435 *,
1436 segment_size: int = 1_000_000,
1437 overlap_pct: float = 50.0,
1438 window: str = "hann",
1439 nfft: int | None = None,
1440) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
1441 """Compute FFT for very long signals using segmented processing.
1444 Divides signal into overlapping segments, computes FFT for each,
1445 and averages the magnitude spectra to reduce variance.
1447 Args:
1448 trace: Input waveform trace.
1449 segment_size: Size of each segment in samples.
1450 overlap_pct: Percentage overlap between segments (0-100).
1451 window: Window function name.
1452 nfft: FFT length. If None, uses segment_size.
1454 Returns:
1455 (frequencies, magnitude_db) - Frequency axis and averaged magnitude in dB.
1457 Raises:
1458 AnalysisError: If no segments were processed (empty trace).
1460 Example:
1461 >>> # Process 1 GB signal in 1M sample segments with 50% overlap
1462 >>> freq, mag = fft_chunked(trace, segment_size=1_000_000, overlap_pct=50)
1463 >>> print(f"Frequency resolution: {freq[1] - freq[0]:.3f} Hz")
1465 References:
1466 Welch's method for spectral estimation
1467 """
1468 data = trace.data
1469 n = len(data)
1470 sample_rate = trace.metadata.sample_rate
1472 if n < segment_size: 1472 ↛ 1474line 1472 didn't jump to line 1474 because the condition on line 1472 was never true
1473 # Use standard FFT if data fits in one segment
1474 result = fft(trace, window=window, nfft=nfft)
1475 return result[0], result[1] # type: ignore[return-value]
1477 # Calculate overlap
1478 overlap_samples = int(segment_size * overlap_pct / 100.0)
1479 hop = segment_size - overlap_samples
1481 # Determine number of segments
1482 num_segments = max(1, (n - overlap_samples) // hop)
1484 if nfft is None: 1484 ↛ 1488line 1484 didn't jump to line 1488 because the condition on line 1484 was always true
1485 nfft = int(2 ** np.ceil(np.log2(segment_size)))
1487 # Accumulate magnitude spectra
1488 freq: NDArray[np.float64] | None = None
1489 magnitude_sum: NDArray[np.float64] | None = None
1490 w = get_window(window, segment_size)
1491 window_gain = np.sum(w) / segment_size
1493 for i in range(num_segments):
1494 start = i * hop
1495 end = min(start + segment_size, n)
1497 if end - start < segment_size: 1497 ↛ 1499line 1497 didn't jump to line 1499 because the condition on line 1497 was never true
1498 # Last segment might be shorter, pad with zeros
1499 segment = np.zeros(segment_size)
1500 segment[: end - start] = data[start:end]
1501 else:
1502 segment = data[start:end]
1504 # Detrend
1505 segment = segment - np.mean(segment)
1507 # Window
1508 segment_windowed = segment * w
1510 # FFT
1511 spectrum = np.fft.rfft(segment_windowed, n=nfft)
1513 # Magnitude
1514 magnitude = np.abs(spectrum) / (segment_size * window_gain)
1516 if magnitude_sum is None:
1517 magnitude_sum = magnitude
1518 freq = np.fft.rfftfreq(nfft, d=1.0 / sample_rate)
1519 else:
1520 magnitude_sum += magnitude
1522 # Average
1523 if magnitude_sum is None: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true
1524 raise AnalysisError("No segments were processed - input trace may be empty")
1525 if freq is None: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true
1526 raise AnalysisError("Frequency array was not initialized - internal error")
1527 magnitude_avg = magnitude_sum / num_segments
1529 # Convert to dB
1530 magnitude_avg = np.maximum(magnitude_avg, 1e-20)
1531 magnitude_db = 20 * np.log10(magnitude_avg)
1533 return freq, magnitude_db
1536__all__ = [
1537 "bartlett_psd",
1538 "cwt",
1539 "dwt",
1540 "enob",
1541 "fft",
1542 "fft_chunked",
1543 "hilbert_transform",
1544 "idwt",
1545 "mfcc",
1546 "periodogram",
1547 "psd",
1548 "psd_chunked",
1549 "sfdr",
1550 "sinad",
1551 "snr",
1552 "spectrogram",
1553 "spectrogram_chunked",
1554 "thd",
1555]