Coverage for src / tracekit / analyzers / spectral / chunked.py: 98%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Chunked spectrogram computation for memory-constrained processing. 

2 

3This module implements memory-bounded spectrogram computation that processes 

4files in chunks with proper boundary handling for continuity. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.spectral.chunked import spectrogram_chunked 

9 >>> spec = spectrogram_chunked('large_file.bin', chunk_size=100e6, nperseg=4096) 

10 >>> print(f"Spectrogram shape: {spec.shape}") 

11 

12References: 

13 scipy.signal.stft for STFT computation 

14 IEEE 1057-2017 for spectral analysis 

15""" 

16 

17from __future__ import annotations 

18 

19from pathlib import Path 

20from typing import TYPE_CHECKING, Any 

21 

22import numpy as np 

23from scipy import signal 

24 

25from tracekit.core.memoize import memoize_analysis 

26 

27if TYPE_CHECKING: 

28 from collections.abc import Iterator 

29 

30 from numpy.typing import NDArray 

31 

32 

33@memoize_analysis(maxsize=8) 

34def spectrogram_chunked( 

35 file_path: str | Path, 

36 chunk_size: int | float, 

37 nperseg: int = 256, 

38 noverlap: int | None = None, 

39 *, 

40 nfft: int | None = None, 

41 window: str | tuple | NDArray = "hann", # type: ignore[type-arg] 

42 detrend: str | bool = False, 

43 return_onesided: bool = True, 

44 scaling: str = "density", 

45 mode: str = "psd", 

46 overlap_factor: float = 2.0, 

47 dtype: str = "float32", 

48 **kwargs: Any, 

49) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any]]: 

50 """Compute spectrogram for large files using chunked processing. 

51 

52 

53 Processes file in chunks with `overlap_factor * nperseg` overlap on 

54 boundaries to ensure continuity. Computes scipy.signal.stft per chunk 

55 and stitches results. 

56 

57 Args: 

58 file_path: Path to signal file (binary format). 

59 chunk_size: Chunk size in samples. 

60 nperseg: Length of each segment for STFT. 

61 noverlap: Number of points to overlap (default: nperseg // 2). 

62 nfft: FFT length (default: nperseg). 

63 window: Window function name or array. 

64 detrend: Detrend type ('constant', 'linear', False). 

65 return_onesided: Return one-sided spectrum for real input. 

66 scaling: Scaling mode ('density' or 'spectrum'). 

67 mode: Output mode ('psd', 'magnitude', 'angle', 'phase', 'complex'). 

68 overlap_factor: Factor for chunk boundary overlap (default: 2.0 = 2*nperseg). 

69 dtype: Data type of input file ('float32' or 'float64'). 

70 **kwargs: Additional arguments passed to scipy.signal.stft. 

71 

72 Returns: 

73 Tuple of (frequencies, times, Sxx) where: 

74 - frequencies: Array of frequency bins (Hz, requires sample_rate in kwargs). 

75 - times: Array of time segments (seconds). 

76 - Sxx: Spectrogram array (frequencies x time segments). 

77 

78 Raises: 

79 ValueError: If chunk_size < nperseg or file cannot be read. 

80 

81 Example: 

82 >>> # Process 10 GB file in 100M sample chunks 

83 >>> f, t, Sxx = spectrogram_chunked( 

84 ... 'huge_trace.bin', 

85 ... chunk_size=100e6, 

86 ... nperseg=4096, 

87 ... noverlap=2048, 

88 ... sample_rate=1e9, # 1 GSa/s 

89 ... dtype='float32' 

90 ... ) 

91 >>> print(f"Spectrogram shape: {Sxx.shape}") 

92 

93 References: 

94 MEM-004: Chunked Spectrogram requirement 

95 """ 

96 chunk_size = int(chunk_size) 

97 if noverlap is None: 

98 noverlap = nperseg // 2 

99 if nfft is None: 

100 nfft = nperseg 

101 

102 if chunk_size < nperseg: 

103 raise ValueError(f"chunk_size ({chunk_size}) must be >= nperseg ({nperseg})") 

104 

105 # Determine dtype 

106 np_dtype = np.float32 if dtype == "float32" else np.float64 

107 bytes_per_sample = 4 if dtype == "float32" else 8 

108 

109 # Calculate boundary overlap (default: 2*nperseg for continuity) 

110 boundary_overlap = int(overlap_factor * nperseg) 

111 

112 # Open file and get total size 

113 file_path = Path(file_path) 

114 file_size_bytes = file_path.stat().st_size 

115 total_samples = file_size_bytes // bytes_per_sample 

116 

117 # Prepare chunk generator 

118 chunks = _generate_chunks(file_path, total_samples, chunk_size, boundary_overlap, np_dtype) 

119 

120 # Process first chunk to initialize arrays 

121 first_chunk = next(chunks) 

122 f, t_chunk, Sxx_chunk = signal.spectrogram( 

123 first_chunk, 

124 nperseg=nperseg, 

125 noverlap=noverlap, 

126 nfft=nfft, 

127 window=window, 

128 detrend=detrend, 

129 return_onesided=return_onesided, 

130 scaling=scaling, 

131 mode=mode, 

132 **kwargs, 

133 ) 

134 

135 # Initialize result arrays 

136 Sxx_list = [Sxx_chunk] 

137 time_offset = 0.0 

138 times_list = [t_chunk + time_offset] 

139 

140 # Get sample rate for time calculation 

141 fs = kwargs.get("sample_rate", kwargs.get("fs", 1.0)) 

142 time_offset += len(first_chunk) / fs 

143 

144 # Process remaining chunks 

145 for chunk_data in chunks: 

146 _, t_chunk, Sxx_chunk = signal.spectrogram( 

147 chunk_data, 

148 nperseg=nperseg, 

149 noverlap=noverlap, 

150 nfft=nfft, 

151 window=window, 

152 detrend=detrend, 

153 return_onesided=return_onesided, 

154 scaling=scaling, 

155 mode=mode, 

156 **kwargs, 

157 ) 

158 

159 Sxx_list.append(Sxx_chunk) 

160 times_list.append(t_chunk + time_offset) 

161 time_offset += (len(chunk_data) - boundary_overlap) / fs 

162 

163 # Concatenate all chunks 

164 Sxx = np.concatenate(Sxx_list, axis=1) 

165 times = np.concatenate(times_list) 

166 

167 return f, times, Sxx 

168 

169 

170def _generate_chunks( 

171 file_path: Path, 

172 total_samples: int, 

173 chunk_size: int, 

174 boundary_overlap: int, 

175 dtype: type, 

176) -> Iterator[NDArray[Any]]: 

177 """Generate overlapping chunks from file. 

178 

179 Args: 

180 file_path: Path to binary file. 

181 total_samples: Total number of samples in file. 

182 chunk_size: Samples per chunk. 

183 boundary_overlap: Overlap samples between chunks. 

184 dtype: NumPy dtype for data. 

185 

186 Yields: 

187 Chunk arrays with boundary overlap. 

188 """ 

189 offset = 0 

190 

191 with open(file_path, "rb") as f: 

192 while offset < total_samples: 

193 # Calculate chunk boundaries 

194 chunk_start = max(0, offset - boundary_overlap) 

195 chunk_end = min(total_samples, offset + chunk_size) 

196 chunk_len = chunk_end - chunk_start 

197 

198 # Seek to start position 

199 f.seek(chunk_start * dtype().itemsize) 

200 

201 # Read chunk 

202 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=chunk_len) 

203 

204 if len(chunk_data) == 0: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 break 

206 

207 yield chunk_data 

208 

209 # Advance offset (accounting for overlap) 

210 offset += chunk_size 

211 

212 

213def spectrogram_chunked_generator( 

214 file_path: str | Path, 

215 chunk_size: int | float, 

216 nperseg: int = 256, 

217 noverlap: int | None = None, 

218 **kwargs: Any, 

219) -> Iterator[tuple[NDArray[Any], NDArray[Any], NDArray[Any]]]: 

220 """Generator version that yields spectrogram chunks. 

221 

222 

223 Yields chunks of (frequencies, times, Sxx) for streaming processing. 

224 Useful when full spectrogram doesn't fit in memory. 

225 

226 Args: 

227 file_path: Path to signal file. 

228 chunk_size: Chunk size in samples. 

229 nperseg: Segment length for STFT. 

230 noverlap: Overlap samples (default: nperseg // 2). 

231 **kwargs: Additional arguments for spectrogram. 

232 

233 Yields: 

234 Tuples of (frequencies, times, Sxx) for each chunk. 

235 

236 Example: 

237 >>> for f, t, Sxx_chunk in spectrogram_chunked_generator('file.bin', chunk_size=50e6): 

238 ... # Process or save each chunk separately 

239 ... print(f"Processing chunk with {Sxx_chunk.shape[1]} time segments") 

240 """ 

241 chunk_size = int(chunk_size) 

242 if noverlap is None: 

243 noverlap = nperseg // 2 

244 

245 # Determine dtype 

246 dtype = kwargs.get("dtype", "float32") 

247 np_dtype = np.float32 if dtype == "float32" else np.float64 

248 bytes_per_sample = 4 if dtype == "float32" else 8 

249 

250 boundary_overlap = int(kwargs.get("overlap_factor", 2.0) * nperseg) 

251 

252 # Open file and get total size 

253 file_path = Path(file_path) 

254 file_size_bytes = file_path.stat().st_size 

255 total_samples = file_size_bytes // bytes_per_sample 

256 

257 # Generate and process chunks 

258 chunks = _generate_chunks(file_path, total_samples, chunk_size, boundary_overlap, np_dtype) 

259 

260 for chunk_data in chunks: 

261 f, t, Sxx_chunk = signal.spectrogram( 

262 chunk_data, 

263 nperseg=nperseg, 

264 noverlap=noverlap, 

265 **{k: v for k, v in kwargs.items() if k != "dtype"}, 

266 ) 

267 yield f, t, Sxx_chunk 

268 

269 

270__all__ = [ 

271 "spectrogram_chunked", 

272 "spectrogram_chunked_generator", 

273]