Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ contiguous \ decimated_continuous_reader.py: 96%
57 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Module to read and parse native Phoenix Geophysics data formats of the
4MTU-5C Family.
6This module implements Streamed readers for decimated continuos time series
7formats of the MTU-5C family.
9:author: Jorge Torres-Solis
11Revised 2022 by J. Peacock
12"""
14# =============================================================================
15# Imports
16# =============================================================================
17from __future__ import annotations
19from pathlib import Path
20from typing import Any
22import numpy as np
23from mt_metadata.common.mttime import MTime
25from mth5.io.phoenix.readers import TSReaderBase
26from mth5.timeseries import ChannelTS
29# =============================================================================
32class DecimatedContinuousReader(TSReaderBase):
33 """
34 Class to create a streamer for continuous decimated time series.
36 This reader handles continuous decimated time series files such as 'td_150',
37 'td_30'. These files have no sub header information.
39 Parameters
40 ----------
41 path : str or Path
42 Path to the time series file
43 num_files : int, optional
44 Number of files in the sequence, by default 1
45 report_hw_sat : bool, optional
46 Whether to report hardware saturation, by default False
47 **kwargs
48 Additional keyword arguments passed to parent TSReaderBase class
50 Attributes
51 ----------
52 subheader : dict
53 Empty dictionary as these files have no sub header information
54 data_size : int or None
55 Size of the data sequence when read
56 """
58 def __init__(
59 self,
60 path: str | Path,
61 num_files: int = 1,
62 report_hw_sat: bool = False,
63 **kwargs,
64 ) -> None:
65 # Init the base class
66 super().__init__(
67 path,
68 num_files=num_files,
69 header_length=128,
70 report_hw_sat=report_hw_sat,
71 **kwargs,
72 )
74 self._channel_metadata = self._update_channel_metadata_from_recmeta()
75 self.subheader = {}
76 self._sequence_start = self.segment_start_time
77 self.data_size: int | None = None
79 @property
80 def segment_start_time(self) -> MTime:
81 """
82 Estimate the segment start time based on sequence number.
84 The first sequence starts 1 second later than the set start time due
85 to initiation within the data logger.
87 Returns
88 -------
89 MTime
90 Start time of the recording segment
91 """
92 if self.file_sequence == 1:
93 return self.recording_start_time + 1
94 else:
95 return self.recording_start_time + (
96 self.frag_period * (self.file_sequence - 1)
97 )
99 @property
100 def segment_end_time(self) -> MTime:
101 """
102 Estimate end time of the segment.
104 The first sequence starts 1 second later than the set start time due
105 to initiation within the data logger.
107 Returns
108 -------
109 MTime
110 Estimated end time from number of samples
111 """
112 return self.segment_start_time + (self.max_samples / self.sample_rate)
114 @property
115 def sequence_start(self) -> MTime:
116 """
117 Get the sequence start time.
119 Returns
120 -------
121 MTime
122 Start time of the sequence
123 """
124 return self._sequence_start
126 @sequence_start.setter
127 def sequence_start(self, value: Any) -> None:
128 """
129 Set the sequence start time.
131 Parameters
132 ----------
133 value : Any
134 Time stamp value that can be converted to MTime
135 """
136 self._sequence_start = MTime(time_stamp=value)
138 @property
139 def sequence_end(self) -> MTime:
140 """
141 Get the sequence end time.
143 Returns
144 -------
145 MTime
146 End time of the sequence based on data size or max samples
147 """
148 if self.data_size is not None:
149 return self.sequence_start + self.data_size / self.sample_rate
150 else:
151 return self.sequence_start + self.max_samples / self.sample_rate
153 # need a read and read sequence
154 def read(self) -> np.ndarray:
155 """
156 Read in the full data from the current file.
158 Returns
159 -------
160 np.ndarray
161 Single channel data array with dtype float32
162 """
163 if self.stream is not None:
164 self.stream.seek(self.header_length)
165 return np.fromfile(self.stream, dtype=np.float32)
166 return np.array([], dtype=np.float32)
168 def read_sequence(self, start: int = 0, end: int | None = None) -> np.ndarray:
169 """
170 Read a sequence of files.
172 Parameters
173 ----------
174 start : int, optional
175 Starting index in the sequence, by default 0
176 end : int or None, optional
177 Ending index in the sequence to read, by default None
179 Returns
180 -------
181 np.ndarray
182 Data within the given sequence range as float32 array
183 """
184 data = np.array([], dtype=np.float32)
185 for ii, fn in enumerate(self.sequence_list[slice(start, end)], start):
186 self._open_file(fn)
187 if self.stream is not None:
188 self.unpack_header(self.stream)
189 if ii == start:
190 self.sequence_start = self.segment_start_time
191 ts = self.read()
192 data = np.append(data, ts)
193 self.seq = ii
194 self.logger.debug(f"Read {self.seq + 1} sequences")
195 self.data_size = data.size
196 return data
198 def to_channel_ts(
199 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None
200 ) -> ChannelTS:
201 """
202 Convert to a ChannelTS object.
204 Parameters
205 ----------
206 rxcal_fn : str, Path or None, optional
207 Path to receiver calibration file, by default None
208 scal_fn : str, Path or None, optional
209 Path to sensor calibration file, by default None
211 Returns
212 -------
213 ChannelTS
214 Channel time series object with data, metadata, and calibration
215 """
216 data = self.read_sequence()
218 # need to update the start and end times here
219 self._channel_metadata.time_period.start = self.sequence_start
220 self._channel_metadata.time_period.end = self.sequence_end
222 return ChannelTS(
223 channel_type=self.channel_metadata.type,
224 data=data,
225 channel_metadata=self.channel_metadata,
226 run_metadata=self.run_metadata,
227 station_metadata=self.station_metadata,
228 channel_response=self.get_channel_response(
229 rxcal_fn=rxcal_fn, scal_fn=scal_fn
230 ),
231 )