1# =====================================================
2# Imports
3# =====================================================
4from typing import Annotated
5
6import numpy as np
7from loguru import logger
8from pydantic import Field, field_validator, ValidationInfo
9from scipy.interpolate import interp1d
10
11from mt_metadata.base.helpers import object_to_array, requires
12from mt_metadata.timeseries.filters import FilterBase, get_base_obspy_mapping
13
14
15try:
16 from obspy.core.inventory.response import (
17 ResponseListElement,
18 ResponseListResponseStage,
19 )
20except ImportError:
21 ResponseListResponseStage = ResponseListElement = None
22
23
24# =====================================================
25
26
27class FrequencyResponseTableFilter(FilterBase):
28 _filter_type: str = "fap"
29 type: Annotated[
30 str,
31 Field(
32 default="fap",
33 description="Type of filter. Must be 'fap' or 'frequency amplitude table'",
34 alias=None,
35 json_schema_extra={
36 "units": None,
37 "required": True,
38 "examples": ["fap"],
39 },
40 ),
41 ]
42 frequencies: Annotated[
43 np.ndarray | list[float],
44 Field(
45 default_factory=lambda: np.empty(0, dtype=float),
46 description="The frequencies at which a calibration of the filter were performed.",
47 alias=None,
48 json_schema_extra={
49 "units": None,
50 "required": True,
51 "examples": [
52 '"[-0.0001., 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.001, ... 1, 2, 5, 10]"'
53 ],
54 },
55 ),
56 ]
57
58 amplitudes: Annotated[
59 np.ndarray | list[float],
60 Field(
61 default_factory=lambda: np.empty(0, dtype=float),
62 description="The amplitudes for each calibration frequency.",
63 alias=None,
64 json_schema_extra={
65 "units": None,
66 "required": True,
67 "examples": [
68 '"[1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 1.0, 1.0, ... 1.0, 1.0, 1.0, 1.0]"'
69 ],
70 },
71 ),
72 ]
73
74 phases: Annotated[
75 np.ndarray | list[float],
76 Field(
77 default_factory=lambda: np.empty(0, dtype=float),
78 description="The phases for each calibration frequency.",
79 alias=None,
80 json_schema_extra={
81 "units": "radians",
82 "required": True,
83 "examples": [
84 '"[-90, -90, -88, -80, -60, -30, 30, ... 50.0, 90.0, 90.0, 90.0]"'
85 ],
86 },
87 ),
88 ]
89
90 instrument_type: Annotated[
91 str,
92 Field(
93 default="",
94 description="The type of instrument the FAP table is associated with.",
95 alias=None,
96 json_schema_extra={
97 "units": None,
98 "required": True,
99 "examples": ["fluxgate magnetometer"],
100 },
101 ),
102 ]
103
104 def make_obspy_mapping(self):
105 mapping = get_base_obspy_mapping()
106 mapping["amplitudes"] = "amplitudes"
107 mapping["frequencies"] = "frequencies"
108 mapping["phases"] = "phases"
109 return mapping
110
111 @field_validator("frequencies", "amplitudes", mode="before")
112 @classmethod
113 def validate_input_arrays(cls, value, info: ValidationInfo) -> np.ndarray:
114 """
115 Validate that the input is a list, tuple, or np.ndarray and convert to np.ndarray.
116 """
117 return object_to_array(value)
118
119 @field_validator("phases", mode="before")
120 @classmethod
121 def validate_phases(cls, value, info: ValidationInfo) -> np.ndarray:
122 """
123 Validate that the input is a list, tuple, or np.ndarray and convert to np.ndarray.
124 """
125 value = object_to_array(value)
126 if value.size > 0:
127 if value.max() > 1000 * np.pi / 2:
128 logger.warning(
129 "self.phases appear to be in milli radians attempting to convert to radians"
130 )
131 return value / 1000
132
133 elif np.abs(value).max() > 6 * np.pi:
134 logger.warning(
135 "self.phases appear to be in degrees attempting to convert to radians"
136 )
137 return np.deg2rad(value)
138
139 else:
140 return value
141 return value
142
143 @property
144 def min_frequency(self):
145 """
146
147 :return: minimum frequency
148 :rtype: float
149
150 """
151 if self.frequencies is None:
152 return 0.0
153 elif self.frequencies.size == 0:
154 return 0.0
155 return float(self.frequencies.min())
156
157 @property
158 def max_frequency(self):
159 """
160
161 :return: maximum frequency
162 :rtype: float
163
164 """
165 if self.frequencies is None:
166 return 0.0
167 elif self.frequencies.size == 0:
168 return 0.0
169 return float(self.frequencies.max())
170
171 @requires(obspy=(ResponseListResponseStage and ResponseListElement))
172 def to_obspy(
173 self,
174 stage_number=1,
175 normalization_frequency=1,
176 sample_rate=1,
177 ):
178 """
179 Convert to an obspy stage
180
181 :param stage_number: sequential stage number, defaults to 1
182 :type stage_number: integer, optional
183 :param normalization_frequency: Normalization frequency, defaults to 1
184 :type normalization_frequency: float, optional
185 :param sample_rate: sample rate, defaults to 1
186 :type sample_rate: float, optional
187 :return: Obspy stage filter
188 :rtype: :class:`obspy.core.inventory.ResponseListResponseStage`
189
190 """
191 response_elements = []
192 for f, a, p in zip(self.frequencies, self.amplitudes, self.phases):
193 element = ResponseListElement(f, a, p)
194 response_elements.append(element)
195
196 rs = ResponseListResponseStage(
197 stage_number,
198 self.gain,
199 normalization_frequency,
200 self.units_in_object.symbol,
201 self.units_out_object.symbol,
202 name=self.name,
203 description=self.get_filter_description(),
204 input_units_description=self.units_in_object.name,
205 output_units_description=self.units_out_object.name,
206 response_list_elements=response_elements,
207 )
208
209 return rs
210
211 def complex_response(self, frequencies, interpolation_method="slinear"):
212 """
213 Computes complex response for given frequency range
214 :param frequencies: array of frequencies to estimate the response
215 :type frequencies: np.ndarray
216 :return: complex response
217 :rtype: np.ndarray
218
219 """
220 if np.min(frequencies) < self.min_frequency:
221 # if there is a dc component skip it.
222 if np.min(frequencies) != 0:
223 logger.warning(
224 f"Extrapolating frequencies smaller ({np.min(frequencies)} Hz) "
225 f"than table frequencies ({self.min_frequency} Hz)."
226 )
227 if np.max(frequencies) > self.max_frequency:
228 logger.warning(
229 f"Extrapolating frequencies larger ({np.max(frequencies)} Hz) "
230 f"than table frequencies ({self.max_frequency} Hz)."
231 )
232
233 phase_response = interp1d(
234 self.frequencies,
235 self.phases,
236 kind=interpolation_method,
237 fill_value="extrapolate",
238 )
239
240 amplitude_response = interp1d(
241 self.frequencies,
242 self.amplitudes,
243 kind=interpolation_method,
244 fill_value="extrapolate",
245 )
246 total_response_function = lambda f: amplitude_response(f) * np.exp(
247 1.0j * phase_response(f)
248 )
249
250 return self.gain * total_response_function(frequencies)