Coverage for src/driada/utils/signals.py: 64.63%

82 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1""" 

2Signal Processing Utilities 

3=========================== 

4 

5This module contains utility functions for signal generation, analysis, 

6and filtering. It consolidates functionality from the former signals module. 

7 

8Functions 

9--------- 

10brownian : Generate Brownian motion (Wiener process) 

11approximate_entropy : Calculate approximate entropy of a signal 

12filter_1d_timeseries : Filter a 1D time series using various methods 

13filter_signals : Filter multiple signals (2D array) 

14adaptive_filter_signals : Adaptively filter based on SNR 

15""" 

16 

17import numpy as np 

18from scipy.stats import norm 

19from scipy.signal import savgol_filter 

20from scipy.ndimage import gaussian_filter1d 

21import pywt 

22from typing import Optional, Union, List 

23 

24 

25def brownian( 

26 x0: Union[float, np.ndarray], 

27 n: int, 

28 dt: float = 1.0, 

29 delta: float = 1.0, 

30 out: Optional[np.ndarray] = None 

31) -> np.ndarray: 

32 """ 

33 Generate an instance of Brownian motion (i.e. the Wiener process). 

34 

35 The Brownian motion follows: 

36 X(t) = X(0) + N(0, delta**2 * t; 0, t) 

37 

38 where N(a,b; t0, t1) is a normally distributed random variable with mean a  

39 and variance b. The parameters t0 and t1 make explicit the statistical 

40 independence of N on different time intervals. 

41 

42 Written as an iteration scheme: 

43 X(t + dt) = X(t) + N(0, delta**2 * dt; t, t+dt) 

44 

45 Parameters 

46 ---------- 

47 x0 : float or np.ndarray 

48 The initial condition(s) (i.e. position(s)) of the Brownian motion. 

49 If array, each value is treated as an initial condition. 

50 n : int 

51 The number of steps to take. 

52 dt : float, optional 

53 The time step. Default is 1.0. 

54 delta : float, optional 

55 Determines the "speed" of the Brownian motion. The random variable 

56 of the position at time t, X(t), has a normal distribution whose mean  

57 is the position at time t=0 and whose variance is delta**2*t. 

58 Default is 1.0. 

59 out : np.ndarray, optional 

60 If provided, specifies the array in which to put the result. 

61 If None, a new numpy array is created and returned. 

62 

63 Returns 

64 ------- 

65 np.ndarray 

66 Array of floats with shape `x0.shape + (n,)`. 

67 Note that the initial value `x0` is not included in the returned array. 

68 

69 Examples 

70 -------- 

71 >>> # Generate single Brownian motion path 

72 >>> path = brownian(0.0, 1000, dt=0.01) 

73 >>> path.shape 

74 (1000,) 

75  

76 >>> # Generate multiple paths with different initial conditions 

77 >>> paths = brownian([0.0, 1.0, -1.0], 1000, dt=0.01) 

78 >>> paths.shape 

79 (3, 1000) 

80 """ 

81 x0 = np.asarray(x0) 

82 

83 # For each element of x0, generate a sample of n numbers from a 

84 # normal distribution. 

85 r = norm.rvs(size=x0.shape + (n,), scale=delta * np.sqrt(dt)) 

86 

87 # If `out` was not given, create an output array. 

88 if out is None: 

89 out = np.empty(r.shape) 

90 

91 # This computes the Brownian motion by forming the cumulative sum of 

92 # the random samples. 

93 np.cumsum(r, axis=-1, out=out) 

94 

95 # Add the initial condition. 

96 out += np.expand_dims(x0, axis=-1) 

97 

98 return out 

99 

100 

101def approximate_entropy(U: Union[List, np.ndarray], m: int, r: float) -> float: 

102 """ 

103 Calculate approximate entropy (ApEn) of a signal. 

104  

105 Approximate entropy is a regularity statistic that quantifies the  

106 unpredictability of fluctuations in a time series. A time series  

107 containing many repetitive patterns has a relatively small ApEn;  

108 a less predictable process has a higher ApEn. 

109  

110 Parameters 

111 ---------- 

112 U : array-like 

113 Input signal/time series. 

114 m : int 

115 Pattern length. Common values are 1 or 2. 

116 r : float 

117 Tolerance threshold for pattern matching. Typically 0.1-0.25  

118 times the standard deviation of the data. 

119  

120 Returns 

121 ------- 

122 float 

123 The approximate entropy value. Higher values indicate more  

124 randomness/complexity. 

125  

126 Notes 

127 ----- 

128 The algorithm: 

129 1. Fix m (pattern length) and r (tolerance) 

130 2. Look at patterns of length m and m+1 

131 3. Count pattern matches within tolerance r 

132 4. Calculate the logarithmic frequency of patterns 

133 5. Return the difference between m and m+1 pattern frequencies 

134  

135 References 

136 ---------- 

137 Pincus, S. M. (1991). Approximate entropy as a measure of system  

138 complexity. Proceedings of the National Academy of Sciences, 88(6),  

139 2297-2301. 

140  

141 Examples 

142 -------- 

143 >>> # Regular signal has low entropy 

144 >>> regular = [1, 2, 3, 1, 2, 3, 1, 2, 3] 

145 >>> approximate_entropy(regular, m=2, r=0.1) 

146 0.0 

147  

148 >>> # Random signal has high entropy 

149 >>> import numpy as np 

150 >>> random_signal = np.random.randn(100) 

151 >>> apen = approximate_entropy(random_signal, m=2, r=0.2 * np.std(random_signal)) 

152 >>> apen > 1.0 # Typically true for random signals 

153 True 

154 """ 

155 def _maxdist(x_i, x_j): 

156 """Calculate maximum distance between two patterns.""" 

157 return max([abs(ua - va) for ua, va in zip(x_i, x_j)]) 

158 

159 def _phi(m): 

160 """Calculate phi(m) - the logarithmic frequency of patterns.""" 

161 # Extract all patterns of length m 

162 patterns = [[U[j] for j in range(i, i + m)] for i in range(N - m + 1)] 

163 

164 # Count matches for each pattern 

165 C = [ 

166 len([1 for x_j in patterns if _maxdist(x_i, x_j) <= r]) / (N - m + 1.0) 

167 for x_i in patterns 

168 ] 

169 

170 # Return average log frequency 

171 return (N - m + 1.0) ** (-1) * sum(np.log(C)) 

172 

173 N = len(U) 

174 

175 # Approximate entropy is the difference between phi(m+1) and phi(m) 

176 return abs(_phi(m + 1) - _phi(m)) 

177 

178 

179def filter_1d_timeseries(data: np.ndarray, method: str = 'gaussian', **kwargs) -> np.ndarray: 

180 """ 

181 Filter a 1D time series using various denoising methods. 

182  

183 This is the core filtering function used by TimeSeries.filter() method. 

184 Supports Gaussian smoothing, Savitzky-Golay filtering, and wavelet denoising. 

185  

186 Parameters 

187 ---------- 

188 data : ndarray 

189 1D time series data 

190 method : str 

191 Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none' 

192 **kwargs : dict 

193 Method-specific parameters: 

194 - gaussian: sigma (default: 1.0) - standard deviation for Gaussian kernel 

195 - savgol: window_length (default: 5), polyorder (default: 2) 

196 - wavelet: wavelet (default: 'db4'), level (default: auto),  

197 mode (default: 'smooth'), threshold_method (default: 'mad') 

198  

199 Returns 

200 ------- 

201 ndarray 

202 Filtered 1D time series 

203  

204 Examples 

205 -------- 

206 >>> # Gaussian smoothing for general noise reduction 

207 >>> filtered = filter_1d_timeseries(data, method='gaussian', sigma=1.5) 

208  

209 >>> # Savitzky-Golay for preserving peaks while smoothing 

210 >>> filtered = filter_1d_timeseries(data, method='savgol', window_length=5) 

211  

212 >>> # Wavelet denoising for multi-scale noise removal 

213 >>> filtered = filter_1d_timeseries(data, method='wavelet', wavelet='db4') 

214 """ 

215 if method == 'none': 

216 return data.copy() 

217 

218 # Ensure we have a 1D array 

219 data = np.asarray(data).ravel() 

220 

221 if method == 'gaussian': 

222 # Gaussian filter: good for general smoothing 

223 sigma = kwargs.get('sigma', 1.0) 

224 return gaussian_filter1d(data, sigma=sigma) 

225 

226 elif method == 'savgol': 

227 # Savitzky-Golay: preserves features like peaks better than Gaussian 

228 window_length = kwargs.get('window_length', 5) 

229 polyorder = kwargs.get('polyorder', 2) 

230 

231 # Ensure window_length is odd 

232 if window_length % 2 == 0: 

233 window_length += 1 

234 

235 # Check if signal is long enough 

236 if len(data) <= window_length: 

237 return data.copy() 

238 

239 return savgol_filter(data, window_length, polyorder) 

240 

241 elif method == 'wavelet': 

242 # Wavelet denoising: excellent for multi-scale noise removal 

243 wavelet = kwargs.get('wavelet', 'db4') # Daubechies 4 is a good default 

244 level = kwargs.get('level', None) 

245 mode = kwargs.get('mode', 'smooth') # boundary handling 

246 threshold_method = kwargs.get('threshold_method', 'mad') 

247 

248 n = len(data) 

249 

250 # Determine decomposition level if not specified 

251 max_level = pywt.dwt_max_level(n, wavelet) 

252 if level is None: 

253 # Automatic level selection: balance between noise removal and signal preservation 

254 level = min(max_level, max(1, int(np.log2(n)) - 4)) 

255 elif level > max_level: 

256 level = max_level 

257 

258 # Perform wavelet decomposition 

259 coeffs = pywt.wavedec(data, wavelet, mode=mode, level=level) 

260 

261 # Apply thresholding to detail coefficients (not the approximation) 

262 if threshold_method == 'mad': 

263 # MAD-based threshold: robust to outliers 

264 for j in range(1, len(coeffs)): 

265 detail_coeffs = coeffs[j] 

266 # Estimate noise level using Median Absolute Deviation 

267 sigma = np.median(np.abs(detail_coeffs)) / 0.6745 

268 # Universal threshold (Donoho & Johnstone) 

269 threshold = sigma * np.sqrt(2 * np.log(n)) 

270 # Soft thresholding: shrinks coefficients smoothly 

271 coeffs[j] = pywt.threshold(detail_coeffs, threshold, mode='soft') 

272 

273 elif threshold_method == 'sure': 

274 # SURE (Stein's Unbiased Risk Estimate): data-adaptive threshold 

275 for j in range(1, len(coeffs)): 

276 threshold = np.std(coeffs[j]) * np.sqrt(2 * np.log(len(coeffs[j]))) 

277 coeffs[j] = pywt.threshold(coeffs[j], threshold, mode='soft') 

278 

279 # Reconstruct the signal 

280 reconstructed = pywt.waverec(coeffs, wavelet, mode=mode) 

281 

282 # Handle potential length mismatch 

283 return reconstructed[:n] 

284 

285 else: 

286 raise ValueError(f"Unknown filtering method: {method}. " 

287 f"Choose from: 'gaussian', 'savgol', 'wavelet', 'none'") 

288 

289 

290def filter_signals(data: np.ndarray, method: str = 'gaussian', **kwargs) -> np.ndarray: 

291 """ 

292 Filter multiple signals (2D array compatibility wrapper). 

293  

294 Parameters 

295 ---------- 

296 data : ndarray 

297 Data with shape (n_neurons, n_timepoints) 

298 method : str 

299 Filtering method: 'gaussian', 'savgol', 'wavelet', or 'none' 

300 **kwargs : dict 

301 Method-specific parameters (see filter_1d_timeseries) 

302  

303 Returns 

304 ------- 

305 ndarray 

306 Filtered data with same shape as input 

307 """ 

308 if data.ndim == 1: 

309 return filter_1d_timeseries(data, method=method, **kwargs) 

310 

311 filtered_data = np.zeros_like(data) 

312 for i in range(data.shape[0]): 

313 filtered_data[i] = filter_1d_timeseries(data[i], method=method, **kwargs) 

314 

315 return filtered_data 

316 

317 

318def adaptive_filter_signals(data: np.ndarray, snr_threshold: float = 2.0) -> np.ndarray: 

319 """ 

320 Adaptively filter signals based on signal-to-noise ratio. 

321  

322 Parameters 

323 ---------- 

324 data : ndarray 

325 Data with shape (n_neurons, n_timepoints) 

326 snr_threshold : float 

327 SNR threshold for determining filter strength 

328  

329 Returns 

330 ------- 

331 ndarray 

332 Adaptively filtered data 

333 """ 

334 filtered_data = np.zeros_like(data) 

335 

336 for i in range(data.shape[0]): 

337 signal = data[i] 

338 

339 # Estimate SNR (simple approach) 

340 signal_power = np.var(signal) 

341 noise_power = np.var(np.diff(signal)) # High-freq noise estimate 

342 snr = signal_power / (noise_power + 1e-10) 

343 

344 # Adaptive filtering based on SNR 

345 if snr < snr_threshold: 

346 # High noise - stronger filtering 

347 sigma = 2.0 

348 else: 

349 # Low noise - lighter filtering 

350 sigma = 0.5 

351 

352 filtered_data[i] = gaussian_filter1d(signal, sigma=sigma) 

353 

354 return filtered_data