Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ base.py: 91%
217 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1"""
2Module to read and parse native Phoenix Geophysics data formats of the
3MTU-5C Family.
5This module implements Streamed readers for segmented-decimated continuus-decimated
6and native sampling rate time series formats of the MTU-5C family.
8:author: Jorge Torres-Solis
10Revised 2022 by J. Peacock
11"""
13# =============================================================================
14# Imports
15# =============================================================================
16from __future__ import annotations
18from pathlib import Path
19from typing import Any
21from loguru import logger
22from mt_metadata.timeseries.filters import ChannelResponse, CoefficientFilter
24from .calibrations import PhoenixCalibration
25from .config import PhoenixConfig
26from .header import Header
27from .receiver_metadata import PhoenixReceiverMetadata
30# =============================================================================
33class TSReaderBase(Header):
34 """
35 Generic reader that all other readers will inherit.
37 This base class provides common functionality for reading Phoenix Geophysics
38 time series data files, including header parsing, file sequence management,
39 and metadata handling.
41 Parameters
42 ----------
43 path : str or Path
44 Path to the time series file
45 num_files : int, optional
46 Number of files in the sequence, by default 1
47 header_length : int, optional
48 Length of file header in bytes, by default 128
49 report_hw_sat : bool, optional
50 Whether to report hardware saturation, by default False
51 **kwargs
52 Additional keyword arguments passed to parent Header class
54 Attributes
55 ----------
56 stream : BinaryIO or None
57 File stream for reading binary data
58 base_path : Path
59 Path to the current file
60 last_seq : int
61 Last sequence number in the file sequence
62 rx_metadata : PhoenixReceiverMetadata or None
63 Receiver metadata object
64 """
66 def __init__(
67 self,
68 path: str | Path,
69 num_files: int = 1,
70 header_length: int = 128,
71 report_hw_sat: bool = False,
72 **kwargs,
73 ) -> None:
74 self._seq = None
75 super().__init__(
76 header_length=header_length, report_hw_sat=report_hw_sat, **kwargs
77 )
79 self.logger = logger
80 self.base_path = path
81 self.last_seq = self.seq + num_files
82 self.stream = None
83 # Open the file passed as the first file in the sequence to stream
84 self._open_file(self.base_path)
85 if self._recording_id is None:
86 self.recording_id = self.base_path.stem.split("_")[1]
87 if self._channel_id is None:
88 self.channel_id = self.base_path.stem.split("_")[2]
90 self.rx_metadata = None
91 self.get_receiver_metadata_object()
93 if self.recmeta_file_path is not None:
94 self.update_channel_map_from_recmeta()
96 self._channel_metadata = None
98 @property
99 def base_path(self) -> Path:
100 """
101 Full path of the file.
103 Returns
104 -------
105 Path
106 Full path to the file
107 """
108 return self._base_path
110 @base_path.setter
111 def base_path(self, value: str | Path) -> None:
112 """
113 Set the full path to the file.
115 Parameters
116 ----------
117 value : str or Path
118 Full path to file
120 Raises
121 ------
122 TypeError
123 If value cannot be converted to a Path object
124 """
125 try:
126 self._base_path = Path(value)
127 except TypeError:
128 raise TypeError(f"Cannot set path from {value}, bad type {type(value)}")
130 @property
131 def base_dir(self) -> Path:
132 """
133 Parent directory of the file.
135 Returns
136 -------
137 Path
138 Parent directory of the file
139 """
140 return self.base_path.parent
142 @property
143 def file_name(self) -> str:
144 """
145 Name of the file.
147 Returns
148 -------
149 str
150 Name of the file
151 """
152 return self.base_path.name
154 @property
155 def file_extension(self) -> str:
156 """
157 File extension.
159 Returns
160 -------
161 str
162 File extension including the dot
163 """
164 return self.base_path.suffix
166 @property
167 def instrument_id(self) -> str:
168 """
169 Instrument ID extracted from filename.
171 Returns
172 -------
173 str
174 Instrument identifier
175 """
176 return self.base_path.stem.split("_")[0]
178 @property
179 def seq(self) -> int:
180 """
181 Sequence number of the file.
183 Returns
184 -------
185 int
186 Sequence number extracted from filename or set value
187 """
188 if self._seq is None:
189 return int(self.base_path.stem.split("_")[3], 16)
190 return self._seq
192 @seq.setter
193 def seq(self, value: int) -> None:
194 """
195 Set the sequence number.
197 Parameters
198 ----------
199 value : int
200 Sequence number
201 """
202 self._seq = int(value)
204 @property
205 def file_size(self) -> int:
206 """
207 File size in bytes.
209 Returns
210 -------
211 int
212 Size of the file in bytes
213 """
214 return self.base_path.stat().st_size
216 @property
217 def max_samples(self) -> int:
218 """
219 Maximum number of samples in a file.
221 Calculated as: (total number of bytes - header length) / frame size * n samples per frame
223 Returns
224 -------
225 int
226 Maximum number of samples in the file
227 """
228 return int((self.file_size - self.header_length) / 4)
230 @property
231 def sequence_list(self) -> list[Path]:
232 """
233 Get all the files in the sequence sorted by sequence number.
235 Returns
236 -------
237 list[Path]
238 List of Path objects for all files in the sequence
239 """
240 return sorted(list(self.base_dir.glob(f"*{self.file_extension}")))
242 @property
243 def config_file_path(self) -> Path | None:
244 """
245 Path to the config.json file.
247 Returns
248 -------
249 Path or None
250 Path to config file if it exists, None otherwise
251 """
252 if self.base_path is not None:
253 config_fn = self.base_path.parent.parent.joinpath("config.json")
254 if config_fn.exists():
255 return config_fn
256 else:
257 self.logger.warning("Could not find config file")
258 return None
260 @property
261 def recmeta_file_path(self) -> Path | None:
262 """
263 Path to the recmeta.json file.
265 Returns
266 -------
267 Path or None
268 Path to recmeta file if it exists, None otherwise
269 """
270 if self.base_path is not None:
271 recmeta_fn = self.base_path.parent.parent.joinpath("recmeta.json")
272 if recmeta_fn.exists():
273 return recmeta_fn
274 else:
275 self.logger.warning("Could not find recmeta file")
276 return None
278 def _open_file(self, filename: str | Path) -> bool:
279 """
280 Open a given file in 'rb' mode.
282 Parameters
283 ----------
284 filename : str or Path
285 Full path to file
287 Returns
288 -------
289 bool
290 True if the file is now open, False if it is not
291 """
292 filename = Path(filename)
294 if filename.exists():
295 self.logger.debug(f"Opening {filename}")
296 self.stream = open(filename, "rb")
297 self.unpack_header(self.stream)
298 return True
299 return False
301 def open_next(self) -> bool:
302 """
303 Open the next file in the sequence.
305 Returns
306 -------
307 bool
308 True if next file is now open, False if it is not
309 """
310 if self.stream is not None:
311 self.stream.close()
312 self.seq += 1
313 self.open_file_seq(self.seq)
314 if self.seq < self.last_seq:
315 new_path = self.sequence_list[self.seq - 1]
316 return self._open_file(new_path)
317 return False
319 def open_file_seq(self, file_seq_num: int | None = None) -> bool:
320 """
321 Open a file in the sequence given the sequence number.
323 Parameters
324 ----------
325 file_seq_num : int, optional
326 Sequence number to open, by default None
328 Returns
329 -------
330 bool
331 True if file is now open, False if it is not
332 """
333 if self.stream is not None:
334 self.stream.close()
335 if file_seq_num is not None:
336 self.seq = file_seq_num
337 new_path = self.sequence_list[self.seq - 1]
338 return self._open_file(new_path)
340 def close(self) -> None:
341 """
342 Close the file stream.
343 """
344 if self.stream is not None:
345 self.stream.close()
347 def get_config_object(self) -> PhoenixConfig | None:
348 """
349 Read a config file into an object.
351 Returns
352 -------
353 PhoenixConfig or None
354 Configuration object if config file exists, None otherwise
355 """
356 if self.config_file_path is not None:
357 return PhoenixConfig(self.config_file_path)
358 return None
360 def get_receiver_metadata_object(self) -> None:
361 """
362 Read recmeta.json into an object and store in rx_metadata attribute.
363 """
364 if self.recmeta_file_path is not None and self.rx_metadata is None:
365 self.rx_metadata = PhoenixReceiverMetadata(self.recmeta_file_path)
367 def get_lowpass_filter_name(self) -> str | None:
368 """
369 Get the lowpass filter used by the receiver pre-decimation.
371 Returns
372 -------
373 str or None
374 Name of the lowpass filter if available, None otherwise
375 """
376 if self.recmeta_file_path is not None and self.rx_metadata is not None:
377 return self.rx_metadata.obj.chconfig.chans[0].lp
378 return None
380 def update_channel_map_from_recmeta(self) -> None:
381 """
382 Update channel map from recmeta.json file.
383 """
384 if self.recmeta_file_path is not None and self.rx_metadata is not None:
385 self.channel_map = self.rx_metadata.channel_map
387 def _update_channel_metadata_from_recmeta(self) -> Any:
388 """
389 Get channel metadata from recmeta.json.
391 Returns
392 -------
393 Any
394 Channel metadata object updated with recmeta information
395 """
396 ch_metadata = self.get_channel_metadata()
397 if self.recmeta_file_path is not None and self.rx_metadata is not None:
398 rx_ch_metadata = self.rx_metadata.get_ch_metadata(self._channel_id)
399 ch_metadata.update(rx_ch_metadata)
400 ch_metadata.sample_rate = self.sample_rate
401 ch_metadata.time_period.start = self.recording_start_time
402 return ch_metadata
404 def _update_run_metadata_from_recmeta(self) -> Any:
405 """
406 Update run metadata from recmeta.json.
408 Returns
409 -------
410 Any
411 Run metadata object updated with recmeta information
412 """
413 run_metadata = self.get_run_metadata()
414 if self.recmeta_file_path is not None and self.rx_metadata is not None:
415 rx_run_metadata = self.rx_metadata.run_metadata
416 run_metadata.update(rx_run_metadata)
417 run_metadata.add_channel(self.channel_metadata)
418 run_metadata.update_time_period()
419 return run_metadata
421 def _update_station_metadata_from_recmeta(self) -> Any:
422 """
423 Update station metadata from recmeta.json.
425 Returns
426 -------
427 Any
428 Station metadata object updated with recmeta information
429 """
430 station_metadata = self.get_station_metadata()
431 if self.recmeta_file_path is not None and self.rx_metadata is not None:
432 rx_station_metadata = self.rx_metadata.station_metadata
433 station_metadata.update(rx_station_metadata)
434 station_metadata.add_run(self.run_metadata)
435 station_metadata.update_time_period()
436 return station_metadata
438 @property
439 def channel_metadata(self) -> Any:
440 """
441 Channel metadata updated from recmeta.
443 Returns
444 -------
445 Any
446 Channel metadata object
447 """
448 if self._channel_metadata is None:
449 return self._update_channel_metadata_from_recmeta()
450 return self._channel_metadata
452 @property
453 def run_metadata(self) -> Any:
454 """
455 Run metadata updated from recmeta.
457 Returns
458 -------
459 Any
460 Run metadata object
461 """
462 return self._update_run_metadata_from_recmeta()
464 @property
465 def station_metadata(self) -> Any:
466 """
467 Station metadata updated from recmeta.
469 Returns
470 -------
471 Any
472 Station metadata object
473 """
474 return self._update_station_metadata_from_recmeta()
476 def get_receiver_lowpass_filter(self, rxcal_fn: str | Path) -> Any:
477 """
478 Get receiver lowpass filter from the rxcal.json file.
480 Parameters
481 ----------
482 rxcal_fn : str or Path
483 Path to the receiver calibration file
485 Returns
486 -------
487 Any
488 Filter object from calibration file
490 Raises
491 ------
492 ValueError
493 If the lowpass filter name cannot be found
494 """
495 rx_cal_obj = PhoenixCalibration(rxcal_fn)
496 if rx_cal_obj._has_read():
497 lp_name = self.get_lowpass_filter_name()
498 if lp_name is None:
499 msg = (
500 f"Could not find {lp_name} for channel "
501 f"{self.channel_metadata.comp}"
502 )
503 self.logger.error(msg)
504 raise ValueError(msg)
506 return rx_cal_obj.get_filter(self.channel_metadata.component, lp_name)
507 else:
508 self.logger.error("Phoenix RX Calibration is None. Check file path")
509 return None
511 def get_dipole_filter(self) -> CoefficientFilter | None:
512 """
513 Get dipole filter for electric field channels.
515 Returns
516 -------
517 CoefficientFilter or None
518 Dipole filter if channel has dipole length, None otherwise
519 """
520 ch_metadata = self.channel_metadata.copy()
522 if hasattr(ch_metadata, "dipole_length"):
523 dp_filter = CoefficientFilter()
524 dp_filter.gain = ch_metadata.dipole_length / 1000
525 dp_filter.units_in = "milliVolt"
526 dp_filter.units_out = "milliVolt per kilometer"
528 # Support both newer mt_metadata API (filter_names) and older (filter.name)
529 if hasattr(ch_metadata, "filter_names"):
530 filter_names = ch_metadata.filter_names or []
531 elif hasattr(ch_metadata, "filter") and getattr(ch_metadata.filter, "name", None):
532 filter_names = [ch_metadata.filter.name]
533 else:
534 filter_names = []
536 for f_name in filter_names:
537 if "dipole" in f_name:
538 dp_filter.name = f_name
540 return dp_filter
541 return None
543 def get_sensor_filter(self, scal_fn: str | Path) -> Any:
544 """
545 Get sensor filter from calibration file.
547 Parameters
548 ----------
549 scal_fn : str or Path
550 Path to sensor calibration file
552 Returns
553 -------
554 Any
555 Sensor filter object
557 Notes
558 -----
559 This method is not implemented yet.
560 """
561 return None
563 def get_v_to_mv_filter(self) -> CoefficientFilter:
564 """
565 Create a filter to convert units from volts to millivolts.
567 Returns
568 -------
569 CoefficientFilter
570 Filter that converts volts to millivolts with gain of 1000
571 """
572 conversion = CoefficientFilter()
573 conversion.units_out = "mV"
574 conversion.units_in = "V"
575 conversion.name = "v_to_mv"
576 conversion.gain = 1e3
578 return conversion
580 def get_channel_response(
581 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None
582 ) -> ChannelResponse:
583 """
584 Get the channel response filter.
586 Parameters
587 ----------
588 rxcal_fn : str, Path or None, optional
589 Path to receiver calibration file, by default None
590 scal_fn : str, Path or None, optional
591 Path to sensor calibration file, by default None
593 Returns
594 -------
595 ChannelResponse
596 Complete channel response filter chain
597 """
598 ch_metadata = self.channel_metadata.copy()
600 filter_list = []
602 # Check if a lowpass filter already exists in metadata
603 has_lowpass = any("lowpass" in f.name for f in ch_metadata.filters)
605 if rxcal_fn is not None:
606 rx_filter = self.get_receiver_lowpass_filter(rxcal_fn)
607 if rx_filter is not None:
608 if has_lowpass:
609 # Update the filter name to match existing metadata filter name
610 existing_lowpass = next(
611 f for f in ch_metadata.filters if "lowpass" in f.name
612 )
613 rx_filter.name = existing_lowpass.name
614 self.logger.debug(
615 f"Using existing lowpass filter name: {existing_lowpass.name}"
616 )
617 filter_list.append(rx_filter)
619 filter_list.append(self.get_v_to_mv_filter())
621 if ch_metadata.type in ["magnetic"] and scal_fn is not None:
622 sensor_filter = self.get_sensor_filter(scal_fn)
623 if sensor_filter is not None:
624 filter_list.append(sensor_filter)
625 else:
626 self.logger.warning(
627 "Could not find Phoenix coil sensor calibration filter "
628 f"for channel {ch_metadata.comp}"
629 )
631 if ch_metadata.type in ["electric"]:
632 dipole_filter = self.get_dipole_filter()
633 if dipole_filter is not None:
634 filter_list.append(dipole_filter)
636 return ChannelResponse(filters_list=filter_list)