Coverage for src / tracekit / analyzers / spectral / chunked.py: 98%
66 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Chunked spectrogram computation for memory-constrained processing.
3This module implements memory-bounded spectrogram computation that processes
4files in chunks with proper boundary handling for continuity.
7Example:
8 >>> from tracekit.analyzers.spectral.chunked import spectrogram_chunked
9 >>> spec = spectrogram_chunked('large_file.bin', chunk_size=100e6, nperseg=4096)
10 >>> print(f"Spectrogram shape: {spec.shape}")
12References:
13 scipy.signal.stft for STFT computation
14 IEEE 1057-2017 for spectral analysis
15"""
17from __future__ import annotations
19from pathlib import Path
20from typing import TYPE_CHECKING, Any
22import numpy as np
23from scipy import signal
25from tracekit.core.memoize import memoize_analysis
27if TYPE_CHECKING:
28 from collections.abc import Iterator
30 from numpy.typing import NDArray
33@memoize_analysis(maxsize=8)
34def spectrogram_chunked(
35 file_path: str | Path,
36 chunk_size: int | float,
37 nperseg: int = 256,
38 noverlap: int | None = None,
39 *,
40 nfft: int | None = None,
41 window: str | tuple | NDArray = "hann", # type: ignore[type-arg]
42 detrend: str | bool = False,
43 return_onesided: bool = True,
44 scaling: str = "density",
45 mode: str = "psd",
46 overlap_factor: float = 2.0,
47 dtype: str = "float32",
48 **kwargs: Any,
49) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]:
50 """Compute spectrogram for large files using chunked processing.
53 Processes file in chunks with `overlap_factor * nperseg` overlap on
54 boundaries to ensure continuity. Computes scipy.signal.stft per chunk
55 and stitches results.
57 Args:
58 file_path: Path to signal file (binary format).
59 chunk_size: Chunk size in samples.
60 nperseg: Length of each segment for STFT.
61 noverlap: Number of points to overlap (default: nperseg // 2).
62 nfft: FFT length (default: nperseg).
63 window: Window function name or array.
64 detrend: Detrend type ('constant', 'linear', False).
65 return_onesided: Return one-sided spectrum for real input.
66 scaling: Scaling mode ('density' or 'spectrum').
67 mode: Output mode ('psd', 'magnitude', 'angle', 'phase', 'complex').
68 overlap_factor: Factor for chunk boundary overlap (default: 2.0 = 2*nperseg).
69 dtype: Data type of input file ('float32' or 'float64').
70 **kwargs: Additional arguments passed to scipy.signal.stft.
72 Returns:
73 Tuple of (frequencies, times, Sxx) where:
74 - frequencies: Array of frequency bins (Hz, requires sample_rate in kwargs).
75 - times: Array of time segments (seconds).
76 - Sxx: Spectrogram array (frequencies x time segments).
78 Raises:
79 ValueError: If chunk_size < nperseg or file cannot be read.
81 Example:
82 >>> # Process 10 GB file in 100M sample chunks
83 >>> f, t, Sxx = spectrogram_chunked(
84 ... 'huge_trace.bin',
85 ... chunk_size=100e6,
86 ... nperseg=4096,
87 ... noverlap=2048,
88 ... sample_rate=1e9, # 1 GSa/s
89 ... dtype='float32'
90 ... )
91 >>> print(f"Spectrogram shape: {Sxx.shape}")
93 References:
94 MEM-004: Chunked Spectrogram requirement
95 """
96 chunk_size = int(chunk_size)
97 if noverlap is None:
98 noverlap = nperseg // 2
99 if nfft is None:
100 nfft = nperseg
102 if chunk_size < nperseg:
103 raise ValueError(f"chunk_size ({chunk_size}) must be >= nperseg ({nperseg})")
105 # Determine dtype
106 np_dtype = np.float32 if dtype == "float32" else np.float64
107 bytes_per_sample = 4 if dtype == "float32" else 8
109 # Calculate boundary overlap (default: 2*nperseg for continuity)
110 boundary_overlap = int(overlap_factor * nperseg)
112 # Open file and get total size
113 file_path = Path(file_path)
114 file_size_bytes = file_path.stat().st_size
115 total_samples = file_size_bytes // bytes_per_sample
117 # Prepare chunk generator
118 chunks = _generate_chunks(file_path, total_samples, chunk_size, boundary_overlap, np_dtype)
120 # Process first chunk to initialize arrays
121 first_chunk = next(chunks)
122 f, t_chunk, Sxx_chunk = signal.spectrogram(
123 first_chunk,
124 nperseg=nperseg,
125 noverlap=noverlap,
126 nfft=nfft,
127 window=window,
128 detrend=detrend,
129 return_onesided=return_onesided,
130 scaling=scaling,
131 mode=mode,
132 **kwargs,
133 )
135 # Initialize result arrays
136 Sxx_list = [Sxx_chunk]
137 time_offset = 0.0
138 times_list = [t_chunk + time_offset]
140 # Get sample rate for time calculation
141 fs = kwargs.get("sample_rate", kwargs.get("fs", 1.0))
142 time_offset += len(first_chunk) / fs
144 # Process remaining chunks
145 for chunk_data in chunks:
146 _, t_chunk, Sxx_chunk = signal.spectrogram(
147 chunk_data,
148 nperseg=nperseg,
149 noverlap=noverlap,
150 nfft=nfft,
151 window=window,
152 detrend=detrend,
153 return_onesided=return_onesided,
154 scaling=scaling,
155 mode=mode,
156 **kwargs,
157 )
159 Sxx_list.append(Sxx_chunk)
160 times_list.append(t_chunk + time_offset)
161 time_offset += (len(chunk_data) - boundary_overlap) / fs
163 # Concatenate all chunks
164 Sxx = np.concatenate(Sxx_list, axis=1)
165 times = np.concatenate(times_list)
167 return f, times, Sxx
170def _generate_chunks(
171 file_path: Path,
172 total_samples: int,
173 chunk_size: int,
174 boundary_overlap: int,
175 dtype: type,
176) -> Iterator[NDArray[Any]]:
177 """Generate overlapping chunks from file.
179 Args:
180 file_path: Path to binary file.
181 total_samples: Total number of samples in file.
182 chunk_size: Samples per chunk.
183 boundary_overlap: Overlap samples between chunks.
184 dtype: NumPy dtype for data.
186 Yields:
187 Chunk arrays with boundary overlap.
188 """
189 offset = 0
191 with open(file_path, "rb") as f:
192 while offset < total_samples:
193 # Calculate chunk boundaries
194 chunk_start = max(0, offset - boundary_overlap)
195 chunk_end = min(total_samples, offset + chunk_size)
196 chunk_len = chunk_end - chunk_start
198 # Seek to start position
199 f.seek(chunk_start * dtype().itemsize)
201 # Read chunk
202 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=chunk_len)
204 if len(chunk_data) == 0: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 break
207 yield chunk_data
209 # Advance offset (accounting for overlap)
210 offset += chunk_size
213def spectrogram_chunked_generator(
214 file_path: str | Path,
215 chunk_size: int | float,
216 nperseg: int = 256,
217 noverlap: int | None = None,
218 **kwargs: Any,
219) -> Iterator[tuple[NDArray[Any], NDArray[Any], NDArray[Any]]]:
220 """Generator version that yields spectrogram chunks.
223 Yields chunks of (frequencies, times, Sxx) for streaming processing.
224 Useful when full spectrogram doesn't fit in memory.
226 Args:
227 file_path: Path to signal file.
228 chunk_size: Chunk size in samples.
229 nperseg: Segment length for STFT.
230 noverlap: Overlap samples (default: nperseg // 2).
231 **kwargs: Additional arguments for spectrogram.
233 Yields:
234 Tuples of (frequencies, times, Sxx) for each chunk.
236 Example:
237 >>> for f, t, Sxx_chunk in spectrogram_chunked_generator('file.bin', chunk_size=50e6):
238 ... # Process or save each chunk separately
239 ... print(f"Processing chunk with {Sxx_chunk.shape[1]} time segments")
240 """
241 chunk_size = int(chunk_size)
242 if noverlap is None:
243 noverlap = nperseg // 2
245 # Determine dtype
246 dtype = kwargs.get("dtype", "float32")
247 np_dtype = np.float32 if dtype == "float32" else np.float64
248 bytes_per_sample = 4 if dtype == "float32" else 8
250 boundary_overlap = int(kwargs.get("overlap_factor", 2.0) * nperseg)
252 # Open file and get total size
253 file_path = Path(file_path)
254 file_size_bytes = file_path.stat().st_size
255 total_samples = file_size_bytes // bytes_per_sample
257 # Generate and process chunks
258 chunks = _generate_chunks(file_path, total_samples, chunk_size, boundary_overlap, np_dtype)
260 for chunk_data in chunks:
261 f, t, Sxx_chunk = signal.spectrogram(
262 chunk_data,
263 nperseg=nperseg,
264 noverlap=noverlap,
265 **{k: v for k, v in kwargs.items() if k != "dtype"},
266 )
267 yield f, t, Sxx_chunk
270__all__ = [
271 "spectrogram_chunked",
272 "spectrogram_chunked_generator",
273]