Coverage for src / tracekit / inference / spectral.py: 89%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Spectral method auto-selection. 

2 

3This module automatically selects appropriate spectral analysis methods 

4based on signal characteristics. 

5 

6 

7Example: 

8 >>> import tracekit as tk 

9 >>> trace = tk.load('signal.wfm') 

10 >>> config = tk.auto_spectral_config(trace) 

11 >>> print(f"Method: {config['method']}") 

12 >>> print(f"Window: {config['window']}") 

13 

14References: 

15 Welch's method: IEEE Signal Processing Magazine (1986) 

16 Stationarity tests: Augmented Dickey-Fuller test 

17""" 

18 

19from __future__ import annotations 

20 

21from typing import TYPE_CHECKING, Any 

22 

23import numpy as np 

24 

25if TYPE_CHECKING: 

26 from numpy.typing import NDArray 

27 

28 from tracekit.core.types import WaveformTrace 

29 

30 

31def auto_spectral_config( 

32 trace: WaveformTrace, 

33 *, 

34 target_resolution: float | None = None, 

35 dynamic_range_db: float = 60.0, 

36 log_rationale: bool = True, 

37) -> dict[str, Any]: 

38 """Auto-select spectral analysis method and parameters. 

39 

40 Analyzes signal stationarity and characteristics to select: 

41 - Spectral method (Welch, Bartlett, Periodogram) 

42 - Window function (Hann, Hamming, Blackman, Kaiser) 

43 - Window size and overlap 

44 - FFT size 

45 

46 Args: 

47 trace: Signal to analyze. 

48 target_resolution: Desired frequency resolution in Hz (optional). 

49 dynamic_range_db: Required dynamic range in dB (default 60 dB). 

50 log_rationale: If True, include selection rationale in result. 

51 

52 Returns: 

53 Dictionary containing: 

54 - method: Recommended method ('welch', 'bartlett', 'periodogram') 

55 - window: Window function name ('hann', 'hamming', 'blackman', etc.) 

56 - nperseg: Segment length for Welch/Bartlett 

57 - noverlap: Overlap samples 

58 - nfft: FFT size 

59 - stationarity_score: Stationarity score (0-1, 1=stationary) 

60 - rationale: Explanation of selection (if log_rationale=True) 

61 

62 Example: 

63 >>> trace = tk.load('noisy_signal.wfm') 

64 >>> config = tk.auto_spectral_config(trace, dynamic_range_db=80) 

65 >>> print(f"Method: {config['method']}") 

66 >>> print(f"Window: {config['window']}") 

67 >>> print(f"Rationale: {config['rationale']}") 

68 >>> # Use configuration 

69 >>> freq, psd = tk.psd(trace, **config) 

70 

71 References: 

72 Welch, P. D. (1967): Use of FFT for estimation of power spectra 

73 Stoica & Moses (2005): Spectral Analysis of Signals 

74 """ 

75 # Analyze signal stationarity 

76 stationarity_score = _assess_stationarity(trace.data) 

77 

78 # Determine method based on stationarity 

79 if stationarity_score > 0.8: 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was never true

80 # Stationary - can use simpler methods 

81 method = "bartlett" 

82 rationale = f"Signal is stationary (score={stationarity_score:.2f}). Bartlett method provides good variance reduction." 

83 elif stationarity_score > 0.5: 

84 # Moderately stationary - Welch is safest 

85 method = "welch" 

86 rationale = f"Signal is moderately stationary (score={stationarity_score:.2f}). Welch method with overlap for variance reduction." 

87 else: 

88 # Non-stationary - need time-frequency analysis or careful windowing 

89 method = "welch" 

90 rationale = f"Signal is non-stationary (score={stationarity_score:.2f}). Welch method with short segments to track changes." 

91 

92 # Select window function based on dynamic range requirements 

93 if dynamic_range_db > 80: 

94 window = "blackman-harris" 

95 window_rationale = "Blackman-Harris window for high dynamic range (>80 dB)" 

96 elif dynamic_range_db > 60: 

97 window = "blackman" 

98 window_rationale = "Blackman window for good dynamic range (60-80 dB)" 

99 elif dynamic_range_db > 40: 

100 window = "hamming" 

101 window_rationale = "Hamming window for moderate dynamic range (40-60 dB)" 

102 else: 

103 window = "hann" 

104 window_rationale = "Hann window for general purpose (<40 dB)" 

105 

106 if log_rationale: 

107 rationale += " " + window_rationale 

108 

109 # Determine segment size 

110 n_samples = len(trace.data) 

111 sample_rate = trace.metadata.sample_rate 

112 

113 if target_resolution is not None: 

114 # User specified resolution 

115 nperseg = int(sample_rate / target_resolution) 

116 # Round to next power of 2 for efficiency 

117 nperseg = 2 ** int(np.ceil(np.log2(nperseg))) 

118 # Auto-select based on signal length and stationarity 

119 elif stationarity_score > 0.8: 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was never true

120 # Stationary - can use longer segments 

121 nperseg = min(n_samples, 2**14) # Up to 16k samples 

122 else: 

123 # Non-stationary - shorter segments 

124 nperseg = min(n_samples // 8, 2**12) # Up to 4k samples 

125 

126 # Ensure nperseg is reasonable 

127 nperseg = max(256, min(nperseg, n_samples // 2)) 

128 

129 # Determine overlap 

130 if method == "welch": 130 ↛ 133line 130 didn't jump to line 133 because the condition on line 130 was always true

131 # Welch typically uses 50% overlap 

132 noverlap = nperseg // 2 

133 elif method == "bartlett": 

134 # Bartlett uses no overlap 

135 noverlap = 0 

136 else: 

137 # Periodogram doesn't use segments 

138 noverlap = 0 

139 

140 # FFT size (usually same as segment size or next power of 2) 

141 nfft = 2 ** int(np.ceil(np.log2(nperseg))) 

142 

143 config = { 

144 "method": method, 

145 "window": window, 

146 "nperseg": nperseg, 

147 "noverlap": noverlap, 

148 "nfft": nfft, 

149 "stationarity_score": stationarity_score, 

150 } 

151 

152 if log_rationale: 

153 config["rationale"] = rationale 

154 

155 return config 

156 

157 

158def _assess_stationarity(data: NDArray[np.floating[Any]]) -> float: 

159 """Assess signal stationarity using windowed variance method. 

160 

161 A simpler alternative to ADF test that works well for spectral analysis. 

162 

163 Args: 

164 data: Signal data array. 

165 

166 Returns: 

167 Stationarity score from 0 (non-stationary) to 1 (stationary). 

168 """ 

169 # Divide signal into windows 

170 n_windows = 8 

171 window_size = len(data) // n_windows 

172 

173 if window_size < 100: 

174 # Signal too short for meaningful analysis 

175 return 0.7 # Assume moderately stationary 

176 

177 # Calculate statistics in each window 

178 means = [] 

179 variances = [] 

180 

181 for i in range(n_windows): 

182 start = i * window_size 

183 end = start + window_size 

184 window_data = data[start:end] 

185 

186 means.append(np.mean(window_data)) 

187 variances.append(np.var(window_data)) 

188 

189 # Stationarity check: mean and variance should be consistent 

190 # Use coefficient of variation, but handle near-zero means specially 

191 mean_abs = abs(np.mean(means)) 

192 if mean_abs > 1e-6: 

193 mean_variation = np.std(means) / mean_abs 

194 else: 

195 # For zero-mean signals, use absolute variation 

196 mean_variation = np.std(means) 

197 

198 var_mean = np.mean(variances) 

199 if var_mean > 1e-12: 

200 var_variation = np.std(variances) / var_mean 

201 else: 

202 # Near-zero variance signal (constant) 

203 var_variation = 0.0 

204 

205 # Low variation = high stationarity 

206 mean_score = max(0, 1.0 - mean_variation * 5) 

207 var_score = max(0, 1.0 - var_variation * 5) 

208 

209 # Combined score 

210 stationarity_score = (mean_score + var_score) / 2 

211 

212 return float(np.clip(stationarity_score, 0, 1)) 

213 

214 

215__all__ = ["auto_spectral_config"]