Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ readers \ header.py: 82%

544 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Adopted from TimeSeries reader, making all attributes properties for easier 

4reading and testing. 

5 

6Module to read and parse native Phoenix Geophysics data formats of the MTU-5C Family 

7 

8This module implements Streamed readers for segmented-decimated continuus-decimated 

9and native sampling rate time series formats of the MTU-5C family. 

10 

11:author: Jorge Torres-Solis 

12 

13Revised 2022 by J. Peacock 

14""" 

15 

16# ============================================================================= 

17# Imports 

18# ============================================================================= 

19from __future__ import annotations 

20 

21import string 

22from struct import unpack_from 

23from typing import Any, BinaryIO, TYPE_CHECKING 

24 

25from loguru import logger 

26from mt_metadata.common.mttime import MTime 

27from mt_metadata.timeseries import Electric, Magnetic, Run, Station 

28 

29 

30if TYPE_CHECKING: 

31 pass 

32 

33 from loguru import Logger 

34 

35 

36# ============================================================================= 

37class Header: 

38 """ 

39 Phoenix Geophysics MTU-5C binary header reader and parser. 

40 

41 This class reads and parses the 128-byte binary header from Phoenix 

42 Geophysics MTU-5C data files. The header contains instrument configuration, 

43 GPS location, timing information, and recording parameters essential for 

44 proper data interpretation. 

45 

46 The header format is fixed at 128 bytes and contains information about: 

47 - Instrument type and serial number 

48 - Recording parameters (sample rate, channel configuration) 

49 - GPS location and timing information 

50 - Hardware configuration and gain settings 

51 - Data quality metrics (saturated/missing frames) 

52 

53 Parameters 

54 ---------- 

55 **kwargs : Any 

56 Additional keyword arguments to set as instance attributes. 

57 

58 Attributes 

59 ---------- 

60 logger : loguru.Logger 

61 Logger instance for debugging and error reporting. 

62 report_hw_sat : bool, default False 

63 Flag to control hardware saturation reporting. 

64 header_length : int, default 128 

65 Length of the binary header in bytes. 

66 ad_plus_minus_range : float, default 5.0 

67 Differential voltage range of the A/D converter (board dependent). 

68 channel_map : dict[int, str] 

69 Mapping from channel IDs to channel names. 

70 channel_azimuths : dict[str, int] 

71 Mapping from channel names to azimuth angles in degrees. 

72 

73 Examples 

74 -------- 

75 >>> with open("phoenix_data.bin", "rb") as f: 

76 ... header = Header() 

77 ... header.unpack_header(f) 

78 ... print(f"Sample rate: {header.sample_rate}") 

79 ... print(f"GPS location: {header.gps_lat}, {header.gps_long}") 

80 """ 

81 

82 def __init__(self, **kwargs: Any) -> None: 

83 self.logger: Logger = logger 

84 self.report_hw_sat: bool = False 

85 self.header_length: int = 128 

86 self.ad_plus_minus_range: float = 5.0 # differential voltage range that the A/D can measure (Board model dependent) 

87 self._header: bytes | None = None 

88 self._recording_id: int | None = None 

89 self._channel_id: int | None = None 

90 

91 self.channel_map: dict[int, str] = { 

92 0: "h1", 

93 1: "h2", 

94 2: "h3", 

95 3: "e1", 

96 4: "e2", 

97 5: "h1", 

98 6: "h2", 

99 7: "h3", 

100 } 

101 

102 self.channel_azimuths: dict[str, int] = { 

103 "h1": 0, 

104 "h2": 90, 

105 "h3": 0, 

106 "h4": 0, 

107 "h5": 90, 

108 "h6": 0, 

109 "e1": 0, 

110 "e2": 90, 

111 } 

112 

113 for key, value in kwargs.items(): 

114 setattr(self, key, value) 

115 

116 self._unpack_dict: dict[str, dict[str, Any]] = { 

117 "file_type": {"dtype": "B", "index": 0}, 

118 "file_version": {"dtype": "B", "index": 1}, 

119 "header_length": {"dtype": "H", "index": 2}, 

120 "instrument_type": {"dtype": "8s", "index": 4}, 

121 "instrument_serial_number": {"dtype": "cccccccc", "index": 12}, 

122 "recording_id": {"dtype": "I", "index": 20}, 

123 "channel_id": {"dtype": "B", "index": 24}, 

124 "file_sequence": {"dtype": "I", "index": 25}, 

125 "frag_period": {"dtype": "H", "index": 29}, 

126 "ch_board_model": {"dtype": "8s", "index": 31}, 

127 "ch_board_serial": {"dtype": "8s", "index": 39}, 

128 "ch_firmware": {"dtype": "I", "index": 47}, 

129 "hardware_configuration": {"dtype": "BBBBBBBB", "index": 51}, 

130 "sample_rate_base": {"dtype": "H", "index": 59}, 

131 "sample_rate_exp": {"dtype": "b", "index": 61}, 

132 "bytes_per_sample": {"dtype": "B", "index": 62}, 

133 "frame_size": {"dtype": "I", "index": 63}, 

134 "decimation_node_id": {"dtype": "H", "index": 67}, 

135 "frame_rollover_count": {"dtype": "H", "index": 69}, 

136 "gps_long": {"dtype": "f", "index": 71}, 

137 "gps_lat": {"dtype": "f", "index": 75}, 

138 "gps_elevation": {"dtype": "f", "index": 79}, 

139 "gps_horizontal_accuracy": {"dtype": "I", "index": 83}, 

140 "gps_vertical_accuracy": {"dtype": "I", "index": 87}, 

141 "timing_status": {"dtype": "BBH", "index": 91}, 

142 "future1": {"dtype": "b", "index": 95}, 

143 "future2": {"dtype": "i", "index": 97}, 

144 "saturated_frames": {"dtype": "H", "index": 101}, 

145 "missing_frames": {"dtype": "H", "index": 103}, 

146 "battery_voltage_mv": {"dtype": "H", "index": 105}, 

147 "min_signal": {"dtype": "f", "index": 107}, 

148 "max_signal": {"dtype": "f", "index": 111}, 

149 } 

150 

151 def __str__(self) -> str: 

152 """String representation of the Header with key information.""" 

153 lines = [f"channel_id: {self.channel_id} channel_type: {self.channel_type}"] 

154 lines += ["-" * 40] 

155 for key in [ 

156 "instrument_type", 

157 "instrument_serial_number", 

158 "gps_lat", 

159 "gps_long", 

160 "gps_elevation", 

161 "recording_start_time", 

162 "sample_rate", 

163 "saturated_frames", 

164 "missing_frames", 

165 "max_signal", 

166 "min_signal", 

167 ]: 

168 lines.append(f"\t{key:<25}: {getattr(self, key)}") 

169 return "\n".join(lines) 

170 

171 def __repr__(self) -> str: 

172 """Detailed string representation of the Header.""" 

173 return self.__str__() 

174 

175 def _has_header(self) -> bool: 

176 """ 

177 Check if header data has been loaded. 

178 

179 Returns 

180 ------- 

181 bool 

182 True if header data is loaded, False otherwise. 

183 """ 

184 return self._header is not None 

185 

186 def _unpack_value(self, key: str) -> tuple[Any, ...] | None: 

187 """ 

188 Unpack a value from the binary header using the unpack dictionary. 

189 

190 Parameters 

191 ---------- 

192 key : str 

193 The key in the unpack dictionary corresponding to the field to extract. 

194 

195 Returns 

196 ------- 

197 tuple of Any or None 

198 Unpacked values from the binary header, or None if no header loaded. 

199 """ 

200 if self._has_header() and self._header is not None: 

201 return unpack_from( 

202 self._unpack_dict[key]["dtype"], 

203 self._header, 

204 self._unpack_dict[key]["index"], 

205 ) 

206 return None 

207 

208 @property 

209 def file_type(self) -> int | None: 

210 """ 

211 File type indicator from binary header. 

212 

213 Returns 

214 ------- 

215 int or None 

216 File type identifier, or None if no header is loaded. 

217 """ 

218 if self._has_header(): 

219 unpacked = self._unpack_value("file_type") 

220 if unpacked is not None: 

221 return unpacked[0] 

222 return None 

223 

224 @property 

225 def file_version(self) -> int | None: 

226 """ 

227 File version from binary header. 

228 

229 Returns 

230 ------- 

231 int or None 

232 File version identifier, or None if no header is loaded. 

233 """ 

234 if self._has_header(): 

235 unpacked = self._unpack_value("file_version") 

236 if unpacked is not None: 

237 return unpacked[0] 

238 return None 

239 

240 @property 

241 def header_length(self) -> int: 

242 """ 

243 Length of the header in bytes. 

244 

245 Returns 

246 ------- 

247 int 

248 Header length in bytes. 

249 """ 

250 if self._has_header(): 

251 unpacked = self._unpack_value("header_length") 

252 if unpacked is not None: 

253 self._header_length = unpacked[0] 

254 return self._header_length 

255 

256 @header_length.setter 

257 def header_length(self, value: int) -> None: 

258 """Set header length.""" 

259 self._header_length = value 

260 

261 @property 

262 def instrument_type(self) -> str | None: 

263 """ 

264 Instrument type string from binary header. 

265 

266 Returns 

267 ------- 

268 str or None 

269 Cleaned instrument type string, or None if no header is loaded. 

270 """ 

271 if self._has_header(): 

272 unpacked = self._unpack_value("instrument_type") 

273 if unpacked is not None: 

274 return unpacked[0].decode("utf-8").strip(" ").strip("\x00") 

275 return None 

276 

277 @property 

278 def instrument_serial_number(self) -> str | None: 

279 """ 

280 Instrument serial number from binary header. 

281 

282 Returns 

283 ------- 

284 str or None 

285 Decoded instrument serial number, or None if no header is loaded. 

286 """ 

287 if self._has_header(): 

288 unpacked = self._unpack_value("instrument_serial_number") 

289 if unpacked is not None: 

290 return b"".join(unpacked).strip(b"\x00").decode("utf-8") 

291 return None 

292 

293 @property 

294 def recording_id(self) -> int | None: 

295 """ 

296 Recording identifier from binary header or cached value. 

297 

298 Returns 

299 ------- 

300 int or None 

301 Recording ID as integer, or None if not available. 

302 """ 

303 if self._recording_id is None: 

304 if self._has_header(): 

305 unpacked = self._unpack_value("recording_id") 

306 if unpacked is not None: 

307 return unpacked[0] 

308 else: 

309 return self._recording_id 

310 return None 

311 

312 @recording_id.setter 

313 def recording_id(self, value: str | int) -> None: 

314 """ 

315 Set recording ID. 

316 

317 Parameters 

318 ---------- 

319 value : str or int 

320 Recording ID as hex string or integer. 

321 """ 

322 if isinstance(value, str): 

323 value = int(value, 16) 

324 self._recording_id = value 

325 

326 @property 

327 def recording_start_time(self) -> MTime | None: 

328 """ 

329 Recording start time from GPS timestamp. 

330 

331 The actual data recording starts 1 second after the set start time. 

332 This is caused by the data logger starting up and initializing filter. 

333 This is taken care of in the segment start time. 

334 

335 See https://github.com/kujaku11/PhoenixGeoPy/tree/main/Docs for more 

336 information. 

337 

338 The time recorded is GPS time. 

339 

340 Returns 

341 ------- 

342 MTime or None 

343 GPS start time, or None if recording ID is not available. 

344 """ 

345 recording_id = self.recording_id 

346 if recording_id is not None: 

347 return MTime(time_stamp=recording_id, gps_time=True) 

348 return None 

349 

350 @property 

351 def channel_id(self) -> int | None: 

352 """ 

353 Channel identifier from binary header or cached value. 

354 

355 Returns 

356 ------- 

357 int or None 

358 Channel ID, or None if not available. 

359 """ 

360 if self._channel_id is None: 

361 if self._has_header(): 

362 unpacked = self._unpack_value("channel_id") 

363 if unpacked is not None: 

364 return int(unpacked[0]) 

365 else: 

366 return self._channel_id 

367 return None 

368 

369 @channel_id.setter 

370 def channel_id(self, value: int | str) -> None: 

371 """ 

372 Set channel ID. 

373 

374 Parameters 

375 ---------- 

376 value : int or str 

377 Channel identifier. 

378 """ 

379 self._channel_id = int(value) 

380 

381 @property 

382 def file_sequence(self) -> int | None: 

383 """ 

384 File sequence number from binary header. 

385 

386 Returns 

387 ------- 

388 int or None 

389 File sequence number, or None if no header is loaded. 

390 """ 

391 if self._has_header(): 

392 unpacked = self._unpack_value("file_sequence") 

393 if unpacked is not None: 

394 return unpacked[0] 

395 return None 

396 

397 @property 

398 def frag_period(self) -> int | None: 

399 """ 

400 Fragment period from binary header. 

401 

402 Returns 

403 ------- 

404 int or None 

405 Fragment period, or None if no header is loaded. 

406 """ 

407 if self._has_header(): 

408 unpacked = self._unpack_value("frag_period") 

409 if unpacked is not None: 

410 return unpacked[0] 

411 return None 

412 

413 @property 

414 def ch_board_model(self) -> str | None: 

415 """ 

416 Channel board model string from binary header. 

417 

418 Returns 

419 ------- 

420 str or None 

421 Board model string, or None if no header is loaded. 

422 """ 

423 if self._has_header(): 

424 unpacked = self._unpack_value("ch_board_model") 

425 if unpacked is not None: 

426 return unpacked[0].decode("utf-8").strip(" ") 

427 return None 

428 

429 @property 

430 def board_model_main(self) -> str | None: 

431 """ 

432 Main board model identifier. 

433 

434 Returns 

435 ------- 

436 str or None 

437 Main board model (first 5 characters), or None if not available. 

438 """ 

439 ch_board_model = self.ch_board_model 

440 if ch_board_model is not None: 

441 return ch_board_model[0:5] 

442 return None 

443 

444 @property 

445 def board_model_revision(self) -> str | None: 

446 """ 

447 Board model revision identifier. 

448 

449 Returns 

450 ------- 

451 str or None 

452 Board revision (character 6), or None if not available. 

453 """ 

454 ch_board_model = self.ch_board_model 

455 if ch_board_model is not None: 

456 return ch_board_model[6:7] # Fixed slice to get single character 

457 return None 

458 

459 @property 

460 def ch_board_serial(self) -> int: 

461 """ 

462 Channel board serial number from binary header. 

463 

464 Returns 

465 ------- 

466 int 

467 Board serial number as integer, or 0 if not available or invalid. 

468 """ 

469 if self._has_header(): 

470 unpacked = self._unpack_value("ch_board_serial") 

471 if unpacked is not None: 

472 value = unpacked[0].decode("utf-8").strip("\x00") 

473 # handle the case of backend < v0.14, which puts '--------' in ch_ser 

474 if all(chars in string.hexdigits for chars in value): 

475 return int(value, 16) 

476 return 0 

477 

478 @property 

479 def ch_firmware(self) -> int | None: 

480 """ 

481 Channel firmware version from binary header. 

482 

483 Returns 

484 ------- 

485 int or None 

486 Firmware version, or None if no header is loaded. 

487 """ 

488 if self._has_header(): 

489 unpacked = self._unpack_value("ch_firmware") 

490 if unpacked is not None: 

491 return unpacked[0] 

492 return None 

493 

494 @property 

495 def hardware_configuration(self) -> tuple[Any, ...] | None: 

496 """ 

497 Hardware configuration bytes from binary header. 

498 

499 Returns 

500 ------- 

501 tuple of Any or None 

502 Hardware configuration data, or None if no header is loaded. 

503 """ 

504 if self._has_header(): 

505 return self._unpack_value("hardware_configuration") 

506 return None 

507 

508 @property 

509 def channel_type(self) -> str | None: 

510 """ 

511 Channel type determined from hardware configuration. 

512 

513 Returns 

514 ------- 

515 str or None 

516 'E' for electric, 'H' for magnetic, or None if no header. 

517 """ 

518 if self._has_header(): 

519 hw_config = self.hardware_configuration 

520 if hw_config is not None: 

521 if hw_config[1] & 0x08 == 0x08: 

522 return "E" 

523 else: 

524 return "H" 

525 return None 

526 

527 @property 

528 def detected_channel_type(self) -> str | None: 

529 """ 

530 Channel type detected by electronics. 

531 

532 This normally matches channel_type, but used in electronics design and testing. 

533 

534 Returns 

535 ------- 

536 str or None 

537 'E' for electric, 'H' for magnetic, or None if no header. 

538 """ 

539 if self._has_header(): 

540 hw_config = self.hardware_configuration 

541 if hw_config is not None: 

542 if hw_config[1] & 0x20 == 0x20: 

543 return "E" 

544 else: 

545 return "H" 

546 return None 

547 

548 @property 

549 def lp_frequency(self) -> int | None: 

550 """ 

551 Low-pass filter frequency based on hardware configuration. 

552 

553 Returns 

554 ------- 

555 int or None 

556 Filter frequency in Hz, or None if no header. 

557 """ 

558 if self._has_header(): 

559 hw_config = self.hardware_configuration 

560 board_main = self.board_model_main 

561 if hw_config is not None: 

562 # LPF on 

563 if hw_config[0] & 0x80 == 0x80: 

564 if hw_config[0] & 0x03 == 0x03: 

565 return 10 

566 elif hw_config[0] & 0x03 == 0x02: 

567 if board_main == "BCM03" or board_main == "BCM06": 

568 return 1000 

569 else: 

570 return 100 

571 elif hw_config[0] & 0x03 == 0x01: 

572 if board_main == "BCM03" or board_main == "BCM06": 

573 return 10000 

574 else: 

575 return 1000 

576 # LPF off 

577 else: 

578 if board_main == "BCM03" or board_main == "BCM06": 

579 return 17800 

580 else: 

581 return 10000 

582 return None 

583 

584 @property 

585 def preamp_gain(self) -> float: 

586 """ 

587 Pre-amplifier gain factor. 

588 

589 Returns 

590 ------- 

591 float 

592 Gain factor, default 1.0. 

593 

594 Raises 

595 ------ 

596 Exception 

597 If channel type is not determined before calculating gain. 

598 """ 

599 preamp_gain = 1.0 

600 if self._has_header(): 

601 channel_type = self.channel_type 

602 if channel_type == "?" or channel_type is None: 

603 raise Exception( 

604 "Channel type must be set before attemting to calculate preamp gain" 

605 ) 

606 hw_config = self.hardware_configuration 

607 if hw_config is not None: 

608 preamp_on = bool(hw_config[0] & 0x10) 

609 if channel_type == "E": 

610 if preamp_on: 

611 board_main = self.board_model_main 

612 board_revision = self.board_model_revision 

613 if board_main == "BCM01" or board_main == "BCM03": 

614 preamp_gain = 4.0 

615 if board_revision == "L": 

616 # Account for BCM01-L experimental prototype 

617 preamp_gain = 8.0 

618 else: 

619 preamp_gain = 8.0 

620 # Account for experimental prototype BCM05-A 

621 ch_board_model = self.ch_board_model 

622 if ( 

623 ch_board_model is not None 

624 and ch_board_model[0:7] == "BCM05-A" 

625 ): 

626 preamp_gain = 4.0 

627 return preamp_gain 

628 

629 @property 

630 def channel_main_gain(self) -> float: 

631 """ 

632 Main gain of the board. 

633 

634 Returns 

635 ------- 

636 float 

637 Main gain factor. 

638 """ 

639 main_gain = 1.0 

640 if self._has_header(): 

641 # BCM05-B and BCM06 introduced different selectable gains 

642 new_gains = True # we assume any newer board will have the new gain banks 

643 board_main = self.board_model_main 

644 ch_board_model = self.ch_board_model 

645 if board_main == "BCM01" or board_main == "BCM03": 

646 # Original style 24 KSps boards and original 96 KSps boards 

647 new_gains = False 

648 if ch_board_model is not None and ch_board_model[0:7] == "BCM05-A": 

649 # Account for experimental prototype BCM05-A, which also had original gain banks 

650 new_gains = False 

651 

652 hw_config = self.hardware_configuration 

653 if hw_config is not None: 

654 if hw_config[0] & 0x0C == 0x00: 

655 main_gain = 1.0 

656 elif hw_config[0] & 0x0C == 0x04: 

657 main_gain = 4.0 

658 elif hw_config[0] & 0x0C == 0x08: 

659 main_gain = 6.0 

660 if not new_gains: 

661 main_gain = 16.0 

662 elif hw_config[0] & 0x0C == 0x0C: 

663 main_gain = 8.0 

664 if not new_gains: 

665 main_gain = 32.0 

666 return main_gain 

667 

668 @property 

669 def intrinsic_circuitry_gain(self) -> float: 

670 """ 

671 Intrinsic circuitry gain based on sensor range configuration. 

672 

673 This function adjusts the intrinsic circuitry gain based on the 

674 sensor range configuration in the configuration fingerprint. 

675 

676 For the Electric channel, calibration path, or H-legacy 

677 sensors all go through a 1/4 gain stage, and then they get a virtual x2 gain from 

678 Single-ended-diff before the A/D. In the case of newer sensors (differential) 

679 instead of a 1/4 gain stage, there is only a 1/2 gain stage. 

680 

681 Therefore, in the E, cal and legacy sensor case the circuitry gain is 1/2, while for 

682 newer sensors it is 1. 

683 

684 Returns 

685 ------- 

686 float 

687 Intrinsic gain factor. 

688 

689 Raises 

690 ------ 

691 Exception 

692 If channel type is not determined before calculating gain. 

693 

694 Notes 

695 ----- 

696 Circuitry Gain not directly configurable by the user. 

697 """ 

698 intrinsic_circuitry_gain = 0.5 

699 if self._has_header(): 

700 channel_type = self.channel_type 

701 if channel_type == "?" or channel_type is None: 

702 raise Exception( 

703 "Channel type must be set before attemting to calculate preamp gain" 

704 ) 

705 intrinsic_circuitry_gain = 0.5 

706 if channel_type == "H": 

707 hw_config = self.hardware_configuration 

708 if hw_config is not None and hw_config[1] & 0x01 == 0x01: 

709 intrinsic_circuitry_gain = 1.0 

710 return intrinsic_circuitry_gain 

711 

712 @property 

713 def attenuator_gain(self) -> float: 

714 """ 

715 Attenuator gain factor. 

716 

717 Returns 

718 ------- 

719 float 

720 Attenuator gain factor, default 1.0. 

721 

722 Raises 

723 ------ 

724 Exception 

725 If channel type is not determined before calculating gain. 

726 """ 

727 attenuator_gain = 1.0 

728 if self._has_header(): 

729 channel_type = self.channel_type 

730 if channel_type == "?" or channel_type is None: 

731 raise Exception( 

732 "Channel type must be set before attemting to calculate preamp gain" 

733 ) 

734 hw_config = self.hardware_configuration 

735 if hw_config is not None: 

736 attenuator_on = bool(hw_config[4] & 0x01) 

737 if attenuator_on and channel_type == "E": 

738 new_attenuator = ( 

739 True # By default assume dealing with newer board types 

740 ) 

741 board_main = self.board_model_main 

742 ch_board_model = self.ch_board_model 

743 if board_main == "BCM01" or board_main == "BCM03": 

744 # Original style 24 KSps boards and original 96 KSps boards 

745 new_attenuator = False 

746 if ch_board_model is not None and ch_board_model[0:7] == "BCM05-A": 

747 # Account for experimental prototype BCM05-A, which also had original gain banks 

748 new_attenuator = False 

749 if new_attenuator: 

750 attenuator_gain = 523.0 / 5223.0 

751 else: 

752 attenuator_gain = 0.1 

753 return attenuator_gain 

754 

755 @property 

756 def total_selectable_gain(self) -> float: 

757 """ 

758 Total gain that is selectable by the user. 

759 

760 Combines attenuator, preamp, and main channel gains. 

761 

762 Returns 

763 ------- 

764 float 

765 Total selectable gain factor. 

766 """ 

767 if self._has_header(): 

768 return self.channel_main_gain * self.preamp_gain * self.attenuator_gain 

769 return 1.0 

770 

771 @property 

772 def total_circuitry_gain(self) -> float: 

773 """ 

774 Total board gain including both intrinsic and user-selectable gains. 

775 

776 Returns 

777 ------- 

778 float 

779 Total circuitry gain factor. 

780 """ 

781 if self._has_header(): 

782 return self.total_selectable_gain * self.intrinsic_circuitry_gain 

783 return 0.5 

784 

785 @property 

786 def sample_rate_base(self) -> int | None: 

787 """ 

788 Base sample rate from binary header. 

789 

790 Returns 

791 ------- 

792 int or None 

793 Base sample rate, or None if no header. 

794 """ 

795 if self._has_header(): 

796 unpacked = self._unpack_value("sample_rate_base") 

797 if unpacked is not None: 

798 return unpacked[0] 

799 return None 

800 

801 @property 

802 def sample_rate_exp(self) -> int | None: 

803 """ 

804 Sample rate exponent from binary header. 

805 

806 Returns 

807 ------- 

808 int or None 

809 Sample rate exponent, or None if no header. 

810 """ 

811 if self._has_header(): 

812 unpacked = self._unpack_value("sample_rate_exp") 

813 if unpacked is not None: 

814 return unpacked[0] 

815 return None 

816 

817 @property 

818 def sample_rate(self) -> float | None: 

819 """ 

820 Calculated sample rate. 

821 

822 Returns 

823 ------- 

824 float or None 

825 Sample rate in Hz, or None if no header. 

826 """ 

827 if self._has_header(): 

828 rate_base = self.sample_rate_base 

829 rate_exp = self.sample_rate_exp 

830 if rate_base is not None and rate_exp is not None: 

831 if rate_exp != 0: 

832 return rate_base * pow(10, rate_exp) 

833 return float(rate_base) 

834 return None 

835 

836 @property 

837 def bytes_per_sample(self) -> int | None: 

838 """ 

839 Number of bytes per sample. 

840 

841 Returns 

842 ------- 

843 int or None 

844 Bytes per sample, or None if no header. 

845 """ 

846 if self._has_header(): 

847 unpacked = self._unpack_value("bytes_per_sample") 

848 if unpacked is not None: 

849 return unpacked[0] 

850 return None 

851 

852 @property 

853 def frame_size(self) -> int | None: 

854 """ 

855 Frame size from binary header. 

856 

857 Returns 

858 ------- 

859 int or None 

860 Frame size value, or None if no header. 

861 """ 

862 if self._has_header(): 

863 unpacked = self._unpack_value("frame_size") 

864 if unpacked is not None: 

865 return unpacked[0] 

866 return None 

867 

868 @property 

869 def data_footer(self) -> int | None: 

870 """ 

871 Data footer extracted from frame size. 

872 

873 Returns 

874 ------- 

875 int or None 

876 Data footer value, or None if no frame size. 

877 """ 

878 frame_size = self.frame_size 

879 if frame_size is not None: 

880 return frame_size >> 24 

881 return None 

882 

883 @property 

884 def frame_size_bytes(self) -> int | None: 

885 """ 

886 Frame size in bytes. 

887 

888 Returns 

889 ------- 

890 int or None 

891 Frame size in bytes, or None if no frame size. 

892 """ 

893 frame_size = self.frame_size 

894 if frame_size is not None: 

895 return frame_size & 0x0FFFFFF 

896 return None 

897 

898 @property 

899 def decimation_node_id(self) -> int | None: 

900 """ 

901 Decimation node identifier. 

902 

903 Returns 

904 ------- 

905 int or None 

906 Decimation node ID, or None if no header. 

907 """ 

908 if self._has_header(): 

909 unpacked = self._unpack_value("decimation_node_id") 

910 if unpacked is not None: 

911 return unpacked[0] 

912 return None 

913 

914 @property 

915 def frame_rollover_count(self) -> int | None: 

916 """ 

917 Frame rollover count. 

918 

919 Returns 

920 ------- 

921 int or None 

922 Rollover count, or None if no header. 

923 """ 

924 if self._has_header(): 

925 unpacked = self._unpack_value("frame_rollover_count") 

926 if unpacked is not None: 

927 return unpacked[0] 

928 return None 

929 

930 @property 

931 def gps_long(self) -> float | None: 

932 """ 

933 GPS longitude. 

934 

935 Returns 

936 ------- 

937 float or None 

938 Longitude in degrees, or None if no header. 

939 """ 

940 if self._has_header(): 

941 unpacked = self._unpack_value("gps_long") 

942 if unpacked is not None: 

943 return unpacked[0] 

944 return None 

945 

946 @property 

947 def gps_lat(self) -> float | None: 

948 """ 

949 GPS latitude. 

950 

951 Returns 

952 ------- 

953 float or None 

954 Latitude in degrees, or None if no header. 

955 """ 

956 if self._has_header(): 

957 unpacked = self._unpack_value("gps_lat") 

958 if unpacked is not None: 

959 return unpacked[0] 

960 return None 

961 

962 @property 

963 def gps_elevation(self) -> float | None: 

964 """ 

965 GPS elevation. 

966 

967 Returns 

968 ------- 

969 float or None 

970 Elevation in meters, or None if no header. 

971 """ 

972 if self._has_header(): 

973 unpacked = self._unpack_value("gps_elevation") 

974 if unpacked is not None: 

975 return unpacked[0] 

976 return None 

977 

978 @property 

979 def gps_horizontal_accuracy(self) -> float | None: 

980 """ 

981 GPS horizontal accuracy. 

982 

983 Returns 

984 ------- 

985 float or None 

986 Horizontal accuracy in meters (converted from millimeters), or None if no header. 

987 """ 

988 if self._has_header(): 

989 unpacked = self._unpack_value("gps_horizontal_accuracy") 

990 if unpacked is not None: 

991 return unpacked[0] / 1000 

992 return None 

993 

994 @property 

995 def gps_vertical_accuracy(self) -> float | None: 

996 """ 

997 GPS vertical accuracy. 

998 

999 Returns 

1000 ------- 

1001 float or None 

1002 Vertical accuracy in meters (converted from millimeters), or None if no header. 

1003 """ 

1004 if self._has_header(): 

1005 unpacked = self._unpack_value("gps_vertical_accuracy") 

1006 if unpacked is not None: 

1007 return unpacked[0] / 1000 

1008 return None 

1009 

1010 @property 

1011 def timing_status(self) -> tuple[Any, ...] | None: 

1012 """ 

1013 Timing status information. 

1014 

1015 Returns 

1016 ------- 

1017 tuple of Any or None 

1018 Timing status data, or None if no header. 

1019 """ 

1020 if self._has_header(): 

1021 return self._unpack_value("timing_status") 

1022 return None 

1023 

1024 @property 

1025 def timing_flags(self) -> Any | None: 

1026 """ 

1027 Timing flags from timing status. 

1028 

1029 Returns 

1030 ------- 

1031 Any or None 

1032 Timing flags, or None if no timing status. 

1033 """ 

1034 timing_status = self.timing_status 

1035 if timing_status is not None: 

1036 return timing_status[0] 

1037 return None 

1038 

1039 @property 

1040 def timing_sat_count(self) -> Any | None: 

1041 """ 

1042 Satellite count from timing status. 

1043 

1044 Returns 

1045 ------- 

1046 Any or None 

1047 Satellite count, or None if no timing status. 

1048 """ 

1049 timing_status = self.timing_status 

1050 if timing_status is not None: 

1051 return timing_status[1] 

1052 return None 

1053 

1054 @property 

1055 def timing_stability(self) -> Any | None: 

1056 """ 

1057 Timing stability from timing status. 

1058 

1059 Returns 

1060 ------- 

1061 Any or None 

1062 Timing stability value, or None if no timing status. 

1063 """ 

1064 timing_status = self.timing_status 

1065 if timing_status is not None: 

1066 return timing_status[2] 

1067 return None 

1068 

1069 @property 

1070 def future1(self) -> Any | None: 

1071 """ 

1072 Future field 1 (reserved). 

1073 

1074 Returns 

1075 ------- 

1076 Any or None 

1077 Future field value, or None if no header. 

1078 """ 

1079 if self._has_header(): 

1080 unpacked = self._unpack_value("future1") 

1081 if unpacked is not None: 

1082 return unpacked[0] 

1083 return None 

1084 

1085 @property 

1086 def future2(self) -> Any | None: 

1087 """ 

1088 Future field 2 (reserved). 

1089 

1090 Returns 

1091 ------- 

1092 Any or None 

1093 Future field value, or None if no header. 

1094 """ 

1095 if self._has_header(): 

1096 unpacked = self._unpack_value("future2") 

1097 if unpacked is not None: 

1098 return unpacked[0] 

1099 return None 

1100 

1101 @property 

1102 def saturated_frames(self) -> int | None: 

1103 """ 

1104 Number of saturated frames. 

1105 

1106 Returns 

1107 ------- 

1108 int or None 

1109 Saturated frame count, or None if no header. 

1110 """ 

1111 if self._has_header(): 

1112 unpacked = self._unpack_value("saturated_frames") 

1113 if unpacked is not None: 

1114 value = unpacked[0] 

1115 if value & 0x80 == 0x80: 

1116 value &= 0x7F 

1117 value <<= 4 

1118 return value 

1119 return None 

1120 

1121 @property 

1122 def missing_frames(self) -> int | None: 

1123 """ 

1124 Number of missing frames. 

1125 

1126 Returns 

1127 ------- 

1128 int or None 

1129 Missing frame count, or None if no header. 

1130 """ 

1131 if self._has_header(): 

1132 unpacked = self._unpack_value("missing_frames") 

1133 if unpacked is not None: 

1134 return unpacked[0] 

1135 return None 

1136 

1137 @property 

1138 def battery_voltage_v(self) -> float | None: 

1139 """ 

1140 Battery voltage in volts. 

1141 

1142 Returns 

1143 ------- 

1144 float or None 

1145 Battery voltage in volts (converted from millivolts), or None if no header. 

1146 """ 

1147 if self._has_header(): 

1148 unpacked = self._unpack_value("battery_voltage_mv") 

1149 if unpacked is not None: 

1150 return unpacked[0] / 1000 

1151 return None 

1152 

1153 @property 

1154 def min_signal(self) -> Any | None: 

1155 """ 

1156 Minimum signal value. 

1157 

1158 Returns 

1159 ------- 

1160 Any or None 

1161 Minimum signal value, or None if no header. 

1162 """ 

1163 if self._has_header(): 

1164 unpacked = self._unpack_value("min_signal") 

1165 if unpacked is not None: 

1166 return unpacked[0] 

1167 return None 

1168 

1169 @property 

1170 def max_signal(self) -> Any | None: 

1171 """ 

1172 Maximum signal value. 

1173 

1174 Returns 

1175 ------- 

1176 Any or None 

1177 Maximum signal value, or None if no header. 

1178 """ 

1179 if self._has_header(): 

1180 unpacked = self._unpack_value("max_signal") 

1181 if unpacked is not None: 

1182 return unpacked[0] 

1183 return None 

1184 

1185 def unpack_header(self, stream: BinaryIO) -> None: 

1186 """ 

1187 Read and unpack binary header from stream. 

1188 

1189 Parameters 

1190 ---------- 

1191 stream : BinaryIO 

1192 Binary stream to read header from. 

1193 """ 

1194 if self.header_length > 0: 

1195 # be sure to read from the beginning of the file 

1196 stream.seek(0) 

1197 self._header = stream.read(self.header_length) 

1198 

1199 def get_channel_metadata(self) -> Magnetic | Electric: 

1200 """ 

1201 Translate metadata to channel metadata. 

1202 

1203 Returns 

1204 ------- 

1205 Magnetic or Electric 

1206 Channel metadata object populated with header data. 

1207 

1208 Raises 

1209 ------ 

1210 KeyError 

1211 If channel ID is not found in channel map. 

1212 ValueError 

1213 If required fields are None or invalid. 

1214 """ 

1215 channel_type = self.channel_type 

1216 if channel_type is None: 

1217 raise ValueError("Channel type not available") 

1218 

1219 if channel_type.lower() in ["h"]: 

1220 ch = Magnetic() # type: ignore[call-arg] 

1221 gps_lat = self.gps_lat 

1222 gps_long = self.gps_long 

1223 gps_elevation = self.gps_elevation 

1224 if gps_lat is not None: 

1225 ch.location.latitude = gps_lat 

1226 if gps_long is not None: 

1227 ch.location.longitude = gps_long 

1228 if gps_elevation is not None: 

1229 ch.location.elevation = gps_elevation 

1230 elif channel_type.lower() in ["e"]: 

1231 ch = Electric() # type: ignore[call-arg] 

1232 else: 

1233 raise ValueError(f"Unknown channel type: {channel_type}") 

1234 

1235 channel_id = self.channel_id 

1236 if channel_id is not None: 

1237 try: 

1238 ch.component = self.channel_map[channel_id] 

1239 except KeyError: 

1240 self.logger.error(f"Could not find {channel_id} in channel_map") 

1241 raise 

1242 ch.channel_number = channel_id 

1243 

1244 recording_start = self.recording_start_time 

1245 if recording_start is not None: 

1246 ch.time_period.start = recording_start 

1247 

1248 sample_rate = self.sample_rate 

1249 if sample_rate is not None: 

1250 ch.sample_rate = sample_rate 

1251 

1252 if hasattr(ch, "component") and ch.component: 

1253 ch.measurement_azimuth = self.channel_azimuths[ch.component] 

1254 

1255 return ch 

1256 

1257 def get_run_metadata(self) -> Run: 

1258 """ 

1259 Translate to run metadata. 

1260 

1261 Returns 

1262 ------- 

1263 Run 

1264 Run metadata object populated with header data. 

1265 

1266 Raises 

1267 ------ 

1268 ValueError 

1269 If required fields are None. 

1270 """ 

1271 r = Run() # type: ignore[call-arg] 

1272 

1273 instrument_type = self.instrument_type 

1274 if instrument_type is not None: 

1275 r.data_logger.type = instrument_type 

1276 

1277 instrument_serial = self.instrument_serial_number 

1278 if instrument_serial is not None: 

1279 r.data_logger.id = instrument_serial 

1280 

1281 r.data_logger.manufacturer = "Phoenix Geophysics" 

1282 

1283 timing_stability = self.timing_stability 

1284 if timing_stability is not None: 

1285 r.data_logger.timing_system.uncertainty = timing_stability 

1286 

1287 sample_rate = self.sample_rate 

1288 if sample_rate is not None: 

1289 r.sample_rate = sample_rate 

1290 r.id = f"sr{int(sample_rate)}_0001" 

1291 

1292 battery_voltage = self.battery_voltage_v 

1293 if battery_voltage is not None: 

1294 r.data_logger.power_source.voltage.start = battery_voltage 

1295 

1296 channel_metadata = self.get_channel_metadata() 

1297 r.channels.append(channel_metadata) # type: ignore[attr-defined] 

1298 r.update_time_period() 

1299 

1300 return r 

1301 

1302 def get_station_metadata(self) -> Station: 

1303 """ 

1304 Translate to station metadata. 

1305 

1306 Returns 

1307 ------- 

1308 Station 

1309 Station metadata object populated with header data. 

1310 """ 

1311 s = Station() # type: ignore[call-arg] 

1312 

1313 gps_lat = self.gps_lat 

1314 if gps_lat is not None: 

1315 s.location.latitude = gps_lat 

1316 

1317 gps_long = self.gps_long 

1318 if gps_long is not None: 

1319 s.location.longitude = gps_long 

1320 

1321 gps_elevation = self.gps_elevation 

1322 if gps_elevation is not None: 

1323 s.location.elevation = gps_elevation 

1324 

1325 run_metadata = self.get_run_metadata() 

1326 s.runs.append(run_metadata) # type: ignore[attr-defined] 

1327 s.update_time_period() 

1328 

1329 return s