Coverage for src/driada/utils/signals.py: 64.63%
82 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
1"""
2Signal Processing Utilities
3===========================
5This module contains utility functions for signal generation, analysis,
6and filtering. It consolidates functionality from the former signals module.
8Functions
9---------
10brownian : Generate Brownian motion (Wiener process)
11approximate_entropy : Calculate approximate entropy of a signal
12filter_1d_timeseries : Filter a 1D time series using various methods
13filter_signals : Filter multiple signals (2D array)
14adaptive_filter_signals : Adaptively filter based on SNR
15"""
17import numpy as np
18from scipy.stats import norm
19from scipy.signal import savgol_filter
20from scipy.ndimage import gaussian_filter1d
21import pywt
22from typing import Optional, Union, List
25def brownian(
26 x0: Union[float, np.ndarray],
27 n: int,
28 dt: float = 1.0,
29 delta: float = 1.0,
30 out: Optional[np.ndarray] = None
31) -> np.ndarray:
32 """
33 Generate an instance of Brownian motion (i.e. the Wiener process).
35 The Brownian motion follows:
36 X(t) = X(0) + N(0, delta**2 * t; 0, t)
38 where N(a,b; t0, t1) is a normally distributed random variable with mean a
39 and variance b. The parameters t0 and t1 make explicit the statistical
40 independence of N on different time intervals.
42 Written as an iteration scheme:
43 X(t + dt) = X(t) + N(0, delta**2 * dt; t, t+dt)
45 Parameters
46 ----------
47 x0 : float or np.ndarray
48 The initial condition(s) (i.e. position(s)) of the Brownian motion.
49 If array, each value is treated as an initial condition.
50 n : int
51 The number of steps to take.
52 dt : float, optional
53 The time step. Default is 1.0.
54 delta : float, optional
55 Determines the "speed" of the Brownian motion. The random variable
56 of the position at time t, X(t), has a normal distribution whose mean
57 is the position at time t=0 and whose variance is delta**2*t.
58 Default is 1.0.
59 out : np.ndarray, optional
60 If provided, specifies the array in which to put the result.
61 If None, a new numpy array is created and returned.
63 Returns
64 -------
65 np.ndarray
66 Array of floats with shape `x0.shape + (n,)`.
67 Note that the initial value `x0` is not included in the returned array.
69 Examples
70 --------
71 >>> # Generate single Brownian motion path
72 >>> path = brownian(0.0, 1000, dt=0.01)
73 >>> path.shape
74 (1000,)
76 >>> # Generate multiple paths with different initial conditions
77 >>> paths = brownian([0.0, 1.0, -1.0], 1000, dt=0.01)
78 >>> paths.shape
79 (3, 1000)
80 """
81 x0 = np.asarray(x0)
83 # For each element of x0, generate a sample of n numbers from a
84 # normal distribution.
85 r = norm.rvs(size=x0.shape + (n,), scale=delta * np.sqrt(dt))
87 # If `out` was not given, create an output array.
88 if out is None:
89 out = np.empty(r.shape)
91 # This computes the Brownian motion by forming the cumulative sum of
92 # the random samples.
93 np.cumsum(r, axis=-1, out=out)
95 # Add the initial condition.
96 out += np.expand_dims(x0, axis=-1)
98 return out
101def approximate_entropy(U: Union[List, np.ndarray], m: int, r: float) -> float:
102 """
103 Calculate approximate entropy (ApEn) of a signal.
105 Approximate entropy is a regularity statistic that quantifies the
106 unpredictability of fluctuations in a time series. A time series
107 containing many repetitive patterns has a relatively small ApEn;
108 a less predictable process has a higher ApEn.
110 Parameters
111 ----------
112 U : array-like
113 Input signal/time series.
114 m : int
115 Pattern length. Common values are 1 or 2.
116 r : float
117 Tolerance threshold for pattern matching. Typically 0.1-0.25
118 times the standard deviation of the data.
120 Returns
121 -------
122 float
123 The approximate entropy value. Higher values indicate more
124 randomness/complexity.
126 Notes
127 -----
128 The algorithm:
129 1. Fix m (pattern length) and r (tolerance)
130 2. Look at patterns of length m and m+1
131 3. Count pattern matches within tolerance r
132 4. Calculate the logarithmic frequency of patterns
133 5. Return the difference between m and m+1 pattern frequencies
135 References
136 ----------
137 Pincus, S. M. (1991). Approximate entropy as a measure of system
138 complexity. Proceedings of the National Academy of Sciences, 88(6),
139 2297-2301.
141 Examples
142 --------
143 >>> # Regular signal has low entropy
144 >>> regular = [1, 2, 3, 1, 2, 3, 1, 2, 3]
145 >>> approximate_entropy(regular, m=2, r=0.1)
146 0.0
148 >>> # Random signal has high entropy
149 >>> import numpy as np
150 >>> random_signal = np.random.randn(100)
151 >>> apen = approximate_entropy(random_signal, m=2, r=0.2 * np.std(random_signal))
152 >>> apen > 1.0 # Typically true for random signals
153 True
154 """
155 def _maxdist(x_i, x_j):
156 """Calculate maximum distance between two patterns."""
157 return max([abs(ua - va) for ua, va in zip(x_i, x_j)])
159 def _phi(m):
160 """Calculate phi(m) - the logarithmic frequency of patterns."""
161 # Extract all patterns of length m
162 patterns = [[U[j] for j in range(i, i + m)] for i in range(N - m + 1)]
164 # Count matches for each pattern
165 C = [
166 len([1 for x_j in patterns if _maxdist(x_i, x_j) <= r]) / (N - m + 1.0)
167 for x_i in patterns
168 ]
170 # Return average log frequency
171 return (N - m + 1.0) ** (-1) * sum(np.log(C))
173 N = len(U)
175 # Approximate entropy is the difference between phi(m+1) and phi(m)
176 return abs(_phi(m + 1) - _phi(m))
179def filter_1d_timeseries(data: np.ndarray, method: str = 'gaussian', **kwargs) -> np.ndarray:
180 """
181 Filter a 1D time series using various denoising methods.
183 This is the core filtering function used by TimeSeries.filter() method.
184 Supports Gaussian smoothing, Savitzky-Golay filtering, and wavelet denoising.
186 Parameters
187 ----------
188 data : ndarray
189 1D time series data
190 method : str
191 Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none'
192 **kwargs : dict
193 Method-specific parameters:
194 - gaussian: sigma (default: 1.0) - standard deviation for Gaussian kernel
195 - savgol: window_length (default: 5), polyorder (default: 2)
196 - wavelet: wavelet (default: 'db4'), level (default: auto),
197 mode (default: 'smooth'), threshold_method (default: 'mad')
199 Returns
200 -------
201 ndarray
202 Filtered 1D time series
204 Examples
205 --------
206 >>> # Gaussian smoothing for general noise reduction
207 >>> filtered = filter_1d_timeseries(data, method='gaussian', sigma=1.5)
209 >>> # Savitzky-Golay for preserving peaks while smoothing
210 >>> filtered = filter_1d_timeseries(data, method='savgol', window_length=5)
212 >>> # Wavelet denoising for multi-scale noise removal
213 >>> filtered = filter_1d_timeseries(data, method='wavelet', wavelet='db4')
214 """
215 if method == 'none':
216 return data.copy()
218 # Ensure we have a 1D array
219 data = np.asarray(data).ravel()
221 if method == 'gaussian':
222 # Gaussian filter: good for general smoothing
223 sigma = kwargs.get('sigma', 1.0)
224 return gaussian_filter1d(data, sigma=sigma)
226 elif method == 'savgol':
227 # Savitzky-Golay: preserves features like peaks better than Gaussian
228 window_length = kwargs.get('window_length', 5)
229 polyorder = kwargs.get('polyorder', 2)
231 # Ensure window_length is odd
232 if window_length % 2 == 0:
233 window_length += 1
235 # Check if signal is long enough
236 if len(data) <= window_length:
237 return data.copy()
239 return savgol_filter(data, window_length, polyorder)
241 elif method == 'wavelet':
242 # Wavelet denoising: excellent for multi-scale noise removal
243 wavelet = kwargs.get('wavelet', 'db4') # Daubechies 4 is a good default
244 level = kwargs.get('level', None)
245 mode = kwargs.get('mode', 'smooth') # boundary handling
246 threshold_method = kwargs.get('threshold_method', 'mad')
248 n = len(data)
250 # Determine decomposition level if not specified
251 max_level = pywt.dwt_max_level(n, wavelet)
252 if level is None:
253 # Automatic level selection: balance between noise removal and signal preservation
254 level = min(max_level, max(1, int(np.log2(n)) - 4))
255 elif level > max_level:
256 level = max_level
258 # Perform wavelet decomposition
259 coeffs = pywt.wavedec(data, wavelet, mode=mode, level=level)
261 # Apply thresholding to detail coefficients (not the approximation)
262 if threshold_method == 'mad':
263 # MAD-based threshold: robust to outliers
264 for j in range(1, len(coeffs)):
265 detail_coeffs = coeffs[j]
266 # Estimate noise level using Median Absolute Deviation
267 sigma = np.median(np.abs(detail_coeffs)) / 0.6745
268 # Universal threshold (Donoho & Johnstone)
269 threshold = sigma * np.sqrt(2 * np.log(n))
270 # Soft thresholding: shrinks coefficients smoothly
271 coeffs[j] = pywt.threshold(detail_coeffs, threshold, mode='soft')
273 elif threshold_method == 'sure':
274 # SURE (Stein's Unbiased Risk Estimate): data-adaptive threshold
275 for j in range(1, len(coeffs)):
276 threshold = np.std(coeffs[j]) * np.sqrt(2 * np.log(len(coeffs[j])))
277 coeffs[j] = pywt.threshold(coeffs[j], threshold, mode='soft')
279 # Reconstruct the signal
280 reconstructed = pywt.waverec(coeffs, wavelet, mode=mode)
282 # Handle potential length mismatch
283 return reconstructed[:n]
285 else:
286 raise ValueError(f"Unknown filtering method: {method}. "
287 f"Choose from: 'gaussian', 'savgol', 'wavelet', 'none'")
290def filter_signals(data: np.ndarray, method: str = 'gaussian', **kwargs) -> np.ndarray:
291 """
292 Filter multiple signals (2D array compatibility wrapper).
294 Parameters
295 ----------
296 data : ndarray
297 Data with shape (n_neurons, n_timepoints)
298 method : str
299 Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none'
300 **kwargs : dict
301 Method-specific parameters (see filter_1d_timeseries)
303 Returns
304 -------
305 ndarray
306 Filtered data with same shape as input
307 """
308 if data.ndim == 1:
309 return filter_1d_timeseries(data, method=method, **kwargs)
311 filtered_data = np.zeros_like(data)
312 for i in range(data.shape[0]):
313 filtered_data[i] = filter_1d_timeseries(data[i], method=method, **kwargs)
315 return filtered_data
318def adaptive_filter_signals(data: np.ndarray, snr_threshold: float = 2.0) -> np.ndarray:
319 """
320 Adaptively filter signals based on signal-to-noise ratio.
322 Parameters
323 ----------
324 data : ndarray
325 Data with shape (n_neurons, n_timepoints)
326 snr_threshold : float
327 SNR threshold for determining filter strength
329 Returns
330 -------
331 ndarray
332 Adaptively filtered data
333 """
334 filtered_data = np.zeros_like(data)
336 for i in range(data.shape[0]):
337 signal = data[i]
339 # Estimate SNR (simple approach)
340 signal_power = np.var(signal)
341 noise_power = np.var(np.diff(signal)) # High-freq noise estimate
342 snr = signal_power / (noise_power + 1e-10)
344 # Adaptive filtering based on SNR
345 if snr < snr_threshold:
346 # High noise - stronger filtering
347 sigma = 2.0
348 else:
349 # Low noise - lighter filtering
350 sigma = 0.5
352 filtered_data[i] = gaussian_filter1d(signal, sigma=sigma)
354 return filtered_data