Coverage for src / tracekit / analyzers / waveform / wavelets.py: 90%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Wavelet transform analysis for waveform data. 

2 

3This module provides continuous and discrete wavelet transforms, 

4including chunked implementations for memory-efficient processing. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.waveform.wavelets import cwt_chunked, dwt_chunked 

9 >>> # Process large file with chunked CWT 

10 >>> coeffs, scales = cwt_chunked('large_file.bin', scales=np.arange(1, 128)) 

11 

12References: 

13 Mallat, S. (2008). A Wavelet Tour of Signal Processing 

14 Torrence, C., & Compo, G. P. (1998). A practical guide to wavelet analysis 

15""" 

16 

17from __future__ import annotations 

18 

19from pathlib import Path 

20from typing import TYPE_CHECKING, Any 

21 

22import numpy as np 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Generator 

26 from os import PathLike 

27 

28 from numpy.typing import NDArray 

29 

30try: 

31 import pywt 

32 

33 PYWT_AVAILABLE = True 

34except ImportError: 

35 PYWT_AVAILABLE = False 

36 

37 

38def cwt_chunked( 

39 file_path: str | PathLike[str], 

40 scales: NDArray[np.floating[Any]], 

41 wavelet: str = "morl", 

42 *, 

43 chunk_size: int = 10_000_000, 

44 overlap_factor: float = 2.0, 

45 dtype: type = np.float64, 

46) -> Generator[tuple[NDArray[np.float64], NDArray[np.float64]], None, None]: 

47 """Compute continuous wavelet transform on large files using chunked processing. 

48 

49 Processes file in overlapping chunks to handle files larger than memory. 

50 Uses overlap to ensure continuity at chunk boundaries. 

51 

52 Args: 

53 file_path: Path to binary file containing signal data. 

54 scales: Array of scales for CWT computation. 

55 wavelet: Wavelet name (default 'morl' - Morlet). 

56 chunk_size: Number of samples per chunk (default 10M). 

57 overlap_factor: Overlap as multiple of max scale (default 2.0). 

58 dtype: Data type for file reading (default float64). 

59 

60 Yields: 

61 Tuple of (coefficients, valid_scales) for each chunk: 

62 - coefficients: CWT coefficients [scales x time] 

63 - valid_scales: Scales used (same as input) 

64 

65 Raises: 

66 ImportError: If PyWavelets is not available. 

67 FileNotFoundError: If file does not exist. 

68 

69 Example: 

70 >>> import numpy as np 

71 >>> scales = np.arange(1, 128) 

72 >>> for coeffs, scales_out in cwt_chunked('signal.bin', scales): 

73 ... # Process each chunk 

74 ... print(f"Chunk shape: {coeffs.shape}") 

75 

76 References: 

77 MEM-007: Chunked Wavelet Transform 

78 Torrence & Compo (1998): Practical guide to wavelet analysis 

79 """ 

80 if not PYWT_AVAILABLE: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets") 

82 

83 path = Path(file_path) 

84 if not path.exists(): 

85 raise FileNotFoundError(f"File not found: {file_path}") 

86 

87 # Get file size and calculate total samples 

88 file_size = path.stat().st_size 

89 bytes_per_sample = np.dtype(dtype).itemsize 

90 total_samples = file_size // bytes_per_sample 

91 

92 # Calculate overlap based on maximum scale 

93 max_scale = int(np.max(scales)) 

94 overlap = int(max_scale * overlap_factor) 

95 

96 # Process file in chunks 

97 offset = 0 

98 with open(path, "rb") as f: 

99 while offset < total_samples: 

100 # Read chunk with overlap 

101 f.seek(offset * bytes_per_sample) 

102 chunk_samples = min(chunk_size, total_samples - offset) 

103 read_samples = min(chunk_samples + overlap, total_samples - offset) 

104 

105 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=read_samples) 

106 

107 if len(chunk_data) == 0: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 break 

109 

110 # Compute CWT on chunk 

111 coefficients, _frequencies = pywt.cwt(chunk_data, scales, wavelet, sampling_period=1.0) 

112 

113 # Extract valid portion (exclude overlap region for continuity) 

114 if offset + chunk_samples < total_samples: 

115 # Not the last chunk - exclude overlap from end 

116 valid_coeffs = coefficients[:, :chunk_samples] 

117 else: 

118 # Last chunk - include everything 

119 valid_coeffs = coefficients 

120 

121 yield valid_coeffs.astype(np.float64), scales.astype(np.float64) 

122 

123 # Move to next chunk 

124 offset += chunk_samples 

125 

126 

127def dwt_chunked( 

128 file_path: str | PathLike[str], 

129 wavelet: str = "db4", 

130 level: int | None = None, 

131 *, 

132 chunk_size: int = 10_000_000, 

133 mode: str = "symmetric", 

134 dtype: type = np.float64, 

135) -> Generator[dict[str, NDArray[np.float64]], None, None]: 

136 """Compute discrete wavelet transform on large files using chunked processing. 

137 

138 Processes file in chunks with boundary handling for DWT reconstruction. 

139 

140 Args: 

141 file_path: Path to binary file containing signal data. 

142 wavelet: Wavelet name (default 'db4' - Daubechies 4). 

143 level: Decomposition level. If None, uses maximum possible. 

144 chunk_size: Number of samples per chunk (default 10M). 

145 mode: Signal extension mode (default 'symmetric'). 

146 dtype: Data type for file reading (default float64). 

147 

148 Yields: 

149 Dictionary with DWT coefficients for each chunk: 

150 - 'cA{level}': Approximation coefficients at level 

151 - 'cD{level}', 'cD{level-1}', ..., 'cD1': Detail coefficients 

152 

153 Raises: 

154 ImportError: If PyWavelets is not available. 

155 FileNotFoundError: If file does not exist. 

156 

157 Example: 

158 >>> for coeffs in dwt_chunked('signal.bin', wavelet='db4', level=3): 

159 ... # Process each chunk's wavelet decomposition 

160 ... print(f"Approximation shape: {coeffs['cA3'].shape}") 

161 ... print(f"Details: {[k for k in coeffs if k.startswith('cD')]}") 

162 

163 References: 

164 MEM-007: Chunked Wavelet Transform 

165 Mallat (2008): Wavelet Tour of Signal Processing 

166 """ 

167 if not PYWT_AVAILABLE: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets") 

169 

170 path = Path(file_path) 

171 if not path.exists(): 

172 raise FileNotFoundError(f"File not found: {file_path}") 

173 

174 # Get file size 

175 file_size = path.stat().st_size 

176 bytes_per_sample = np.dtype(dtype).itemsize 

177 total_samples = file_size // bytes_per_sample 

178 

179 # Calculate filter length for boundary handling 

180 wavelet_obj = pywt.Wavelet(wavelet) 

181 filter_len = wavelet_obj.dec_len 

182 

183 # Overlap needed for boundary continuity 

184 if level is None: 

185 level = pywt.dwt_max_level(chunk_size, filter_len) 

186 

187 overlap = filter_len * (2**level) 

188 

189 # Process file in chunks 

190 offset = 0 

191 with open(path, "rb") as f: 

192 while offset < total_samples: 

193 # Read chunk with overlap 

194 f.seek(offset * bytes_per_sample) 

195 chunk_samples = min(chunk_size, total_samples - offset) 

196 read_samples = min(chunk_samples + overlap, total_samples - offset) 

197 

198 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=read_samples) 

199 

200 if len(chunk_data) == 0: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 break 

202 

203 # Compute DWT on chunk 

204 coeffs = pywt.wavedec(chunk_data, wavelet, mode=mode, level=level) 

205 

206 # Build result dictionary 

207 result = {} 

208 result[f"cA{level}"] = coeffs[0].astype(np.float64) 

209 for i, detail in enumerate(coeffs[1:], 1): 

210 result[f"cD{level - i + 1}"] = detail.astype(np.float64) 

211 

212 yield result 

213 

214 # Move to next chunk 

215 offset += chunk_samples 

216 

217 

218def cwt( 

219 signal: NDArray[np.floating[Any]], 

220 scales: NDArray[np.floating[Any]], 

221 wavelet: str = "morl", 

222 *, 

223 sampling_period: float = 1.0, 

224) -> tuple[NDArray[np.complex128], NDArray[np.float64]]: 

225 """Compute continuous wavelet transform. 

226 

227 Wrapper around PyWavelets CWT for in-memory processing. 

228 

229 Args: 

230 signal: Input signal array. 

231 scales: Array of scales for CWT computation. 

232 wavelet: Wavelet name (default 'morl' - Morlet). 

233 sampling_period: Sampling period (default 1.0). 

234 

235 Returns: 

236 Tuple of (coefficients, frequencies): 

237 - coefficients: Complex CWT coefficients [scales x time] 

238 - frequencies: Corresponding frequencies for each scale 

239 

240 Raises: 

241 ImportError: If PyWavelets is not available. 

242 

243 Example: 

244 >>> import numpy as np 

245 >>> signal = np.sin(2 * np.pi * 10 * np.linspace(0, 1, 1000)) 

246 >>> scales = np.arange(1, 128) 

247 >>> coeffs, freqs = cwt(signal, scales) 

248 >>> print(f"CWT shape: {coeffs.shape}") 

249 """ 

250 if not PYWT_AVAILABLE: 

251 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets") 

252 

253 coefficients, frequencies = pywt.cwt(signal, scales, wavelet, sampling_period=sampling_period) 

254 

255 return coefficients.astype(np.complex128), frequencies.astype(np.float64) 

256 

257 

258def dwt( 

259 signal: NDArray[np.floating[Any]], 

260 wavelet: str = "db4", 

261 level: int | None = None, 

262 *, 

263 mode: str = "symmetric", 

264) -> list[NDArray[np.float64]]: 

265 """Compute discrete wavelet transform. 

266 

267 Wrapper around PyWavelets DWT for in-memory processing. 

268 

269 Args: 

270 signal: Input signal array. 

271 wavelet: Wavelet name (default 'db4' - Daubechies 4). 

272 level: Decomposition level. If None, uses maximum possible. 

273 mode: Signal extension mode (default 'symmetric'). 

274 

275 Returns: 

276 List of wavelet coefficients [cA_n, cD_n, cD_n-1, ..., cD1]: 

277 - cA_n: Approximation coefficients at level n 

278 - cD_i: Detail coefficients at level i 

279 

280 Raises: 

281 ImportError: If PyWavelets is not available. 

282 

283 Example: 

284 >>> import numpy as np 

285 >>> signal = np.random.randn(1024) 

286 >>> coeffs = dwt(signal, wavelet='db4', level=3) 

287 >>> print(f"Approximation: {coeffs[0].shape}") 

288 >>> print(f"Details: {[c.shape for c in coeffs[1:]]}") 

289 """ 

290 if not PYWT_AVAILABLE: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets") 

292 

293 coeffs = pywt.wavedec(signal, wavelet, mode=mode, level=level) 

294 

295 return [c.astype(np.float64) for c in coeffs] 

296 

297 

298__all__ = ["cwt", "cwt_chunked", "dwt", "dwt_chunked"]