Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ nims \ nims.py: 81%

459 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3=============== 

4NIMS 

5=============== 

6 

7 * deals with reading in NIMS DATA.BIN files 

8 

9 This is a translation from Matlab codes written and edited by: 

10 * Anna Kelbert 

11 * Paul Bedrosian 

12 * Esteban Bowles-Martinez 

13 * Possibly others. 

14 

15 I've tested it against a version, and it matches. The data/GPS gaps I 

16 still don't understand so for now the time series is just 

17 made continuous and the number of missing seconds is clipped from the 

18 end of the time series. 

19 

20 .. note:: this only works for 8Hz data for now 

21 

22 

23:copyright: 

24 Jared Peacock (jpeacock@usgs.gov) 

25 

26:license: 

27 MIT 

28""" 

29 

30from __future__ import annotations 

31 

32import datetime 

33import struct 

34from pathlib import Path 

35from typing import Any, Optional, Union 

36 

37# ============================================================================= 

38# Imports 

39# ============================================================================= 

40import numpy as np 

41import pandas as pd 

42from mt_metadata.common.mttime import MTime 

43from mt_metadata.timeseries import Auxiliary, Electric, Magnetic, Run, Station 

44 

45from mth5 import timeseries 

46from mth5.io.nims.gps import GPS 

47from mth5.io.nims.header import NIMSHeader 

48from mth5.io.nims.response_filters import Response 

49 

50 

51# ============================================================================= 

52# Exceptions 

53# ============================================================================= 

54 

55 

56class NIMS(NIMSHeader): 

57 """ 

58 NIMS Class for reading NIMS DATA.BIN files. 

59 

60 A fast way to read the binary files are to first read in the GPS strings, 

61 the third byte in each block as a character and parse that into valid 

62 GPS stamps. 

63 

64 Then read in the entire data set as unsigned 8 bit integers and reshape 

65 the data to be n seconds x block size. Then parse that array into the 

66 status information and data. 

67 

68 Parameters 

69 ---------- 

70 fn : str or Path, optional 

71 Path to the NIMS DATA.BIN file to read, by default None 

72 

73 Attributes 

74 ---------- 

75 block_size : int 

76 Size of data blocks (default 131 for 8 Hz data) 

77 block_sequence : list of int 

78 Sequence pattern to locate [1, 131] 

79 sample_rate : int 

80 Sample rate in samples/second (default 8) 

81 e_conversion_factor : float 

82 Electric field conversion factor 

83 h_conversion_factor : float 

84 Magnetic field conversion factor 

85 t_conversion_factor : float 

86 Temperature conversion factor 

87 t_offset : int 

88 Temperature offset value 

89 info_array : ndarray or None 

90 Structured array of block information 

91 stamps : list or None 

92 List of valid GPS stamps 

93 ts_data : DataFrame or None 

94 Time series data as pandas DataFrame 

95 gaps : list or None 

96 List of timing gaps found in data 

97 duplicate_list : list or None 

98 List of duplicate blocks found 

99 

100 Notes 

101 ----- 

102 I only have a limited amount of .BIN files to test so this will likely 

103 break if there are issues such as data gaps. This has been tested against the 

104 matlab program loadNIMS by Anna Kelbert and the match for all the .bin files 

105 I have. If something looks weird check it against that program. 

106 

107 .. todo:: Deal with timing issues, right now a warning is sent to the user 

108 need to figure out a way to find where the gap is and adjust 

109 accordingly. 

110 

111 .. warning:: 

112 Currently Only 8 Hz data is supported 

113 

114 Examples 

115 -------- 

116 >>> from mth5.io.nims import nims 

117 >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin") 

118 >>> n.read_nims() 

119 """ 

120 

121 def __init__(self, fn: Optional[Union[str, Path]] = None) -> None: 

122 super().__init__(fn) 

123 

124 # change thes if the sample rate is different 

125 self.block_size = 131 

126 self.block_sequence = [1, self.block_size] 

127 self.sample_rate = 8 ### samples/second 

128 self.e_conversion_factor = 2.44141221047903e-06 

129 self.h_conversion_factor = 0.01 

130 self.t_conversion_factor = 70 

131 self.t_offset = 18048 

132 self._int_max = 8388608 

133 self._int_factor = 16777216 

134 self._block_dict = { 

135 "soh": 0, 

136 "block_len": 1, 

137 "status": 2, 

138 "gps": 3, 

139 "sequence": 4, 

140 "box_temp": (5, 6), 

141 "head_temp": (7, 8), 

142 "logic": 81, 

143 "end": 130, 

144 } 

145 self.info_array = None 

146 self.stamps = None 

147 self.ts_data = None 

148 self.gaps = None 

149 self.duplicate_list = None 

150 

151 self._raw_string = None 

152 

153 self.indices = self._make_index_values() 

154 

155 def __str__(self) -> str: 

156 """ 

157 Return string representation of NIMS object. 

158 

159 Returns 

160 ------- 

161 str 

162 Formatted string with NIMS station information and data summary 

163 """ 

164 lines = [f"NIMS Station: {self.site_name}", "-" * 20] 

165 lines.append(f"NIMS ID: {self.box_id}") 

166 lines.append(f"magnetometer ID: {self.mag_id}") 

167 lines.append(f"operator: {self.operator}") 

168 lines.append(f"location: {self.state_province}, {self.country}") 

169 lines.append(f"latitude: {self.latitude} (degrees)") 

170 lines.append(f"longitude: {self.longitude} (degrees)") 

171 lines.append(f"elevation: {self.elevation} m") 

172 lines.append(f"gps stamp: {self.header_gps_stamp}") 

173 lines.append(f"EX: length = {self.ex_length} m; azimuth = {self.ex_azimuth}") 

174 lines.append(f"EY: length = {self.ey_length} m; azimuth = {self.ey_azimuth}") 

175 lines.append(f"comments: {self.comments}") 

176 

177 if self.has_data(): 

178 lines.append("") 

179 lines.append(f"Start: {self.start_time.isoformat()}") 

180 lines.append(f"End: {self.end_time.isoformat()}") 

181 lines.append(f"Data shape: {self.ts_data.shape}") 

182 lines.append(f"Found {len(self.stamps)} GPS stamps") 

183 return "\n".join(lines) 

184 

185 def __repr__(self) -> str: 

186 """ 

187 Return string representation for debugging. 

188 

189 Returns 

190 ------- 

191 str 

192 String representation of the NIMS object 

193 """ 

194 return self.__str__() 

195 

196 def has_data(self) -> bool: 

197 """ 

198 Check if the NIMS object contains time series data. 

199 

200 Returns 

201 ------- 

202 bool 

203 True if ts_data is not None, False otherwise 

204 """ 

205 if self.ts_data is not None: 

206 return True 

207 return False 

208 

209 @property 

210 def n_samples(self) -> Optional[int]: 

211 """ 

212 Number of samples in the time series. 

213 

214 Returns 

215 ------- 

216 int or None 

217 Number of samples if data is loaded, estimated from file size 

218 if file exists, None otherwise 

219 """ 

220 if self.has_data(): 

221 return self.ts_data.shape[0] 

222 elif self.fn is not None: 

223 return int(self.file_size / 16.375) 

224 

225 @property 

226 def latitude(self) -> Optional[float]: 

227 """ 

228 Median latitude value from all GPS stamps. 

229 

230 Returns 

231 ------- 

232 float or None 

233 Median latitude in decimal degrees (WGS84) from GPRMC stamps, 

234 or header GPS latitude if no stamps available 

235 

236 Notes 

237 ----- 

238 Only uses GPRMC stamps as they should be duplicates of GPGGA stamps 

239 but include additional validation. 

240 """ 

241 if self.stamps is not None: 

242 latitude = np.zeros(len(self.stamps)) 

243 for ii, stamp in enumerate(self.stamps): 

244 latitude[ii] = stamp[1][0].latitude 

245 return np.median(latitude[np.nonzero(latitude)]) 

246 return self.header_gps_latitude 

247 

248 @property 

249 def longitude(self) -> Optional[float]: 

250 """ 

251 Median longitude value from all GPS stamps. 

252 

253 Returns 

254 ------- 

255 float or None 

256 Median longitude in decimal degrees (WGS84) from GPS stamps, 

257 or header GPS longitude if no stamps available 

258 

259 Notes 

260 ----- 

261 Uses the first stamp within each GPS stamp set. 

262 """ 

263 if self.stamps is not None: 

264 longitude = np.zeros(len(self.stamps)) 

265 for ii, stamp in enumerate(self.stamps): 

266 longitude[ii] = stamp[1][0].longitude 

267 return np.median(longitude[np.nonzero(longitude)]) 

268 return self.header_gps_longitude 

269 

270 @property 

271 def elevation(self) -> Optional[float]: 

272 """ 

273 Median elevation value from all GPS stamps. 

274 

275 Returns 

276 ------- 

277 float or None 

278 Median elevation in meters (WGS84) from GPS stamps, 

279 or header GPS elevation if no stamps available 

280 

281 Notes 

282 ----- 

283 Uses the first stamp within each GPS stamp set. For paired stamps 

284 (GPRMC/GPGGA), uses the GPGGA elevation if available. 

285 """ 

286 if self.stamps is not None: 

287 elevation = np.zeros(len(self.stamps)) 

288 for ii, stamp in enumerate(self.stamps): 

289 if len(stamp[1]) == 1: 

290 elev = stamp[1][0].elevation 

291 if len(stamp[1]) == 2: 

292 elev = stamp[1][1].elevation 

293 if elev is None: 

294 continue 

295 elevation[ii] = elev 

296 return np.median(elevation[np.nonzero(elevation)]) 

297 return self.header_gps_elevation 

298 

299 @property 

300 def declination(self) -> Optional[float]: 

301 """ 

302 Median magnetic declination value from all GPS stamps. 

303 

304 Returns 

305 ------- 

306 float or None 

307 Median magnetic declination in decimal degrees from GPRMC stamps, 

308 or None if no declination data available 

309 

310 Notes 

311 ----- 

312 Only uses GPRMC stamps as they contain declination information. 

313 """ 

314 if self.stamps is not None: 

315 declination = np.zeros(len(self.stamps)) 

316 for ii, stamp in enumerate(self.stamps): 

317 if stamp[1][0].gps_type == "GPRMC": 

318 dec = stamp[1][0].declination 

319 if dec is None: 

320 continue 

321 declination[ii] = dec 

322 return np.median(declination[np.nonzero(declination)]) 

323 return None 

324 

325 @property 

326 def start_time(self) -> MTime: 

327 """ 

328 Start time of the time series data. 

329 

330 Returns 

331 ------- 

332 MTime 

333 Start time derived from the first GPS time stamp index, 

334 or header GPS stamp if no time series data available 

335 

336 Notes 

337 ----- 

338 The start time is calculated from the first good GPS time stamp 

339 minus the seconds to the beginning of the time series. 

340 """ 

341 if self.stamps is not None: 

342 return MTime(time_stamp=self.ts_data.index[0]) 

343 return self.header_gps_stamp 

344 

345 @property 

346 def end_time(self) -> MTime: 

347 """ 

348 End time of the time series data. 

349 

350 Returns 

351 ------- 

352 MTime 

353 End time derived from the last time series index, 

354 or estimated from start time and number of samples 

355 

356 Notes 

357 ----- 

358 If time series data is available, uses the last timestamp. 

359 Otherwise estimates end time from start time plus duration 

360 calculated from number of samples and sample rate. 

361 """ 

362 if self.stamps is not None: 

363 return MTime(time_stamp=self.ts_data.index[-1]) 

364 self.logger.warning("Estimating end time from n_samples") 

365 return self.start_time + int(self.n_samples / self.sample_rate) 

366 

367 @property 

368 def box_temperature(self) -> Optional[timeseries.ChannelTS]: 

369 """ 

370 Data logger temperature channel. 

371 

372 Returns 

373 ------- 

374 ChannelTS or None 

375 Temperature channel sampled at 1 second, interpolated to match 

376 the time series sample rate, or None if no time series data 

377 

378 Notes 

379 ----- 

380 Temperature is measured in Celsius and interpolated onto the same 

381 time grid as the magnetic and electric field channels. 

382 """ 

383 

384 if self.ts_data is not None: 

385 auxiliary_metadata = Auxiliary() 

386 auxiliary_metadata.from_dict( 

387 { 

388 "channel_number": 6, 

389 "component": "temperature", 

390 "measurement_azimuth": 0, 

391 "measurement_tilt": 0, 

392 "sample_rate": 1, 

393 "time_period.start": self.start_time.isoformat(), 

394 "time_period.end": self.end_time.isoformat(), 

395 "type": "auxiliary", 

396 "units": "celsius", 

397 } 

398 ) 

399 

400 temp = timeseries.ChannelTS( 

401 channel_type="auxiliary", 

402 data=self.info_array["box_temp"], 

403 channel_metadata=auxiliary_metadata, 

404 run_metadata=self.run_metadata, 

405 station_metadata=self.station_metadata, 

406 ) 

407 # interpolate temperature onto the same sample rate as the channels. 

408 temp.data_array = temp.data_array.interp_like(self.hx.data_array) 

409 temp.channel_metadata.sample_rate = self.sample_rate 

410 temp.channel_metadata.time_period.end = self.end_time.isoformat() 

411 

412 return temp 

413 return None 

414 

415 def get_channel_response(self, channel: str, dipole_length: float = 1) -> Any: 

416 """ 

417 Get the channel response for a given channel. 

418 

419 Parameters 

420 ---------- 

421 channel : str 

422 Channel identifier (e.g., 'hx', 'hy', 'hz', 'ex', 'ey') 

423 dipole_length : float, optional 

424 Dipole length for electric field channels, by default 1 

425 

426 Returns 

427 ------- 

428 Any 

429 Channel response object from the NIMS response filters 

430 

431 Notes 

432 ----- 

433 Uses the NIMS response filters to generate appropriate response 

434 functions for magnetic and electric field channels at the current 

435 sample rate. 

436 """ 

437 

438 nims_filters = Response(sample_rate=self.sample_rate) 

439 return nims_filters.get_channel_response(channel, dipole_length=dipole_length) 

440 

441 @property 

442 def hx_metadata(self) -> Optional[Magnetic]: 

443 """ 

444 Metadata for the HX magnetic field channel. 

445 

446 Returns 

447 ------- 

448 Magnetic or None 

449 Magnetic field metadata object for the HX channel, 

450 or None if no time series data is loaded 

451 """ 

452 if self.ts_data is not None: 

453 hx_metadata = Magnetic() 

454 hx_metadata.from_dict( 

455 { 

456 "channel_number": 1, 

457 "component": "hx", 

458 "measurement_azimuth": 0, 

459 "measurement_tilt": 0, 

460 "sample_rate": self.sample_rate, 

461 "time_period.start": self.start_time.isoformat(), 

462 "time_period.end": self.end_time.isoformat(), 

463 "type": "magnetic", 

464 "units": "counts", 

465 "sensor.id": self.mag_id, 

466 "sensor.manufacturer": "Barry Narod", 

467 "sensor.type": "fluxgate triaxial magnetometer", 

468 } 

469 ) 

470 return hx_metadata 

471 

472 @property 

473 def hx(self) -> Optional[timeseries.ChannelTS]: 

474 """ 

475 HX magnetic field channel time series. 

476 

477 Returns 

478 ------- 

479 ChannelTS or None 

480 Time series data for the HX magnetic field component, 

481 or None if no time series data is loaded 

482 """ 

483 if self.ts_data is not None: 

484 return timeseries.ChannelTS( 

485 channel_type="magnetic", 

486 data=self.ts_data.hx.to_numpy(), 

487 channel_metadata=self.hx_metadata, 

488 run_metadata=self.run_metadata, 

489 station_metadata=self.station_metadata, 

490 channel_response=self.get_channel_response("hx"), 

491 ) 

492 return None 

493 

494 @property 

495 def hy_metadata(self) -> Optional[Magnetic]: 

496 """ 

497 Metadata for the HY magnetic field channel. 

498 

499 Returns 

500 ------- 

501 Magnetic or None 

502 Magnetic field metadata object for the HY channel, 

503 or None if no time series data is loaded 

504 """ 

505 if self.ts_data is not None: 

506 hy_metadata = Magnetic() 

507 hy_metadata.from_dict( 

508 { 

509 "channel_number": 2, 

510 "component": "hy", 

511 "measurement_azimuth": 90, 

512 "measurement_tilt": 0, 

513 "sample_rate": self.sample_rate, 

514 "time_period.start": self.start_time.isoformat(), 

515 "time_period.end": self.end_time.isoformat(), 

516 "type": "magnetic", 

517 "units": "counts", 

518 "sensor.id": self.mag_id, 

519 "sensor.manufacturer": "Barry Narod", 

520 "sensor.type": "fluxgate triaxial magnetometer", 

521 } 

522 ) 

523 return hy_metadata 

524 

525 @property 

526 def hy(self) -> Optional[timeseries.ChannelTS]: 

527 """ 

528 HY magnetic field channel time series. 

529 

530 Returns 

531 ------- 

532 ChannelTS or None 

533 Time series data for the HY magnetic field component, 

534 or None if no time series data is loaded 

535 """ 

536 if self.ts_data is not None: 

537 return timeseries.ChannelTS( 

538 channel_type="magnetic", 

539 data=self.ts_data.hy.to_numpy(), 

540 channel_metadata=self.hy_metadata, 

541 run_metadata=self.run_metadata, 

542 station_metadata=self.station_metadata, 

543 channel_response=self.get_channel_response("hy"), 

544 ) 

545 return None 

546 

547 @property 

548 def hz_metadata(self) -> Optional[Magnetic]: 

549 """ 

550 Metadata for the HZ magnetic field channel. 

551 

552 Returns 

553 ------- 

554 Magnetic or None 

555 Magnetic field metadata object for the HZ channel, 

556 or None if no time series data is loaded 

557 """ 

558 if self.ts_data is not None: 

559 hz_metadata = Magnetic() 

560 hz_metadata.from_dict( 

561 { 

562 "channel_number": 3, 

563 "component": "hz", 

564 "measurement_azimuth": 0, 

565 "measurement_tilt": 0, 

566 "sample_rate": self.sample_rate, 

567 "time_period.start": self.start_time.isoformat(), 

568 "time_period.end": self.end_time.isoformat(), 

569 "type": "magnetic", 

570 "units": "counts", 

571 "sensor.id": self.mag_id, 

572 "sensor.manufacturer": "Barry Narod", 

573 "sensor.type": "fluxgate triaxial magnetometer", 

574 } 

575 ) 

576 return hz_metadata 

577 

578 @property 

579 def hz(self) -> Optional[timeseries.ChannelTS]: 

580 """ 

581 HZ magnetic field channel time series. 

582 

583 Returns 

584 ------- 

585 ChannelTS or None 

586 Time series data for the HZ magnetic field component, 

587 or None if no time series data is loaded 

588 """ 

589 """HZ""" 

590 if self.ts_data is not None: 

591 return timeseries.ChannelTS( 

592 channel_type="magnetic", 

593 data=self.ts_data.hz.to_numpy(), 

594 channel_metadata=self.hz_metadata, 

595 run_metadata=self.run_metadata, 

596 station_metadata=self.station_metadata, 

597 channel_response=self.get_channel_response("hz"), 

598 ) 

599 return None 

600 

601 @property 

602 def ex_metadata(self) -> Optional[Electric]: 

603 """ 

604 Metadata for the EX electric field channel. 

605 

606 Returns 

607 ------- 

608 Electric or None 

609 Electric field metadata object for the EX channel, 

610 or None if no time series data is loaded 

611 """ 

612 if self.ts_data is not None: 

613 ex_metadata = Electric() 

614 ex_metadata.from_dict( 

615 { 

616 "channel_number": 4, 

617 "component": "ex", 

618 "measurement_azimuth": self.ex_azimuth, 

619 "measurement_tilt": 0, 

620 "sample_rate": self.sample_rate, 

621 "dipole_length": self.ex_length, 

622 "time_period.start": self.start_time.isoformat(), 

623 "time_period.end": self.end_time.isoformat(), 

624 "type": "electric", 

625 "units": "counts", 

626 "negative.id": self.s_electrode_id, 

627 "positive.id": self.n_electrode_id, 

628 } 

629 ) 

630 

631 return ex_metadata 

632 

633 @property 

634 def ex(self) -> Optional[timeseries.ChannelTS]: 

635 """ 

636 EX electric field channel time series. 

637 

638 Returns 

639 ------- 

640 ChannelTS or None 

641 Time series data for the EX electric field component, 

642 or None if no time series data is loaded 

643 """ 

644 if self.ts_data is not None: 

645 return timeseries.ChannelTS( 

646 channel_type="electric", 

647 data=self.ts_data.ex.to_numpy(), 

648 channel_metadata=self.ex_metadata, 

649 run_metadata=self.run_metadata, 

650 station_metadata=self.station_metadata, 

651 channel_response=self.get_channel_response("ex", self.ex_length), 

652 ) 

653 return None 

654 

655 @property 

656 def ey_metadata(self) -> Optional[Electric]: 

657 """ 

658 Metadata for the EY electric field channel. 

659 

660 Returns 

661 ------- 

662 Electric or None 

663 Electric field metadata object for the EY channel, 

664 or None if no time series data is loaded 

665 """ 

666 if self.ts_data is not None: 

667 ey_metadata = Electric() 

668 ey_metadata.from_dict( 

669 { 

670 "channel_number": 5, 

671 "component": "ey", 

672 "measurement_azimuth": self.ey_azimuth, 

673 "measurement_tilt": 0, 

674 "sample_rate": self.sample_rate, 

675 "dipole_length": self.ey_length, 

676 "time_period.start": self.start_time.isoformat(), 

677 "time_period.end": self.end_time.isoformat(), 

678 "type": "electric", 

679 "units": "counts", 

680 "negative.id": self.w_electrode_id, 

681 "positive.id": self.e_electrode_id, 

682 } 

683 ) 

684 

685 return ey_metadata 

686 

687 @property 

688 def ey(self) -> Optional[timeseries.ChannelTS]: 

689 """ 

690 EY electric field channel time series. 

691 

692 Returns 

693 ------- 

694 ChannelTS or None 

695 Time series data for the EY electric field component, 

696 or None if no time series data is loaded 

697 """ 

698 if self.ts_data is not None: 

699 return timeseries.ChannelTS( 

700 channel_type="electric", 

701 data=self.ts_data.ey.to_numpy(), 

702 channel_metadata=self.ey_metadata, 

703 run_metadata=self.run_metadata, 

704 station_metadata=self.station_metadata, 

705 channel_response=self.get_channel_response("ey", self.ey_length), 

706 ) 

707 return None 

708 

709 @property 

710 def run_metadata(self) -> Optional[Run]: 

711 """ 

712 Run metadata for the NIMS data collection. 

713 

714 Returns 

715 ------- 

716 Run or None 

717 MT run metadata including data logger information, timing, 

718 and channel metadata, or None if no time series data is loaded 

719 """ 

720 

721 if self.ts_data is not None: 

722 run_metadata = Run() 

723 run_metadata.from_dict( 

724 { 

725 "Run": { 

726 "comments": self.comments, 

727 "data_logger.firmware.author": "B. Narod", 

728 "data_logger.firmware.name": "nims", 

729 "data_logger.firmware.version": "1.0", 

730 "data_logger.manufacturer": "Narod", 

731 "data_logger.model": self.box_id, 

732 "data_logger.id": self.box_id, 

733 "data_logger.type": "long period", 

734 "id": self.run_id, 

735 "data_type": "LPMT", 

736 "sample_rate": self.sample_rate, 

737 "time_period.end": self.end_time.isoformat(), 

738 "time_period.start": self.start_time.isoformat(), 

739 } 

740 } 

741 ) 

742 for comp in ["hx", "hy", "hz", "ex", "ey"]: 

743 run_metadata.channels.append(getattr(self, f"{comp}_metadata")) 

744 return run_metadata 

745 return None 

746 

747 @property 

748 def station_metadata(self) -> Optional[Station]: 

749 """ 

750 Station metadata from NIMS file. 

751 

752 Returns 

753 ------- 

754 Station or None 

755 MT station metadata including geographic information and location data, 

756 or None if no time series data is loaded 

757 """ 

758 if self.ts_data is not None: 

759 station_metadata = Station() 

760 if self.run_id is None: 

761 self.run_id = f"001" 

762 station_metadata.from_dict( 

763 { 

764 "Station": { 

765 "geographic_name": f"{self.site_name}, {self.state_province}, {self.country}", 

766 "location.declination.value": self.declination, 

767 "location.elevation": self.elevation, 

768 "location.latitude": self.latitude, 

769 "location.longitude": self.longitude, 

770 "id": self.run_id[0:-1], 

771 "orientation.reference_frame": "geomagnetic", 

772 } 

773 } 

774 ) 

775 station_metadata.runs.append(self.run_metadata) 

776 return station_metadata 

777 return None 

778 

779 def to_runts(self, calibrate: bool = False) -> Optional[timeseries.RunTS]: 

780 """ 

781 Get xarray RunTS object for the NIMS data. 

782 

783 Parameters 

784 ---------- 

785 calibrate : bool, optional 

786 Whether to apply calibration to the data, by default False 

787 

788 Returns 

789 ------- 

790 RunTS or None 

791 Time series run object containing all channels and metadata, 

792 or None if no time series data is loaded 

793 

794 Notes 

795 ----- 

796 Includes all magnetic field channels (hx, hy, hz), electric field 

797 channels (ex, ey), and box temperature data. 

798 """ 

799 

800 if self.ts_data is not None: 

801 run = timeseries.RunTS( 

802 array_list=[ 

803 self.hx, 

804 self.hy, 

805 self.hz, 

806 self.ex, 

807 self.ey, 

808 self.box_temperature, 

809 ], 

810 run_metadata=self.run_metadata, 

811 station_metadata=self.station_metadata, 

812 ) 

813 if calibrate: 

814 return run.calibrate() 

815 else: 

816 return run 

817 return None 

818 

819 def _make_index_values(self) -> np.ndarray: 

820 """ 

821 Create index values for the recorded channels. 

822 

823 Returns 

824 ------- 

825 ndarray 

826 Array of shape (8, 5) containing byte indices for extracting 

827 magnetic (first 3 columns) and electric (last 2 columns) 

828 channel data from each 8-sample block 

829 

830 Notes 

831 ----- 

832 Creates an array of index values for magnetics and electrics. 

833 Each row corresponds to one sample within an 8-sample block. 

834 Columns 0-2 are for magnetic channels (hx, hy, hz). 

835 Columns 3-4 are for electric channels (ex, ey). 

836 """ 

837 ### make an array of index values for magnetics and electrics 

838 indices = np.zeros((8, 5), dtype=int) 

839 for kk in range(8): 

840 ### magnetic blocks 

841 for ii in range(3): 

842 indices[kk, ii] = 9 + (kk) * 9 + (ii) * 3 

843 ### electric blocks 

844 for ii in range(2): 

845 indices[kk, 3 + ii] = 82 + (kk) * 6 + (ii) * 3 

846 return indices 

847 

848 def _get_gps_string_list( 

849 self, nims_string: bytes 

850 ) -> tuple[list[float], list[bytes]]: 

851 """ 

852 Extract GPS strings from raw NIMS binary data. 

853 

854 This method takes the 3rd byte in each block, concatenates into a long 

855 string and creates a list by splitting by '$'. The index values of 

856 where the '$' characters are found are also calculated. 

857 

858 Parameters 

859 ---------- 

860 nims_string : bytes 

861 Raw binary string output by NIMS 

862 

863 Returns 

864 ------- 

865 list of float 

866 List of index values associated with the location of the '$' characters 

867 list of bytes 

868 List of possible raw GPS strings split by '$' 

869 

870 Notes 

871 ----- 

872 This assumes that there are an even amount of data blocks. 

873 This might be a bad assumption in some cases. 

874 """ 

875 ### get index values of $ and gps_strings 

876 index_values = [] 

877 gps_str_list = [] 

878 for ii in range(int(len(nims_string) / self.block_size)): 

879 index = ii * self.block_size + 3 

880 g_char = struct.unpack("c", nims_string[index : index + 1])[0] 

881 if g_char == b"$": 

882 index_values.append((index - 3) / self.block_size) 

883 gps_str_list.append(g_char) 

884 gps_raw_stamp_list = b"".join(gps_str_list).split(b"$") 

885 return index_values, gps_raw_stamp_list 

886 

887 def get_stamps(self, nims_string: bytes) -> list[tuple[Any, list[GPS]]]: 

888 """ 

889 Extract and parse valid GPS strings, matching GPRMC with GPGGA stamps. 

890 

891 Parameters 

892 ---------- 

893 nims_string : bytes 

894 Raw GPS binary string output by NIMS 

895 

896 Returns 

897 ------- 

898 list of tuple 

899 List of matched GPS stamps where each element is a tuple containing 

900 index and list of GPS objects [GPRMC, GPGGA] (or just [GPRMC]) 

901 

902 Notes 

903 ----- 

904 Skips the first entry as it tends to be incomplete. Attempts to match 

905 synchronous GPRMC with GPGGA stamps when possible. 

906 """ 

907 ### read in GPS strings into a list to be parsed later 

908 index_list, gps_raw_stamp_list = self._get_gps_string_list(nims_string) 

909 

910 gprmc_list = [] 

911 gpgga_list = [] 

912 ### note we are skipping the first entry, it tends to be not 

913 ### complete anyway 

914 for ii, index, raw_stamp in zip( 

915 range(len(index_list)), index_list, gps_raw_stamp_list[1:] 

916 ): 

917 gps_obj = GPS(raw_stamp, index) 

918 if gps_obj.valid: 

919 if gps_obj.gps_type == "GPRMC": 

920 gprmc_list.append(gps_obj) 

921 elif gps_obj.gps_type == "GPGGA": 

922 gpgga_list.append(gps_obj) 

923 else: 

924 self.logger.debug(f"GPS Error: file index {index}, stamp number {ii}") 

925 max_len = min([len(raw_stamp), 15]) 

926 self.logger.debug(f"GPS Raw Stamp: {raw_stamp[0:max_len]}") 

927 return self._gps_match_gprmc_gpgga_strings(gprmc_list, gpgga_list) 

928 

929 def _gps_match_gprmc_gpgga_strings(self, gprmc_list, gpgga_list): 

930 """ 

931 match GPRMC and GPGGA strings together into a list 

932 

933 [[GPRMC, GPGGA], ...] 

934 

935 :param list gprmc_list: list of GPS objects for the GPRMC stamps 

936 :param list gpgga_list: list of GPS objects for the GPGGA stamps 

937 

938 :returns: list of matched GPRMC and GPGGA stamps 

939 

940 """ 

941 ### match up the GPRMC and GPGGA together 

942 gps_match_list = [] 

943 for gprmc in gprmc_list: 

944 find = False 

945 for ii, gpgga in enumerate(gpgga_list): 

946 if gprmc.time_stamp.time() == gpgga.time_stamp.time(): 

947 gps_match_list.append([gprmc, gpgga]) 

948 find = True 

949 del gpgga_list[ii] 

950 break 

951 if not find: 

952 gps_match_list.append([gprmc]) 

953 return gps_match_list 

954 

955 def _get_gps_stamp_indices_from_status(self, status_array): 

956 """ 

957 get the index location of the stamps from the status array assuming 

958 that 0 indicates GPS lock. 

959 

960 :param :class:`np.ndarray` status_array: an array of status values from data blocks 

961 

962 :returns: array of index values where GPS lock was acquired ignoring 

963 sequential locks. 

964 """ 

965 

966 index_values = np.where(status_array == 0)[0] 

967 status_index = np.zeros_like(index_values) 

968 for ii in range(index_values.size): 

969 if index_values[ii] - index_values[ii - 1] == 1: 

970 continue 

971 else: 

972 status_index[ii] = index_values[ii] 

973 status_index = status_index[np.nonzero(status_index)] 

974 

975 return status_index 

976 

977 def match_status_with_gps_stamps(self, status_array, gps_list): 

978 """ 

979 Match the index values from the status array with the index values of 

980 the GPS stamps. There appears to be a bit of wiggle room between when the 

981 lock is recorded and the stamp was actually recorded. This is typically 1 

982 second and sometimes 2. 

983 

984 :param array status_array: array of status values from each data block 

985 :param list gps_list: list of valid GPS stamps [[GPRMC, GPGGA], ...] 

986 

987 .. note:: I think there is a 2 second gap between the lock and the 

988 first stamp character. 

989 """ 

990 

991 stamp_indices = self._get_gps_stamp_indices_from_status(status_array) 

992 gps_stamps = [] 

993 for index in stamp_indices: 

994 stamp_find = False 

995 for ii, stamps in enumerate(gps_list): 

996 index_diff = stamps[0].index - index 

997 ### check the index value, should be 2 or 74, if it is off by 

998 ### a value left or right apply a correction. 

999 if index_diff == 1 or index_diff == 73: 

1000 index += 1 

1001 stamps[0].index += 1 

1002 elif index_diff == 2 or index_diff == 74: 

1003 index = index 

1004 elif index_diff == 3 or index_diff == 75: 

1005 index -= 1 

1006 stamps[0].index -= 1 

1007 elif index_diff == 4 or index_diff == 76: 

1008 index -= 2 

1009 stamps[0].index -= 2 

1010 if stamps[0].gps_type in ["GPRMC", "gprmc"]: 

1011 if index_diff in [1, 2, 3, 4]: 

1012 gps_stamps.append((index, stamps)) 

1013 stamp_find = True 

1014 del gps_list[ii] 

1015 break 

1016 elif stamps[0].gps_type in ["GPGGA", "gpgga"]: 

1017 if index_diff in [73, 74, 75, 76]: 

1018 gps_stamps.append((index, stamps)) 

1019 stamp_find = True 

1020 del gps_list[ii] 

1021 break 

1022 if not stamp_find: 

1023 self.logger.debug(f"GPS Error: No good GPS stamp at {index} seconds") 

1024 return gps_stamps 

1025 

1026 def find_sequence( 

1027 self, data_array: np.ndarray, block_sequence: Optional[list[int]] = None 

1028 ) -> np.ndarray: 

1029 """ 

1030 Find a sequence pattern in the data array. 

1031 

1032 Parameters 

1033 ---------- 

1034 data_array : ndarray 

1035 Array of the data with shape [n, m] where n is the number of 

1036 seconds recorded and m is the block length for a given sampling rate 

1037 block_sequence : list of int, optional 

1038 Sequence pattern to locate, by default [1, 131] (start of data block) 

1039 

1040 Returns 

1041 ------- 

1042 ndarray 

1043 Array of index locations where the sequence is found 

1044 

1045 Notes 

1046 ----- 

1047 Uses numpy rolling and comparison to find all occurrences of the 

1048 specified sequence pattern in the data array. 

1049 """ 

1050 if block_sequence is not None: 

1051 self.block_sequence = block_sequence 

1052 # want to find the index there the test data is equal to the test sequence 

1053 t = np.vstack( 

1054 [ 

1055 np.roll(data_array, shift) 

1056 for shift in -np.arange(len(self.block_sequence)) 

1057 ] 

1058 ).T 

1059 return np.where(np.all(t == self.block_sequence, axis=1))[0] 

1060 

1061 def unwrap_sequence(self, sequence: np.ndarray) -> np.ndarray: 

1062 """ 

1063 Unwrap sequence to sequential numbers instead of modulo 256. 

1064 

1065 Parameters 

1066 ---------- 

1067 sequence : ndarray 

1068 Sequence of byte numbers (0-255) to unwrap 

1069 

1070 Returns 

1071 ------- 

1072 ndarray 

1073 Unwrapped sequence with first number set to 0 and subsequent 

1074 values forming a continuous count 

1075 

1076 Notes 

1077 ----- 

1078 Handles the fact that sequence numbers are stored as single bytes 

1079 (0-255) but represent a continuous count. When a value of 255 is 

1080 encountered, the next rollover is anticipated. 

1081 """ 

1082 count = 0 

1083 unwrapped = np.zeros_like(sequence) 

1084 for ii, seq in enumerate(sequence): 

1085 unwrapped[ii] = seq + count * 256 

1086 if seq == 255: 

1087 count += 1 

1088 unwrapped -= unwrapped[0] 

1089 

1090 return unwrapped 

1091 

1092 def _locate_duplicate_blocks(self, sequence): 

1093 """ 

1094 locate the sequence number where the duplicates exist 

1095 

1096 :param list sequence: sequence to match duplicate numbers. 

1097 :returns: list of duplicate index values. 

1098 """ 

1099 

1100 duplicates = np.where(np.abs(np.diff(sequence)) == 0)[0] 

1101 if len(duplicates) == 0: 

1102 return None 

1103 duplicate_list = [] 

1104 for dup in duplicates: 

1105 dup_dict = {} 

1106 dup_dict["sequence_index"] = dup 

1107 dup_dict["ts_index_0"] = dup * self.sample_rate 

1108 dup_dict["ts_index_1"] = dup * self.sample_rate + self.sample_rate 

1109 dup_dict["ts_index_2"] = (dup + 1) * self.sample_rate 

1110 dup_dict["ts_index_3"] = (dup + 1) * self.sample_rate + self.sample_rate 

1111 duplicate_list.append(dup_dict) 

1112 return duplicate_list 

1113 

1114 def _check_duplicate_blocks(self, block_01, block_02, info_01, info_02): 

1115 """ 

1116 make sure the blocks are truly duplicates 

1117 

1118 :param np.array block_01: block of data to compare 

1119 :param np.array block_02: block of data to compare 

1120 :param np.array info_01: information array from info_array[sequence_index] 

1121 :param np.array info_02: information array from info_array[sequence_index] 

1122 

1123 :returns: boolean if the blocks and information match 

1124 

1125 """ 

1126 if np.array_equal(block_01, block_02): 

1127 if np.array_equal(info_01, info_02): 

1128 return True 

1129 else: 

1130 return False 

1131 else: 

1132 return False 

1133 

1134 def remove_duplicates(self, info_array, data_array): 

1135 """ 

1136 remove duplicate blocks, removing the first duplicate as suggested by 

1137 Paul and Anna. Checks to make sure that the mag data are identical for 

1138 the duplicate blocks. Removes the blocks from the information and 

1139 data arrays and returns the reduced arrays. This should sync up the 

1140 timing of GPS stamps and index values. 

1141 

1142 :param np.array info_array: structured array of block information 

1143 :param np.array data_array: structured array of the data 

1144 

1145 :returns: reduced information array 

1146 :returns: reduced data array 

1147 :returns: index of duplicates in raw data 

1148 

1149 """ 

1150 ### locate 

1151 duplicate_test_list = self._locate_duplicate_blocks(self.info_array["sequence"]) 

1152 if duplicate_test_list is None: 

1153 return info_array, data_array, None 

1154 duplicate_list = [] 

1155 for d in duplicate_test_list: 

1156 if self._check_duplicate_blocks( 

1157 data_array[d["ts_index_0"] : d["ts_index_1"]], 

1158 data_array[d["ts_index_2"] : d["ts_index_3"]], 

1159 info_array[d["sequence_index"]], 

1160 info_array[d["sequence_index"] + 1], 

1161 ): 

1162 duplicate_list.append(d) 

1163 

1164 if len(duplicate_list) == 0: 

1165 self.logger.info(f"Duplicate block count is zero") 

1166 return info_array, data_array, None 

1167 self.logger.debug(f"Deleting {len(duplicate_list)} duplicate blocks") 

1168 ### get the index of the blocks to be removed, namely the 1st duplicate 

1169 ### block 

1170 remove_sequence_index = [d["sequence_index"] for d in duplicate_list] 

1171 remove_data_index = np.concatenate( 

1172 [np.arange(d["ts_index_0"], d["ts_index_1"]) for d in duplicate_list] 

1173 ).astype(int) 

1174 

1175 ### remove the data 

1176 return_info_array = np.delete(info_array, remove_sequence_index) 

1177 return_data_array = np.delete(data_array, remove_data_index) 

1178 

1179 ### set sequence to be monotonic 

1180 return_info_array["sequence"][:] = np.arange(return_info_array.shape[0]) 

1181 

1182 return return_info_array, return_data_array, duplicate_list 

1183 

1184 def read_nims(self, fn: Optional[Union[str, Path]] = None) -> None: 

1185 """ 

1186 Read NIMS DATA.BIN file and parse all data. 

1187 

1188 This method performs the complete data reading and processing workflow: 

1189 

1190 1. Read header information and store as attributes 

1191 2. Locate data block beginning by finding first [1, 131, ...] sequence 

1192 3. Ensure data is multiple of block length, trim excess bits 

1193 4. Extract GPS data (3rd byte of each block) and parse GPS stamps 

1194 5. Read data as unsigned 8-bit integers, reshape to [N, block_length] 

1195 6. Remove duplicate blocks (first of each duplicate pair) 

1196 7. Match GPS status locks with valid GPS stamps 

1197 8. Verify timing between first/last GPS stamps, trim excess seconds 

1198 

1199 Parameters 

1200 ---------- 

1201 fn : str or Path, optional 

1202 Path to NIMS DATA.BIN file. Uses self.fn if not provided. 

1203 

1204 Notes 

1205 ----- 

1206 The data and information arrays returned have duplicates removed 

1207 and sequence reset to be monotonic. Extra seconds due to timing 

1208 gaps are trimmed from the end of the time series. 

1209 

1210 Examples 

1211 -------- 

1212 >>> from mth5.io import nims 

1213 >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin") 

1214 >>> n.read_nims() 

1215 """ 

1216 

1217 if fn is not None: 

1218 self.fn = fn 

1219 st = datetime.datetime.now() 

1220 ### read in header information and get the location of end of header 

1221 self.read_header(self.fn) 

1222 

1223 ### load in the entire file, its not too big, start from the 

1224 ### end of the header information. 

1225 with open(self.fn, "rb") as fid: 

1226 fid.seek(self.data_start_seek) 

1227 self._raw_string = fid.read() 

1228 ### read in full string as unsigned integers 

1229 data = np.frombuffer(self._raw_string, dtype=np.uint8) 

1230 

1231 ### need to make sure that the data starts with a full block 

1232 find_first = self.find_sequence(data[0 : self.block_size * 10])[0] 

1233 data = data[find_first:] 

1234 

1235 ### get GPS stamps from the binary string first 

1236 self.gps_list = self.get_stamps(self._raw_string[find_first:]) 

1237 

1238 ### check the size of the data, should have an equal amount of blocks 

1239 if (data.size % self.block_size) != 0: 

1240 self.logger.warning( 

1241 f"odd number of bytes {data.size}, not even blocks " 

1242 f"cutting down the data by {data.size % self.block_size} bits" 

1243 ) 

1244 end_data = data.size - (data.size % self.block_size) 

1245 data = data[0:end_data] 

1246 # resized the data into an even amount of blocks 

1247 data = data.reshape((int(data.size / self.block_size), self.block_size)) 

1248 

1249 ### need to parse the data 

1250 ### first get the status information 

1251 self.info_array = np.zeros( 

1252 data.shape[0], 

1253 dtype=[ 

1254 ("soh", int), 

1255 ("block_len", int), 

1256 ("status", int), 

1257 ("gps", int), 

1258 ("sequence", int), 

1259 ("box_temp", float), 

1260 ("head_temp", float), 

1261 ("logic", int), 

1262 ("end", int), 

1263 ], 

1264 ) 

1265 

1266 for key, index in self._block_dict.items(): 

1267 if "temp" in key: 

1268 # compute temperature - cast to int32 to avoid uint8 overflow 

1269 t_value = data[:, index[0]].astype(np.int32) * 256 + data[ 

1270 :, index[1] 

1271 ].astype(np.int32) 

1272 

1273 # something to do with the bits where you have to subtract 

1274 t_value[np.where(t_value > 32768)] -= 65536 

1275 value = (t_value - self.t_offset) / self.t_conversion_factor 

1276 else: 

1277 value = data[:, index] 

1278 self.info_array[key][:] = value 

1279 ### unwrap sequence 

1280 self.info_array["sequence"] = self.unwrap_sequence(self.info_array["sequence"]) 

1281 

1282 ### get data 

1283 data_array = np.zeros( 

1284 data.shape[0] * self.sample_rate, 

1285 dtype=[ 

1286 ("hx", float), 

1287 ("hy", float), 

1288 ("hz", float), 

1289 ("ex", float), 

1290 ("ey", float), 

1291 ], 

1292 ) 

1293 

1294 ### fill the data 

1295 for cc, comp in enumerate(["hx", "hy", "hz", "ex", "ey"]): 

1296 channel_arr = np.zeros((data.shape[0], 8), dtype=float) 

1297 for kk in range(self.sample_rate): 

1298 index = self.indices[kk, cc] 

1299 # Cast to int32 to avoid uint8 overflow 

1300 value = ( 

1301 data[:, index].astype(np.int32) * 256 

1302 + data[:, index + 1].astype(np.int32) 

1303 ) * np.array([256]) + data[:, index + 2].astype(np.int32) 

1304 value[np.where(value > self._int_max)] -= self._int_factor 

1305 channel_arr[:, kk] = value 

1306 data_array[comp][:] = channel_arr.flatten() 

1307 ### clean things up 

1308 ### I guess that the E channels are opposite phase? 

1309 for comp in ["ex", "ey"]: 

1310 data_array[comp] *= -1 

1311 ### remove duplicates 

1312 ( 

1313 self.info_array, 

1314 data_array, 

1315 self.duplicate_list, 

1316 ) = self.remove_duplicates(self.info_array, data_array) 

1317 ### get GPS stamps with index values 

1318 self.stamps = self.match_status_with_gps_stamps( 

1319 self.info_array["status"], self.gps_list 

1320 ) 

1321 ### align data checking for timing gaps 

1322 self.ts_data = self.align_data(data_array, self.stamps) 

1323 

1324 et = datetime.datetime.now() 

1325 read_time = (et - st).total_seconds() 

1326 self.logger.debug(f"Reading took {read_time:.2f} seconds") 

1327 

1328 def _get_first_gps_stamp(self, stamps): 

1329 """ 

1330 get the first GPRMC stamp 

1331 """ 

1332 for stamp in stamps: 

1333 if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: 

1334 return stamp 

1335 return None 

1336 

1337 def _get_last_gps_stamp(self, stamps): 

1338 """ 

1339 get the last gprmc stamp 

1340 """ 

1341 for stamp in stamps[::-1]: 

1342 if stamp[1][0].gps_type in ["gprmc", "GPRMC"]: 

1343 return stamp 

1344 return None 

1345 

1346 def _locate_timing_gaps(self, stamps): 

1347 """ 

1348 locate timing gaps in the data by comparing the stamp index with the 

1349 GPS time stamp. The number of points and seconds should be the same 

1350 

1351 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] 

1352 

1353 :returns: list of gap index values 

1354 """ 

1355 stamp_01 = self._get_first_gps_stamp(stamps)[1][0] 

1356 # current_gap = 0 

1357 current_stamp = stamp_01 

1358 gap_beginning = [] 

1359 total_gap = 0 

1360 for ii, stamp in enumerate(stamps[1:], 1): 

1361 stamp = stamp[1][0] 

1362 # can only compare those with a date and time. 

1363 if stamp.gps_type == "GPGGA": 

1364 continue 

1365 time_diff = (stamp.time_stamp - current_stamp.time_stamp).total_seconds() 

1366 index_diff = stamp.index - current_stamp.index 

1367 

1368 time_gap = index_diff - time_diff 

1369 if time_gap == 0: 

1370 continue 

1371 elif time_gap > 0: 

1372 total_gap += time_gap 

1373 current_stamp = stamp 

1374 gap_beginning.append(stamp.index) 

1375 self.logger.debug( 

1376 f"GPS tamp at {stamp.time_stamp.isoformat()} is off " 

1377 f"from previous time by { time_gap} seconds" 

1378 ) 

1379 self.logger.warning(f"Timing is off by {total_gap} seconds") 

1380 return gap_beginning 

1381 

1382 def check_timing(self, stamps): 

1383 """ 

1384 make sure that there are the correct number of seconds in between 

1385 the first and last GPS GPRMC stamps 

1386 

1387 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] 

1388 

1389 :returns: [ True | False ] if data is valid or not. 

1390 :returns: gap index locations 

1391 

1392 .. note:: currently it is assumed that if a data gap occurs the data can be 

1393 squeezed to remove them. Probably a more elegant way of doing it. 

1394 """ 

1395 gaps = None 

1396 first_stamp = self._get_first_gps_stamp(stamps)[1][0] 

1397 last_stamp = self._get_last_gps_stamp(stamps)[1][0] 

1398 

1399 time_diff = last_stamp.time_stamp - first_stamp.time_stamp 

1400 index_diff = last_stamp.index - first_stamp.index 

1401 

1402 difference = index_diff - time_diff.total_seconds() 

1403 if difference != 0: 

1404 gaps = self._locate_timing_gaps(stamps) 

1405 return False, gaps, difference 

1406 return True, gaps, difference 

1407 

1408 def align_data(self, data_array, stamps): 

1409 """ 

1410 Need to match up the first good GPS stamp with the data 

1411 

1412 Do this by using the first GPS stamp and assuming that the time from 

1413 the first time stamp to the start is the index value. 

1414 

1415 put the data into a pandas data frame that is indexed by time 

1416 

1417 :param array data_array: structure array with columns for each 

1418 component [hx, hy, hz, ex, ey] 

1419 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]] 

1420 

1421 :returns: pandas DataFrame with colums of components and indexed by 

1422 time initialized by the start time. 

1423 

1424 .. note:: Data gaps are squeezed cause not sure what a gap actually means. 

1425 """ 

1426 ### check timing first to make sure there is no drift 

1427 timing_valid, self.gaps, time_difference = self.check_timing(stamps) 

1428 

1429 ### first GPS stamp within the data is at a given index that is 

1430 ### assumed to be the number of seconds from the start of the run. 

1431 ### therefore make the start time the first GPS stamp time minus 

1432 ### the index value for that stamp. 

1433 ### need to be sure that the first GPS stamp has a date, need GPRMC 

1434 first_stamp = self._get_first_gps_stamp(stamps) 

1435 first_index = first_stamp[0] 

1436 start_time = first_stamp[1][0].time_stamp - datetime.timedelta( 

1437 seconds=int(first_index) 

1438 ) 

1439 

1440 dt_index = self.make_dt_index( 

1441 start_time.isoformat(), 

1442 self.sample_rate, 

1443 n_samples=data_array.shape[0], 

1444 ) 

1445 

1446 return pd.DataFrame(data_array, index=dt_index) 

1447 

1448 def make_dt_index( 

1449 self, 

1450 start_time: str, 

1451 sample_rate: float, 

1452 stop_time: Optional[str] = None, 

1453 n_samples: Optional[int] = None, 

1454 ) -> pd.DatetimeIndex: 

1455 """ 

1456 Create datetime index array for time series data. 

1457 

1458 Parameters 

1459 ---------- 

1460 start_time : str 

1461 Start time in format YYYY-MM-DDThh:mm:ss.ms UTC 

1462 sample_rate : float 

1463 Sample rate in samples/second 

1464 stop_time : str, optional 

1465 End time in same format as start_time 

1466 n_samples : int, optional 

1467 Number of samples to generate 

1468 

1469 Returns 

1470 ------- 

1471 DatetimeIndex 

1472 Pandas datetime index with UTC timezone 

1473 

1474 Notes 

1475 ----- 

1476 Either stop_time or n_samples must be provided. The datetime format 

1477 should be YYYY-MM-DDThh:mm:ss.ms UTC. 

1478 

1479 Raises 

1480 ------ 

1481 ValueError 

1482 If neither stop_time nor n_samples is provided 

1483 """ 

1484 

1485 # set the index to be UTC time 

1486 dt_freq = "{0:.0f}ns".format(1.0 / (sample_rate) * 1e9) 

1487 if stop_time is not None: 

1488 dt_index = pd.date_range( 

1489 start=start_time, 

1490 end=stop_time, 

1491 freq=dt_freq, 

1492 closed="left", 

1493 tz="UTC", 

1494 ) 

1495 elif n_samples is not None: 

1496 dt_index = pd.date_range( 

1497 start=start_time, periods=n_samples, freq=dt_freq, tz="UTC" 

1498 ) 

1499 else: 

1500 raise ValueError("Need to input either stop_time or n_samples") 

1501 return dt_index 

1502 

1503 

1504# ============================================================================= 

1505# convenience read 

1506# ============================================================================= 

1507def read_nims(fn: Union[str, Path]) -> Optional[timeseries.RunTS]: 

1508 """ 

1509 Convenience function to read a NIMS DATA.BIN file. 

1510 

1511 Parameters 

1512 ---------- 

1513 fn : str or Path 

1514 Path to the NIMS DATA.BIN file 

1515 

1516 Returns 

1517 ------- 

1518 RunTS or None 

1519 Time series run object containing all channels and metadata, 

1520 or None if reading fails 

1521 

1522 Examples 

1523 -------- 

1524 >>> from mth5.io.nims import nims 

1525 >>> run_ts = nims.read_nims("/path/to/data.bin") 

1526 """ 

1527 

1528 nims_obj = NIMS(fn) 

1529 nims_obj.read_nims() 

1530 

1531 return nims_obj.to_runts()