Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ calibrations.py: 98%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Jun 15 15:21:35 2023 

4 

5@author: jpeacock 

6 

7Calibrations can come in json files. the JSON file includes filters 

8for all lowpass filters, so you need to match the lowpass filter used in the 

9setup with the lowpass filter. Then you need to add the dipole length and 

10sensor calibrations. 

11""" 

12 

13# ============================================================================= 

14# Imports 

15# ============================================================================= 

16from __future__ import annotations 

17 

18from pathlib import Path 

19from typing import Any, TYPE_CHECKING 

20 

21import numpy as np 

22from mt_metadata.common.mttime import MTime 

23from mt_metadata.timeseries.filters import FrequencyResponseTableFilter 

24 

25from .helpers import read_json_to_object 

26 

27 

28if TYPE_CHECKING: 

29 from numpy.typing import NDArray 

30 

31 

32# ============================================================================= 

33 

34 

35class PhoenixCalibration: 

36 """ 

37 Phoenix Geophysics calibration data reader and filter manager. 

38 

39 This class reads Phoenix calibration files in JSON format and provides 

40 access to frequency response filters for different channels and lowpass 

41 filter settings. It supports both receiver and sensor calibration files. 

42 

43 Parameters 

44 ---------- 

45 cal_fn : str or pathlib.Path, optional 

46 Path to the calibration file to read. If provided, the file will be 

47 loaded automatically during initialization. 

48 **kwargs : Any 

49 Additional keyword arguments that will be set as instance attributes. 

50 

51 Attributes 

52 ---------- 

53 obj : Any or None 

54 The parsed calibration object containing all calibration data. 

55 """ 

56 

57 def __init__(self, cal_fn: str | Path | None = None, **kwargs: Any) -> None: 

58 self.obj: Any = None 

59 

60 for key, value in kwargs.items(): 

61 setattr(self, key, value) 

62 

63 self.cal_fn = cal_fn 

64 

65 def __str__(self) -> str: 

66 """String representation of PhoenixCalibration.""" 

67 lines = ["Phoenix Response Filters"] 

68 return "\n".join(lines) 

69 

70 def __repr__(self) -> str: 

71 """Detailed string representation of PhoenixCalibration.""" 

72 return self.__str__() 

73 

74 @property 

75 def cal_fn(self) -> Path: 

76 """ 

77 Path to the calibration file. 

78 

79 Returns 

80 ------- 

81 pathlib.Path 

82 The path to the calibration file. 

83 """ 

84 return self._cal_fn 

85 

86 @cal_fn.setter 

87 def cal_fn(self, cal_fn: str | Path | None) -> None: 

88 """ 

89 Set the calibration file path and automatically read the file. 

90 

91 Parameters 

92 ---------- 

93 cal_fn : str, pathlib.Path, or None 

94 Path to the calibration file. If None, no action is taken. 

95 If the file exists, it will be read automatically. 

96 

97 Raises 

98 ------ 

99 IOError 

100 If the specified file does not exist. 

101 """ 

102 if cal_fn is not None: 

103 self._cal_fn = Path(cal_fn) 

104 if self._cal_fn.exists(): 

105 self.read() 

106 else: 

107 raise IOError(f"Could not find file {cal_fn}") 

108 

109 @property 

110 def calibration_date(self) -> MTime | None: 

111 """ 

112 Get the calibration date from the loaded calibration data. 

113 

114 Returns 

115 ------- 

116 MTime or None 

117 The calibration date as an MTime object, or None if no data is loaded. 

118 """ 

119 if self._has_read(): 

120 return MTime(time_stamp=self.obj.timestamp_utc) 

121 return None 

122 

123 def _has_read(self) -> bool: 

124 """ 

125 Check if calibration data has been loaded. 

126 

127 Returns 

128 ------- 

129 bool 

130 True if calibration data is loaded, False otherwise. 

131 """ 

132 return self.obj is not None 

133 

134 def get_max_freq( 

135 self, freq: NDArray[np.floating] | list[float] | np.ndarray 

136 ) -> int: 

137 """ 

138 Calculate the maximum frequency for filter naming. 

139 

140 Determines the power-of-10 frequency limit based on the maximum 

141 frequency in the input array. Used to name filters as 

142 {channel}_{max_freq}hz_lowpass. 

143 

144 Parameters 

145 ---------- 

146 freq : numpy.ndarray 

147 Array of frequency values in Hz. 

148 

149 Returns 

150 ------- 

151 int 

152 The power-of-10 frequency limit (e.g., 1000 for frequencies up to 9999 Hz). 

153 

154 Examples 

155 -------- 

156 >>> cal = PhoenixCalibration() 

157 >>> freq = np.array([1.0, 10.0, 100.0, 1500.0]) 

158 >>> cal.get_max_freq(freq) 

159 1000 

160 """ 

161 return int(10 ** np.floor(np.log10(np.array(freq).max()))) 

162 

163 @property 

164 def base_filter_name(self) -> str | None: 

165 """ 

166 Generate the base filter name from instrument information. 

167 

168 Creates a standardized filter name prefix based on the instrument 

169 type, model, and serial number from the calibration data. 

170 

171 Returns 

172 ------- 

173 str or None 

174 Base filter name in format "{instrument_type}_{instrument_model}_{serial}" 

175 converted to lowercase, or None if no data is loaded. 

176 

177 Examples 

178 -------- 

179 >>> cal = PhoenixCalibration("calibration.json") 

180 >>> cal.base_filter_name 

181 'mtu-5c_rmt03-j_666' 

182 """ 

183 if self._has_read(): 

184 return ( 

185 f"{self.obj.instrument_type}_" 

186 f"{self.obj.instrument_model}_" 

187 f"{self.obj.inst_serial}" 

188 ).lower() 

189 return None 

190 

191 def get_filter_lp_name(self, channel: str, max_freq: int) -> str: 

192 """ 

193 Generate a lowpass filter name for a specific channel and frequency. 

194 

195 Creates a standardized filter name for receiver calibration filters 

196 in the format: {base_filter_name}_{channel}_{max_freq}hz_lowpass 

197 

198 Parameters 

199 ---------- 

200 channel : str 

201 Channel identifier (e.g., 'e1', 'h2'). 

202 max_freq : int 

203 Maximum frequency in Hz for the lowpass filter. 

204 

205 Returns 

206 ------- 

207 str 

208 Complete lowpass filter name in lowercase. 

209 

210 Examples 

211 -------- 

212 >>> cal = PhoenixCalibration("calibration.json") 

213 >>> cal.get_filter_lp_name("e1", 1000) 

214 'mtu-5c_rmt03-j_666_e1_1000hz_lowpass' 

215 """ 

216 return f"{self.base_filter_name}_{channel}_{max_freq}hz_lowpass".lower() 

217 

218 def get_filter_sensor_name(self, sensor: str) -> str: 

219 """ 

220 Generate a sensor filter name for a specific sensor. 

221 

222 Creates a standardized filter name for sensor calibration filters 

223 in the format: {base_filter_name}_{sensor} 

224 

225 Parameters 

226 ---------- 

227 sensor : str 

228 Sensor identifier or serial number. 

229 

230 Returns 

231 ------- 

232 str 

233 Complete sensor filter name in lowercase. 

234 

235 Examples 

236 -------- 

237 >>> cal = PhoenixCalibration("calibration.json") 

238 >>> cal.get_filter_sensor_name("sensor123") 

239 'mtu-5c_rmt03-j_666_sensor123' 

240 """ 

241 return f"{self.base_filter_name}_{sensor}".lower() 

242 

243 def read(self, cal_fn: str | Path | None = None) -> None: 

244 """ 

245 Read and parse a Phoenix calibration file. 

246 

247 Loads calibration data from a JSON file and creates frequency response 

248 filters for each channel and frequency band. The method creates channel 

249 attributes (e.g., self.e1, self.h2) containing either: 

250 - Dictionary of filters by frequency (receiver calibration) 

251 - Single filter object (sensor calibration) 

252 

253 Parameters 

254 ---------- 

255 cal_fn : str, pathlib.Path, or None, optional 

256 Path to the calibration file to read. If None, uses the previously 

257 set calibration file path. 

258 

259 Raises 

260 ------ 

261 IOError 

262 If the calibration file cannot be found or read. 

263 

264 Notes 

265 ----- 

266 The method automatically determines calibration type based on file_type: 

267 - "receiver calibration": Creates multiple filters per channel by frequency 

268 - "sensor calibration": Creates single filter per channel 

269 """ 

270 if cal_fn is not None: 

271 self._cal_fn = Path(cal_fn) 

272 

273 if not self.cal_fn.exists(): 

274 raise IOError(f"Could not find {self.cal_fn}") 

275 

276 self.obj = read_json_to_object(self.cal_fn) 

277 

278 for channel in self.obj.cal_data: 

279 comp = channel.tag.lower() 

280 ch_cal_dict = {} 

281 for cal in channel.chan_data: 

282 ch_fap = FrequencyResponseTableFilter() # type: ignore 

283 ch_fap.frequencies = cal.freq_Hz 

284 ch_fap.amplitudes = cal.magnitude 

285 ch_fap.phases = np.deg2rad(cal.phs_deg) 

286 

287 max_freq = self.get_max_freq(ch_fap.frequencies) 

288 if self.obj.file_type in ["receiver calibration"]: 

289 ch_fap.name = self.get_filter_lp_name(comp, max_freq) 

290 else: 

291 ch_fap.name = self.get_filter_sensor_name(self.obj.sensor_serial) 

292 ch_fap.calibration_date = self.obj.timestamp_utc 

293 ch_cal_dict[max_freq] = ch_fap 

294 ch_fap.units_in = "Volt" 

295 ch_fap.units_out = "Volt" 

296 

297 if "sensor" in self.obj.file_type: 

298 ch_fap.units_in = "milliVolt" 

299 ch_fap.units_out = "nanoTesla" 

300 setattr(self, comp, ch_fap) 

301 

302 else: 

303 setattr(self, comp, ch_cal_dict) 

304 

305 def get_filter( 

306 self, channel: str, filter_name: str | int 

307 ) -> FrequencyResponseTableFilter: 

308 """ 

309 Get the frequency response filter for a specific channel and filter. 

310 

311 Retrieves the lowpass filter for the given channel and filter specification. 

312 The method automatically handles both string and integer filter names. 

313 

314 Parameters 

315 ---------- 

316 channel : str 

317 Channel identifier (e.g., 'e1', 'h2', 'h3'). 

318 filter_name : str or int 

319 Filter specification, typically the lowpass frequency in Hz 

320 (e.g., 1000, '100', 10000). 

321 

322 Returns 

323 ------- 

324 FrequencyResponseTableFilter 

325 The frequency response filter object containing the calibration data 

326 for the specified channel and filter. 

327 

328 Raises 

329 ------ 

330 AttributeError 

331 If the specified channel is not found in the calibration data. 

332 KeyError 

333 If the specified filter is not found for the given channel. 

334 

335 Examples 

336 -------- 

337 >>> cal = PhoenixCalibration("calibration.json") 

338 >>> filt = cal.get_filter("e1", 1000) 

339 >>> print(f"Filter name: {filt.name}") 

340 >>> print(f"Frequency points: {len(filt.frequencies)}") 

341 """ 

342 try: 

343 filter_name = int(filter_name) 

344 except ValueError: 

345 pass 

346 

347 try: 

348 return getattr(self, channel)[filter_name] 

349 except AttributeError: 

350 raise AttributeError(f"Could not find {channel}") 

351 except KeyError: 

352 raise KeyError(f"Could not find lowpass filter {filter_name}")