Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ header.py: 82%
544 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Adopted from TimeSeries reader, making all attributes properties for easier
4reading and testing.
6Module to read and parse native Phoenix Geophysics data formats of the MTU-5C Family
8This module implements Streamed readers for segmented-decimated continuus-decimated
9and native sampling rate time series formats of the MTU-5C family.
11:author: Jorge Torres-Solis
13Revised 2022 by J. Peacock
14"""
16# =============================================================================
17# Imports
18# =============================================================================
19from __future__ import annotations
21import string
22from struct import unpack_from
23from typing import Any, BinaryIO, TYPE_CHECKING
25from loguru import logger
26from mt_metadata.common.mttime import MTime
27from mt_metadata.timeseries import Electric, Magnetic, Run, Station
30if TYPE_CHECKING:
31 pass
33 from loguru import Logger
36# =============================================================================
37class Header:
38 """
39 Phoenix Geophysics MTU-5C binary header reader and parser.
41 This class reads and parses the 128-byte binary header from Phoenix
42 Geophysics MTU-5C data files. The header contains instrument configuration,
43 GPS location, timing information, and recording parameters essential for
44 proper data interpretation.
46 The header format is fixed at 128 bytes and contains information about:
47 - Instrument type and serial number
48 - Recording parameters (sample rate, channel configuration)
49 - GPS location and timing information
50 - Hardware configuration and gain settings
51 - Data quality metrics (saturated/missing frames)
53 Parameters
54 ----------
55 **kwargs : Any
56 Additional keyword arguments to set as instance attributes.
58 Attributes
59 ----------
60 logger : loguru.Logger
61 Logger instance for debugging and error reporting.
62 report_hw_sat : bool, default False
63 Flag to control hardware saturation reporting.
64 header_length : int, default 128
65 Length of the binary header in bytes.
66 ad_plus_minus_range : float, default 5.0
67 Differential voltage range of the A/D converter (board dependent).
68 channel_map : dict[int, str]
69 Mapping from channel IDs to channel names.
70 channel_azimuths : dict[str, int]
71 Mapping from channel names to azimuth angles in degrees.
73 Examples
74 --------
75 >>> with open("phoenix_data.bin", "rb") as f:
76 ... header = Header()
77 ... header.unpack_header(f)
78 ... print(f"Sample rate: {header.sample_rate}")
79 ... print(f"GPS location: {header.gps_lat}, {header.gps_long}")
80 """
82 def __init__(self, **kwargs: Any) -> None:
83 self.logger: Logger = logger
84 self.report_hw_sat: bool = False
85 self.header_length: int = 128
86 self.ad_plus_minus_range: float = 5.0 # differential voltage range that the A/D can measure (Board model dependent)
87 self._header: bytes | None = None
88 self._recording_id: int | None = None
89 self._channel_id: int | None = None
91 self.channel_map: dict[int, str] = {
92 0: "h1",
93 1: "h2",
94 2: "h3",
95 3: "e1",
96 4: "e2",
97 5: "h1",
98 6: "h2",
99 7: "h3",
100 }
102 self.channel_azimuths: dict[str, int] = {
103 "h1": 0,
104 "h2": 90,
105 "h3": 0,
106 "h4": 0,
107 "h5": 90,
108 "h6": 0,
109 "e1": 0,
110 "e2": 90,
111 }
113 for key, value in kwargs.items():
114 setattr(self, key, value)
116 self._unpack_dict: dict[str, dict[str, Any]] = {
117 "file_type": {"dtype": "B", "index": 0},
118 "file_version": {"dtype": "B", "index": 1},
119 "header_length": {"dtype": "H", "index": 2},
120 "instrument_type": {"dtype": "8s", "index": 4},
121 "instrument_serial_number": {"dtype": "cccccccc", "index": 12},
122 "recording_id": {"dtype": "I", "index": 20},
123 "channel_id": {"dtype": "B", "index": 24},
124 "file_sequence": {"dtype": "I", "index": 25},
125 "frag_period": {"dtype": "H", "index": 29},
126 "ch_board_model": {"dtype": "8s", "index": 31},
127 "ch_board_serial": {"dtype": "8s", "index": 39},
128 "ch_firmware": {"dtype": "I", "index": 47},
129 "hardware_configuration": {"dtype": "BBBBBBBB", "index": 51},
130 "sample_rate_base": {"dtype": "H", "index": 59},
131 "sample_rate_exp": {"dtype": "b", "index": 61},
132 "bytes_per_sample": {"dtype": "B", "index": 62},
133 "frame_size": {"dtype": "I", "index": 63},
134 "decimation_node_id": {"dtype": "H", "index": 67},
135 "frame_rollover_count": {"dtype": "H", "index": 69},
136 "gps_long": {"dtype": "f", "index": 71},
137 "gps_lat": {"dtype": "f", "index": 75},
138 "gps_elevation": {"dtype": "f", "index": 79},
139 "gps_horizontal_accuracy": {"dtype": "I", "index": 83},
140 "gps_vertical_accuracy": {"dtype": "I", "index": 87},
141 "timing_status": {"dtype": "BBH", "index": 91},
142 "future1": {"dtype": "b", "index": 95},
143 "future2": {"dtype": "i", "index": 97},
144 "saturated_frames": {"dtype": "H", "index": 101},
145 "missing_frames": {"dtype": "H", "index": 103},
146 "battery_voltage_mv": {"dtype": "H", "index": 105},
147 "min_signal": {"dtype": "f", "index": 107},
148 "max_signal": {"dtype": "f", "index": 111},
149 }
151 def __str__(self) -> str:
152 """String representation of the Header with key information."""
153 lines = [f"channel_id: {self.channel_id} channel_type: {self.channel_type}"]
154 lines += ["-" * 40]
155 for key in [
156 "instrument_type",
157 "instrument_serial_number",
158 "gps_lat",
159 "gps_long",
160 "gps_elevation",
161 "recording_start_time",
162 "sample_rate",
163 "saturated_frames",
164 "missing_frames",
165 "max_signal",
166 "min_signal",
167 ]:
168 lines.append(f"\t{key:<25}: {getattr(self, key)}")
169 return "\n".join(lines)
171 def __repr__(self) -> str:
172 """Detailed string representation of the Header."""
173 return self.__str__()
175 def _has_header(self) -> bool:
176 """
177 Check if header data has been loaded.
179 Returns
180 -------
181 bool
182 True if header data is loaded, False otherwise.
183 """
184 return self._header is not None
186 def _unpack_value(self, key: str) -> tuple[Any, ...] | None:
187 """
188 Unpack a value from the binary header using the unpack dictionary.
190 Parameters
191 ----------
192 key : str
193 The key in the unpack dictionary corresponding to the field to extract.
195 Returns
196 -------
197 tuple of Any or None
198 Unpacked values from the binary header, or None if no header loaded.
199 """
200 if self._has_header() and self._header is not None:
201 return unpack_from(
202 self._unpack_dict[key]["dtype"],
203 self._header,
204 self._unpack_dict[key]["index"],
205 )
206 return None
208 @property
209 def file_type(self) -> int | None:
210 """
211 File type indicator from binary header.
213 Returns
214 -------
215 int or None
216 File type identifier, or None if no header is loaded.
217 """
218 if self._has_header():
219 unpacked = self._unpack_value("file_type")
220 if unpacked is not None:
221 return unpacked[0]
222 return None
224 @property
225 def file_version(self) -> int | None:
226 """
227 File version from binary header.
229 Returns
230 -------
231 int or None
232 File version identifier, or None if no header is loaded.
233 """
234 if self._has_header():
235 unpacked = self._unpack_value("file_version")
236 if unpacked is not None:
237 return unpacked[0]
238 return None
240 @property
241 def header_length(self) -> int:
242 """
243 Length of the header in bytes.
245 Returns
246 -------
247 int
248 Header length in bytes.
249 """
250 if self._has_header():
251 unpacked = self._unpack_value("header_length")
252 if unpacked is not None:
253 self._header_length = unpacked[0]
254 return self._header_length
256 @header_length.setter
257 def header_length(self, value: int) -> None:
258 """Set header length."""
259 self._header_length = value
261 @property
262 def instrument_type(self) -> str | None:
263 """
264 Instrument type string from binary header.
266 Returns
267 -------
268 str or None
269 Cleaned instrument type string, or None if no header is loaded.
270 """
271 if self._has_header():
272 unpacked = self._unpack_value("instrument_type")
273 if unpacked is not None:
274 return unpacked[0].decode("utf-8").strip(" ").strip("\x00")
275 return None
277 @property
278 def instrument_serial_number(self) -> str | None:
279 """
280 Instrument serial number from binary header.
282 Returns
283 -------
284 str or None
285 Decoded instrument serial number, or None if no header is loaded.
286 """
287 if self._has_header():
288 unpacked = self._unpack_value("instrument_serial_number")
289 if unpacked is not None:
290 return b"".join(unpacked).strip(b"\x00").decode("utf-8")
291 return None
293 @property
294 def recording_id(self) -> int | None:
295 """
296 Recording identifier from binary header or cached value.
298 Returns
299 -------
300 int or None
301 Recording ID as integer, or None if not available.
302 """
303 if self._recording_id is None:
304 if self._has_header():
305 unpacked = self._unpack_value("recording_id")
306 if unpacked is not None:
307 return unpacked[0]
308 else:
309 return self._recording_id
310 return None
312 @recording_id.setter
313 def recording_id(self, value: str | int) -> None:
314 """
315 Set recording ID.
317 Parameters
318 ----------
319 value : str or int
320 Recording ID as hex string or integer.
321 """
322 if isinstance(value, str):
323 value = int(value, 16)
324 self._recording_id = value
326 @property
327 def recording_start_time(self) -> MTime | None:
328 """
329 Recording start time from GPS timestamp.
331 The actual data recording starts 1 second after the set start time.
332 This is caused by the data logger starting up and initializing filter.
333 This is taken care of in the segment start time.
335 See https://github.com/kujaku11/PhoenixGeoPy/tree/main/Docs for more
336 information.
338 The time recorded is GPS time.
340 Returns
341 -------
342 MTime or None
343 GPS start time, or None if recording ID is not available.
344 """
345 recording_id = self.recording_id
346 if recording_id is not None:
347 return MTime(time_stamp=recording_id, gps_time=True)
348 return None
350 @property
351 def channel_id(self) -> int | None:
352 """
353 Channel identifier from binary header or cached value.
355 Returns
356 -------
357 int or None
358 Channel ID, or None if not available.
359 """
360 if self._channel_id is None:
361 if self._has_header():
362 unpacked = self._unpack_value("channel_id")
363 if unpacked is not None:
364 return int(unpacked[0])
365 else:
366 return self._channel_id
367 return None
369 @channel_id.setter
370 def channel_id(self, value: int | str) -> None:
371 """
372 Set channel ID.
374 Parameters
375 ----------
376 value : int or str
377 Channel identifier.
378 """
379 self._channel_id = int(value)
381 @property
382 def file_sequence(self) -> int | None:
383 """
384 File sequence number from binary header.
386 Returns
387 -------
388 int or None
389 File sequence number, or None if no header is loaded.
390 """
391 if self._has_header():
392 unpacked = self._unpack_value("file_sequence")
393 if unpacked is not None:
394 return unpacked[0]
395 return None
397 @property
398 def frag_period(self) -> int | None:
399 """
400 Fragment period from binary header.
402 Returns
403 -------
404 int or None
405 Fragment period, or None if no header is loaded.
406 """
407 if self._has_header():
408 unpacked = self._unpack_value("frag_period")
409 if unpacked is not None:
410 return unpacked[0]
411 return None
413 @property
414 def ch_board_model(self) -> str | None:
415 """
416 Channel board model string from binary header.
418 Returns
419 -------
420 str or None
421 Board model string, or None if no header is loaded.
422 """
423 if self._has_header():
424 unpacked = self._unpack_value("ch_board_model")
425 if unpacked is not None:
426 return unpacked[0].decode("utf-8").strip(" ")
427 return None
429 @property
430 def board_model_main(self) -> str | None:
431 """
432 Main board model identifier.
434 Returns
435 -------
436 str or None
437 Main board model (first 5 characters), or None if not available.
438 """
439 ch_board_model = self.ch_board_model
440 if ch_board_model is not None:
441 return ch_board_model[0:5]
442 return None
444 @property
445 def board_model_revision(self) -> str | None:
446 """
447 Board model revision identifier.
449 Returns
450 -------
451 str or None
452 Board revision (character 6), or None if not available.
453 """
454 ch_board_model = self.ch_board_model
455 if ch_board_model is not None:
456 return ch_board_model[6:7] # Fixed slice to get single character
457 return None
459 @property
460 def ch_board_serial(self) -> int:
461 """
462 Channel board serial number from binary header.
464 Returns
465 -------
466 int
467 Board serial number as integer, or 0 if not available or invalid.
468 """
469 if self._has_header():
470 unpacked = self._unpack_value("ch_board_serial")
471 if unpacked is not None:
472 value = unpacked[0].decode("utf-8").strip("\x00")
473 # handle the case of backend < v0.14, which puts '--------' in ch_ser
474 if all(chars in string.hexdigits for chars in value):
475 return int(value, 16)
476 return 0
478 @property
479 def ch_firmware(self) -> int | None:
480 """
481 Channel firmware version from binary header.
483 Returns
484 -------
485 int or None
486 Firmware version, or None if no header is loaded.
487 """
488 if self._has_header():
489 unpacked = self._unpack_value("ch_firmware")
490 if unpacked is not None:
491 return unpacked[0]
492 return None
494 @property
495 def hardware_configuration(self) -> tuple[Any, ...] | None:
496 """
497 Hardware configuration bytes from binary header.
499 Returns
500 -------
501 tuple of Any or None
502 Hardware configuration data, or None if no header is loaded.
503 """
504 if self._has_header():
505 return self._unpack_value("hardware_configuration")
506 return None
508 @property
509 def channel_type(self) -> str | None:
510 """
511 Channel type determined from hardware configuration.
513 Returns
514 -------
515 str or None
516 'E' for electric, 'H' for magnetic, or None if no header.
517 """
518 if self._has_header():
519 hw_config = self.hardware_configuration
520 if hw_config is not None:
521 if hw_config[1] & 0x08 == 0x08:
522 return "E"
523 else:
524 return "H"
525 return None
527 @property
528 def detected_channel_type(self) -> str | None:
529 """
530 Channel type detected by electronics.
532 This normally matches channel_type, but used in electronics design and testing.
534 Returns
535 -------
536 str or None
537 'E' for electric, 'H' for magnetic, or None if no header.
538 """
539 if self._has_header():
540 hw_config = self.hardware_configuration
541 if hw_config is not None:
542 if hw_config[1] & 0x20 == 0x20:
543 return "E"
544 else:
545 return "H"
546 return None
548 @property
549 def lp_frequency(self) -> int | None:
550 """
551 Low-pass filter frequency based on hardware configuration.
553 Returns
554 -------
555 int or None
556 Filter frequency in Hz, or None if no header.
557 """
558 if self._has_header():
559 hw_config = self.hardware_configuration
560 board_main = self.board_model_main
561 if hw_config is not None:
562 # LPF on
563 if hw_config[0] & 0x80 == 0x80:
564 if hw_config[0] & 0x03 == 0x03:
565 return 10
566 elif hw_config[0] & 0x03 == 0x02:
567 if board_main == "BCM03" or board_main == "BCM06":
568 return 1000
569 else:
570 return 100
571 elif hw_config[0] & 0x03 == 0x01:
572 if board_main == "BCM03" or board_main == "BCM06":
573 return 10000
574 else:
575 return 1000
576 # LPF off
577 else:
578 if board_main == "BCM03" or board_main == "BCM06":
579 return 17800
580 else:
581 return 10000
582 return None
584 @property
585 def preamp_gain(self) -> float:
586 """
587 Pre-amplifier gain factor.
589 Returns
590 -------
591 float
592 Gain factor, default 1.0.
594 Raises
595 ------
596 Exception
597 If channel type is not determined before calculating gain.
598 """
599 preamp_gain = 1.0
600 if self._has_header():
601 channel_type = self.channel_type
602 if channel_type == "?" or channel_type is None:
603 raise Exception(
604 "Channel type must be set before attemting to calculate preamp gain"
605 )
606 hw_config = self.hardware_configuration
607 if hw_config is not None:
608 preamp_on = bool(hw_config[0] & 0x10)
609 if channel_type == "E":
610 if preamp_on:
611 board_main = self.board_model_main
612 board_revision = self.board_model_revision
613 if board_main == "BCM01" or board_main == "BCM03":
614 preamp_gain = 4.0
615 if board_revision == "L":
616 # Account for BCM01-L experimental prototype
617 preamp_gain = 8.0
618 else:
619 preamp_gain = 8.0
620 # Account for experimental prototype BCM05-A
621 ch_board_model = self.ch_board_model
622 if (
623 ch_board_model is not None
624 and ch_board_model[0:7] == "BCM05-A"
625 ):
626 preamp_gain = 4.0
627 return preamp_gain
629 @property
630 def channel_main_gain(self) -> float:
631 """
632 Main gain of the board.
634 Returns
635 -------
636 float
637 Main gain factor.
638 """
639 main_gain = 1.0
640 if self._has_header():
641 # BCM05-B and BCM06 introduced different selectable gains
642 new_gains = True # we assume any newer board will have the new gain banks
643 board_main = self.board_model_main
644 ch_board_model = self.ch_board_model
645 if board_main == "BCM01" or board_main == "BCM03":
646 # Original style 24 KSps boards and original 96 KSps boards
647 new_gains = False
648 if ch_board_model is not None and ch_board_model[0:7] == "BCM05-A":
649 # Account for experimental prototype BCM05-A, which also had original gain banks
650 new_gains = False
652 hw_config = self.hardware_configuration
653 if hw_config is not None:
654 if hw_config[0] & 0x0C == 0x00:
655 main_gain = 1.0
656 elif hw_config[0] & 0x0C == 0x04:
657 main_gain = 4.0
658 elif hw_config[0] & 0x0C == 0x08:
659 main_gain = 6.0
660 if not new_gains:
661 main_gain = 16.0
662 elif hw_config[0] & 0x0C == 0x0C:
663 main_gain = 8.0
664 if not new_gains:
665 main_gain = 32.0
666 return main_gain
668 @property
669 def intrinsic_circuitry_gain(self) -> float:
670 """
671 Intrinsic circuitry gain based on sensor range configuration.
673 This function adjusts the intrinsic circuitry gain based on the
674 sensor range configuration in the configuration fingerprint.
676 For the Electric channel, calibration path, or H-legacy
677 sensors all go through a 1/4 gain stage, and then they get a virtual x2 gain from
678 Single-ended-diff before the A/D. In the case of newer sensors (differential)
679 instead of a 1/4 gain stage, there is only a 1/2 gain stage.
681 Therefore, in the E, cal and legacy sensor case the circuitry gain is 1/2, while for
682 newer sensors it is 1.
684 Returns
685 -------
686 float
687 Intrinsic gain factor.
689 Raises
690 ------
691 Exception
692 If channel type is not determined before calculating gain.
694 Notes
695 -----
696 Circuitry Gain not directly configurable by the user.
697 """
698 intrinsic_circuitry_gain = 0.5
699 if self._has_header():
700 channel_type = self.channel_type
701 if channel_type == "?" or channel_type is None:
702 raise Exception(
703 "Channel type must be set before attemting to calculate preamp gain"
704 )
705 intrinsic_circuitry_gain = 0.5
706 if channel_type == "H":
707 hw_config = self.hardware_configuration
708 if hw_config is not None and hw_config[1] & 0x01 == 0x01:
709 intrinsic_circuitry_gain = 1.0
710 return intrinsic_circuitry_gain
712 @property
713 def attenuator_gain(self) -> float:
714 """
715 Attenuator gain factor.
717 Returns
718 -------
719 float
720 Attenuator gain factor, default 1.0.
722 Raises
723 ------
724 Exception
725 If channel type is not determined before calculating gain.
726 """
727 attenuator_gain = 1.0
728 if self._has_header():
729 channel_type = self.channel_type
730 if channel_type == "?" or channel_type is None:
731 raise Exception(
732 "Channel type must be set before attemting to calculate preamp gain"
733 )
734 hw_config = self.hardware_configuration
735 if hw_config is not None:
736 attenuator_on = bool(hw_config[4] & 0x01)
737 if attenuator_on and channel_type == "E":
738 new_attenuator = (
739 True # By default assume dealing with newer board types
740 )
741 board_main = self.board_model_main
742 ch_board_model = self.ch_board_model
743 if board_main == "BCM01" or board_main == "BCM03":
744 # Original style 24 KSps boards and original 96 KSps boards
745 new_attenuator = False
746 if ch_board_model is not None and ch_board_model[0:7] == "BCM05-A":
747 # Account for experimental prototype BCM05-A, which also had original gain banks
748 new_attenuator = False
749 if new_attenuator:
750 attenuator_gain = 523.0 / 5223.0
751 else:
752 attenuator_gain = 0.1
753 return attenuator_gain
755 @property
756 def total_selectable_gain(self) -> float:
757 """
758 Total gain that is selectable by the user.
760 Combines attenuator, preamp, and main channel gains.
762 Returns
763 -------
764 float
765 Total selectable gain factor.
766 """
767 if self._has_header():
768 return self.channel_main_gain * self.preamp_gain * self.attenuator_gain
769 return 1.0
771 @property
772 def total_circuitry_gain(self) -> float:
773 """
774 Total board gain including both intrinsic and user-selectable gains.
776 Returns
777 -------
778 float
779 Total circuitry gain factor.
780 """
781 if self._has_header():
782 return self.total_selectable_gain * self.intrinsic_circuitry_gain
783 return 0.5
785 @property
786 def sample_rate_base(self) -> int | None:
787 """
788 Base sample rate from binary header.
790 Returns
791 -------
792 int or None
793 Base sample rate, or None if no header.
794 """
795 if self._has_header():
796 unpacked = self._unpack_value("sample_rate_base")
797 if unpacked is not None:
798 return unpacked[0]
799 return None
801 @property
802 def sample_rate_exp(self) -> int | None:
803 """
804 Sample rate exponent from binary header.
806 Returns
807 -------
808 int or None
809 Sample rate exponent, or None if no header.
810 """
811 if self._has_header():
812 unpacked = self._unpack_value("sample_rate_exp")
813 if unpacked is not None:
814 return unpacked[0]
815 return None
817 @property
818 def sample_rate(self) -> float | None:
819 """
820 Calculated sample rate.
822 Returns
823 -------
824 float or None
825 Sample rate in Hz, or None if no header.
826 """
827 if self._has_header():
828 rate_base = self.sample_rate_base
829 rate_exp = self.sample_rate_exp
830 if rate_base is not None and rate_exp is not None:
831 if rate_exp != 0:
832 return rate_base * pow(10, rate_exp)
833 return float(rate_base)
834 return None
836 @property
837 def bytes_per_sample(self) -> int | None:
838 """
839 Number of bytes per sample.
841 Returns
842 -------
843 int or None
844 Bytes per sample, or None if no header.
845 """
846 if self._has_header():
847 unpacked = self._unpack_value("bytes_per_sample")
848 if unpacked is not None:
849 return unpacked[0]
850 return None
852 @property
853 def frame_size(self) -> int | None:
854 """
855 Frame size from binary header.
857 Returns
858 -------
859 int or None
860 Frame size value, or None if no header.
861 """
862 if self._has_header():
863 unpacked = self._unpack_value("frame_size")
864 if unpacked is not None:
865 return unpacked[0]
866 return None
868 @property
869 def data_footer(self) -> int | None:
870 """
871 Data footer extracted from frame size.
873 Returns
874 -------
875 int or None
876 Data footer value, or None if no frame size.
877 """
878 frame_size = self.frame_size
879 if frame_size is not None:
880 return frame_size >> 24
881 return None
883 @property
884 def frame_size_bytes(self) -> int | None:
885 """
886 Frame size in bytes.
888 Returns
889 -------
890 int or None
891 Frame size in bytes, or None if no frame size.
892 """
893 frame_size = self.frame_size
894 if frame_size is not None:
895 return frame_size & 0x0FFFFFF
896 return None
898 @property
899 def decimation_node_id(self) -> int | None:
900 """
901 Decimation node identifier.
903 Returns
904 -------
905 int or None
906 Decimation node ID, or None if no header.
907 """
908 if self._has_header():
909 unpacked = self._unpack_value("decimation_node_id")
910 if unpacked is not None:
911 return unpacked[0]
912 return None
914 @property
915 def frame_rollover_count(self) -> int | None:
916 """
917 Frame rollover count.
919 Returns
920 -------
921 int or None
922 Rollover count, or None if no header.
923 """
924 if self._has_header():
925 unpacked = self._unpack_value("frame_rollover_count")
926 if unpacked is not None:
927 return unpacked[0]
928 return None
930 @property
931 def gps_long(self) -> float | None:
932 """
933 GPS longitude.
935 Returns
936 -------
937 float or None
938 Longitude in degrees, or None if no header.
939 """
940 if self._has_header():
941 unpacked = self._unpack_value("gps_long")
942 if unpacked is not None:
943 return unpacked[0]
944 return None
946 @property
947 def gps_lat(self) -> float | None:
948 """
949 GPS latitude.
951 Returns
952 -------
953 float or None
954 Latitude in degrees, or None if no header.
955 """
956 if self._has_header():
957 unpacked = self._unpack_value("gps_lat")
958 if unpacked is not None:
959 return unpacked[0]
960 return None
962 @property
963 def gps_elevation(self) -> float | None:
964 """
965 GPS elevation.
967 Returns
968 -------
969 float or None
970 Elevation in meters, or None if no header.
971 """
972 if self._has_header():
973 unpacked = self._unpack_value("gps_elevation")
974 if unpacked is not None:
975 return unpacked[0]
976 return None
978 @property
979 def gps_horizontal_accuracy(self) -> float | None:
980 """
981 GPS horizontal accuracy.
983 Returns
984 -------
985 float or None
986 Horizontal accuracy in meters (converted from millimeters), or None if no header.
987 """
988 if self._has_header():
989 unpacked = self._unpack_value("gps_horizontal_accuracy")
990 if unpacked is not None:
991 return unpacked[0] / 1000
992 return None
994 @property
995 def gps_vertical_accuracy(self) -> float | None:
996 """
997 GPS vertical accuracy.
999 Returns
1000 -------
1001 float or None
1002 Vertical accuracy in meters (converted from millimeters), or None if no header.
1003 """
1004 if self._has_header():
1005 unpacked = self._unpack_value("gps_vertical_accuracy")
1006 if unpacked is not None:
1007 return unpacked[0] / 1000
1008 return None
1010 @property
1011 def timing_status(self) -> tuple[Any, ...] | None:
1012 """
1013 Timing status information.
1015 Returns
1016 -------
1017 tuple of Any or None
1018 Timing status data, or None if no header.
1019 """
1020 if self._has_header():
1021 return self._unpack_value("timing_status")
1022 return None
1024 @property
1025 def timing_flags(self) -> Any | None:
1026 """
1027 Timing flags from timing status.
1029 Returns
1030 -------
1031 Any or None
1032 Timing flags, or None if no timing status.
1033 """
1034 timing_status = self.timing_status
1035 if timing_status is not None:
1036 return timing_status[0]
1037 return None
1039 @property
1040 def timing_sat_count(self) -> Any | None:
1041 """
1042 Satellite count from timing status.
1044 Returns
1045 -------
1046 Any or None
1047 Satellite count, or None if no timing status.
1048 """
1049 timing_status = self.timing_status
1050 if timing_status is not None:
1051 return timing_status[1]
1052 return None
1054 @property
1055 def timing_stability(self) -> Any | None:
1056 """
1057 Timing stability from timing status.
1059 Returns
1060 -------
1061 Any or None
1062 Timing stability value, or None if no timing status.
1063 """
1064 timing_status = self.timing_status
1065 if timing_status is not None:
1066 return timing_status[2]
1067 return None
1069 @property
1070 def future1(self) -> Any | None:
1071 """
1072 Future field 1 (reserved).
1074 Returns
1075 -------
1076 Any or None
1077 Future field value, or None if no header.
1078 """
1079 if self._has_header():
1080 unpacked = self._unpack_value("future1")
1081 if unpacked is not None:
1082 return unpacked[0]
1083 return None
1085 @property
1086 def future2(self) -> Any | None:
1087 """
1088 Future field 2 (reserved).
1090 Returns
1091 -------
1092 Any or None
1093 Future field value, or None if no header.
1094 """
1095 if self._has_header():
1096 unpacked = self._unpack_value("future2")
1097 if unpacked is not None:
1098 return unpacked[0]
1099 return None
1101 @property
1102 def saturated_frames(self) -> int | None:
1103 """
1104 Number of saturated frames.
1106 Returns
1107 -------
1108 int or None
1109 Saturated frame count, or None if no header.
1110 """
1111 if self._has_header():
1112 unpacked = self._unpack_value("saturated_frames")
1113 if unpacked is not None:
1114 value = unpacked[0]
1115 if value & 0x80 == 0x80:
1116 value &= 0x7F
1117 value <<= 4
1118 return value
1119 return None
1121 @property
1122 def missing_frames(self) -> int | None:
1123 """
1124 Number of missing frames.
1126 Returns
1127 -------
1128 int or None
1129 Missing frame count, or None if no header.
1130 """
1131 if self._has_header():
1132 unpacked = self._unpack_value("missing_frames")
1133 if unpacked is not None:
1134 return unpacked[0]
1135 return None
1137 @property
1138 def battery_voltage_v(self) -> float | None:
1139 """
1140 Battery voltage in volts.
1142 Returns
1143 -------
1144 float or None
1145 Battery voltage in volts (converted from millivolts), or None if no header.
1146 """
1147 if self._has_header():
1148 unpacked = self._unpack_value("battery_voltage_mv")
1149 if unpacked is not None:
1150 return unpacked[0] / 1000
1151 return None
1153 @property
1154 def min_signal(self) -> Any | None:
1155 """
1156 Minimum signal value.
1158 Returns
1159 -------
1160 Any or None
1161 Minimum signal value, or None if no header.
1162 """
1163 if self._has_header():
1164 unpacked = self._unpack_value("min_signal")
1165 if unpacked is not None:
1166 return unpacked[0]
1167 return None
1169 @property
1170 def max_signal(self) -> Any | None:
1171 """
1172 Maximum signal value.
1174 Returns
1175 -------
1176 Any or None
1177 Maximum signal value, or None if no header.
1178 """
1179 if self._has_header():
1180 unpacked = self._unpack_value("max_signal")
1181 if unpacked is not None:
1182 return unpacked[0]
1183 return None
1185 def unpack_header(self, stream: BinaryIO) -> None:
1186 """
1187 Read and unpack binary header from stream.
1189 Parameters
1190 ----------
1191 stream : BinaryIO
1192 Binary stream to read header from.
1193 """
1194 if self.header_length > 0:
1195 # be sure to read from the beginning of the file
1196 stream.seek(0)
1197 self._header = stream.read(self.header_length)
1199 def get_channel_metadata(self) -> Magnetic | Electric:
1200 """
1201 Translate metadata to channel metadata.
1203 Returns
1204 -------
1205 Magnetic or Electric
1206 Channel metadata object populated with header data.
1208 Raises
1209 ------
1210 KeyError
1211 If channel ID is not found in channel map.
1212 ValueError
1213 If required fields are None or invalid.
1214 """
1215 channel_type = self.channel_type
1216 if channel_type is None:
1217 raise ValueError("Channel type not available")
1219 if channel_type.lower() in ["h"]:
1220 ch = Magnetic() # type: ignore[call-arg]
1221 gps_lat = self.gps_lat
1222 gps_long = self.gps_long
1223 gps_elevation = self.gps_elevation
1224 if gps_lat is not None:
1225 ch.location.latitude = gps_lat
1226 if gps_long is not None:
1227 ch.location.longitude = gps_long
1228 if gps_elevation is not None:
1229 ch.location.elevation = gps_elevation
1230 elif channel_type.lower() in ["e"]:
1231 ch = Electric() # type: ignore[call-arg]
1232 else:
1233 raise ValueError(f"Unknown channel type: {channel_type}")
1235 channel_id = self.channel_id
1236 if channel_id is not None:
1237 try:
1238 ch.component = self.channel_map[channel_id]
1239 except KeyError:
1240 self.logger.error(f"Could not find {channel_id} in channel_map")
1241 raise
1242 ch.channel_number = channel_id
1244 recording_start = self.recording_start_time
1245 if recording_start is not None:
1246 ch.time_period.start = recording_start
1248 sample_rate = self.sample_rate
1249 if sample_rate is not None:
1250 ch.sample_rate = sample_rate
1252 if hasattr(ch, "component") and ch.component:
1253 ch.measurement_azimuth = self.channel_azimuths[ch.component]
1255 return ch
1257 def get_run_metadata(self) -> Run:
1258 """
1259 Translate to run metadata.
1261 Returns
1262 -------
1263 Run
1264 Run metadata object populated with header data.
1266 Raises
1267 ------
1268 ValueError
1269 If required fields are None.
1270 """
1271 r = Run() # type: ignore[call-arg]
1273 instrument_type = self.instrument_type
1274 if instrument_type is not None:
1275 r.data_logger.type = instrument_type
1277 instrument_serial = self.instrument_serial_number
1278 if instrument_serial is not None:
1279 r.data_logger.id = instrument_serial
1281 r.data_logger.manufacturer = "Phoenix Geophysics"
1283 timing_stability = self.timing_stability
1284 if timing_stability is not None:
1285 r.data_logger.timing_system.uncertainty = timing_stability
1287 sample_rate = self.sample_rate
1288 if sample_rate is not None:
1289 r.sample_rate = sample_rate
1290 r.id = f"sr{int(sample_rate)}_0001"
1292 battery_voltage = self.battery_voltage_v
1293 if battery_voltage is not None:
1294 r.data_logger.power_source.voltage.start = battery_voltage
1296 channel_metadata = self.get_channel_metadata()
1297 r.channels.append(channel_metadata) # type: ignore[attr-defined]
1298 r.update_time_period()
1300 return r
1302 def get_station_metadata(self) -> Station:
1303 """
1304 Translate to station metadata.
1306 Returns
1307 -------
1308 Station
1309 Station metadata object populated with header data.
1310 """
1311 s = Station() # type: ignore[call-arg]
1313 gps_lat = self.gps_lat
1314 if gps_lat is not None:
1315 s.location.latitude = gps_lat
1317 gps_long = self.gps_long
1318 if gps_long is not None:
1319 s.location.longitude = gps_long
1321 gps_elevation = self.gps_elevation
1322 if gps_elevation is not None:
1323 s.location.elevation = gps_elevation
1325 run_metadata = self.get_run_metadata()
1326 s.runs.append(run_metadata) # type: ignore[attr-defined]
1327 s.update_time_period()
1329 return s