Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ segmented \ decimated_segmented_reader.py: 74%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Module to read and parse native Phoenix Geophysics data formats of the 

4MTU-5C Family 

5 

6This module implements Streamed readers for segmented-decimated time series 

7 formats of the MTU-5C family. 

8 

9:author: Jorge Torres-Solis 

10 

11Revised 2022 by J. Peacock 

12""" 

13 

14 

15from __future__ import annotations 

16 

17from pathlib import Path 

18from struct import unpack_from 

19from typing import Any, BinaryIO 

20 

21# ============================================================================= 

22# Imports 

23# ============================================================================= 

24import numpy as np 

25from mt_metadata.common.mttime import MTime 

26 

27from mth5.io.phoenix.readers import TSReaderBase 

28from mth5.timeseries import ChannelTS 

29 

30 

31# ============================================================================= 

32class SubHeader: 

33 """ 

34 Class for subheader of segmented files. 

35 

36 This class handles the parsing and access to subheader information in 

37 Phoenix Geophysics segmented time series files. The subheader contains 

38 metadata about each segment including timing, sample counts, and statistics. 

39 

40 Parameters 

41 ---------- 

42 **kwargs 

43 Arbitrary keyword arguments that are set as attributes 

44 

45 Attributes 

46 ---------- 

47 header_length : int 

48 Length of the subheader in bytes (32 bytes) 

49 _header : bytes or None 

50 Raw header bytes from the file 

51 _unpack_dict : dict 

52 Dictionary defining how to unpack different header fields 

53 """ 

54 

55 def __init__(self, **kwargs) -> None: 

56 self.header_length = 32 

57 self._header = None 

58 

59 for key, value in kwargs.items(): 

60 setattr(self, key, value) 

61 self._unpack_dict = { 

62 "gps_time_stamp": {"dtype": "I", "index": 0}, 

63 "n_samples": {"dtype": "I", "index": 4}, 

64 "saturation_count": {"dtype": "H", "index": 8}, 

65 "missing_count": {"dtype": "H", "index": 10}, 

66 "value_min": {"dtype": "f", "index": 12}, 

67 "value_max": {"dtype": "f", "index": 16}, 

68 "value_mean": {"dtype": "f", "index": 20}, 

69 } 

70 

71 def __str__(self) -> str: 

72 """String representation of the subheader information.""" 

73 lines = ["subheader information:", "-" * 30] 

74 for key in [ 

75 "gps_time_stamp", 

76 "n_samples", 

77 "saturation_count", 

78 "missing_count", 

79 "value_min", 

80 "value_max", 

81 "value_mean", 

82 ]: 

83 lines.append(f"\t{key:<25}: {getattr(self, key)}") 

84 return "\n".join(lines) 

85 

86 def __repr__(self) -> str: 

87 """String representation of the subheader.""" 

88 return self.__str__() 

89 

90 def _has_header(self) -> bool: 

91 """ 

92 Check if header data has been loaded. 

93 

94 Returns 

95 ------- 

96 bool 

97 True if header is loaded, False otherwise 

98 """ 

99 if self._header is not None: 

100 return True 

101 return False 

102 

103 def _unpack_value(self, key: str) -> tuple[Any, ...] | None: 

104 """ 

105 Unpack a value from the header bytes. 

106 

107 Parameters 

108 ---------- 

109 key : str 

110 Key name for the value to unpack 

111 

112 Returns 

113 ------- 

114 tuple or None 

115 Unpacked value tuple, or None if header not available 

116 

117 Raises 

118 ------ 

119 IOError 

120 If unpacking fails 

121 """ 

122 if self._has_header() and self._header is not None: 

123 try: 

124 return unpack_from( 

125 self._unpack_dict[key]["dtype"], 

126 self._header, 

127 self._unpack_dict[key]["index"], 

128 ) 

129 except Exception as error: 

130 raise IOError(error) 

131 return None 

132 

133 @property 

134 def gps_time_stamp(self) -> MTime | None: 

135 """ 

136 GPS time stamp in UTC. 

137 

138 Returns 

139 ------- 

140 MTime or None 

141 GPS timestamp if header is available, None otherwise 

142 """ 

143 if self._has_header(): 

144 value = self._unpack_value("gps_time_stamp") 

145 if value is not None: 

146 return MTime(time_stamp=value[0], gps_time=True) 

147 return None 

148 

149 @property 

150 def n_samples(self) -> int | None: 

151 """ 

152 Number of samples in the segment. 

153 

154 Returns 

155 ------- 

156 int or None 

157 Number of samples if header is available, None otherwise 

158 """ 

159 if self._has_header(): 

160 value = self._unpack_value("n_samples") 

161 if value is not None: 

162 return value[0] 

163 return None 

164 

165 @property 

166 def saturation_count(self) -> int | None: 

167 """ 

168 Number of saturated samples. 

169 

170 Returns 

171 ------- 

172 int or None 

173 Saturation count if header is available, None otherwise 

174 """ 

175 if self._has_header(): 

176 value = self._unpack_value("saturation_count") 

177 if value is not None: 

178 return value[0] 

179 return None 

180 

181 @property 

182 def missing_count(self) -> int | None: 

183 """ 

184 Number of missing samples. 

185 

186 Returns 

187 ------- 

188 int or None 

189 Missing sample count if header is available, None otherwise 

190 """ 

191 if self._has_header(): 

192 value = self._unpack_value("missing_count") 

193 if value is not None: 

194 return value[0] 

195 return None 

196 

197 @property 

198 def value_min(self) -> float | None: 

199 """ 

200 Minimum value in the segment. 

201 

202 Returns 

203 ------- 

204 float or None 

205 Minimum value if header is available, None otherwise 

206 """ 

207 if self._has_header(): 

208 value = self._unpack_value("value_min") 

209 if value is not None: 

210 return value[0] 

211 return None 

212 

213 @property 

214 def value_max(self) -> float | None: 

215 """ 

216 Maximum value in the segment. 

217 

218 Returns 

219 ------- 

220 float or None 

221 Maximum value if header is available, None otherwise 

222 """ 

223 if self._has_header(): 

224 value = self._unpack_value("value_max") 

225 if value is not None: 

226 return value[0] 

227 return None 

228 

229 @property 

230 def value_mean(self) -> float | None: 

231 """ 

232 Mean value in the segment. 

233 

234 Returns 

235 ------- 

236 float or None 

237 Mean value if header is available, None otherwise 

238 """ 

239 if self._has_header(): 

240 value = self._unpack_value("value_mean") 

241 if value is not None: 

242 return value[0] 

243 return None 

244 

245 def unpack_header(self, stream: BinaryIO) -> None: 

246 """ 

247 Unpack the header from a binary stream. 

248 

249 Parameters 

250 ---------- 

251 stream : BinaryIO 

252 Binary stream to read header from 

253 """ 

254 if self.header_length > 0: 

255 # be sure to read from the beginning of the file 

256 self._header = stream.read(self.header_length) 

257 else: 

258 return 

259 

260 

261class Segment(SubHeader): 

262 """ 

263 A segment class to hold a single segment. 

264 

265 This class represents a single time series segment with its associated 

266 metadata and data. It inherits from SubHeader to provide access to 

267 segment-specific header information. 

268 

269 Parameters 

270 ---------- 

271 stream : BinaryIO 

272 Binary file stream to read from 

273 **kwargs 

274 Additional keyword arguments passed to SubHeader 

275 

276 Attributes 

277 ---------- 

278 stream : BinaryIO 

279 The file stream for reading data 

280 data : np.ndarray or None 

281 Time series data for this segment 

282 """ 

283 

284 def __init__(self, stream: BinaryIO, **kwargs) -> None: 

285 super().__init__(**kwargs) 

286 self.stream = stream 

287 self.data: np.ndarray | None = None 

288 

289 def read_segment(self, metadata_only: bool = False) -> None: 

290 """ 

291 Read the segment data from the file stream. 

292 

293 Parameters 

294 ---------- 

295 metadata_only : bool, optional 

296 If True, only read metadata without loading data, by default False 

297 """ 

298 self.unpack_header(self.stream) 

299 if not metadata_only and self.n_samples is not None: 

300 self.data = np.fromfile(self.stream, dtype=np.float32, count=self.n_samples) 

301 

302 @property 

303 def segment_start_time(self) -> MTime | None: 

304 """ 

305 Get the segment start time. 

306 

307 Returns 

308 ------- 

309 MTime or None 

310 GPS timestamp of segment start, or None if not available 

311 """ 

312 return self.gps_time_stamp 

313 

314 @property 

315 def segment_end_time(self) -> MTime | None: 

316 """ 

317 Calculate the segment end time. 

318 

319 Returns 

320 ------- 

321 MTime or None 

322 Estimated end time based on start time, sample count and sample rate, 

323 or None if required information is not available 

324 """ 

325 start_time = self.segment_start_time 

326 if ( 

327 start_time is not None 

328 and self.n_samples is not None 

329 and hasattr(self, "sample_rate") 

330 ): 

331 return start_time + (self.n_samples / self.sample_rate) 

332 return None 

333 

334 

335class DecimatedSegmentedReader(TSReaderBase): 

336 """ 

337 Class to create a streamer for segmented decimated time series. 

338 

339 This reader handles segmented decimated time series files such as 'td_24k'. 

340 These files have sub headers containing metadata for each segment. 

341 

342 Parameters 

343 ---------- 

344 path : str or Path 

345 Path to the time series file 

346 num_files : int, optional 

347 Number of files in the sequence, by default 1 

348 report_hw_sat : bool, optional 

349 Whether to report hardware saturation, by default False 

350 **kwargs 

351 Additional keyword arguments passed to parent TSReaderBase class 

352 

353 Attributes 

354 ---------- 

355 sub_header : SubHeader 

356 SubHeader instance for parsing segment headers 

357 subheader : dict 

358 Dictionary for additional subheader information 

359 """ 

360 

361 def __init__( 

362 self, 

363 path: str | Path, 

364 num_files: int = 1, 

365 report_hw_sat: bool = False, 

366 **kwargs, 

367 ) -> None: 

368 # Init the base class 

369 super().__init__( 

370 path, 

371 num_files=num_files, 

372 header_length=128, 

373 report_hw_sat=report_hw_sat, 

374 **kwargs, 

375 ) 

376 

377 self._channel_metadata = self._update_channel_metadata_from_recmeta() 

378 self.sub_header = SubHeader() 

379 self.subheader = {} 

380 

381 def read_segment(self, metadata_only: bool = False) -> Segment: 

382 """ 

383 Read a single segment from the file. 

384 

385 Parameters 

386 ---------- 

387 metadata_only : bool, optional 

388 If True, only read metadata without loading data, by default False 

389 

390 Returns 

391 ------- 

392 Segment 

393 Segment object containing data and metadata 

394 

395 Raises 

396 ------ 

397 ValueError 

398 If stream is not available 

399 """ 

400 kwargs = { 

401 "instrument_type": self.instrument_type, 

402 "instrument_serial_number": self.instrument_serial_number, 

403 "latitude": self.gps_lat, 

404 "longitude": self.gps_long, 

405 "elevation": self.gps_elevation, 

406 "sample_rate": self.sample_rate, 

407 "channel_id": self.channel_id, 

408 "channel_type": self.channel_type, 

409 "segment": 0, 

410 } 

411 

412 if self.stream is None: 

413 raise ValueError("Stream is not available") 

414 

415 segment = Segment(self.stream, **kwargs) 

416 segment.read_segment(metadata_only=metadata_only) 

417 

418 return segment 

419 

420 def to_channel_ts( 

421 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None 

422 ) -> ChannelTS: 

423 """ 

424 Convert to a ChannelTS object. 

425 

426 Parameters 

427 ---------- 

428 rxcal_fn : str, Path or None, optional 

429 Path to receiver calibration file, by default None 

430 scal_fn : str, Path or None, optional 

431 Path to sensor calibration file, by default None 

432 

433 Returns 

434 ------- 

435 ChannelTS 

436 Channel time series object with data, metadata, and calibration 

437 """ 

438 segment = self.read_segment() 

439 ch_metadata = self.channel_metadata 

440 

441 # Set timing information if available 

442 if segment.segment_start_time is not None: 

443 ch_metadata.time_period.start = segment.segment_start_time.isoformat() 

444 if segment.segment_end_time is not None: 

445 ch_metadata.time_period.end = segment.segment_end_time.isoformat() 

446 

447 return ChannelTS( 

448 channel_type=ch_metadata.type, 

449 data=segment.data, 

450 channel_metadata=ch_metadata, 

451 run_metadata=self.run_metadata, 

452 station_metadata=self.station_metadata, 

453 channel_response=self.get_channel_response( 

454 rxcal_fn=rxcal_fn, scal_fn=scal_fn 

455 ), 

456 ) 

457 

458 

459class DecimatedSegmentCollection(TSReaderBase): 

460 """ 

461 Class to read multiple segments from a segmented decimated time series file. 

462 

463 This reader handles files containing multiple segments of decimated time series 

464 data such as 'td_24k'. Each segment has its own sub header with metadata. 

465 

466 Parameters 

467 ---------- 

468 path : str or Path 

469 Path to the time series file 

470 num_files : int, optional 

471 Number of files in the sequence, by default 1 

472 report_hw_sat : bool, optional 

473 Whether to report hardware saturation, by default False 

474 **kwargs 

475 Additional keyword arguments passed to parent TSReaderBase class 

476 

477 Attributes 

478 ---------- 

479 sub_header : SubHeader 

480 SubHeader instance for parsing segment headers 

481 subheader : dict 

482 Dictionary for additional subheader information 

483 """ 

484 

485 def __init__( 

486 self, 

487 path: str | Path, 

488 num_files: int = 1, 

489 report_hw_sat: bool = False, 

490 **kwargs, 

491 ) -> None: 

492 # Init the base class 

493 super().__init__( 

494 path, 

495 num_files=num_files, 

496 header_length=128, 

497 report_hw_sat=report_hw_sat, 

498 **kwargs, 

499 ) 

500 

501 if self.stream is not None: 

502 self.unpack_header(self.stream) 

503 self.sub_header = SubHeader() 

504 self.subheader = {} 

505 

506 def read_segments(self, metadata_only: bool = False) -> list[Segment]: 

507 """ 

508 Read all segments from the file. 

509 

510 Parameters 

511 ---------- 

512 metadata_only : bool, optional 

513 If True, only read metadata without loading data, by default False 

514 

515 Returns 

516 ------- 

517 list[Segment] 

518 List of Segment objects containing data and metadata 

519 

520 Raises 

521 ------ 

522 ValueError 

523 If stream is not available 

524 """ 

525 kwargs = { 

526 "instrument_type": self.instrument_type, 

527 "instrument_serial_number": self.instrument_serial_number, 

528 "latitude": self.gps_lat, 

529 "longitude": self.gps_long, 

530 "elevation": self.gps_elevation, 

531 "sample_rate": self.sample_rate, 

532 "channel_id": self.channel_id, 

533 "channel_type": self.channel_type, 

534 "segment": 0, 

535 } 

536 

537 if self.stream is None: 

538 raise ValueError("Stream is not available") 

539 

540 segments = [] 

541 count = 1 

542 while True: 

543 try: 

544 kwargs["segment"] = count 

545 segment = Segment(self.stream, **kwargs) 

546 segment.read_segment(metadata_only=metadata_only) 

547 segments.append(segment) 

548 count += 1 

549 except Exception: 

550 break 

551 self.logger.info(f"Read {count - 1} segments") 

552 

553 return segments 

554 

555 def to_channel_ts( 

556 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None 

557 ) -> list[ChannelTS]: 

558 """ 

559 Convert all segments to ChannelTS objects. 

560 

561 Parameters 

562 ---------- 

563 rxcal_fn : str, Path or None, optional 

564 Path to receiver calibration file, by default None 

565 scal_fn : str, Path or None, optional 

566 Path to sensor calibration file, by default None 

567 

568 Returns 

569 ------- 

570 list[ChannelTS] 

571 List of ChannelTS objects, one for each segment 

572 """ 

573 seq_list = [] 

574 for seq in self.read_segments(): 

575 ch_metadata = self.channel_metadata 

576 if seq.gps_time_stamp is not None: 

577 ch_metadata.time_period.start = seq.gps_time_stamp.isoformat() 

578 

579 seq_list.append( 

580 ChannelTS( 

581 channel_type=ch_metadata.type, 

582 data=seq.data, 

583 channel_metadata=ch_metadata, 

584 run_metadata=self.run_metadata, 

585 station_metadata=self.station_metadata, 

586 channel_response=self.get_channel_response( 

587 rxcal_fn=rxcal_fn, scal_fn=scal_fn 

588 ), 

589 ) 

590 ) 

591 return seq_list