Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ native \ native_reader.py: 90%
118 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Module to read and parse native Phoenix Geophysics data formats of the MTU-5C
4Family.
6This module implements Streamed readers for segmented-decimated time series
7formats of the MTU-5C family.
9:author: Jorge Torres-Solis
11Revised 2022 by J. Peacock
13"""
15# =============================================================================
16# Imports
17# =============================================================================
18from __future__ import annotations
20from pathlib import Path
21from struct import unpack, unpack_from
23import numpy as np
24from numpy.lib.stride_tricks import as_strided
26from mth5.io.phoenix.readers import TSReaderBase
27from mth5.timeseries import ChannelTS
30AD_IN_AD_UNITS = 0
31AD_INPUT_VOLTS = 1
32INSTRUMENT_INPUT_VOLTS = 2
34# =============================================================================
37class NativeReader(TSReaderBase):
38 """
39 Native sampling rate 'Raw' time series reader class.
41 This class reads native binary (.bin) files from Phoenix Geophysics MTU-5C
42 instruments. The files are formatted with a header of 128 bytes followed by
43 frames of 64 bytes each. Each frame contains 20 x 3-byte (24-bit) data
44 points plus a 4-byte footer.
46 Parameters
47 ----------
48 path : str or Path
49 Path to the time series file
50 num_files : int, optional
51 Number of files in the sequence, by default 1
52 scale_to : int, optional
53 Data scaling mode (AD_IN_AD_UNITS, AD_INPUT_VOLTS, or
54 INSTRUMENT_INPUT_VOLTS), by default AD_INPUT_VOLTS
55 header_length : int, optional
56 Length of file header in bytes, by default 128
57 last_frame : int, optional
58 Last frame number seen by the streamer, by default 0
59 ad_plus_minus_range : float, optional
60 ADC plus/minus range in volts, by default 5.0
61 channel_type : str, optional
62 Channel type identifier, by default "E"
63 report_hw_sat : bool, optional
64 Whether to report hardware saturation, by default False
65 **kwargs
66 Additional keyword arguments passed to parent TSReaderBase class
68 Attributes
69 ----------
70 last_frame : int
71 Last frame number processed
72 data_scaling : int
73 Current data scaling mode
74 ad_plus_minus_range : float
75 ADC voltage range
76 input_plusminus_range : float
77 Input voltage range after gain correction
78 scale_factor : float
79 Calculated scaling factor for data conversion
80 footer_idx_samp_mask : int
81 Bit mask for frame index in footer
82 footer_sat_mask : int
83 Bit mask for saturation count in footer
84 """
86 def __init__(
87 self,
88 path: str | Path,
89 num_files: int = 1,
90 scale_to: int = AD_INPUT_VOLTS,
91 header_length: int = 128,
92 last_frame: int = 0,
93 ad_plus_minus_range: float = 5.0,
94 channel_type: str = "E",
95 report_hw_sat: bool = False,
96 **kwargs,
97 ) -> None:
98 # Init the base class
99 super().__init__(path, num_files, header_length, report_hw_sat, **kwargs)
101 self._chunk_size = 4096
103 # Track the last frame seen by the streamer, to report missing frames
104 self.last_frame = last_frame
105 self.header_length = header_length
106 self.data_scaling = scale_to
107 self.ad_plus_minus_range = ad_plus_minus_range
109 if header_length == 128:
110 self.unpack_header(self.stream)
111 # Now that we have the channel circuit-based gain (either form init
112 # or from the header)
113 # We can calculate the voltage range at the input of the board.
114 self.input_plusminus_range = self._calculate_input_plusminus_range()
116 self.scale_factor = self._calculate_data_scaling()
118 # optimization variables
119 self.footer_idx_samp_mask = int("0x0fffffff", 16)
120 self.footer_sat_mask = int("0x70000000", 16)
122 def _calculate_input_plusminus_range(self) -> float:
123 """
124 Calculate the correct input plusminus range from metadata.
126 Returns
127 -------
128 float
129 Input voltage range after accounting for total circuit gain
130 """
131 return self.ad_plus_minus_range / self.total_circuitry_gain
133 def _calculate_data_scaling(self) -> float:
134 """
135 Calculate the correct data scaling factor for the AD converter.
137 Returns
138 -------
139 float
140 Scaling factor for converting raw ADC values to physical units
142 Raises
143 ------
144 LookupError
145 If an invalid scaling mode is requested
146 """
147 if self.data_scaling == AD_IN_AD_UNITS:
148 return 256
149 elif self.data_scaling == AD_INPUT_VOLTS:
150 return self.ad_plus_minus_range / (2**31)
151 elif self.data_scaling == INSTRUMENT_INPUT_VOLTS:
152 return self.input_plusminus_range / (2**31)
153 else:
154 raise LookupError("Invalid scaling requested")
156 def read_frames(self, num_frames: int) -> np.ndarray:
157 """
158 Read the given number of frames from the data stream.
160 Note
161 ----
162 The seek position is not reset, so iterating this method will read
163 from the last position in the stream.
165 Parameters
166 ----------
167 num_frames : int
168 Number of frames to read
170 Returns
171 -------
172 np.ndarray
173 Scaled data from the given number of frames with dtype float64
174 """
175 frames_in_buf = 0
176 _idx_buf = 0
177 _data_buf = np.empty([num_frames * 20]) # 20 samples packed in a frame
179 while frames_in_buf < num_frames:
180 if self.stream is not None:
181 dataFrame = self.stream.read(64)
182 else:
183 return np.empty([0])
185 if not dataFrame:
186 if not self.open_next():
187 return np.empty([0])
188 if self.stream is not None:
189 dataFrame = self.stream.read(64)
190 if not dataFrame:
191 return np.empty([0])
192 dataFooter = unpack_from("I", dataFrame, 60)
194 # Check that there are no skipped frames
195 frameCount = dataFooter[0] & self.footer_idx_samp_mask
196 difCount = frameCount - self.last_frame
197 if difCount != 1:
198 self.logger.warning(
199 f"Ch [{self.channel_id}] Missing frames at {frameCount} "
200 f"[{difCount}]"
201 )
202 self.last_frame = frameCount
204 for ptrSamp in range(0, 60, 3):
205 # unpack expects 4 bytes, but the frames are only 3?
206 value = unpack(">i", dataFrame[ptrSamp : ptrSamp + 3] + b"\x00")[0]
207 _data_buf[_idx_buf] = value * self.scale_factor
208 _idx_buf += 1
209 frames_in_buf += 1
211 if self.report_hw_sat:
212 satCount = (dataFooter[0] & self.footer_sat_mask) >> 24
213 if satCount:
214 self.logger.warning(
215 f"Ch [{self.channel_id}] Frame {frameCount} has {satCount} "
216 "saturations"
217 )
218 return _data_buf
220 @property
221 def npts_per_frame(self) -> int:
222 """
223 Get the number of data points per frame.
225 Returns
226 -------
227 int
228 Number of data points per frame (frame size - 4 footer bytes) / 3 bytes per sample
229 """
230 if self.frame_size_bytes is not None:
231 return int((self.frame_size_bytes - 4) / 3)
232 return 20 # Default to 20 if frame_size_bytes is not available
234 def read(self) -> tuple[np.ndarray, np.ndarray]:
235 """
236 Read the full data file using memory mapping and stride tricks.
238 Note
239 ----
240 This uses numpy.lib.stride_tricks.as_strided which can be unstable
241 if the bytes are not the correct length. See notes by numpy.
243 The solution is adapted from:
244 https://stackoverflow.com/questions/12080279/how-do-i-create-a-numpy-dtype-that-includes-24-bit-integers
246 Returns
247 -------
248 tuple[np.ndarray, np.ndarray]
249 Scaled time series data and footer data as (data, footer)
250 """
251 # should do a memory map otherwise things can go badly with as_strided
252 raw_data = np.memmap(self.base_path, ">i1", mode="r")
253 raw_data = raw_data[self.header_length :]
255 # trim of any extra bytes
256 extra_bytes = raw_data.size % 64
257 if extra_bytes != 0:
258 self.logger.warning(f"found {extra_bytes} extra bits in file.")
259 useable_bytes = raw_data.size - extra_bytes
260 raw_data = raw_data[:useable_bytes]
261 # This should now be the exact number of frames after trimming off
262 # extra bytes.
263 if self.frame_size_bytes is not None:
264 n_frames = int(raw_data.size / self.frame_size_bytes)
265 else:
266 n_frames = int(raw_data.size / 64) # Default frame size
268 # reshape to (nframes, 64)
269 frame_size = self.frame_size_bytes if self.frame_size_bytes is not None else 64
270 raw_data = raw_data.reshape((n_frames, frame_size))
272 # split the data from the footer
273 ts = raw_data[:, 0 : self.npts_per_frame * 3].flatten()
274 footer = raw_data[:, self.npts_per_frame * 3 :].flatten()
276 # get the number of raw byte frames
277 raw_frames = int(ts.size / 12)
279 # stride over bytes making new 4 bytes for a 32bit integer and scale
280 ts_data = (
281 as_strided(
282 ts.view(np.int32),
283 strides=(12, 3),
284 shape=(raw_frames, 4),
285 )
286 .flatten()
287 .byteswap()
288 * self.scale_factor
289 )
290 # somehow the number is off by just a bit ~1E-7 V
292 # view the footer as an int32
293 footer = footer.view(np.int32)
295 return ts_data, footer
297 def read_sequence(
298 self, start: int = 0, end: int | None = None
299 ) -> tuple[np.ndarray, np.ndarray]:
300 """
301 Read a sequence of files into a single array.
303 Parameters
304 ----------
305 start : int, optional
306 Sequence start index, by default 0
307 end : int or None, optional
308 Sequence end index, by default None
310 Returns
311 -------
312 tuple[np.ndarray, np.ndarray]
313 Scaled time series data and footer data as (data, footer)
314 - data: np.ndarray with dtype float32
315 - footer: np.ndarray with dtype int32
316 """
317 data = np.array([])
318 footer = np.array([])
319 for fn in self.sequence_list[slice(start, end)]:
320 self._open_file(fn)
321 if self.stream is not None:
322 self.unpack_header(self.stream)
323 ts, foot = self.read()
324 data = np.append(data, ts)
325 footer = np.append(footer, foot)
326 return data, footer
328 def skip_frames(self, num_frames: int) -> bool:
329 """
330 Skip frames in the data stream.
332 Parameters
333 ----------
334 num_frames : int
335 Number of frames to skip
337 Returns
338 -------
339 bool
340 True if skip completed successfully, False if end of file reached
341 """
342 bytes_to_skip = int(num_frames * 64)
343 # Python is dumb for seek and tell, it cannot tell us if a seek goes
344 # past EOF so instead we need to do inefficient reads to skip bytes
345 while bytes_to_skip > 0:
346 if self.stream is not None:
347 foo = self.stream.read(bytes_to_skip)
348 local_read_size = len(foo)
349 else:
350 return False
352 # If we ran out of data in this file before finishing the skip,
353 # open the next file and return false if there is no next file
354 # to indicate that the skip ran out of
355 # data before completion
356 if local_read_size == 0:
357 more_data = self.open_next()
358 if not more_data:
359 return False
360 else:
361 bytes_to_skip -= local_read_size
362 # If we reached here we managed to skip all the data requested
363 # return true
364 self.last_frame += num_frames
365 return True
367 def to_channel_ts(
368 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None
369 ) -> ChannelTS:
370 """
371 Convert to a ChannelTS object.
373 Parameters
374 ----------
375 rxcal_fn : str, Path or None, optional
376 Path to receiver calibration file, by default None
377 scal_fn : str, Path or None, optional
378 Path to sensor calibration file, by default None
380 Returns
381 -------
382 ChannelTS
383 Channel time series object with data, metadata, and calibration
384 """
385 data, footer = self.read_sequence()
386 ch_metadata = self.channel_metadata
387 return ChannelTS(
388 channel_type=ch_metadata.type,
389 data=data,
390 channel_metadata=ch_metadata,
391 run_metadata=self.run_metadata,
392 station_metadata=self.station_metadata,
393 channel_response=self.get_channel_response(
394 rxcal_fn=rxcal_fn, scal_fn=scal_fn
395 ),
396 )