Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ calibrations.py: 98%
84 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Jun 15 15:21:35 2023
5@author: jpeacock
7Calibrations can come in json files. the JSON file includes filters
8for all lowpass filters, so you need to match the lowpass filter used in the
9setup with the lowpass filter. Then you need to add the dipole length and
10sensor calibrations.
11"""
13# =============================================================================
14# Imports
15# =============================================================================
16from __future__ import annotations
18from pathlib import Path
19from typing import Any, TYPE_CHECKING
21import numpy as np
22from mt_metadata.common.mttime import MTime
23from mt_metadata.timeseries.filters import FrequencyResponseTableFilter
25from .helpers import read_json_to_object
28if TYPE_CHECKING:
29 from numpy.typing import NDArray
32# =============================================================================
35class PhoenixCalibration:
36 """
37 Phoenix Geophysics calibration data reader and filter manager.
39 This class reads Phoenix calibration files in JSON format and provides
40 access to frequency response filters for different channels and lowpass
41 filter settings. It supports both receiver and sensor calibration files.
43 Parameters
44 ----------
45 cal_fn : str or pathlib.Path, optional
46 Path to the calibration file to read. If provided, the file will be
47 loaded automatically during initialization.
48 **kwargs : Any
49 Additional keyword arguments that will be set as instance attributes.
51 Attributes
52 ----------
53 obj : Any or None
54 The parsed calibration object containing all calibration data.
55 """
57 def __init__(self, cal_fn: str | Path | None = None, **kwargs: Any) -> None:
58 self.obj: Any = None
60 for key, value in kwargs.items():
61 setattr(self, key, value)
63 self.cal_fn = cal_fn
65 def __str__(self) -> str:
66 """String representation of PhoenixCalibration."""
67 lines = ["Phoenix Response Filters"]
68 return "\n".join(lines)
70 def __repr__(self) -> str:
71 """Detailed string representation of PhoenixCalibration."""
72 return self.__str__()
74 @property
75 def cal_fn(self) -> Path:
76 """
77 Path to the calibration file.
79 Returns
80 -------
81 pathlib.Path
82 The path to the calibration file.
83 """
84 return self._cal_fn
86 @cal_fn.setter
87 def cal_fn(self, cal_fn: str | Path | None) -> None:
88 """
89 Set the calibration file path and automatically read the file.
91 Parameters
92 ----------
93 cal_fn : str, pathlib.Path, or None
94 Path to the calibration file. If None, no action is taken.
95 If the file exists, it will be read automatically.
97 Raises
98 ------
99 IOError
100 If the specified file does not exist.
101 """
102 if cal_fn is not None:
103 self._cal_fn = Path(cal_fn)
104 if self._cal_fn.exists():
105 self.read()
106 else:
107 raise IOError(f"Could not find file {cal_fn}")
109 @property
110 def calibration_date(self) -> MTime | None:
111 """
112 Get the calibration date from the loaded calibration data.
114 Returns
115 -------
116 MTime or None
117 The calibration date as an MTime object, or None if no data is loaded.
118 """
119 if self._has_read():
120 return MTime(time_stamp=self.obj.timestamp_utc)
121 return None
123 def _has_read(self) -> bool:
124 """
125 Check if calibration data has been loaded.
127 Returns
128 -------
129 bool
130 True if calibration data is loaded, False otherwise.
131 """
132 return self.obj is not None
134 def get_max_freq(
135 self, freq: NDArray[np.floating] | list[float] | np.ndarray
136 ) -> int:
137 """
138 Calculate the maximum frequency for filter naming.
140 Determines the power-of-10 frequency limit based on the maximum
141 frequency in the input array. Used to name filters as
142 {channel}_{max_freq}hz_lowpass.
144 Parameters
145 ----------
146 freq : numpy.ndarray
147 Array of frequency values in Hz.
149 Returns
150 -------
151 int
152 The power-of-10 frequency limit (e.g., 1000 for frequencies up to 9999 Hz).
154 Examples
155 --------
156 >>> cal = PhoenixCalibration()
157 >>> freq = np.array([1.0, 10.0, 100.0, 1500.0])
158 >>> cal.get_max_freq(freq)
159 1000
160 """
161 return int(10 ** np.floor(np.log10(np.array(freq).max())))
163 @property
164 def base_filter_name(self) -> str | None:
165 """
166 Generate the base filter name from instrument information.
168 Creates a standardized filter name prefix based on the instrument
169 type, model, and serial number from the calibration data.
171 Returns
172 -------
173 str or None
174 Base filter name in format "{instrument_type}_{instrument_model}_{serial}"
175 converted to lowercase, or None if no data is loaded.
177 Examples
178 --------
179 >>> cal = PhoenixCalibration("calibration.json")
180 >>> cal.base_filter_name
181 'mtu-5c_rmt03-j_666'
182 """
183 if self._has_read():
184 return (
185 f"{self.obj.instrument_type}_"
186 f"{self.obj.instrument_model}_"
187 f"{self.obj.inst_serial}"
188 ).lower()
189 return None
191 def get_filter_lp_name(self, channel: str, max_freq: int) -> str:
192 """
193 Generate a lowpass filter name for a specific channel and frequency.
195 Creates a standardized filter name for receiver calibration filters
196 in the format: {base_filter_name}_{channel}_{max_freq}hz_lowpass
198 Parameters
199 ----------
200 channel : str
201 Channel identifier (e.g., 'e1', 'h2').
202 max_freq : int
203 Maximum frequency in Hz for the lowpass filter.
205 Returns
206 -------
207 str
208 Complete lowpass filter name in lowercase.
210 Examples
211 --------
212 >>> cal = PhoenixCalibration("calibration.json")
213 >>> cal.get_filter_lp_name("e1", 1000)
214 'mtu-5c_rmt03-j_666_e1_1000hz_lowpass'
215 """
216 return f"{self.base_filter_name}_{channel}_{max_freq}hz_lowpass".lower()
218 def get_filter_sensor_name(self, sensor: str) -> str:
219 """
220 Generate a sensor filter name for a specific sensor.
222 Creates a standardized filter name for sensor calibration filters
223 in the format: {base_filter_name}_{sensor}
225 Parameters
226 ----------
227 sensor : str
228 Sensor identifier or serial number.
230 Returns
231 -------
232 str
233 Complete sensor filter name in lowercase.
235 Examples
236 --------
237 >>> cal = PhoenixCalibration("calibration.json")
238 >>> cal.get_filter_sensor_name("sensor123")
239 'mtu-5c_rmt03-j_666_sensor123'
240 """
241 return f"{self.base_filter_name}_{sensor}".lower()
243 def read(self, cal_fn: str | Path | None = None) -> None:
244 """
245 Read and parse a Phoenix calibration file.
247 Loads calibration data from a JSON file and creates frequency response
248 filters for each channel and frequency band. The method creates channel
249 attributes (e.g., self.e1, self.h2) containing either:
250 - Dictionary of filters by frequency (receiver calibration)
251 - Single filter object (sensor calibration)
253 Parameters
254 ----------
255 cal_fn : str, pathlib.Path, or None, optional
256 Path to the calibration file to read. If None, uses the previously
257 set calibration file path.
259 Raises
260 ------
261 IOError
262 If the calibration file cannot be found or read.
264 Notes
265 -----
266 The method automatically determines calibration type based on file_type:
267 - "receiver calibration": Creates multiple filters per channel by frequency
268 - "sensor calibration": Creates single filter per channel
269 """
270 if cal_fn is not None:
271 self._cal_fn = Path(cal_fn)
273 if not self.cal_fn.exists():
274 raise IOError(f"Could not find {self.cal_fn}")
276 self.obj = read_json_to_object(self.cal_fn)
278 for channel in self.obj.cal_data:
279 comp = channel.tag.lower()
280 ch_cal_dict = {}
281 for cal in channel.chan_data:
282 ch_fap = FrequencyResponseTableFilter() # type: ignore
283 ch_fap.frequencies = cal.freq_Hz
284 ch_fap.amplitudes = cal.magnitude
285 ch_fap.phases = np.deg2rad(cal.phs_deg)
287 max_freq = self.get_max_freq(ch_fap.frequencies)
288 if self.obj.file_type in ["receiver calibration"]:
289 ch_fap.name = self.get_filter_lp_name(comp, max_freq)
290 else:
291 ch_fap.name = self.get_filter_sensor_name(self.obj.sensor_serial)
292 ch_fap.calibration_date = self.obj.timestamp_utc
293 ch_cal_dict[max_freq] = ch_fap
294 ch_fap.units_in = "Volt"
295 ch_fap.units_out = "Volt"
297 if "sensor" in self.obj.file_type:
298 ch_fap.units_in = "milliVolt"
299 ch_fap.units_out = "nanoTesla"
300 setattr(self, comp, ch_fap)
302 else:
303 setattr(self, comp, ch_cal_dict)
305 def get_filter(
306 self, channel: str, filter_name: str | int
307 ) -> FrequencyResponseTableFilter:
308 """
309 Get the frequency response filter for a specific channel and filter.
311 Retrieves the lowpass filter for the given channel and filter specification.
312 The method automatically handles both string and integer filter names.
314 Parameters
315 ----------
316 channel : str
317 Channel identifier (e.g., 'e1', 'h2', 'h3').
318 filter_name : str or int
319 Filter specification, typically the lowpass frequency in Hz
320 (e.g., 1000, '100', 10000).
322 Returns
323 -------
324 FrequencyResponseTableFilter
325 The frequency response filter object containing the calibration data
326 for the specified channel and filter.
328 Raises
329 ------
330 AttributeError
331 If the specified channel is not found in the calibration data.
332 KeyError
333 If the specified filter is not found for the given channel.
335 Examples
336 --------
337 >>> cal = PhoenixCalibration("calibration.json")
338 >>> filt = cal.get_filter("e1", 1000)
339 >>> print(f"Filter name: {filt.name}")
340 >>> print(f"Frequency points: {len(filt.frequencies)}")
341 """
342 try:
343 filter_name = int(filter_name)
344 except ValueError:
345 pass
347 try:
348 return getattr(self, channel)[filter_name]
349 except AttributeError:
350 raise AttributeError(f"Could not find {channel}")
351 except KeyError:
352 raise KeyError(f"Could not find lowpass filter {filter_name}")