Coverage for src / tracekit / analyzers / waveform / wavelets.py: 90%
74 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Wavelet transform analysis for waveform data.
3This module provides continuous and discrete wavelet transforms,
4including chunked implementations for memory-efficient processing.
7Example:
8 >>> from tracekit.analyzers.waveform.wavelets import cwt_chunked, dwt_chunked
9 >>> # Process large file with chunked CWT
10 >>> coeffs, scales = cwt_chunked('large_file.bin', scales=np.arange(1, 128))
12References:
13 Mallat, S. (2008). A Wavelet Tour of Signal Processing
14 Torrence, C., & Compo, G. P. (1998). A practical guide to wavelet analysis
15"""
17from __future__ import annotations
19from pathlib import Path
20from typing import TYPE_CHECKING, Any
22import numpy as np
24if TYPE_CHECKING:
25 from collections.abc import Generator
26 from os import PathLike
28 from numpy.typing import NDArray
30try:
31 import pywt
33 PYWT_AVAILABLE = True
34except ImportError:
35 PYWT_AVAILABLE = False
38def cwt_chunked(
39 file_path: str | PathLike[str],
40 scales: NDArray[np.floating[Any]],
41 wavelet: str = "morl",
42 *,
43 chunk_size: int = 10_000_000,
44 overlap_factor: float = 2.0,
45 dtype: type = np.float64,
46) -> Generator[tuple[NDArray[np.float64], NDArray[np.float64]], None, None]:
47 """Compute continuous wavelet transform on large files using chunked processing.
49 Processes file in overlapping chunks to handle files larger than memory.
50 Uses overlap to ensure continuity at chunk boundaries.
52 Args:
53 file_path: Path to binary file containing signal data.
54 scales: Array of scales for CWT computation.
55 wavelet: Wavelet name (default 'morl' - Morlet).
56 chunk_size: Number of samples per chunk (default 10M).
57 overlap_factor: Overlap as multiple of max scale (default 2.0).
58 dtype: Data type for file reading (default float64).
60 Yields:
61 Tuple of (coefficients, valid_scales) for each chunk:
62 - coefficients: CWT coefficients [scales x time]
63 - valid_scales: Scales used (same as input)
65 Raises:
66 ImportError: If PyWavelets is not available.
67 FileNotFoundError: If file does not exist.
69 Example:
70 >>> import numpy as np
71 >>> scales = np.arange(1, 128)
72 >>> for coeffs, scales_out in cwt_chunked('signal.bin', scales):
73 ... # Process each chunk
74 ... print(f"Chunk shape: {coeffs.shape}")
76 References:
77 MEM-007: Chunked Wavelet Transform
78 Torrence & Compo (1998): Practical guide to wavelet analysis
79 """
80 if not PYWT_AVAILABLE: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets")
83 path = Path(file_path)
84 if not path.exists():
85 raise FileNotFoundError(f"File not found: {file_path}")
87 # Get file size and calculate total samples
88 file_size = path.stat().st_size
89 bytes_per_sample = np.dtype(dtype).itemsize
90 total_samples = file_size // bytes_per_sample
92 # Calculate overlap based on maximum scale
93 max_scale = int(np.max(scales))
94 overlap = int(max_scale * overlap_factor)
96 # Process file in chunks
97 offset = 0
98 with open(path, "rb") as f:
99 while offset < total_samples:
100 # Read chunk with overlap
101 f.seek(offset * bytes_per_sample)
102 chunk_samples = min(chunk_size, total_samples - offset)
103 read_samples = min(chunk_samples + overlap, total_samples - offset)
105 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=read_samples)
107 if len(chunk_data) == 0: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 break
110 # Compute CWT on chunk
111 coefficients, _frequencies = pywt.cwt(chunk_data, scales, wavelet, sampling_period=1.0)
113 # Extract valid portion (exclude overlap region for continuity)
114 if offset + chunk_samples < total_samples:
115 # Not the last chunk - exclude overlap from end
116 valid_coeffs = coefficients[:, :chunk_samples]
117 else:
118 # Last chunk - include everything
119 valid_coeffs = coefficients
121 yield valid_coeffs.astype(np.float64), scales.astype(np.float64)
123 # Move to next chunk
124 offset += chunk_samples
127def dwt_chunked(
128 file_path: str | PathLike[str],
129 wavelet: str = "db4",
130 level: int | None = None,
131 *,
132 chunk_size: int = 10_000_000,
133 mode: str = "symmetric",
134 dtype: type = np.float64,
135) -> Generator[dict[str, NDArray[np.float64]], None, None]:
136 """Compute discrete wavelet transform on large files using chunked processing.
138 Processes file in chunks with boundary handling for DWT reconstruction.
140 Args:
141 file_path: Path to binary file containing signal data.
142 wavelet: Wavelet name (default 'db4' - Daubechies 4).
143 level: Decomposition level. If None, uses maximum possible.
144 chunk_size: Number of samples per chunk (default 10M).
145 mode: Signal extension mode (default 'symmetric').
146 dtype: Data type for file reading (default float64).
148 Yields:
149 Dictionary with DWT coefficients for each chunk:
150 - 'cA{level}': Approximation coefficients at level
151 - 'cD{level}', 'cD{level-1}', ..., 'cD1': Detail coefficients
153 Raises:
154 ImportError: If PyWavelets is not available.
155 FileNotFoundError: If file does not exist.
157 Example:
158 >>> for coeffs in dwt_chunked('signal.bin', wavelet='db4', level=3):
159 ... # Process each chunk's wavelet decomposition
160 ... print(f"Approximation shape: {coeffs['cA3'].shape}")
161 ... print(f"Details: {[k for k in coeffs if k.startswith('cD')]}")
163 References:
164 MEM-007: Chunked Wavelet Transform
165 Mallat (2008): Wavelet Tour of Signal Processing
166 """
167 if not PYWT_AVAILABLE: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets")
170 path = Path(file_path)
171 if not path.exists():
172 raise FileNotFoundError(f"File not found: {file_path}")
174 # Get file size
175 file_size = path.stat().st_size
176 bytes_per_sample = np.dtype(dtype).itemsize
177 total_samples = file_size // bytes_per_sample
179 # Calculate filter length for boundary handling
180 wavelet_obj = pywt.Wavelet(wavelet)
181 filter_len = wavelet_obj.dec_len
183 # Overlap needed for boundary continuity
184 if level is None:
185 level = pywt.dwt_max_level(chunk_size, filter_len)
187 overlap = filter_len * (2**level)
189 # Process file in chunks
190 offset = 0
191 with open(path, "rb") as f:
192 while offset < total_samples:
193 # Read chunk with overlap
194 f.seek(offset * bytes_per_sample)
195 chunk_samples = min(chunk_size, total_samples - offset)
196 read_samples = min(chunk_samples + overlap, total_samples - offset)
198 chunk_data: NDArray[np.float64] = np.fromfile(f, dtype=dtype, count=read_samples)
200 if len(chunk_data) == 0: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 break
203 # Compute DWT on chunk
204 coeffs = pywt.wavedec(chunk_data, wavelet, mode=mode, level=level)
206 # Build result dictionary
207 result = {}
208 result[f"cA{level}"] = coeffs[0].astype(np.float64)
209 for i, detail in enumerate(coeffs[1:], 1):
210 result[f"cD{level - i + 1}"] = detail.astype(np.float64)
212 yield result
214 # Move to next chunk
215 offset += chunk_samples
218def cwt(
219 signal: NDArray[np.floating[Any]],
220 scales: NDArray[np.floating[Any]],
221 wavelet: str = "morl",
222 *,
223 sampling_period: float = 1.0,
224) -> tuple[NDArray[np.complex128], NDArray[np.float64]]:
225 """Compute continuous wavelet transform.
227 Wrapper around PyWavelets CWT for in-memory processing.
229 Args:
230 signal: Input signal array.
231 scales: Array of scales for CWT computation.
232 wavelet: Wavelet name (default 'morl' - Morlet).
233 sampling_period: Sampling period (default 1.0).
235 Returns:
236 Tuple of (coefficients, frequencies):
237 - coefficients: Complex CWT coefficients [scales x time]
238 - frequencies: Corresponding frequencies for each scale
240 Raises:
241 ImportError: If PyWavelets is not available.
243 Example:
244 >>> import numpy as np
245 >>> signal = np.sin(2 * np.pi * 10 * np.linspace(0, 1, 1000))
246 >>> scales = np.arange(1, 128)
247 >>> coeffs, freqs = cwt(signal, scales)
248 >>> print(f"CWT shape: {coeffs.shape}")
249 """
250 if not PYWT_AVAILABLE:
251 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets")
253 coefficients, frequencies = pywt.cwt(signal, scales, wavelet, sampling_period=sampling_period)
255 return coefficients.astype(np.complex128), frequencies.astype(np.float64)
258def dwt(
259 signal: NDArray[np.floating[Any]],
260 wavelet: str = "db4",
261 level: int | None = None,
262 *,
263 mode: str = "symmetric",
264) -> list[NDArray[np.float64]]:
265 """Compute discrete wavelet transform.
267 Wrapper around PyWavelets DWT for in-memory processing.
269 Args:
270 signal: Input signal array.
271 wavelet: Wavelet name (default 'db4' - Daubechies 4).
272 level: Decomposition level. If None, uses maximum possible.
273 mode: Signal extension mode (default 'symmetric').
275 Returns:
276 List of wavelet coefficients [cA_n, cD_n, cD_n-1, ..., cD1]:
277 - cA_n: Approximation coefficients at level n
278 - cD_i: Detail coefficients at level i
280 Raises:
281 ImportError: If PyWavelets is not available.
283 Example:
284 >>> import numpy as np
285 >>> signal = np.random.randn(1024)
286 >>> coeffs = dwt(signal, wavelet='db4', level=3)
287 >>> print(f"Approximation: {coeffs[0].shape}")
288 >>> print(f"Details: {[c.shape for c in coeffs[1:]]}")
289 """
290 if not PYWT_AVAILABLE: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 raise ImportError("PyWavelets not available. Install with: pip install PyWavelets")
293 coeffs = pywt.wavedec(signal, wavelet, mode=mode, level=level)
295 return [c.astype(np.float64) for c in coeffs]
298__all__ = ["cwt", "cwt_chunked", "dwt", "dwt_chunked"]