Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ nims \ nims.py: 81%
459 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3===============
4NIMS
5===============
7 * deals with reading in NIMS DATA.BIN files
9 This is a translation from Matlab codes written and edited by:
10 * Anna Kelbert
11 * Paul Bedrosian
12 * Esteban Bowles-Martinez
13 * Possibly others.
15 I've tested it against a version, and it matches. The data/GPS gaps I
16 still don't understand so for now the time series is just
17 made continuous and the number of missing seconds is clipped from the
18 end of the time series.
20 .. note:: this only works for 8Hz data for now
23:copyright:
24 Jared Peacock (jpeacock@usgs.gov)
26:license:
27 MIT
28"""
30from __future__ import annotations
32import datetime
33import struct
34from pathlib import Path
35from typing import Any, Optional, Union
37# =============================================================================
38# Imports
39# =============================================================================
40import numpy as np
41import pandas as pd
42from mt_metadata.common.mttime import MTime
43from mt_metadata.timeseries import Auxiliary, Electric, Magnetic, Run, Station
45from mth5 import timeseries
46from mth5.io.nims.gps import GPS
47from mth5.io.nims.header import NIMSHeader
48from mth5.io.nims.response_filters import Response
51# =============================================================================
52# Exceptions
53# =============================================================================
56class NIMS(NIMSHeader):
57 """
58 NIMS Class for reading NIMS DATA.BIN files.
60 A fast way to read the binary files are to first read in the GPS strings,
61 the third byte in each block as a character and parse that into valid
62 GPS stamps.
64 Then read in the entire data set as unsigned 8 bit integers and reshape
65 the data to be n seconds x block size. Then parse that array into the
66 status information and data.
68 Parameters
69 ----------
70 fn : str or Path, optional
71 Path to the NIMS DATA.BIN file to read, by default None
73 Attributes
74 ----------
75 block_size : int
76 Size of data blocks (default 131 for 8 Hz data)
77 block_sequence : list of int
78 Sequence pattern to locate [1, 131]
79 sample_rate : int
80 Sample rate in samples/second (default 8)
81 e_conversion_factor : float
82 Electric field conversion factor
83 h_conversion_factor : float
84 Magnetic field conversion factor
85 t_conversion_factor : float
86 Temperature conversion factor
87 t_offset : int
88 Temperature offset value
89 info_array : ndarray or None
90 Structured array of block information
91 stamps : list or None
92 List of valid GPS stamps
93 ts_data : DataFrame or None
94 Time series data as pandas DataFrame
95 gaps : list or None
96 List of timing gaps found in data
97 duplicate_list : list or None
98 List of duplicate blocks found
100 Notes
101 -----
102 I only have a limited amount of .BIN files to test so this will likely
103 break if there are issues such as data gaps. This has been tested against the
104 matlab program loadNIMS by Anna Kelbert and the match for all the .bin files
105 I have. If something looks weird check it against that program.
107 .. todo:: Deal with timing issues, right now a warning is sent to the user
108 need to figure out a way to find where the gap is and adjust
109 accordingly.
111 .. warning::
112 Currently Only 8 Hz data is supported
114 Examples
115 --------
116 >>> from mth5.io.nims import nims
117 >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin")
118 >>> n.read_nims()
119 """
121 def __init__(self, fn: Optional[Union[str, Path]] = None) -> None:
122 super().__init__(fn)
124 # change thes if the sample rate is different
125 self.block_size = 131
126 self.block_sequence = [1, self.block_size]
127 self.sample_rate = 8 ### samples/second
128 self.e_conversion_factor = 2.44141221047903e-06
129 self.h_conversion_factor = 0.01
130 self.t_conversion_factor = 70
131 self.t_offset = 18048
132 self._int_max = 8388608
133 self._int_factor = 16777216
134 self._block_dict = {
135 "soh": 0,
136 "block_len": 1,
137 "status": 2,
138 "gps": 3,
139 "sequence": 4,
140 "box_temp": (5, 6),
141 "head_temp": (7, 8),
142 "logic": 81,
143 "end": 130,
144 }
145 self.info_array = None
146 self.stamps = None
147 self.ts_data = None
148 self.gaps = None
149 self.duplicate_list = None
151 self._raw_string = None
153 self.indices = self._make_index_values()
155 def __str__(self) -> str:
156 """
157 Return string representation of NIMS object.
159 Returns
160 -------
161 str
162 Formatted string with NIMS station information and data summary
163 """
164 lines = [f"NIMS Station: {self.site_name}", "-" * 20]
165 lines.append(f"NIMS ID: {self.box_id}")
166 lines.append(f"magnetometer ID: {self.mag_id}")
167 lines.append(f"operator: {self.operator}")
168 lines.append(f"location: {self.state_province}, {self.country}")
169 lines.append(f"latitude: {self.latitude} (degrees)")
170 lines.append(f"longitude: {self.longitude} (degrees)")
171 lines.append(f"elevation: {self.elevation} m")
172 lines.append(f"gps stamp: {self.header_gps_stamp}")
173 lines.append(f"EX: length = {self.ex_length} m; azimuth = {self.ex_azimuth}")
174 lines.append(f"EY: length = {self.ey_length} m; azimuth = {self.ey_azimuth}")
175 lines.append(f"comments: {self.comments}")
177 if self.has_data():
178 lines.append("")
179 lines.append(f"Start: {self.start_time.isoformat()}")
180 lines.append(f"End: {self.end_time.isoformat()}")
181 lines.append(f"Data shape: {self.ts_data.shape}")
182 lines.append(f"Found {len(self.stamps)} GPS stamps")
183 return "\n".join(lines)
185 def __repr__(self) -> str:
186 """
187 Return string representation for debugging.
189 Returns
190 -------
191 str
192 String representation of the NIMS object
193 """
194 return self.__str__()
196 def has_data(self) -> bool:
197 """
198 Check if the NIMS object contains time series data.
200 Returns
201 -------
202 bool
203 True if ts_data is not None, False otherwise
204 """
205 if self.ts_data is not None:
206 return True
207 return False
209 @property
210 def n_samples(self) -> Optional[int]:
211 """
212 Number of samples in the time series.
214 Returns
215 -------
216 int or None
217 Number of samples if data is loaded, estimated from file size
218 if file exists, None otherwise
219 """
220 if self.has_data():
221 return self.ts_data.shape[0]
222 elif self.fn is not None:
223 return int(self.file_size / 16.375)
225 @property
226 def latitude(self) -> Optional[float]:
227 """
228 Median latitude value from all GPS stamps.
230 Returns
231 -------
232 float or None
233 Median latitude in decimal degrees (WGS84) from GPRMC stamps,
234 or header GPS latitude if no stamps available
236 Notes
237 -----
238 Only uses GPRMC stamps as they should be duplicates of GPGGA stamps
239 but include additional validation.
240 """
241 if self.stamps is not None:
242 latitude = np.zeros(len(self.stamps))
243 for ii, stamp in enumerate(self.stamps):
244 latitude[ii] = stamp[1][0].latitude
245 return np.median(latitude[np.nonzero(latitude)])
246 return self.header_gps_latitude
248 @property
249 def longitude(self) -> Optional[float]:
250 """
251 Median longitude value from all GPS stamps.
253 Returns
254 -------
255 float or None
256 Median longitude in decimal degrees (WGS84) from GPS stamps,
257 or header GPS longitude if no stamps available
259 Notes
260 -----
261 Uses the first stamp within each GPS stamp set.
262 """
263 if self.stamps is not None:
264 longitude = np.zeros(len(self.stamps))
265 for ii, stamp in enumerate(self.stamps):
266 longitude[ii] = stamp[1][0].longitude
267 return np.median(longitude[np.nonzero(longitude)])
268 return self.header_gps_longitude
270 @property
271 def elevation(self) -> Optional[float]:
272 """
273 Median elevation value from all GPS stamps.
275 Returns
276 -------
277 float or None
278 Median elevation in meters (WGS84) from GPS stamps,
279 or header GPS elevation if no stamps available
281 Notes
282 -----
283 Uses the first stamp within each GPS stamp set. For paired stamps
284 (GPRMC/GPGGA), uses the GPGGA elevation if available.
285 """
286 if self.stamps is not None:
287 elevation = np.zeros(len(self.stamps))
288 for ii, stamp in enumerate(self.stamps):
289 if len(stamp[1]) == 1:
290 elev = stamp[1][0].elevation
291 if len(stamp[1]) == 2:
292 elev = stamp[1][1].elevation
293 if elev is None:
294 continue
295 elevation[ii] = elev
296 return np.median(elevation[np.nonzero(elevation)])
297 return self.header_gps_elevation
299 @property
300 def declination(self) -> Optional[float]:
301 """
302 Median magnetic declination value from all GPS stamps.
304 Returns
305 -------
306 float or None
307 Median magnetic declination in decimal degrees from GPRMC stamps,
308 or None if no declination data available
310 Notes
311 -----
312 Only uses GPRMC stamps as they contain declination information.
313 """
314 if self.stamps is not None:
315 declination = np.zeros(len(self.stamps))
316 for ii, stamp in enumerate(self.stamps):
317 if stamp[1][0].gps_type == "GPRMC":
318 dec = stamp[1][0].declination
319 if dec is None:
320 continue
321 declination[ii] = dec
322 return np.median(declination[np.nonzero(declination)])
323 return None
325 @property
326 def start_time(self) -> MTime:
327 """
328 Start time of the time series data.
330 Returns
331 -------
332 MTime
333 Start time derived from the first GPS time stamp index,
334 or header GPS stamp if no time series data available
336 Notes
337 -----
338 The start time is calculated from the first good GPS time stamp
339 minus the seconds to the beginning of the time series.
340 """
341 if self.stamps is not None:
342 return MTime(time_stamp=self.ts_data.index[0])
343 return self.header_gps_stamp
345 @property
346 def end_time(self) -> MTime:
347 """
348 End time of the time series data.
350 Returns
351 -------
352 MTime
353 End time derived from the last time series index,
354 or estimated from start time and number of samples
356 Notes
357 -----
358 If time series data is available, uses the last timestamp.
359 Otherwise estimates end time from start time plus duration
360 calculated from number of samples and sample rate.
361 """
362 if self.stamps is not None:
363 return MTime(time_stamp=self.ts_data.index[-1])
364 self.logger.warning("Estimating end time from n_samples")
365 return self.start_time + int(self.n_samples / self.sample_rate)
367 @property
368 def box_temperature(self) -> Optional[timeseries.ChannelTS]:
369 """
370 Data logger temperature channel.
372 Returns
373 -------
374 ChannelTS or None
375 Temperature channel sampled at 1 second, interpolated to match
376 the time series sample rate, or None if no time series data
378 Notes
379 -----
380 Temperature is measured in Celsius and interpolated onto the same
381 time grid as the magnetic and electric field channels.
382 """
384 if self.ts_data is not None:
385 auxiliary_metadata = Auxiliary()
386 auxiliary_metadata.from_dict(
387 {
388 "channel_number": 6,
389 "component": "temperature",
390 "measurement_azimuth": 0,
391 "measurement_tilt": 0,
392 "sample_rate": 1,
393 "time_period.start": self.start_time.isoformat(),
394 "time_period.end": self.end_time.isoformat(),
395 "type": "auxiliary",
396 "units": "celsius",
397 }
398 )
400 temp = timeseries.ChannelTS(
401 channel_type="auxiliary",
402 data=self.info_array["box_temp"],
403 channel_metadata=auxiliary_metadata,
404 run_metadata=self.run_metadata,
405 station_metadata=self.station_metadata,
406 )
407 # interpolate temperature onto the same sample rate as the channels.
408 temp.data_array = temp.data_array.interp_like(self.hx.data_array)
409 temp.channel_metadata.sample_rate = self.sample_rate
410 temp.channel_metadata.time_period.end = self.end_time.isoformat()
412 return temp
413 return None
415 def get_channel_response(self, channel: str, dipole_length: float = 1) -> Any:
416 """
417 Get the channel response for a given channel.
419 Parameters
420 ----------
421 channel : str
422 Channel identifier (e.g., 'hx', 'hy', 'hz', 'ex', 'ey')
423 dipole_length : float, optional
424 Dipole length for electric field channels, by default 1
426 Returns
427 -------
428 Any
429 Channel response object from the NIMS response filters
431 Notes
432 -----
433 Uses the NIMS response filters to generate appropriate response
434 functions for magnetic and electric field channels at the current
435 sample rate.
436 """
438 nims_filters = Response(sample_rate=self.sample_rate)
439 return nims_filters.get_channel_response(channel, dipole_length=dipole_length)
441 @property
442 def hx_metadata(self) -> Optional[Magnetic]:
443 """
444 Metadata for the HX magnetic field channel.
446 Returns
447 -------
448 Magnetic or None
449 Magnetic field metadata object for the HX channel,
450 or None if no time series data is loaded
451 """
452 if self.ts_data is not None:
453 hx_metadata = Magnetic()
454 hx_metadata.from_dict(
455 {
456 "channel_number": 1,
457 "component": "hx",
458 "measurement_azimuth": 0,
459 "measurement_tilt": 0,
460 "sample_rate": self.sample_rate,
461 "time_period.start": self.start_time.isoformat(),
462 "time_period.end": self.end_time.isoformat(),
463 "type": "magnetic",
464 "units": "counts",
465 "sensor.id": self.mag_id,
466 "sensor.manufacturer": "Barry Narod",
467 "sensor.type": "fluxgate triaxial magnetometer",
468 }
469 )
470 return hx_metadata
472 @property
473 def hx(self) -> Optional[timeseries.ChannelTS]:
474 """
475 HX magnetic field channel time series.
477 Returns
478 -------
479 ChannelTS or None
480 Time series data for the HX magnetic field component,
481 or None if no time series data is loaded
482 """
483 if self.ts_data is not None:
484 return timeseries.ChannelTS(
485 channel_type="magnetic",
486 data=self.ts_data.hx.to_numpy(),
487 channel_metadata=self.hx_metadata,
488 run_metadata=self.run_metadata,
489 station_metadata=self.station_metadata,
490 channel_response=self.get_channel_response("hx"),
491 )
492 return None
494 @property
495 def hy_metadata(self) -> Optional[Magnetic]:
496 """
497 Metadata for the HY magnetic field channel.
499 Returns
500 -------
501 Magnetic or None
502 Magnetic field metadata object for the HY channel,
503 or None if no time series data is loaded
504 """
505 if self.ts_data is not None:
506 hy_metadata = Magnetic()
507 hy_metadata.from_dict(
508 {
509 "channel_number": 2,
510 "component": "hy",
511 "measurement_azimuth": 90,
512 "measurement_tilt": 0,
513 "sample_rate": self.sample_rate,
514 "time_period.start": self.start_time.isoformat(),
515 "time_period.end": self.end_time.isoformat(),
516 "type": "magnetic",
517 "units": "counts",
518 "sensor.id": self.mag_id,
519 "sensor.manufacturer": "Barry Narod",
520 "sensor.type": "fluxgate triaxial magnetometer",
521 }
522 )
523 return hy_metadata
525 @property
526 def hy(self) -> Optional[timeseries.ChannelTS]:
527 """
528 HY magnetic field channel time series.
530 Returns
531 -------
532 ChannelTS or None
533 Time series data for the HY magnetic field component,
534 or None if no time series data is loaded
535 """
536 if self.ts_data is not None:
537 return timeseries.ChannelTS(
538 channel_type="magnetic",
539 data=self.ts_data.hy.to_numpy(),
540 channel_metadata=self.hy_metadata,
541 run_metadata=self.run_metadata,
542 station_metadata=self.station_metadata,
543 channel_response=self.get_channel_response("hy"),
544 )
545 return None
547 @property
548 def hz_metadata(self) -> Optional[Magnetic]:
549 """
550 Metadata for the HZ magnetic field channel.
552 Returns
553 -------
554 Magnetic or None
555 Magnetic field metadata object for the HZ channel,
556 or None if no time series data is loaded
557 """
558 if self.ts_data is not None:
559 hz_metadata = Magnetic()
560 hz_metadata.from_dict(
561 {
562 "channel_number": 3,
563 "component": "hz",
564 "measurement_azimuth": 0,
565 "measurement_tilt": 0,
566 "sample_rate": self.sample_rate,
567 "time_period.start": self.start_time.isoformat(),
568 "time_period.end": self.end_time.isoformat(),
569 "type": "magnetic",
570 "units": "counts",
571 "sensor.id": self.mag_id,
572 "sensor.manufacturer": "Barry Narod",
573 "sensor.type": "fluxgate triaxial magnetometer",
574 }
575 )
576 return hz_metadata
578 @property
579 def hz(self) -> Optional[timeseries.ChannelTS]:
580 """
581 HZ magnetic field channel time series.
583 Returns
584 -------
585 ChannelTS or None
586 Time series data for the HZ magnetic field component,
587 or None if no time series data is loaded
588 """
589 """HZ"""
590 if self.ts_data is not None:
591 return timeseries.ChannelTS(
592 channel_type="magnetic",
593 data=self.ts_data.hz.to_numpy(),
594 channel_metadata=self.hz_metadata,
595 run_metadata=self.run_metadata,
596 station_metadata=self.station_metadata,
597 channel_response=self.get_channel_response("hz"),
598 )
599 return None
601 @property
602 def ex_metadata(self) -> Optional[Electric]:
603 """
604 Metadata for the EX electric field channel.
606 Returns
607 -------
608 Electric or None
609 Electric field metadata object for the EX channel,
610 or None if no time series data is loaded
611 """
612 if self.ts_data is not None:
613 ex_metadata = Electric()
614 ex_metadata.from_dict(
615 {
616 "channel_number": 4,
617 "component": "ex",
618 "measurement_azimuth": self.ex_azimuth,
619 "measurement_tilt": 0,
620 "sample_rate": self.sample_rate,
621 "dipole_length": self.ex_length,
622 "time_period.start": self.start_time.isoformat(),
623 "time_period.end": self.end_time.isoformat(),
624 "type": "electric",
625 "units": "counts",
626 "negative.id": self.s_electrode_id,
627 "positive.id": self.n_electrode_id,
628 }
629 )
631 return ex_metadata
633 @property
634 def ex(self) -> Optional[timeseries.ChannelTS]:
635 """
636 EX electric field channel time series.
638 Returns
639 -------
640 ChannelTS or None
641 Time series data for the EX electric field component,
642 or None if no time series data is loaded
643 """
644 if self.ts_data is not None:
645 return timeseries.ChannelTS(
646 channel_type="electric",
647 data=self.ts_data.ex.to_numpy(),
648 channel_metadata=self.ex_metadata,
649 run_metadata=self.run_metadata,
650 station_metadata=self.station_metadata,
651 channel_response=self.get_channel_response("ex", self.ex_length),
652 )
653 return None
655 @property
656 def ey_metadata(self) -> Optional[Electric]:
657 """
658 Metadata for the EY electric field channel.
660 Returns
661 -------
662 Electric or None
663 Electric field metadata object for the EY channel,
664 or None if no time series data is loaded
665 """
666 if self.ts_data is not None:
667 ey_metadata = Electric()
668 ey_metadata.from_dict(
669 {
670 "channel_number": 5,
671 "component": "ey",
672 "measurement_azimuth": self.ey_azimuth,
673 "measurement_tilt": 0,
674 "sample_rate": self.sample_rate,
675 "dipole_length": self.ey_length,
676 "time_period.start": self.start_time.isoformat(),
677 "time_period.end": self.end_time.isoformat(),
678 "type": "electric",
679 "units": "counts",
680 "negative.id": self.w_electrode_id,
681 "positive.id": self.e_electrode_id,
682 }
683 )
685 return ey_metadata
687 @property
688 def ey(self) -> Optional[timeseries.ChannelTS]:
689 """
690 EY electric field channel time series.
692 Returns
693 -------
694 ChannelTS or None
695 Time series data for the EY electric field component,
696 or None if no time series data is loaded
697 """
698 if self.ts_data is not None:
699 return timeseries.ChannelTS(
700 channel_type="electric",
701 data=self.ts_data.ey.to_numpy(),
702 channel_metadata=self.ey_metadata,
703 run_metadata=self.run_metadata,
704 station_metadata=self.station_metadata,
705 channel_response=self.get_channel_response("ey", self.ey_length),
706 )
707 return None
709 @property
710 def run_metadata(self) -> Optional[Run]:
711 """
712 Run metadata for the NIMS data collection.
714 Returns
715 -------
716 Run or None
717 MT run metadata including data logger information, timing,
718 and channel metadata, or None if no time series data is loaded
719 """
721 if self.ts_data is not None:
722 run_metadata = Run()
723 run_metadata.from_dict(
724 {
725 "Run": {
726 "comments": self.comments,
727 "data_logger.firmware.author": "B. Narod",
728 "data_logger.firmware.name": "nims",
729 "data_logger.firmware.version": "1.0",
730 "data_logger.manufacturer": "Narod",
731 "data_logger.model": self.box_id,
732 "data_logger.id": self.box_id,
733 "data_logger.type": "long period",
734 "id": self.run_id,
735 "data_type": "LPMT",
736 "sample_rate": self.sample_rate,
737 "time_period.end": self.end_time.isoformat(),
738 "time_period.start": self.start_time.isoformat(),
739 }
740 }
741 )
742 for comp in ["hx", "hy", "hz", "ex", "ey"]:
743 run_metadata.channels.append(getattr(self, f"{comp}_metadata"))
744 return run_metadata
745 return None
747 @property
748 def station_metadata(self) -> Optional[Station]:
749 """
750 Station metadata from NIMS file.
752 Returns
753 -------
754 Station or None
755 MT station metadata including geographic information and location data,
756 or None if no time series data is loaded
757 """
758 if self.ts_data is not None:
759 station_metadata = Station()
760 if self.run_id is None:
761 self.run_id = f"001"
762 station_metadata.from_dict(
763 {
764 "Station": {
765 "geographic_name": f"{self.site_name}, {self.state_province}, {self.country}",
766 "location.declination.value": self.declination,
767 "location.elevation": self.elevation,
768 "location.latitude": self.latitude,
769 "location.longitude": self.longitude,
770 "id": self.run_id[0:-1],
771 "orientation.reference_frame": "geomagnetic",
772 }
773 }
774 )
775 station_metadata.runs.append(self.run_metadata)
776 return station_metadata
777 return None
779 def to_runts(self, calibrate: bool = False) -> Optional[timeseries.RunTS]:
780 """
781 Get xarray RunTS object for the NIMS data.
783 Parameters
784 ----------
785 calibrate : bool, optional
786 Whether to apply calibration to the data, by default False
788 Returns
789 -------
790 RunTS or None
791 Time series run object containing all channels and metadata,
792 or None if no time series data is loaded
794 Notes
795 -----
796 Includes all magnetic field channels (hx, hy, hz), electric field
797 channels (ex, ey), and box temperature data.
798 """
800 if self.ts_data is not None:
801 run = timeseries.RunTS(
802 array_list=[
803 self.hx,
804 self.hy,
805 self.hz,
806 self.ex,
807 self.ey,
808 self.box_temperature,
809 ],
810 run_metadata=self.run_metadata,
811 station_metadata=self.station_metadata,
812 )
813 if calibrate:
814 return run.calibrate()
815 else:
816 return run
817 return None
819 def _make_index_values(self) -> np.ndarray:
820 """
821 Create index values for the recorded channels.
823 Returns
824 -------
825 ndarray
826 Array of shape (8, 5) containing byte indices for extracting
827 magnetic (first 3 columns) and electric (last 2 columns)
828 channel data from each 8-sample block
830 Notes
831 -----
832 Creates an array of index values for magnetics and electrics.
833 Each row corresponds to one sample within an 8-sample block.
834 Columns 0-2 are for magnetic channels (hx, hy, hz).
835 Columns 3-4 are for electric channels (ex, ey).
836 """
837 ### make an array of index values for magnetics and electrics
838 indices = np.zeros((8, 5), dtype=int)
839 for kk in range(8):
840 ### magnetic blocks
841 for ii in range(3):
842 indices[kk, ii] = 9 + (kk) * 9 + (ii) * 3
843 ### electric blocks
844 for ii in range(2):
845 indices[kk, 3 + ii] = 82 + (kk) * 6 + (ii) * 3
846 return indices
848 def _get_gps_string_list(
849 self, nims_string: bytes
850 ) -> tuple[list[float], list[bytes]]:
851 """
852 Extract GPS strings from raw NIMS binary data.
854 This method takes the 3rd byte in each block, concatenates into a long
855 string and creates a list by splitting by '$'. The index values of
856 where the '$' characters are found are also calculated.
858 Parameters
859 ----------
860 nims_string : bytes
861 Raw binary string output by NIMS
863 Returns
864 -------
865 list of float
866 List of index values associated with the location of the '$' characters
867 list of bytes
868 List of possible raw GPS strings split by '$'
870 Notes
871 -----
872 This assumes that there are an even amount of data blocks.
873 This might be a bad assumption in some cases.
874 """
875 ### get index values of $ and gps_strings
876 index_values = []
877 gps_str_list = []
878 for ii in range(int(len(nims_string) / self.block_size)):
879 index = ii * self.block_size + 3
880 g_char = struct.unpack("c", nims_string[index : index + 1])[0]
881 if g_char == b"$":
882 index_values.append((index - 3) / self.block_size)
883 gps_str_list.append(g_char)
884 gps_raw_stamp_list = b"".join(gps_str_list).split(b"$")
885 return index_values, gps_raw_stamp_list
887 def get_stamps(self, nims_string: bytes) -> list[tuple[Any, list[GPS]]]:
888 """
889 Extract and parse valid GPS strings, matching GPRMC with GPGGA stamps.
891 Parameters
892 ----------
893 nims_string : bytes
894 Raw GPS binary string output by NIMS
896 Returns
897 -------
898 list of tuple
899 List of matched GPS stamps where each element is a tuple containing
900 index and list of GPS objects [GPRMC, GPGGA] (or just [GPRMC])
902 Notes
903 -----
904 Skips the first entry as it tends to be incomplete. Attempts to match
905 synchronous GPRMC with GPGGA stamps when possible.
906 """
907 ### read in GPS strings into a list to be parsed later
908 index_list, gps_raw_stamp_list = self._get_gps_string_list(nims_string)
910 gprmc_list = []
911 gpgga_list = []
912 ### note we are skipping the first entry, it tends to be not
913 ### complete anyway
914 for ii, index, raw_stamp in zip(
915 range(len(index_list)), index_list, gps_raw_stamp_list[1:]
916 ):
917 gps_obj = GPS(raw_stamp, index)
918 if gps_obj.valid:
919 if gps_obj.gps_type == "GPRMC":
920 gprmc_list.append(gps_obj)
921 elif gps_obj.gps_type == "GPGGA":
922 gpgga_list.append(gps_obj)
923 else:
924 self.logger.debug(f"GPS Error: file index {index}, stamp number {ii}")
925 max_len = min([len(raw_stamp), 15])
926 self.logger.debug(f"GPS Raw Stamp: {raw_stamp[0:max_len]}")
927 return self._gps_match_gprmc_gpgga_strings(gprmc_list, gpgga_list)
929 def _gps_match_gprmc_gpgga_strings(self, gprmc_list, gpgga_list):
930 """
931 match GPRMC and GPGGA strings together into a list
933 [[GPRMC, GPGGA], ...]
935 :param list gprmc_list: list of GPS objects for the GPRMC stamps
936 :param list gpgga_list: list of GPS objects for the GPGGA stamps
938 :returns: list of matched GPRMC and GPGGA stamps
940 """
941 ### match up the GPRMC and GPGGA together
942 gps_match_list = []
943 for gprmc in gprmc_list:
944 find = False
945 for ii, gpgga in enumerate(gpgga_list):
946 if gprmc.time_stamp.time() == gpgga.time_stamp.time():
947 gps_match_list.append([gprmc, gpgga])
948 find = True
949 del gpgga_list[ii]
950 break
951 if not find:
952 gps_match_list.append([gprmc])
953 return gps_match_list
955 def _get_gps_stamp_indices_from_status(self, status_array):
956 """
957 get the index location of the stamps from the status array assuming
958 that 0 indicates GPS lock.
960 :param :class:`np.ndarray` status_array: an array of status values from data blocks
962 :returns: array of index values where GPS lock was acquired ignoring
963 sequential locks.
964 """
966 index_values = np.where(status_array == 0)[0]
967 status_index = np.zeros_like(index_values)
968 for ii in range(index_values.size):
969 if index_values[ii] - index_values[ii - 1] == 1:
970 continue
971 else:
972 status_index[ii] = index_values[ii]
973 status_index = status_index[np.nonzero(status_index)]
975 return status_index
977 def match_status_with_gps_stamps(self, status_array, gps_list):
978 """
979 Match the index values from the status array with the index values of
980 the GPS stamps. There appears to be a bit of wiggle room between when the
981 lock is recorded and the stamp was actually recorded. This is typically 1
982 second and sometimes 2.
984 :param array status_array: array of status values from each data block
985 :param list gps_list: list of valid GPS stamps [[GPRMC, GPGGA], ...]
987 .. note:: I think there is a 2 second gap between the lock and the
988 first stamp character.
989 """
991 stamp_indices = self._get_gps_stamp_indices_from_status(status_array)
992 gps_stamps = []
993 for index in stamp_indices:
994 stamp_find = False
995 for ii, stamps in enumerate(gps_list):
996 index_diff = stamps[0].index - index
997 ### check the index value, should be 2 or 74, if it is off by
998 ### a value left or right apply a correction.
999 if index_diff == 1 or index_diff == 73:
1000 index += 1
1001 stamps[0].index += 1
1002 elif index_diff == 2 or index_diff == 74:
1003 index = index
1004 elif index_diff == 3 or index_diff == 75:
1005 index -= 1
1006 stamps[0].index -= 1
1007 elif index_diff == 4 or index_diff == 76:
1008 index -= 2
1009 stamps[0].index -= 2
1010 if stamps[0].gps_type in ["GPRMC", "gprmc"]:
1011 if index_diff in [1, 2, 3, 4]:
1012 gps_stamps.append((index, stamps))
1013 stamp_find = True
1014 del gps_list[ii]
1015 break
1016 elif stamps[0].gps_type in ["GPGGA", "gpgga"]:
1017 if index_diff in [73, 74, 75, 76]:
1018 gps_stamps.append((index, stamps))
1019 stamp_find = True
1020 del gps_list[ii]
1021 break
1022 if not stamp_find:
1023 self.logger.debug(f"GPS Error: No good GPS stamp at {index} seconds")
1024 return gps_stamps
1026 def find_sequence(
1027 self, data_array: np.ndarray, block_sequence: Optional[list[int]] = None
1028 ) -> np.ndarray:
1029 """
1030 Find a sequence pattern in the data array.
1032 Parameters
1033 ----------
1034 data_array : ndarray
1035 Array of the data with shape [n, m] where n is the number of
1036 seconds recorded and m is the block length for a given sampling rate
1037 block_sequence : list of int, optional
1038 Sequence pattern to locate, by default [1, 131] (start of data block)
1040 Returns
1041 -------
1042 ndarray
1043 Array of index locations where the sequence is found
1045 Notes
1046 -----
1047 Uses numpy rolling and comparison to find all occurrences of the
1048 specified sequence pattern in the data array.
1049 """
1050 if block_sequence is not None:
1051 self.block_sequence = block_sequence
1052 # want to find the index there the test data is equal to the test sequence
1053 t = np.vstack(
1054 [
1055 np.roll(data_array, shift)
1056 for shift in -np.arange(len(self.block_sequence))
1057 ]
1058 ).T
1059 return np.where(np.all(t == self.block_sequence, axis=1))[0]
1061 def unwrap_sequence(self, sequence: np.ndarray) -> np.ndarray:
1062 """
1063 Unwrap sequence to sequential numbers instead of modulo 256.
1065 Parameters
1066 ----------
1067 sequence : ndarray
1068 Sequence of byte numbers (0-255) to unwrap
1070 Returns
1071 -------
1072 ndarray
1073 Unwrapped sequence with first number set to 0 and subsequent
1074 values forming a continuous count
1076 Notes
1077 -----
1078 Handles the fact that sequence numbers are stored as single bytes
1079 (0-255) but represent a continuous count. When a value of 255 is
1080 encountered, the next rollover is anticipated.
1081 """
1082 count = 0
1083 unwrapped = np.zeros_like(sequence)
1084 for ii, seq in enumerate(sequence):
1085 unwrapped[ii] = seq + count * 256
1086 if seq == 255:
1087 count += 1
1088 unwrapped -= unwrapped[0]
1090 return unwrapped
1092 def _locate_duplicate_blocks(self, sequence):
1093 """
1094 locate the sequence number where the duplicates exist
1096 :param list sequence: sequence to match duplicate numbers.
1097 :returns: list of duplicate index values.
1098 """
1100 duplicates = np.where(np.abs(np.diff(sequence)) == 0)[0]
1101 if len(duplicates) == 0:
1102 return None
1103 duplicate_list = []
1104 for dup in duplicates:
1105 dup_dict = {}
1106 dup_dict["sequence_index"] = dup
1107 dup_dict["ts_index_0"] = dup * self.sample_rate
1108 dup_dict["ts_index_1"] = dup * self.sample_rate + self.sample_rate
1109 dup_dict["ts_index_2"] = (dup + 1) * self.sample_rate
1110 dup_dict["ts_index_3"] = (dup + 1) * self.sample_rate + self.sample_rate
1111 duplicate_list.append(dup_dict)
1112 return duplicate_list
1114 def _check_duplicate_blocks(self, block_01, block_02, info_01, info_02):
1115 """
1116 make sure the blocks are truly duplicates
1118 :param np.array block_01: block of data to compare
1119 :param np.array block_02: block of data to compare
1120 :param np.array info_01: information array from info_array[sequence_index]
1121 :param np.array info_02: information array from info_array[sequence_index]
1123 :returns: boolean if the blocks and information match
1125 """
1126 if np.array_equal(block_01, block_02):
1127 if np.array_equal(info_01, info_02):
1128 return True
1129 else:
1130 return False
1131 else:
1132 return False
1134 def remove_duplicates(self, info_array, data_array):
1135 """
1136 remove duplicate blocks, removing the first duplicate as suggested by
1137 Paul and Anna. Checks to make sure that the mag data are identical for
1138 the duplicate blocks. Removes the blocks from the information and
1139 data arrays and returns the reduced arrays. This should sync up the
1140 timing of GPS stamps and index values.
1142 :param np.array info_array: structured array of block information
1143 :param np.array data_array: structured array of the data
1145 :returns: reduced information array
1146 :returns: reduced data array
1147 :returns: index of duplicates in raw data
1149 """
1150 ### locate
1151 duplicate_test_list = self._locate_duplicate_blocks(self.info_array["sequence"])
1152 if duplicate_test_list is None:
1153 return info_array, data_array, None
1154 duplicate_list = []
1155 for d in duplicate_test_list:
1156 if self._check_duplicate_blocks(
1157 data_array[d["ts_index_0"] : d["ts_index_1"]],
1158 data_array[d["ts_index_2"] : d["ts_index_3"]],
1159 info_array[d["sequence_index"]],
1160 info_array[d["sequence_index"] + 1],
1161 ):
1162 duplicate_list.append(d)
1164 if len(duplicate_list) == 0:
1165 self.logger.info(f"Duplicate block count is zero")
1166 return info_array, data_array, None
1167 self.logger.debug(f"Deleting {len(duplicate_list)} duplicate blocks")
1168 ### get the index of the blocks to be removed, namely the 1st duplicate
1169 ### block
1170 remove_sequence_index = [d["sequence_index"] for d in duplicate_list]
1171 remove_data_index = np.concatenate(
1172 [np.arange(d["ts_index_0"], d["ts_index_1"]) for d in duplicate_list]
1173 ).astype(int)
1175 ### remove the data
1176 return_info_array = np.delete(info_array, remove_sequence_index)
1177 return_data_array = np.delete(data_array, remove_data_index)
1179 ### set sequence to be monotonic
1180 return_info_array["sequence"][:] = np.arange(return_info_array.shape[0])
1182 return return_info_array, return_data_array, duplicate_list
1184 def read_nims(self, fn: Optional[Union[str, Path]] = None) -> None:
1185 """
1186 Read NIMS DATA.BIN file and parse all data.
1188 This method performs the complete data reading and processing workflow:
1190 1. Read header information and store as attributes
1191 2. Locate data block beginning by finding first [1, 131, ...] sequence
1192 3. Ensure data is multiple of block length, trim excess bits
1193 4. Extract GPS data (3rd byte of each block) and parse GPS stamps
1194 5. Read data as unsigned 8-bit integers, reshape to [N, block_length]
1195 6. Remove duplicate blocks (first of each duplicate pair)
1196 7. Match GPS status locks with valid GPS stamps
1197 8. Verify timing between first/last GPS stamps, trim excess seconds
1199 Parameters
1200 ----------
1201 fn : str or Path, optional
1202 Path to NIMS DATA.BIN file. Uses self.fn if not provided.
1204 Notes
1205 -----
1206 The data and information arrays returned have duplicates removed
1207 and sequence reset to be monotonic. Extra seconds due to timing
1208 gaps are trimmed from the end of the time series.
1210 Examples
1211 --------
1212 >>> from mth5.io import nims
1213 >>> n = nims.NIMS(r"/home/mt_data/nims/mt001.bin")
1214 >>> n.read_nims()
1215 """
1217 if fn is not None:
1218 self.fn = fn
1219 st = datetime.datetime.now()
1220 ### read in header information and get the location of end of header
1221 self.read_header(self.fn)
1223 ### load in the entire file, its not too big, start from the
1224 ### end of the header information.
1225 with open(self.fn, "rb") as fid:
1226 fid.seek(self.data_start_seek)
1227 self._raw_string = fid.read()
1228 ### read in full string as unsigned integers
1229 data = np.frombuffer(self._raw_string, dtype=np.uint8)
1231 ### need to make sure that the data starts with a full block
1232 find_first = self.find_sequence(data[0 : self.block_size * 10])[0]
1233 data = data[find_first:]
1235 ### get GPS stamps from the binary string first
1236 self.gps_list = self.get_stamps(self._raw_string[find_first:])
1238 ### check the size of the data, should have an equal amount of blocks
1239 if (data.size % self.block_size) != 0:
1240 self.logger.warning(
1241 f"odd number of bytes {data.size}, not even blocks "
1242 f"cutting down the data by {data.size % self.block_size} bits"
1243 )
1244 end_data = data.size - (data.size % self.block_size)
1245 data = data[0:end_data]
1246 # resized the data into an even amount of blocks
1247 data = data.reshape((int(data.size / self.block_size), self.block_size))
1249 ### need to parse the data
1250 ### first get the status information
1251 self.info_array = np.zeros(
1252 data.shape[0],
1253 dtype=[
1254 ("soh", int),
1255 ("block_len", int),
1256 ("status", int),
1257 ("gps", int),
1258 ("sequence", int),
1259 ("box_temp", float),
1260 ("head_temp", float),
1261 ("logic", int),
1262 ("end", int),
1263 ],
1264 )
1266 for key, index in self._block_dict.items():
1267 if "temp" in key:
1268 # compute temperature - cast to int32 to avoid uint8 overflow
1269 t_value = data[:, index[0]].astype(np.int32) * 256 + data[
1270 :, index[1]
1271 ].astype(np.int32)
1273 # something to do with the bits where you have to subtract
1274 t_value[np.where(t_value > 32768)] -= 65536
1275 value = (t_value - self.t_offset) / self.t_conversion_factor
1276 else:
1277 value = data[:, index]
1278 self.info_array[key][:] = value
1279 ### unwrap sequence
1280 self.info_array["sequence"] = self.unwrap_sequence(self.info_array["sequence"])
1282 ### get data
1283 data_array = np.zeros(
1284 data.shape[0] * self.sample_rate,
1285 dtype=[
1286 ("hx", float),
1287 ("hy", float),
1288 ("hz", float),
1289 ("ex", float),
1290 ("ey", float),
1291 ],
1292 )
1294 ### fill the data
1295 for cc, comp in enumerate(["hx", "hy", "hz", "ex", "ey"]):
1296 channel_arr = np.zeros((data.shape[0], 8), dtype=float)
1297 for kk in range(self.sample_rate):
1298 index = self.indices[kk, cc]
1299 # Cast to int32 to avoid uint8 overflow
1300 value = (
1301 data[:, index].astype(np.int32) * 256
1302 + data[:, index + 1].astype(np.int32)
1303 ) * np.array([256]) + data[:, index + 2].astype(np.int32)
1304 value[np.where(value > self._int_max)] -= self._int_factor
1305 channel_arr[:, kk] = value
1306 data_array[comp][:] = channel_arr.flatten()
1307 ### clean things up
1308 ### I guess that the E channels are opposite phase?
1309 for comp in ["ex", "ey"]:
1310 data_array[comp] *= -1
1311 ### remove duplicates
1312 (
1313 self.info_array,
1314 data_array,
1315 self.duplicate_list,
1316 ) = self.remove_duplicates(self.info_array, data_array)
1317 ### get GPS stamps with index values
1318 self.stamps = self.match_status_with_gps_stamps(
1319 self.info_array["status"], self.gps_list
1320 )
1321 ### align data checking for timing gaps
1322 self.ts_data = self.align_data(data_array, self.stamps)
1324 et = datetime.datetime.now()
1325 read_time = (et - st).total_seconds()
1326 self.logger.debug(f"Reading took {read_time:.2f} seconds")
1328 def _get_first_gps_stamp(self, stamps):
1329 """
1330 get the first GPRMC stamp
1331 """
1332 for stamp in stamps:
1333 if stamp[1][0].gps_type in ["gprmc", "GPRMC"]:
1334 return stamp
1335 return None
1337 def _get_last_gps_stamp(self, stamps):
1338 """
1339 get the last gprmc stamp
1340 """
1341 for stamp in stamps[::-1]:
1342 if stamp[1][0].gps_type in ["gprmc", "GPRMC"]:
1343 return stamp
1344 return None
1346 def _locate_timing_gaps(self, stamps):
1347 """
1348 locate timing gaps in the data by comparing the stamp index with the
1349 GPS time stamp. The number of points and seconds should be the same
1351 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]]
1353 :returns: list of gap index values
1354 """
1355 stamp_01 = self._get_first_gps_stamp(stamps)[1][0]
1356 # current_gap = 0
1357 current_stamp = stamp_01
1358 gap_beginning = []
1359 total_gap = 0
1360 for ii, stamp in enumerate(stamps[1:], 1):
1361 stamp = stamp[1][0]
1362 # can only compare those with a date and time.
1363 if stamp.gps_type == "GPGGA":
1364 continue
1365 time_diff = (stamp.time_stamp - current_stamp.time_stamp).total_seconds()
1366 index_diff = stamp.index - current_stamp.index
1368 time_gap = index_diff - time_diff
1369 if time_gap == 0:
1370 continue
1371 elif time_gap > 0:
1372 total_gap += time_gap
1373 current_stamp = stamp
1374 gap_beginning.append(stamp.index)
1375 self.logger.debug(
1376 f"GPS tamp at {stamp.time_stamp.isoformat()} is off "
1377 f"from previous time by { time_gap} seconds"
1378 )
1379 self.logger.warning(f"Timing is off by {total_gap} seconds")
1380 return gap_beginning
1382 def check_timing(self, stamps):
1383 """
1384 make sure that there are the correct number of seconds in between
1385 the first and last GPS GPRMC stamps
1387 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]]
1389 :returns: [ True | False ] if data is valid or not.
1390 :returns: gap index locations
1392 .. note:: currently it is assumed that if a data gap occurs the data can be
1393 squeezed to remove them. Probably a more elegant way of doing it.
1394 """
1395 gaps = None
1396 first_stamp = self._get_first_gps_stamp(stamps)[1][0]
1397 last_stamp = self._get_last_gps_stamp(stamps)[1][0]
1399 time_diff = last_stamp.time_stamp - first_stamp.time_stamp
1400 index_diff = last_stamp.index - first_stamp.index
1402 difference = index_diff - time_diff.total_seconds()
1403 if difference != 0:
1404 gaps = self._locate_timing_gaps(stamps)
1405 return False, gaps, difference
1406 return True, gaps, difference
1408 def align_data(self, data_array, stamps):
1409 """
1410 Need to match up the first good GPS stamp with the data
1412 Do this by using the first GPS stamp and assuming that the time from
1413 the first time stamp to the start is the index value.
1415 put the data into a pandas data frame that is indexed by time
1417 :param array data_array: structure array with columns for each
1418 component [hx, hy, hz, ex, ey]
1419 :param list stamps: list of GPS stamps [[status_index, [GPRMC, GPGGA]]]
1421 :returns: pandas DataFrame with colums of components and indexed by
1422 time initialized by the start time.
1424 .. note:: Data gaps are squeezed cause not sure what a gap actually means.
1425 """
1426 ### check timing first to make sure there is no drift
1427 timing_valid, self.gaps, time_difference = self.check_timing(stamps)
1429 ### first GPS stamp within the data is at a given index that is
1430 ### assumed to be the number of seconds from the start of the run.
1431 ### therefore make the start time the first GPS stamp time minus
1432 ### the index value for that stamp.
1433 ### need to be sure that the first GPS stamp has a date, need GPRMC
1434 first_stamp = self._get_first_gps_stamp(stamps)
1435 first_index = first_stamp[0]
1436 start_time = first_stamp[1][0].time_stamp - datetime.timedelta(
1437 seconds=int(first_index)
1438 )
1440 dt_index = self.make_dt_index(
1441 start_time.isoformat(),
1442 self.sample_rate,
1443 n_samples=data_array.shape[0],
1444 )
1446 return pd.DataFrame(data_array, index=dt_index)
1448 def make_dt_index(
1449 self,
1450 start_time: str,
1451 sample_rate: float,
1452 stop_time: Optional[str] = None,
1453 n_samples: Optional[int] = None,
1454 ) -> pd.DatetimeIndex:
1455 """
1456 Create datetime index array for time series data.
1458 Parameters
1459 ----------
1460 start_time : str
1461 Start time in format YYYY-MM-DDThh:mm:ss.ms UTC
1462 sample_rate : float
1463 Sample rate in samples/second
1464 stop_time : str, optional
1465 End time in same format as start_time
1466 n_samples : int, optional
1467 Number of samples to generate
1469 Returns
1470 -------
1471 DatetimeIndex
1472 Pandas datetime index with UTC timezone
1474 Notes
1475 -----
1476 Either stop_time or n_samples must be provided. The datetime format
1477 should be YYYY-MM-DDThh:mm:ss.ms UTC.
1479 Raises
1480 ------
1481 ValueError
1482 If neither stop_time nor n_samples is provided
1483 """
1485 # set the index to be UTC time
1486 dt_freq = "{0:.0f}ns".format(1.0 / (sample_rate) * 1e9)
1487 if stop_time is not None:
1488 dt_index = pd.date_range(
1489 start=start_time,
1490 end=stop_time,
1491 freq=dt_freq,
1492 closed="left",
1493 tz="UTC",
1494 )
1495 elif n_samples is not None:
1496 dt_index = pd.date_range(
1497 start=start_time, periods=n_samples, freq=dt_freq, tz="UTC"
1498 )
1499 else:
1500 raise ValueError("Need to input either stop_time or n_samples")
1501 return dt_index
1504# =============================================================================
1505# convenience read
1506# =============================================================================
1507def read_nims(fn: Union[str, Path]) -> Optional[timeseries.RunTS]:
1508 """
1509 Convenience function to read a NIMS DATA.BIN file.
1511 Parameters
1512 ----------
1513 fn : str or Path
1514 Path to the NIMS DATA.BIN file
1516 Returns
1517 -------
1518 RunTS or None
1519 Time series run object containing all channels and metadata,
1520 or None if reading fails
1522 Examples
1523 --------
1524 >>> from mth5.io.nims import nims
1525 >>> run_ts = nims.read_nims("/path/to/data.bin")
1526 """
1528 nims_obj = NIMS(fn)
1529 nims_obj.read_nims()
1531 return nims_obj.to_runts()