Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ filters \ frequency_response_table_filter.py: 95%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4from typing import Annotated 

5 

6import numpy as np 

7from loguru import logger 

8from pydantic import Field, field_validator, ValidationInfo 

9from scipy.interpolate import interp1d 

10 

11from mt_metadata.base.helpers import object_to_array, requires 

12from mt_metadata.timeseries.filters import FilterBase, get_base_obspy_mapping 

13 

14 

15try: 

16 from obspy.core.inventory.response import ( 

17 ResponseListElement, 

18 ResponseListResponseStage, 

19 ) 

20except ImportError: 

21 ResponseListResponseStage = ResponseListElement = None 

22 

23 

24# ===================================================== 

25 

26 

27class FrequencyResponseTableFilter(FilterBase): 

28 _filter_type: str = "fap" 

29 type: Annotated[ 

30 str, 

31 Field( 

32 default="fap", 

33 description="Type of filter. Must be 'fap' or 'frequency amplitude table'", 

34 alias=None, 

35 json_schema_extra={ 

36 "units": None, 

37 "required": True, 

38 "examples": ["fap"], 

39 }, 

40 ), 

41 ] 

42 frequencies: Annotated[ 

43 np.ndarray | list[float], 

44 Field( 

45 default_factory=lambda: np.empty(0, dtype=float), 

46 description="The frequencies at which a calibration of the filter were performed.", 

47 alias=None, 

48 json_schema_extra={ 

49 "units": None, 

50 "required": True, 

51 "examples": [ 

52 '"[-0.0001., 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.001, ... 1, 2, 5, 10]"' 

53 ], 

54 }, 

55 ), 

56 ] 

57 

58 amplitudes: Annotated[ 

59 np.ndarray | list[float], 

60 Field( 

61 default_factory=lambda: np.empty(0, dtype=float), 

62 description="The amplitudes for each calibration frequency.", 

63 alias=None, 

64 json_schema_extra={ 

65 "units": None, 

66 "required": True, 

67 "examples": [ 

68 '"[1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 1.0, 1.0, ... 1.0, 1.0, 1.0, 1.0]"' 

69 ], 

70 }, 

71 ), 

72 ] 

73 

74 phases: Annotated[ 

75 np.ndarray | list[float], 

76 Field( 

77 default_factory=lambda: np.empty(0, dtype=float), 

78 description="The phases for each calibration frequency.", 

79 alias=None, 

80 json_schema_extra={ 

81 "units": "radians", 

82 "required": True, 

83 "examples": [ 

84 '"[-90, -90, -88, -80, -60, -30, 30, ... 50.0, 90.0, 90.0, 90.0]"' 

85 ], 

86 }, 

87 ), 

88 ] 

89 

90 instrument_type: Annotated[ 

91 str, 

92 Field( 

93 default="", 

94 description="The type of instrument the FAP table is associated with.", 

95 alias=None, 

96 json_schema_extra={ 

97 "units": None, 

98 "required": True, 

99 "examples": ["fluxgate magnetometer"], 

100 }, 

101 ), 

102 ] 

103 

104 def make_obspy_mapping(self): 

105 mapping = get_base_obspy_mapping() 

106 mapping["amplitudes"] = "amplitudes" 

107 mapping["frequencies"] = "frequencies" 

108 mapping["phases"] = "phases" 

109 return mapping 

110 

111 @field_validator("frequencies", "amplitudes", mode="before") 

112 @classmethod 

113 def validate_input_arrays(cls, value, info: ValidationInfo) -> np.ndarray: 

114 """ 

115 Validate that the input is a list, tuple, or np.ndarray and convert to np.ndarray. 

116 """ 

117 return object_to_array(value) 

118 

119 @field_validator("phases", mode="before") 

120 @classmethod 

121 def validate_phases(cls, value, info: ValidationInfo) -> np.ndarray: 

122 """ 

123 Validate that the input is a list, tuple, or np.ndarray and convert to np.ndarray. 

124 """ 

125 value = object_to_array(value) 

126 if value.size > 0: 

127 if value.max() > 1000 * np.pi / 2: 

128 logger.warning( 

129 "self.phases appear to be in milli radians attempting to convert to radians" 

130 ) 

131 return value / 1000 

132 

133 elif np.abs(value).max() > 6 * np.pi: 

134 logger.warning( 

135 "self.phases appear to be in degrees attempting to convert to radians" 

136 ) 

137 return np.deg2rad(value) 

138 

139 else: 

140 return value 

141 return value 

142 

143 @property 

144 def min_frequency(self): 

145 """ 

146 

147 :return: minimum frequency 

148 :rtype: float 

149 

150 """ 

151 if self.frequencies is None: 

152 return 0.0 

153 elif self.frequencies.size == 0: 

154 return 0.0 

155 return float(self.frequencies.min()) 

156 

157 @property 

158 def max_frequency(self): 

159 """ 

160 

161 :return: maximum frequency 

162 :rtype: float 

163 

164 """ 

165 if self.frequencies is None: 

166 return 0.0 

167 elif self.frequencies.size == 0: 

168 return 0.0 

169 return float(self.frequencies.max()) 

170 

171 @requires(obspy=(ResponseListResponseStage and ResponseListElement)) 

172 def to_obspy( 

173 self, 

174 stage_number=1, 

175 normalization_frequency=1, 

176 sample_rate=1, 

177 ): 

178 """ 

179 Convert to an obspy stage 

180 

181 :param stage_number: sequential stage number, defaults to 1 

182 :type stage_number: integer, optional 

183 :param normalization_frequency: Normalization frequency, defaults to 1 

184 :type normalization_frequency: float, optional 

185 :param sample_rate: sample rate, defaults to 1 

186 :type sample_rate: float, optional 

187 :return: Obspy stage filter 

188 :rtype: :class:`obspy.core.inventory.ResponseListResponseStage` 

189 

190 """ 

191 response_elements = [] 

192 for f, a, p in zip(self.frequencies, self.amplitudes, self.phases): 

193 element = ResponseListElement(f, a, p) 

194 response_elements.append(element) 

195 

196 rs = ResponseListResponseStage( 

197 stage_number, 

198 self.gain, 

199 normalization_frequency, 

200 self.units_in_object.symbol, 

201 self.units_out_object.symbol, 

202 name=self.name, 

203 description=self.get_filter_description(), 

204 input_units_description=self.units_in_object.name, 

205 output_units_description=self.units_out_object.name, 

206 response_list_elements=response_elements, 

207 ) 

208 

209 return rs 

210 

211 def complex_response(self, frequencies, interpolation_method="slinear"): 

212 """ 

213 Computes complex response for given frequency range 

214 :param frequencies: array of frequencies to estimate the response 

215 :type frequencies: np.ndarray 

216 :return: complex response 

217 :rtype: np.ndarray 

218 

219 """ 

220 if np.min(frequencies) < self.min_frequency: 

221 # if there is a dc component skip it. 

222 if np.min(frequencies) != 0: 

223 logger.warning( 

224 f"Extrapolating frequencies smaller ({np.min(frequencies)} Hz) " 

225 f"than table frequencies ({self.min_frequency} Hz)." 

226 ) 

227 if np.max(frequencies) > self.max_frequency: 

228 logger.warning( 

229 f"Extrapolating frequencies larger ({np.max(frequencies)} Hz) " 

230 f"than table frequencies ({self.max_frequency} Hz)." 

231 ) 

232 

233 phase_response = interp1d( 

234 self.frequencies, 

235 self.phases, 

236 kind=interpolation_method, 

237 fill_value="extrapolate", 

238 ) 

239 

240 amplitude_response = interp1d( 

241 self.frequencies, 

242 self.amplitudes, 

243 kind=interpolation_method, 

244 fill_value="extrapolate", 

245 ) 

246 total_response_function = lambda f: amplitude_response(f) * np.exp( 

247 1.0j * phase_response(f) 

248 ) 

249 

250 return self.gain * total_response_function(frequencies)