Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ receiver_metadata.py: 86%
167 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Phoenix Geophysics receiver metadata parser for recmeta.json files.
5Created on Tue Jun 20 15:06:08 2023
7@author: jpeacock
8"""
10from __future__ import annotations
12# =============================================================================
13# Imports
14# =============================================================================
15from pathlib import Path
16from types import SimpleNamespace
17from typing import Any, TYPE_CHECKING
19from loguru import logger
20from mt_metadata.common import Comment
21from mt_metadata.timeseries import Electric, Magnetic, Run, Station, Survey
22from mt_metadata.timeseries.filtered import AppliedFilter
24from .helpers import read_json_to_object
27if TYPE_CHECKING:
28 pass
31# =============================================================================
34class PhoenixReceiverMetadata:
35 """
36 Container for Phoenix Geophysics recmeta.json metadata files.
38 This class reads and parses receiver metadata from JSON configuration files
39 used to control Phoenix Geophysics MTU-5C data recording systems. It provides
40 methods to extract channel configurations, instrument settings, and convert
41 them to standardized metadata objects.
43 Parameters
44 ----------
45 fn : str, Path, or None, optional
46 Path to the recmeta.json file. If provided, the file will be read
47 automatically during initialization.
48 **kwargs
49 Additional keyword arguments (currently unused).
51 Attributes
52 ----------
53 fn : Path or None
54 Path to the metadata file.
55 obj : SimpleNamespace or None
56 Parsed JSON content as a SimpleNamespace object.
57 logger : loguru.Logger
58 Logger instance for error reporting.
60 Raises
61 ------
62 IOError
63 If the specified file does not exist.
65 Examples
66 --------
67 >>> metadata = PhoenixReceiverMetadata("recmeta.json")
68 >>> channel_map = metadata.channel_map
69 >>> e1_config = metadata.e1_metadata
71 Notes
72 -----
73 The class supports both electric and magnetic channel configurations
74 with automatic mapping from Phoenix-specific parameter names to
75 standardized metadata attributes.
76 """
78 def __init__(self, fn: str | Path | None = None, **kwargs: Any) -> None:
79 self.fn = fn
80 self.obj: SimpleNamespace | None = None
82 self._e_map = {
83 "tag": "component",
84 "ty": "type",
85 "ga": "gain",
86 "sampleRate": "sample_rate",
87 "pot_p": "contact_resistance.start",
88 "pot_n": "contact_resistance.end",
89 }
91 self._h_map = {
92 "tag": "component",
93 "ty": "type",
94 "ga": "gain",
95 "sampleRate": "sample_rate",
96 "type_name": "sensor.model",
97 "type": "sensor.type",
98 "serial": "sensor.id",
99 }
100 self.logger = logger
102 if self.fn is not None:
103 self.read()
105 @property
106 def fn(self) -> Path | None:
107 """
108 Path to the metadata file.
110 Returns
111 -------
112 Path or None
113 Path to the recmeta.json file, or None if not set.
114 """
115 return self._fn
117 @fn.setter
118 def fn(self, fn: str | Path | None) -> None:
119 """
120 Set the metadata file path.
122 Parameters
123 ----------
124 fn : str, Path, or None
125 Path to the metadata file.
127 Raises
128 ------
129 IOError
130 If the specified file does not exist.
131 """
132 if fn is None:
133 self._fn = None
134 else:
135 fn = Path(fn)
136 if fn.exists():
137 self._fn = Path(fn)
138 else:
139 raise IOError(f"Could not find {fn}")
141 @property
142 def instrument_id(self) -> str | None:
143 """
144 Instrument identifier from metadata.
146 Returns
147 -------
148 str or None
149 Instrument ID if available, None otherwise.
150 """
151 if self.has_obj() and self.obj is not None:
152 return self.obj.instid
153 return None
155 def read(self, fn: str | Path | None = None) -> None:
156 """
157 Read a recmeta.json file in Phoenix format.
159 Parameters
160 ----------
161 fn : str, Path, or None, optional
162 Path to the JSON file. If None, uses the current fn property.
164 Raises
165 ------
166 IOError
167 If no file path is specified or file doesn't exist.
168 ValueError
169 If the file cannot be parsed as JSON.
170 """
171 if fn is not None:
172 self.fn = fn
174 if self.fn is None:
175 raise IOError("No file path specified")
177 self.obj = read_json_to_object(self.fn)
179 def has_obj(self) -> bool:
180 """
181 Check if metadata object is loaded.
183 Returns
184 -------
185 bool
186 True if metadata object exists, False otherwise.
187 """
188 return self.obj is not None
190 @property
191 def channel_map(self) -> dict[int, str]:
192 """
193 Channel mapping from index to component tag.
195 Returns
196 -------
197 dict[int, str]
198 Dictionary mapping channel indices to component tags (lowercase).
200 Raises
201 ------
202 AttributeError
203 If metadata object is not loaded or missing channel_map.
204 """
205 if self.has_obj() and self.obj is not None:
206 return dict([(d.idx, d.tag.lower()) for d in self.obj.channel_map.mapping])
207 return {}
209 @property
210 def lp_filter_base_name(self) -> str | None:
211 """
212 Base name for low-pass filter identifiers.
214 Returns
215 -------
216 str or None
217 Filter base name combining receiver info, or None if not available.
218 """
219 if self.has_obj() and self.obj is not None:
220 return (
221 f"{self.obj.receiver_commercial_name}_"
222 f"{self.obj.receiver_model}_"
223 f"{self.obj.instid}"
224 ).lower()
225 return None
227 def get_ch_index(self, tag: str) -> int:
228 """
229 Get channel index from component tag.
231 Parameters
232 ----------
233 tag : str
234 Component tag (e.g., 'e1', 'h1', etc.).
236 Returns
237 -------
238 int
239 Channel index corresponding to the tag.
241 Raises
242 ------
243 ValueError
244 If the tag is not found in the channel map.
245 AttributeError
246 If metadata object is not loaded.
247 """
248 if self.has_obj() and self.obj is not None:
249 for item in self.obj.channel_map.mapping:
250 if item.tag.lower() == tag.lower():
251 return item.idx
252 raise ValueError(f"Could not find {tag} in channel map.")
253 raise AttributeError("No metadata object loaded")
255 def get_ch_tag(self, index: int) -> str:
256 """
257 Get component tag from channel index.
259 Parameters
260 ----------
261 index : int
262 Channel index.
264 Returns
265 -------
266 str
267 Component tag corresponding to the index.
269 Raises
270 ------
271 ValueError
272 If the index is not found in the channel map.
273 AttributeError
274 If metadata object is not loaded.
275 """
276 if self.has_obj() and self.obj is not None:
277 for item in self.obj.channel_map.mapping:
278 if item.idx == index:
279 return item.tag
280 raise ValueError(f"Could not find {index} in channel map.")
281 raise AttributeError("No metadata object loaded")
283 def _to_electric_metadata(self, tag: str) -> Electric:
284 """
285 Convert Phoenix configuration to Electric channel metadata.
287 Parameters
288 ----------
289 tag : str
290 Channel tag (e.g., 'e1', 'e2').
292 Returns
293 -------
294 Electric
295 Configured electric channel metadata.
297 Raises
298 ------
299 AttributeError
300 If metadata object is not loaded.
301 ValueError
302 If channel tag is not found.
303 """
304 c = Electric() # type: ignore[call-arg]
306 if self.has_obj() and self.obj is not None:
307 ch = self.obj.chconfig.chans[self.get_ch_index(tag)]
309 for p_key, m_value in self._e_map.items():
310 if p_key == "ty":
311 m_value = "electric"
312 try:
313 value = getattr(ch, p_key)
314 # Convert any numeric values to strings if mapping to string fields
315 if isinstance(value, (int, float)) and "id" in m_value:
316 value = str(value)
317 c.update_attribute(m_value, value)
318 except AttributeError:
319 self.logger.error(
320 f"recmeta.json does not contain attribute '{p_key}' for "
321 f"channel '{ch.tag}'."
322 )
323 c.channel_number = self.get_ch_index(tag)
324 c.dipole_length = ch.length1 + ch.length2
325 c.units = "V"
326 c.time_period.start = self.obj.start
327 c.time_period.end = self.obj.stop
328 c.filters = [
329 AppliedFilter( # type: ignore[call-arg]
330 name=f"{self.lp_filter_base_name}_{ch.tag}_{int(ch.lp)}hz_lowpass",
331 applied=True,
332 stage=1,
333 ),
334 AppliedFilter( # type: ignore[call-arg]
335 name="v_to_mv",
336 applied=True,
337 stage=2,
338 ),
339 AppliedFilter( # type: ignore[call-arg]
340 name=f"dipole_{int(c.dipole_length)}m",
341 applied=True,
342 stage=3,
343 ),
344 ]
345 return c
347 def _to_magnetic_metadata(self, tag: str) -> Magnetic:
348 """
349 Convert Phoenix configuration to Magnetic channel metadata.
351 Parameters
352 ----------
353 tag : str
354 Channel tag (e.g., 'h1', 'h2', etc.).
356 Returns
357 -------
358 Magnetic
359 Configured magnetic channel metadata.
361 Raises
362 ------
363 AttributeError
364 If metadata object is not loaded or missing attributes.
365 ValueError
366 If channel tag is not found.
367 """
368 c = Magnetic() # type: ignore[call-arg]
370 if self.has_obj() and self.obj is not None:
371 ch = self.obj.chconfig.chans[self.get_ch_index(tag)]
373 c.channel_number = self.get_ch_index(tag)
374 c.units = "V"
375 c.time_period.start = self.obj.start
376 c.time_period.end = self.obj.stop
377 # Set manufacturer before processing other sensor attributes
378 c.sensor.manufacturer = "Phoenix Geophysics"
380 for p_key, m_value in self._h_map.items():
381 if p_key == "ty":
382 m_value = "magnetic"
383 try:
384 value = getattr(ch, p_key)
385 # Convert sensor.id from int to str if needed
386 if p_key == "serial" and isinstance(value, int):
387 value = str(value)
388 c.update_attribute(m_value, value)
389 except AttributeError:
390 self.logger.error(
391 f"recmeta.json does not contain attribute '{p_key}' for "
392 f"channel '{ch.tag}'."
393 )
395 # low pass filter of the receiver
396 c.filters = [
397 AppliedFilter( # type: ignore[call-arg]
398 name=f"{self.lp_filter_base_name}_{ch.tag}_{int(ch.lp)}hz_lowpass",
399 applied=True,
400 stage=1,
401 ),
402 AppliedFilter( # type: ignore[call-arg]
403 name="v_to_mv",
404 applied=True,
405 stage=2,
406 ),
407 ]
408 # Add coil response filter using the raw serial value
409 if hasattr(ch, "serial") and ch.serial is not None:
410 c.filters.append(
411 AppliedFilter(
412 name=f"coil_{ch.serial}_response",
413 applied=True,
414 stage=3,
415 comments=Comment(
416 author="", time_stamp="1980-01-01T00:00:00+00:00", value=""
417 ),
418 )
419 )
421 return c
423 # Channel metadata properties
424 @property
425 def e1_metadata(self) -> Electric:
426 """Electric channel 1 metadata."""
427 return self._to_electric_metadata("e1")
429 @property
430 def e2_metadata(self) -> Electric:
431 """Electric channel 2 metadata."""
432 return self._to_electric_metadata("e2")
434 @property
435 def h1_metadata(self) -> Magnetic:
436 """Magnetic channel 1 metadata."""
437 return self._to_magnetic_metadata("h1")
439 @property
440 def h2_metadata(self) -> Magnetic:
441 """Magnetic channel 2 metadata."""
442 return self._to_magnetic_metadata("h2")
444 @property
445 def h3_metadata(self) -> Magnetic:
446 """Magnetic channel 3 metadata."""
447 return self._to_magnetic_metadata("h3")
449 @property
450 def h4_metadata(self) -> Magnetic:
451 """Magnetic channel 4 metadata."""
452 return self._to_magnetic_metadata("h4")
454 @property
455 def h5_metadata(self) -> Magnetic:
456 """Magnetic channel 5 metadata."""
457 return self._to_magnetic_metadata("h5")
459 @property
460 def h6_metadata(self) -> Magnetic:
461 """Magnetic channel 6 metadata."""
462 return self._to_magnetic_metadata("h6")
464 def get_ch_metadata(self, index: int) -> Electric | Magnetic:
465 """
466 Get channel metadata from index.
468 Parameters
469 ----------
470 index : int
471 Channel index.
473 Returns
474 -------
475 Electric or Magnetic
476 Channel metadata object corresponding to the index.
478 Raises
479 ------
480 ValueError
481 If index is not found in channel map.
482 AttributeError
483 If the corresponding metadata property doesn't exist.
484 """
485 tag = self.get_ch_tag(index)
486 return getattr(self, f"{tag.lower()}_metadata")
488 # Metadata object properties
489 @property
490 def run_metadata(self) -> Run:
491 """
492 Run metadata from receiver configuration.
494 Returns
495 -------
496 Run
497 Run metadata object with data logger and timing information.
498 """
499 r = Run() # type: ignore[call-arg]
500 if self.has_obj() and self.obj is not None:
501 r.data_logger.type = self.obj.receiver_model
502 r.data_logger.model = self.obj.receiver_commercial_name
503 r.data_logger.firmware.version = self.obj.motherboard.mb_fw_ver
504 r.data_logger.timing_system.drift = self.obj.timing.tm_drift
505 return r
507 @property
508 def station_metadata(self) -> Station:
509 """
510 Station metadata from receiver configuration.
512 Returns
513 -------
514 Station
515 Station metadata object with location and acquisition information.
516 """
517 s = Station() # type: ignore[call-arg]
518 if self.has_obj() and self.obj is not None:
519 s.id = self.obj.layout.Station_Name.replace(" ", "_")
520 s.comments = self.obj.layout.Notes
521 try:
522 s.acquired_by.organization = self.obj.layout.Company_Name
523 except AttributeError:
524 pass
526 s.acquired_by.name = self.obj.layout.Operator # type: ignore[attr-defined]
527 s.location.latitude = self.obj.timing.gps_lat
528 s.location.longitude = self.obj.timing.gps_lon
529 s.location.elevation = self.obj.timing.gps_alt
530 return s
532 @property
533 def survey_metadata(self) -> Survey:
534 """
535 Survey metadata from receiver configuration.
537 Returns
538 -------
539 Survey
540 Survey metadata object with survey information.
541 """
542 s = Survey() # type: ignore[call-arg]
543 if self.has_obj() and self.obj is not None:
544 s.id = self.obj.layout.Survey_Name
545 return s