Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ native \ native_reader.py: 90%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Module to read and parse native Phoenix Geophysics data formats of the MTU-5C 

4Family. 

5 

6This module implements Streamed readers for segmented-decimated time series 

7formats of the MTU-5C family. 

8 

9:author: Jorge Torres-Solis 

10 

11Revised 2022 by J. Peacock 

12 

13""" 

14 

15# ============================================================================= 

16# Imports 

17# ============================================================================= 

18from __future__ import annotations 

19 

20from pathlib import Path 

21from struct import unpack, unpack_from 

22 

23import numpy as np 

24from numpy.lib.stride_tricks import as_strided 

25 

26from mth5.io.phoenix.readers import TSReaderBase 

27from mth5.timeseries import ChannelTS 

28 

29 

30AD_IN_AD_UNITS = 0 

31AD_INPUT_VOLTS = 1 

32INSTRUMENT_INPUT_VOLTS = 2 

33 

34# ============================================================================= 

35 

36 

37class NativeReader(TSReaderBase): 

38 """ 

39 Native sampling rate 'Raw' time series reader class. 

40 

41 This class reads native binary (.bin) files from Phoenix Geophysics MTU-5C 

42 instruments. The files are formatted with a header of 128 bytes followed by 

43 frames of 64 bytes each. Each frame contains 20 x 3-byte (24-bit) data 

44 points plus a 4-byte footer. 

45 

46 Parameters 

47 ---------- 

48 path : str or Path 

49 Path to the time series file 

50 num_files : int, optional 

51 Number of files in the sequence, by default 1 

52 scale_to : int, optional 

53 Data scaling mode (AD_IN_AD_UNITS, AD_INPUT_VOLTS, or 

54 INSTRUMENT_INPUT_VOLTS), by default AD_INPUT_VOLTS 

55 header_length : int, optional 

56 Length of file header in bytes, by default 128 

57 last_frame : int, optional 

58 Last frame number seen by the streamer, by default 0 

59 ad_plus_minus_range : float, optional 

60 ADC plus/minus range in volts, by default 5.0 

61 channel_type : str, optional 

62 Channel type identifier, by default "E" 

63 report_hw_sat : bool, optional 

64 Whether to report hardware saturation, by default False 

65 **kwargs 

66 Additional keyword arguments passed to parent TSReaderBase class 

67 

68 Attributes 

69 ---------- 

70 last_frame : int 

71 Last frame number processed 

72 data_scaling : int 

73 Current data scaling mode 

74 ad_plus_minus_range : float 

75 ADC voltage range 

76 input_plusminus_range : float 

77 Input voltage range after gain correction 

78 scale_factor : float 

79 Calculated scaling factor for data conversion 

80 footer_idx_samp_mask : int 

81 Bit mask for frame index in footer 

82 footer_sat_mask : int 

83 Bit mask for saturation count in footer 

84 """ 

85 

86 def __init__( 

87 self, 

88 path: str | Path, 

89 num_files: int = 1, 

90 scale_to: int = AD_INPUT_VOLTS, 

91 header_length: int = 128, 

92 last_frame: int = 0, 

93 ad_plus_minus_range: float = 5.0, 

94 channel_type: str = "E", 

95 report_hw_sat: bool = False, 

96 **kwargs, 

97 ) -> None: 

98 # Init the base class 

99 super().__init__(path, num_files, header_length, report_hw_sat, **kwargs) 

100 

101 self._chunk_size = 4096 

102 

103 # Track the last frame seen by the streamer, to report missing frames 

104 self.last_frame = last_frame 

105 self.header_length = header_length 

106 self.data_scaling = scale_to 

107 self.ad_plus_minus_range = ad_plus_minus_range 

108 

109 if header_length == 128: 

110 self.unpack_header(self.stream) 

111 # Now that we have the channel circuit-based gain (either form init 

112 # or from the header) 

113 # We can calculate the voltage range at the input of the board. 

114 self.input_plusminus_range = self._calculate_input_plusminus_range() 

115 

116 self.scale_factor = self._calculate_data_scaling() 

117 

118 # optimization variables 

119 self.footer_idx_samp_mask = int("0x0fffffff", 16) 

120 self.footer_sat_mask = int("0x70000000", 16) 

121 

122 def _calculate_input_plusminus_range(self) -> float: 

123 """ 

124 Calculate the correct input plusminus range from metadata. 

125 

126 Returns 

127 ------- 

128 float 

129 Input voltage range after accounting for total circuit gain 

130 """ 

131 return self.ad_plus_minus_range / self.total_circuitry_gain 

132 

133 def _calculate_data_scaling(self) -> float: 

134 """ 

135 Calculate the correct data scaling factor for the AD converter. 

136 

137 Returns 

138 ------- 

139 float 

140 Scaling factor for converting raw ADC values to physical units 

141 

142 Raises 

143 ------ 

144 LookupError 

145 If an invalid scaling mode is requested 

146 """ 

147 if self.data_scaling == AD_IN_AD_UNITS: 

148 return 256 

149 elif self.data_scaling == AD_INPUT_VOLTS: 

150 return self.ad_plus_minus_range / (2**31) 

151 elif self.data_scaling == INSTRUMENT_INPUT_VOLTS: 

152 return self.input_plusminus_range / (2**31) 

153 else: 

154 raise LookupError("Invalid scaling requested") 

155 

156 def read_frames(self, num_frames: int) -> np.ndarray: 

157 """ 

158 Read the given number of frames from the data stream. 

159 

160 Note 

161 ---- 

162 The seek position is not reset, so iterating this method will read 

163 from the last position in the stream. 

164 

165 Parameters 

166 ---------- 

167 num_frames : int 

168 Number of frames to read 

169 

170 Returns 

171 ------- 

172 np.ndarray 

173 Scaled data from the given number of frames with dtype float64 

174 """ 

175 frames_in_buf = 0 

176 _idx_buf = 0 

177 _data_buf = np.empty([num_frames * 20]) # 20 samples packed in a frame 

178 

179 while frames_in_buf < num_frames: 

180 if self.stream is not None: 

181 dataFrame = self.stream.read(64) 

182 else: 

183 return np.empty([0]) 

184 

185 if not dataFrame: 

186 if not self.open_next(): 

187 return np.empty([0]) 

188 if self.stream is not None: 

189 dataFrame = self.stream.read(64) 

190 if not dataFrame: 

191 return np.empty([0]) 

192 dataFooter = unpack_from("I", dataFrame, 60) 

193 

194 # Check that there are no skipped frames 

195 frameCount = dataFooter[0] & self.footer_idx_samp_mask 

196 difCount = frameCount - self.last_frame 

197 if difCount != 1: 

198 self.logger.warning( 

199 f"Ch [{self.channel_id}] Missing frames at {frameCount} " 

200 f"[{difCount}]" 

201 ) 

202 self.last_frame = frameCount 

203 

204 for ptrSamp in range(0, 60, 3): 

205 # unpack expects 4 bytes, but the frames are only 3? 

206 value = unpack(">i", dataFrame[ptrSamp : ptrSamp + 3] + b"\x00")[0] 

207 _data_buf[_idx_buf] = value * self.scale_factor 

208 _idx_buf += 1 

209 frames_in_buf += 1 

210 

211 if self.report_hw_sat: 

212 satCount = (dataFooter[0] & self.footer_sat_mask) >> 24 

213 if satCount: 

214 self.logger.warning( 

215 f"Ch [{self.channel_id}] Frame {frameCount} has {satCount} " 

216 "saturations" 

217 ) 

218 return _data_buf 

219 

220 @property 

221 def npts_per_frame(self) -> int: 

222 """ 

223 Get the number of data points per frame. 

224 

225 Returns 

226 ------- 

227 int 

228 Number of data points per frame (frame size - 4 footer bytes) / 3 bytes per sample 

229 """ 

230 if self.frame_size_bytes is not None: 

231 return int((self.frame_size_bytes - 4) / 3) 

232 return 20 # Default to 20 if frame_size_bytes is not available 

233 

234 def read(self) -> tuple[np.ndarray, np.ndarray]: 

235 """ 

236 Read the full data file using memory mapping and stride tricks. 

237 

238 Note 

239 ---- 

240 This uses numpy.lib.stride_tricks.as_strided which can be unstable 

241 if the bytes are not the correct length. See notes by numpy. 

242 

243 The solution is adapted from: 

244 https://stackoverflow.com/questions/12080279/how-do-i-create-a-numpy-dtype-that-includes-24-bit-integers 

245 

246 Returns 

247 ------- 

248 tuple[np.ndarray, np.ndarray] 

249 Scaled time series data and footer data as (data, footer) 

250 """ 

251 # should do a memory map otherwise things can go badly with as_strided 

252 raw_data = np.memmap(self.base_path, ">i1", mode="r") 

253 raw_data = raw_data[self.header_length :] 

254 

255 # trim of any extra bytes 

256 extra_bytes = raw_data.size % 64 

257 if extra_bytes != 0: 

258 self.logger.warning(f"found {extra_bytes} extra bits in file.") 

259 useable_bytes = raw_data.size - extra_bytes 

260 raw_data = raw_data[:useable_bytes] 

261 # This should now be the exact number of frames after trimming off 

262 # extra bytes. 

263 if self.frame_size_bytes is not None: 

264 n_frames = int(raw_data.size / self.frame_size_bytes) 

265 else: 

266 n_frames = int(raw_data.size / 64) # Default frame size 

267 

268 # reshape to (nframes, 64) 

269 frame_size = self.frame_size_bytes if self.frame_size_bytes is not None else 64 

270 raw_data = raw_data.reshape((n_frames, frame_size)) 

271 

272 # split the data from the footer 

273 ts = raw_data[:, 0 : self.npts_per_frame * 3].flatten() 

274 footer = raw_data[:, self.npts_per_frame * 3 :].flatten() 

275 

276 # get the number of raw byte frames 

277 raw_frames = int(ts.size / 12) 

278 

279 # stride over bytes making new 4 bytes for a 32bit integer and scale 

280 ts_data = ( 

281 as_strided( 

282 ts.view(np.int32), 

283 strides=(12, 3), 

284 shape=(raw_frames, 4), 

285 ) 

286 .flatten() 

287 .byteswap() 

288 * self.scale_factor 

289 ) 

290 # somehow the number is off by just a bit ~1E-7 V 

291 

292 # view the footer as an int32 

293 footer = footer.view(np.int32) 

294 

295 return ts_data, footer 

296 

297 def read_sequence( 

298 self, start: int = 0, end: int | None = None 

299 ) -> tuple[np.ndarray, np.ndarray]: 

300 """ 

301 Read a sequence of files into a single array. 

302 

303 Parameters 

304 ---------- 

305 start : int, optional 

306 Sequence start index, by default 0 

307 end : int or None, optional 

308 Sequence end index, by default None 

309 

310 Returns 

311 ------- 

312 tuple[np.ndarray, np.ndarray] 

313 Scaled time series data and footer data as (data, footer) 

314 - data: np.ndarray with dtype float32 

315 - footer: np.ndarray with dtype int32 

316 """ 

317 data = np.array([]) 

318 footer = np.array([]) 

319 for fn in self.sequence_list[slice(start, end)]: 

320 self._open_file(fn) 

321 if self.stream is not None: 

322 self.unpack_header(self.stream) 

323 ts, foot = self.read() 

324 data = np.append(data, ts) 

325 footer = np.append(footer, foot) 

326 return data, footer 

327 

328 def skip_frames(self, num_frames: int) -> bool: 

329 """ 

330 Skip frames in the data stream. 

331 

332 Parameters 

333 ---------- 

334 num_frames : int 

335 Number of frames to skip 

336 

337 Returns 

338 ------- 

339 bool 

340 True if skip completed successfully, False if end of file reached 

341 """ 

342 bytes_to_skip = int(num_frames * 64) 

343 # Python is dumb for seek and tell, it cannot tell us if a seek goes 

344 # past EOF so instead we need to do inefficient reads to skip bytes 

345 while bytes_to_skip > 0: 

346 if self.stream is not None: 

347 foo = self.stream.read(bytes_to_skip) 

348 local_read_size = len(foo) 

349 else: 

350 return False 

351 

352 # If we ran out of data in this file before finishing the skip, 

353 # open the next file and return false if there is no next file 

354 # to indicate that the skip ran out of 

355 # data before completion 

356 if local_read_size == 0: 

357 more_data = self.open_next() 

358 if not more_data: 

359 return False 

360 else: 

361 bytes_to_skip -= local_read_size 

362 # If we reached here we managed to skip all the data requested 

363 # return true 

364 self.last_frame += num_frames 

365 return True 

366 

367 def to_channel_ts( 

368 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None 

369 ) -> ChannelTS: 

370 """ 

371 Convert to a ChannelTS object. 

372 

373 Parameters 

374 ---------- 

375 rxcal_fn : str, Path or None, optional 

376 Path to receiver calibration file, by default None 

377 scal_fn : str, Path or None, optional 

378 Path to sensor calibration file, by default None 

379 

380 Returns 

381 ------- 

382 ChannelTS 

383 Channel time series object with data, metadata, and calibration 

384 """ 

385 data, footer = self.read_sequence() 

386 ch_metadata = self.channel_metadata 

387 return ChannelTS( 

388 channel_type=ch_metadata.type, 

389 data=data, 

390 channel_metadata=ch_metadata, 

391 run_metadata=self.run_metadata, 

392 station_metadata=self.station_metadata, 

393 channel_response=self.get_channel_response( 

394 rxcal_fn=rxcal_fn, scal_fn=scal_fn 

395 ), 

396 )