Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ segmented \ decimated_segmented_reader.py: 74%
157 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Module to read and parse native Phoenix Geophysics data formats of the
4MTU-5C Family
6This module implements Streamed readers for segmented-decimated time series
7 formats of the MTU-5C family.
9:author: Jorge Torres-Solis
11Revised 2022 by J. Peacock
12"""
15from __future__ import annotations
17from pathlib import Path
18from struct import unpack_from
19from typing import Any, BinaryIO
21# =============================================================================
22# Imports
23# =============================================================================
24import numpy as np
25from mt_metadata.common.mttime import MTime
27from mth5.io.phoenix.readers import TSReaderBase
28from mth5.timeseries import ChannelTS
31# =============================================================================
32class SubHeader:
33 """
34 Class for subheader of segmented files.
36 This class handles the parsing and access to subheader information in
37 Phoenix Geophysics segmented time series files. The subheader contains
38 metadata about each segment including timing, sample counts, and statistics.
40 Parameters
41 ----------
42 **kwargs
43 Arbitrary keyword arguments that are set as attributes
45 Attributes
46 ----------
47 header_length : int
48 Length of the subheader in bytes (32 bytes)
49 _header : bytes or None
50 Raw header bytes from the file
51 _unpack_dict : dict
52 Dictionary defining how to unpack different header fields
53 """
55 def __init__(self, **kwargs) -> None:
56 self.header_length = 32
57 self._header = None
59 for key, value in kwargs.items():
60 setattr(self, key, value)
61 self._unpack_dict = {
62 "gps_time_stamp": {"dtype": "I", "index": 0},
63 "n_samples": {"dtype": "I", "index": 4},
64 "saturation_count": {"dtype": "H", "index": 8},
65 "missing_count": {"dtype": "H", "index": 10},
66 "value_min": {"dtype": "f", "index": 12},
67 "value_max": {"dtype": "f", "index": 16},
68 "value_mean": {"dtype": "f", "index": 20},
69 }
71 def __str__(self) -> str:
72 """String representation of the subheader information."""
73 lines = ["subheader information:", "-" * 30]
74 for key in [
75 "gps_time_stamp",
76 "n_samples",
77 "saturation_count",
78 "missing_count",
79 "value_min",
80 "value_max",
81 "value_mean",
82 ]:
83 lines.append(f"\t{key:<25}: {getattr(self, key)}")
84 return "\n".join(lines)
86 def __repr__(self) -> str:
87 """String representation of the subheader."""
88 return self.__str__()
90 def _has_header(self) -> bool:
91 """
92 Check if header data has been loaded.
94 Returns
95 -------
96 bool
97 True if header is loaded, False otherwise
98 """
99 if self._header is not None:
100 return True
101 return False
103 def _unpack_value(self, key: str) -> tuple[Any, ...] | None:
104 """
105 Unpack a value from the header bytes.
107 Parameters
108 ----------
109 key : str
110 Key name for the value to unpack
112 Returns
113 -------
114 tuple or None
115 Unpacked value tuple, or None if header not available
117 Raises
118 ------
119 IOError
120 If unpacking fails
121 """
122 if self._has_header() and self._header is not None:
123 try:
124 return unpack_from(
125 self._unpack_dict[key]["dtype"],
126 self._header,
127 self._unpack_dict[key]["index"],
128 )
129 except Exception as error:
130 raise IOError(error)
131 return None
133 @property
134 def gps_time_stamp(self) -> MTime | None:
135 """
136 GPS time stamp in UTC.
138 Returns
139 -------
140 MTime or None
141 GPS timestamp if header is available, None otherwise
142 """
143 if self._has_header():
144 value = self._unpack_value("gps_time_stamp")
145 if value is not None:
146 return MTime(time_stamp=value[0], gps_time=True)
147 return None
149 @property
150 def n_samples(self) -> int | None:
151 """
152 Number of samples in the segment.
154 Returns
155 -------
156 int or None
157 Number of samples if header is available, None otherwise
158 """
159 if self._has_header():
160 value = self._unpack_value("n_samples")
161 if value is not None:
162 return value[0]
163 return None
165 @property
166 def saturation_count(self) -> int | None:
167 """
168 Number of saturated samples.
170 Returns
171 -------
172 int or None
173 Saturation count if header is available, None otherwise
174 """
175 if self._has_header():
176 value = self._unpack_value("saturation_count")
177 if value is not None:
178 return value[0]
179 return None
181 @property
182 def missing_count(self) -> int | None:
183 """
184 Number of missing samples.
186 Returns
187 -------
188 int or None
189 Missing sample count if header is available, None otherwise
190 """
191 if self._has_header():
192 value = self._unpack_value("missing_count")
193 if value is not None:
194 return value[0]
195 return None
197 @property
198 def value_min(self) -> float | None:
199 """
200 Minimum value in the segment.
202 Returns
203 -------
204 float or None
205 Minimum value if header is available, None otherwise
206 """
207 if self._has_header():
208 value = self._unpack_value("value_min")
209 if value is not None:
210 return value[0]
211 return None
213 @property
214 def value_max(self) -> float | None:
215 """
216 Maximum value in the segment.
218 Returns
219 -------
220 float or None
221 Maximum value if header is available, None otherwise
222 """
223 if self._has_header():
224 value = self._unpack_value("value_max")
225 if value is not None:
226 return value[0]
227 return None
229 @property
230 def value_mean(self) -> float | None:
231 """
232 Mean value in the segment.
234 Returns
235 -------
236 float or None
237 Mean value if header is available, None otherwise
238 """
239 if self._has_header():
240 value = self._unpack_value("value_mean")
241 if value is not None:
242 return value[0]
243 return None
245 def unpack_header(self, stream: BinaryIO) -> None:
246 """
247 Unpack the header from a binary stream.
249 Parameters
250 ----------
251 stream : BinaryIO
252 Binary stream to read header from
253 """
254 if self.header_length > 0:
255 # be sure to read from the beginning of the file
256 self._header = stream.read(self.header_length)
257 else:
258 return
261class Segment(SubHeader):
262 """
263 A segment class to hold a single segment.
265 This class represents a single time series segment with its associated
266 metadata and data. It inherits from SubHeader to provide access to
267 segment-specific header information.
269 Parameters
270 ----------
271 stream : BinaryIO
272 Binary file stream to read from
273 **kwargs
274 Additional keyword arguments passed to SubHeader
276 Attributes
277 ----------
278 stream : BinaryIO
279 The file stream for reading data
280 data : np.ndarray or None
281 Time series data for this segment
282 """
284 def __init__(self, stream: BinaryIO, **kwargs) -> None:
285 super().__init__(**kwargs)
286 self.stream = stream
287 self.data: np.ndarray | None = None
289 def read_segment(self, metadata_only: bool = False) -> None:
290 """
291 Read the segment data from the file stream.
293 Parameters
294 ----------
295 metadata_only : bool, optional
296 If True, only read metadata without loading data, by default False
297 """
298 self.unpack_header(self.stream)
299 if not metadata_only and self.n_samples is not None:
300 self.data = np.fromfile(self.stream, dtype=np.float32, count=self.n_samples)
302 @property
303 def segment_start_time(self) -> MTime | None:
304 """
305 Get the segment start time.
307 Returns
308 -------
309 MTime or None
310 GPS timestamp of segment start, or None if not available
311 """
312 return self.gps_time_stamp
314 @property
315 def segment_end_time(self) -> MTime | None:
316 """
317 Calculate the segment end time.
319 Returns
320 -------
321 MTime or None
322 Estimated end time based on start time, sample count and sample rate,
323 or None if required information is not available
324 """
325 start_time = self.segment_start_time
326 if (
327 start_time is not None
328 and self.n_samples is not None
329 and hasattr(self, "sample_rate")
330 ):
331 return start_time + (self.n_samples / self.sample_rate)
332 return None
335class DecimatedSegmentedReader(TSReaderBase):
336 """
337 Class to create a streamer for segmented decimated time series.
339 This reader handles segmented decimated time series files such as 'td_24k'.
340 These files have sub headers containing metadata for each segment.
342 Parameters
343 ----------
344 path : str or Path
345 Path to the time series file
346 num_files : int, optional
347 Number of files in the sequence, by default 1
348 report_hw_sat : bool, optional
349 Whether to report hardware saturation, by default False
350 **kwargs
351 Additional keyword arguments passed to parent TSReaderBase class
353 Attributes
354 ----------
355 sub_header : SubHeader
356 SubHeader instance for parsing segment headers
357 subheader : dict
358 Dictionary for additional subheader information
359 """
361 def __init__(
362 self,
363 path: str | Path,
364 num_files: int = 1,
365 report_hw_sat: bool = False,
366 **kwargs,
367 ) -> None:
368 # Init the base class
369 super().__init__(
370 path,
371 num_files=num_files,
372 header_length=128,
373 report_hw_sat=report_hw_sat,
374 **kwargs,
375 )
377 self._channel_metadata = self._update_channel_metadata_from_recmeta()
378 self.sub_header = SubHeader()
379 self.subheader = {}
381 def read_segment(self, metadata_only: bool = False) -> Segment:
382 """
383 Read a single segment from the file.
385 Parameters
386 ----------
387 metadata_only : bool, optional
388 If True, only read metadata without loading data, by default False
390 Returns
391 -------
392 Segment
393 Segment object containing data and metadata
395 Raises
396 ------
397 ValueError
398 If stream is not available
399 """
400 kwargs = {
401 "instrument_type": self.instrument_type,
402 "instrument_serial_number": self.instrument_serial_number,
403 "latitude": self.gps_lat,
404 "longitude": self.gps_long,
405 "elevation": self.gps_elevation,
406 "sample_rate": self.sample_rate,
407 "channel_id": self.channel_id,
408 "channel_type": self.channel_type,
409 "segment": 0,
410 }
412 if self.stream is None:
413 raise ValueError("Stream is not available")
415 segment = Segment(self.stream, **kwargs)
416 segment.read_segment(metadata_only=metadata_only)
418 return segment
420 def to_channel_ts(
421 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None
422 ) -> ChannelTS:
423 """
424 Convert to a ChannelTS object.
426 Parameters
427 ----------
428 rxcal_fn : str, Path or None, optional
429 Path to receiver calibration file, by default None
430 scal_fn : str, Path or None, optional
431 Path to sensor calibration file, by default None
433 Returns
434 -------
435 ChannelTS
436 Channel time series object with data, metadata, and calibration
437 """
438 segment = self.read_segment()
439 ch_metadata = self.channel_metadata
441 # Set timing information if available
442 if segment.segment_start_time is not None:
443 ch_metadata.time_period.start = segment.segment_start_time.isoformat()
444 if segment.segment_end_time is not None:
445 ch_metadata.time_period.end = segment.segment_end_time.isoformat()
447 return ChannelTS(
448 channel_type=ch_metadata.type,
449 data=segment.data,
450 channel_metadata=ch_metadata,
451 run_metadata=self.run_metadata,
452 station_metadata=self.station_metadata,
453 channel_response=self.get_channel_response(
454 rxcal_fn=rxcal_fn, scal_fn=scal_fn
455 ),
456 )
459class DecimatedSegmentCollection(TSReaderBase):
460 """
461 Class to read multiple segments from a segmented decimated time series file.
463 This reader handles files containing multiple segments of decimated time series
464 data such as 'td_24k'. Each segment has its own sub header with metadata.
466 Parameters
467 ----------
468 path : str or Path
469 Path to the time series file
470 num_files : int, optional
471 Number of files in the sequence, by default 1
472 report_hw_sat : bool, optional
473 Whether to report hardware saturation, by default False
474 **kwargs
475 Additional keyword arguments passed to parent TSReaderBase class
477 Attributes
478 ----------
479 sub_header : SubHeader
480 SubHeader instance for parsing segment headers
481 subheader : dict
482 Dictionary for additional subheader information
483 """
485 def __init__(
486 self,
487 path: str | Path,
488 num_files: int = 1,
489 report_hw_sat: bool = False,
490 **kwargs,
491 ) -> None:
492 # Init the base class
493 super().__init__(
494 path,
495 num_files=num_files,
496 header_length=128,
497 report_hw_sat=report_hw_sat,
498 **kwargs,
499 )
501 if self.stream is not None:
502 self.unpack_header(self.stream)
503 self.sub_header = SubHeader()
504 self.subheader = {}
506 def read_segments(self, metadata_only: bool = False) -> list[Segment]:
507 """
508 Read all segments from the file.
510 Parameters
511 ----------
512 metadata_only : bool, optional
513 If True, only read metadata without loading data, by default False
515 Returns
516 -------
517 list[Segment]
518 List of Segment objects containing data and metadata
520 Raises
521 ------
522 ValueError
523 If stream is not available
524 """
525 kwargs = {
526 "instrument_type": self.instrument_type,
527 "instrument_serial_number": self.instrument_serial_number,
528 "latitude": self.gps_lat,
529 "longitude": self.gps_long,
530 "elevation": self.gps_elevation,
531 "sample_rate": self.sample_rate,
532 "channel_id": self.channel_id,
533 "channel_type": self.channel_type,
534 "segment": 0,
535 }
537 if self.stream is None:
538 raise ValueError("Stream is not available")
540 segments = []
541 count = 1
542 while True:
543 try:
544 kwargs["segment"] = count
545 segment = Segment(self.stream, **kwargs)
546 segment.read_segment(metadata_only=metadata_only)
547 segments.append(segment)
548 count += 1
549 except Exception:
550 break
551 self.logger.info(f"Read {count - 1} segments")
553 return segments
555 def to_channel_ts(
556 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None
557 ) -> list[ChannelTS]:
558 """
559 Convert all segments to ChannelTS objects.
561 Parameters
562 ----------
563 rxcal_fn : str, Path or None, optional
564 Path to receiver calibration file, by default None
565 scal_fn : str, Path or None, optional
566 Path to sensor calibration file, by default None
568 Returns
569 -------
570 list[ChannelTS]
571 List of ChannelTS objects, one for each segment
572 """
573 seq_list = []
574 for seq in self.read_segments():
575 ch_metadata = self.channel_metadata
576 if seq.gps_time_stamp is not None:
577 ch_metadata.time_period.start = seq.gps_time_stamp.isoformat()
579 seq_list.append(
580 ChannelTS(
581 channel_type=ch_metadata.type,
582 data=seq.data,
583 channel_metadata=ch_metadata,
584 run_metadata=self.run_metadata,
585 station_metadata=self.station_metadata,
586 channel_response=self.get_channel_response(
587 rxcal_fn=rxcal_fn, scal_fn=scal_fn
588 ),
589 )
590 )
591 return seq_list