Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ base.py: 91%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1""" 

2Module to read and parse native Phoenix Geophysics data formats of the 

3MTU-5C Family. 

4 

5This module implements Streamed readers for segmented-decimated continuus-decimated 

6and native sampling rate time series formats of the MTU-5C family. 

7 

8:author: Jorge Torres-Solis 

9 

10Revised 2022 by J. Peacock 

11""" 

12 

13# ============================================================================= 

14# Imports 

15# ============================================================================= 

16from __future__ import annotations 

17 

18from pathlib import Path 

19from typing import Any 

20 

21from loguru import logger 

22from mt_metadata.timeseries.filters import ChannelResponse, CoefficientFilter 

23 

24from .calibrations import PhoenixCalibration 

25from .config import PhoenixConfig 

26from .header import Header 

27from .receiver_metadata import PhoenixReceiverMetadata 

28 

29 

30# ============================================================================= 

31 

32 

33class TSReaderBase(Header): 

34 """ 

35 Generic reader that all other readers will inherit. 

36 

37 This base class provides common functionality for reading Phoenix Geophysics 

38 time series data files, including header parsing, file sequence management, 

39 and metadata handling. 

40 

41 Parameters 

42 ---------- 

43 path : str or Path 

44 Path to the time series file 

45 num_files : int, optional 

46 Number of files in the sequence, by default 1 

47 header_length : int, optional 

48 Length of file header in bytes, by default 128 

49 report_hw_sat : bool, optional 

50 Whether to report hardware saturation, by default False 

51 **kwargs 

52 Additional keyword arguments passed to parent Header class 

53 

54 Attributes 

55 ---------- 

56 stream : BinaryIO or None 

57 File stream for reading binary data 

58 base_path : Path 

59 Path to the current file 

60 last_seq : int 

61 Last sequence number in the file sequence 

62 rx_metadata : PhoenixReceiverMetadata or None 

63 Receiver metadata object 

64 """ 

65 

66 def __init__( 

67 self, 

68 path: str | Path, 

69 num_files: int = 1, 

70 header_length: int = 128, 

71 report_hw_sat: bool = False, 

72 **kwargs, 

73 ) -> None: 

74 self._seq = None 

75 super().__init__( 

76 header_length=header_length, report_hw_sat=report_hw_sat, **kwargs 

77 ) 

78 

79 self.logger = logger 

80 self.base_path = path 

81 self.last_seq = self.seq + num_files 

82 self.stream = None 

83 # Open the file passed as the first file in the sequence to stream 

84 self._open_file(self.base_path) 

85 if self._recording_id is None: 

86 self.recording_id = self.base_path.stem.split("_")[1] 

87 if self._channel_id is None: 

88 self.channel_id = self.base_path.stem.split("_")[2] 

89 

90 self.rx_metadata = None 

91 self.get_receiver_metadata_object() 

92 

93 if self.recmeta_file_path is not None: 

94 self.update_channel_map_from_recmeta() 

95 

96 self._channel_metadata = None 

97 

98 @property 

99 def base_path(self) -> Path: 

100 """ 

101 Full path of the file. 

102 

103 Returns 

104 ------- 

105 Path 

106 Full path to the file 

107 """ 

108 return self._base_path 

109 

110 @base_path.setter 

111 def base_path(self, value: str | Path) -> None: 

112 """ 

113 Set the full path to the file. 

114 

115 Parameters 

116 ---------- 

117 value : str or Path 

118 Full path to file 

119 

120 Raises 

121 ------ 

122 TypeError 

123 If value cannot be converted to a Path object 

124 """ 

125 try: 

126 self._base_path = Path(value) 

127 except TypeError: 

128 raise TypeError(f"Cannot set path from {value}, bad type {type(value)}") 

129 

130 @property 

131 def base_dir(self) -> Path: 

132 """ 

133 Parent directory of the file. 

134 

135 Returns 

136 ------- 

137 Path 

138 Parent directory of the file 

139 """ 

140 return self.base_path.parent 

141 

142 @property 

143 def file_name(self) -> str: 

144 """ 

145 Name of the file. 

146 

147 Returns 

148 ------- 

149 str 

150 Name of the file 

151 """ 

152 return self.base_path.name 

153 

154 @property 

155 def file_extension(self) -> str: 

156 """ 

157 File extension. 

158 

159 Returns 

160 ------- 

161 str 

162 File extension including the dot 

163 """ 

164 return self.base_path.suffix 

165 

166 @property 

167 def instrument_id(self) -> str: 

168 """ 

169 Instrument ID extracted from filename. 

170 

171 Returns 

172 ------- 

173 str 

174 Instrument identifier 

175 """ 

176 return self.base_path.stem.split("_")[0] 

177 

178 @property 

179 def seq(self) -> int: 

180 """ 

181 Sequence number of the file. 

182 

183 Returns 

184 ------- 

185 int 

186 Sequence number extracted from filename or set value 

187 """ 

188 if self._seq is None: 

189 return int(self.base_path.stem.split("_")[3], 16) 

190 return self._seq 

191 

192 @seq.setter 

193 def seq(self, value: int) -> None: 

194 """ 

195 Set the sequence number. 

196 

197 Parameters 

198 ---------- 

199 value : int 

200 Sequence number 

201 """ 

202 self._seq = int(value) 

203 

204 @property 

205 def file_size(self) -> int: 

206 """ 

207 File size in bytes. 

208 

209 Returns 

210 ------- 

211 int 

212 Size of the file in bytes 

213 """ 

214 return self.base_path.stat().st_size 

215 

216 @property 

217 def max_samples(self) -> int: 

218 """ 

219 Maximum number of samples in a file. 

220 

221 Calculated as: (total number of bytes - header length) / frame size * n samples per frame 

222 

223 Returns 

224 ------- 

225 int 

226 Maximum number of samples in the file 

227 """ 

228 return int((self.file_size - self.header_length) / 4) 

229 

230 @property 

231 def sequence_list(self) -> list[Path]: 

232 """ 

233 Get all the files in the sequence sorted by sequence number. 

234 

235 Returns 

236 ------- 

237 list[Path] 

238 List of Path objects for all files in the sequence 

239 """ 

240 return sorted(list(self.base_dir.glob(f"*{self.file_extension}"))) 

241 

242 @property 

243 def config_file_path(self) -> Path | None: 

244 """ 

245 Path to the config.json file. 

246 

247 Returns 

248 ------- 

249 Path or None 

250 Path to config file if it exists, None otherwise 

251 """ 

252 if self.base_path is not None: 

253 config_fn = self.base_path.parent.parent.joinpath("config.json") 

254 if config_fn.exists(): 

255 return config_fn 

256 else: 

257 self.logger.warning("Could not find config file") 

258 return None 

259 

260 @property 

261 def recmeta_file_path(self) -> Path | None: 

262 """ 

263 Path to the recmeta.json file. 

264 

265 Returns 

266 ------- 

267 Path or None 

268 Path to recmeta file if it exists, None otherwise 

269 """ 

270 if self.base_path is not None: 

271 recmeta_fn = self.base_path.parent.parent.joinpath("recmeta.json") 

272 if recmeta_fn.exists(): 

273 return recmeta_fn 

274 else: 

275 self.logger.warning("Could not find recmeta file") 

276 return None 

277 

278 def _open_file(self, filename: str | Path) -> bool: 

279 """ 

280 Open a given file in 'rb' mode. 

281 

282 Parameters 

283 ---------- 

284 filename : str or Path 

285 Full path to file 

286 

287 Returns 

288 ------- 

289 bool 

290 True if the file is now open, False if it is not 

291 """ 

292 filename = Path(filename) 

293 

294 if filename.exists(): 

295 self.logger.debug(f"Opening {filename}") 

296 self.stream = open(filename, "rb") 

297 self.unpack_header(self.stream) 

298 return True 

299 return False 

300 

301 def open_next(self) -> bool: 

302 """ 

303 Open the next file in the sequence. 

304 

305 Returns 

306 ------- 

307 bool 

308 True if next file is now open, False if it is not 

309 """ 

310 if self.stream is not None: 

311 self.stream.close() 

312 self.seq += 1 

313 self.open_file_seq(self.seq) 

314 if self.seq < self.last_seq: 

315 new_path = self.sequence_list[self.seq - 1] 

316 return self._open_file(new_path) 

317 return False 

318 

319 def open_file_seq(self, file_seq_num: int | None = None) -> bool: 

320 """ 

321 Open a file in the sequence given the sequence number. 

322 

323 Parameters 

324 ---------- 

325 file_seq_num : int, optional 

326 Sequence number to open, by default None 

327 

328 Returns 

329 ------- 

330 bool 

331 True if file is now open, False if it is not 

332 """ 

333 if self.stream is not None: 

334 self.stream.close() 

335 if file_seq_num is not None: 

336 self.seq = file_seq_num 

337 new_path = self.sequence_list[self.seq - 1] 

338 return self._open_file(new_path) 

339 

340 def close(self) -> None: 

341 """ 

342 Close the file stream. 

343 """ 

344 if self.stream is not None: 

345 self.stream.close() 

346 

347 def get_config_object(self) -> PhoenixConfig | None: 

348 """ 

349 Read a config file into an object. 

350 

351 Returns 

352 ------- 

353 PhoenixConfig or None 

354 Configuration object if config file exists, None otherwise 

355 """ 

356 if self.config_file_path is not None: 

357 return PhoenixConfig(self.config_file_path) 

358 return None 

359 

360 def get_receiver_metadata_object(self) -> None: 

361 """ 

362 Read recmeta.json into an object and store in rx_metadata attribute. 

363 """ 

364 if self.recmeta_file_path is not None and self.rx_metadata is None: 

365 self.rx_metadata = PhoenixReceiverMetadata(self.recmeta_file_path) 

366 

367 def get_lowpass_filter_name(self) -> str | None: 

368 """ 

369 Get the lowpass filter used by the receiver pre-decimation. 

370 

371 Returns 

372 ------- 

373 str or None 

374 Name of the lowpass filter if available, None otherwise 

375 """ 

376 if self.recmeta_file_path is not None and self.rx_metadata is not None: 

377 return self.rx_metadata.obj.chconfig.chans[0].lp 

378 return None 

379 

380 def update_channel_map_from_recmeta(self) -> None: 

381 """ 

382 Update channel map from recmeta.json file. 

383 """ 

384 if self.recmeta_file_path is not None and self.rx_metadata is not None: 

385 self.channel_map = self.rx_metadata.channel_map 

386 

387 def _update_channel_metadata_from_recmeta(self) -> Any: 

388 """ 

389 Get channel metadata from recmeta.json. 

390 

391 Returns 

392 ------- 

393 Any 

394 Channel metadata object updated with recmeta information 

395 """ 

396 ch_metadata = self.get_channel_metadata() 

397 if self.recmeta_file_path is not None and self.rx_metadata is not None: 

398 rx_ch_metadata = self.rx_metadata.get_ch_metadata(self._channel_id) 

399 ch_metadata.update(rx_ch_metadata) 

400 ch_metadata.sample_rate = self.sample_rate 

401 ch_metadata.time_period.start = self.recording_start_time 

402 return ch_metadata 

403 

404 def _update_run_metadata_from_recmeta(self) -> Any: 

405 """ 

406 Update run metadata from recmeta.json. 

407 

408 Returns 

409 ------- 

410 Any 

411 Run metadata object updated with recmeta information 

412 """ 

413 run_metadata = self.get_run_metadata() 

414 if self.recmeta_file_path is not None and self.rx_metadata is not None: 

415 rx_run_metadata = self.rx_metadata.run_metadata 

416 run_metadata.update(rx_run_metadata) 

417 run_metadata.add_channel(self.channel_metadata) 

418 run_metadata.update_time_period() 

419 return run_metadata 

420 

421 def _update_station_metadata_from_recmeta(self) -> Any: 

422 """ 

423 Update station metadata from recmeta.json. 

424 

425 Returns 

426 ------- 

427 Any 

428 Station metadata object updated with recmeta information 

429 """ 

430 station_metadata = self.get_station_metadata() 

431 if self.recmeta_file_path is not None and self.rx_metadata is not None: 

432 rx_station_metadata = self.rx_metadata.station_metadata 

433 station_metadata.update(rx_station_metadata) 

434 station_metadata.add_run(self.run_metadata) 

435 station_metadata.update_time_period() 

436 return station_metadata 

437 

438 @property 

439 def channel_metadata(self) -> Any: 

440 """ 

441 Channel metadata updated from recmeta. 

442 

443 Returns 

444 ------- 

445 Any 

446 Channel metadata object 

447 """ 

448 if self._channel_metadata is None: 

449 return self._update_channel_metadata_from_recmeta() 

450 return self._channel_metadata 

451 

452 @property 

453 def run_metadata(self) -> Any: 

454 """ 

455 Run metadata updated from recmeta. 

456 

457 Returns 

458 ------- 

459 Any 

460 Run metadata object 

461 """ 

462 return self._update_run_metadata_from_recmeta() 

463 

464 @property 

465 def station_metadata(self) -> Any: 

466 """ 

467 Station metadata updated from recmeta. 

468 

469 Returns 

470 ------- 

471 Any 

472 Station metadata object 

473 """ 

474 return self._update_station_metadata_from_recmeta() 

475 

476 def get_receiver_lowpass_filter(self, rxcal_fn: str | Path) -> Any: 

477 """ 

478 Get receiver lowpass filter from the rxcal.json file. 

479 

480 Parameters 

481 ---------- 

482 rxcal_fn : str or Path 

483 Path to the receiver calibration file 

484 

485 Returns 

486 ------- 

487 Any 

488 Filter object from calibration file 

489 

490 Raises 

491 ------ 

492 ValueError 

493 If the lowpass filter name cannot be found 

494 """ 

495 rx_cal_obj = PhoenixCalibration(rxcal_fn) 

496 if rx_cal_obj._has_read(): 

497 lp_name = self.get_lowpass_filter_name() 

498 if lp_name is None: 

499 msg = ( 

500 f"Could not find {lp_name} for channel " 

501 f"{self.channel_metadata.comp}" 

502 ) 

503 self.logger.error(msg) 

504 raise ValueError(msg) 

505 

506 return rx_cal_obj.get_filter(self.channel_metadata.component, lp_name) 

507 else: 

508 self.logger.error("Phoenix RX Calibration is None. Check file path") 

509 return None 

510 

511 def get_dipole_filter(self) -> CoefficientFilter | None: 

512 """ 

513 Get dipole filter for electric field channels. 

514 

515 Returns 

516 ------- 

517 CoefficientFilter or None 

518 Dipole filter if channel has dipole length, None otherwise 

519 """ 

520 ch_metadata = self.channel_metadata.copy() 

521 

522 if hasattr(ch_metadata, "dipole_length"): 

523 dp_filter = CoefficientFilter() 

524 dp_filter.gain = ch_metadata.dipole_length / 1000 

525 dp_filter.units_in = "milliVolt" 

526 dp_filter.units_out = "milliVolt per kilometer" 

527 

528 # Support both newer mt_metadata API (filter_names) and older (filter.name) 

529 if hasattr(ch_metadata, "filter_names"): 

530 filter_names = ch_metadata.filter_names or [] 

531 elif hasattr(ch_metadata, "filter") and getattr(ch_metadata.filter, "name", None): 

532 filter_names = [ch_metadata.filter.name] 

533 else: 

534 filter_names = [] 

535 

536 for f_name in filter_names: 

537 if "dipole" in f_name: 

538 dp_filter.name = f_name 

539 

540 return dp_filter 

541 return None 

542 

543 def get_sensor_filter(self, scal_fn: str | Path) -> Any: 

544 """ 

545 Get sensor filter from calibration file. 

546 

547 Parameters 

548 ---------- 

549 scal_fn : str or Path 

550 Path to sensor calibration file 

551 

552 Returns 

553 ------- 

554 Any 

555 Sensor filter object 

556 

557 Notes 

558 ----- 

559 This method is not implemented yet. 

560 """ 

561 return None 

562 

563 def get_v_to_mv_filter(self) -> CoefficientFilter: 

564 """ 

565 Create a filter to convert units from volts to millivolts. 

566 

567 Returns 

568 ------- 

569 CoefficientFilter 

570 Filter that converts volts to millivolts with gain of 1000 

571 """ 

572 conversion = CoefficientFilter() 

573 conversion.units_out = "mV" 

574 conversion.units_in = "V" 

575 conversion.name = "v_to_mv" 

576 conversion.gain = 1e3 

577 

578 return conversion 

579 

580 def get_channel_response( 

581 self, rxcal_fn: str | Path | None = None, scal_fn: str | Path | None = None 

582 ) -> ChannelResponse: 

583 """ 

584 Get the channel response filter. 

585 

586 Parameters 

587 ---------- 

588 rxcal_fn : str, Path or None, optional 

589 Path to receiver calibration file, by default None 

590 scal_fn : str, Path or None, optional 

591 Path to sensor calibration file, by default None 

592 

593 Returns 

594 ------- 

595 ChannelResponse 

596 Complete channel response filter chain 

597 """ 

598 ch_metadata = self.channel_metadata.copy() 

599 

600 filter_list = [] 

601 

602 # Check if a lowpass filter already exists in metadata 

603 has_lowpass = any("lowpass" in f.name for f in ch_metadata.filters) 

604 

605 if rxcal_fn is not None: 

606 rx_filter = self.get_receiver_lowpass_filter(rxcal_fn) 

607 if rx_filter is not None: 

608 if has_lowpass: 

609 # Update the filter name to match existing metadata filter name 

610 existing_lowpass = next( 

611 f for f in ch_metadata.filters if "lowpass" in f.name 

612 ) 

613 rx_filter.name = existing_lowpass.name 

614 self.logger.debug( 

615 f"Using existing lowpass filter name: {existing_lowpass.name}" 

616 ) 

617 filter_list.append(rx_filter) 

618 

619 filter_list.append(self.get_v_to_mv_filter()) 

620 

621 if ch_metadata.type in ["magnetic"] and scal_fn is not None: 

622 sensor_filter = self.get_sensor_filter(scal_fn) 

623 if sensor_filter is not None: 

624 filter_list.append(sensor_filter) 

625 else: 

626 self.logger.warning( 

627 "Could not find Phoenix coil sensor calibration filter " 

628 f"for channel {ch_metadata.comp}" 

629 ) 

630 

631 if ch_metadata.type in ["electric"]: 

632 dipole_filter = self.get_dipole_filter() 

633 if dipole_filter is not None: 

634 filter_list.append(dipole_filter) 

635 

636 return ChannelResponse(filters_list=filter_list)