Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ lemi \ lemi424.py: 90%
270 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Tue May 11 15:31:31 2021
5:copyright:
6 Jared Peacock (jpeacock@usgs.gov)
8:license: MIT
10"""
11import json
12import warnings
13from io import StringIO
15# =============================================================================
16# Imports
17# =============================================================================
18from pathlib import Path
19from typing import Any
21import numpy as np
24# supress the future warning from pandas about using datetime parser.
25warnings.simplefilter(action="ignore", category=FutureWarning)
28import pandas as pd
29from loguru import logger
30from mt_metadata.common.mttime import MTime
31from mt_metadata.timeseries import Auxiliary, Electric, Magnetic, Run, Station
32from mt_metadata.timeseries.filters import ChannelResponse, FrequencyResponseTableFilter
34from mth5.timeseries import ChannelTS, RunTS
37# =============================================================================
38def lemi_date_parser(
39 year: int, month: int, day: int, hour: int, minute: int, second: int
40) -> pd.Series:
41 """
42 Combine the date-time columns that are output by LEMI into a single column.
44 Assumes UTC timezone.
46 Parameters
47 ----------
48 year : int
49 Year value.
50 month : int
51 Month value (1-12).
52 day : int
53 Day of the month (1-31).
54 hour : int
55 Hour in 24-hour format (0-23).
56 minute : int
57 Minutes in the hour (0-59).
58 second : int
59 Seconds in the minute (0-59).
61 Returns
62 -------
63 pd.DatetimeIndex
64 Combined date-time as a pandas DatetimeIndex.
66 """
67 dt_df = pd.DataFrame(
68 {
69 "year": year,
70 "month": month,
71 "day": day,
72 "hour": hour,
73 "minute": minute,
74 "second": second,
75 }
76 )
78 for key in ["year", "month", "day", "hour", "minute", "second"]:
79 dt_df[key] = dt_df[key].astype(int)
81 return pd.to_datetime(dt_df)
84def lemi_position_parser(position: float) -> float:
85 """
86 Parse LEMI location strings into decimal degrees.
88 Uses the hemisphere for the sign.
90 Notes
91 -----
92 The format of the location is odd in that it is multiplied by
93 100 within the LEMI to provide a single floating point value that
94 includes the degrees and decimal degrees --> {degrees}{degrees[mm.ss]}.
95 For example 40.50166 would be represented as 4030.1.
97 Parameters
98 ----------
99 position : float
100 LEMI position value to parse.
102 Returns
103 -------
104 float
105 Decimal degrees position.
107 """
108 pos = f"{float(position) / 100}".split(".")
109 degrees = int(pos[0])
110 decimals = float(f"{pos[1][0:2]}.{pos[1][2:]}") / 60
112 location = degrees + decimals
113 return location
116def lemi_hemisphere_parser(hemisphere: str) -> int:
117 """
118 Convert hemisphere into a value [-1, 1].
120 Assumes the prime meridian is 0.
122 Parameters
123 ----------
124 hemisphere : str
125 Hemisphere string. Valid values are 'N', 'S', 'E', 'W'.
127 Returns
128 -------
129 int
130 Unity with a sign for the given hemisphere.
131 Returns -1 for 'S' or 'W', 1 for 'N' or 'E'.
133 """
134 if hemisphere in ["S", "W"]:
135 return -1
136 return 1
139class LEMI424:
140 """
141 Read and process LEMI424 magnetotelluric data files.
143 This is a placeholder until IRIS finalizes their reader.
145 Parameters
146 ----------
147 fn : str or pathlib.Path, optional
148 Full path to LEMI424 file, by default None.
149 **kwargs : dict
150 Additional keyword arguments for configuration.
152 Attributes
153 ----------
154 sample_rate : float
155 Sample rate of the file, default is 1.0.
156 chunk_size : int
157 Chunk size for pandas to use, default is 8640.
158 file_column_names : list of str
159 Column names of the LEMI424 file.
160 dtypes : dict
161 Data types for each column.
162 data_column_names : list of str
163 Same as file_column_names with an added column for date.
164 data : pd.DataFrame or None
165 The loaded data.
167 Notes
168 -----
169 LEMI424 File Column Names:
170 year, month, day, hour, minute, second, bx, by, bz,
171 temperature_e, temperature_h, e1, e2, e3, e4, battery,
172 elevation, latitude, lat_hemisphere, longitude,
173 lon_hemisphere, n_satellites, gps_fix, time_diff
175 Data Column Names:
176 date, bx, by, bz, temperature_e, temperature_h, e1, e2,
177 e3, e4, battery, elevation, latitude, lat_hemisphere,
178 longitude, lon_hemisphere, n_satellites, gps_fix, time_diff
180 """
182 def __init__(self, fn: str | Path | None = None, **kwargs: Any) -> None:
183 self.logger = logger
184 self.fn = fn
185 self.sample_rate = 1.0
186 self.chunk_size = 8640
187 self.data = None
188 self.file_column_names = [
189 "year",
190 "month",
191 "day",
192 "hour",
193 "minute",
194 "second",
195 "bx",
196 "by",
197 "bz",
198 "temperature_e",
199 "temperature_h",
200 "e1",
201 "e2",
202 "e3",
203 "e4",
204 "battery",
205 "elevation",
206 "latitude",
207 "lat_hemisphere",
208 "longitude",
209 "lon_hemisphere",
210 "n_satellites",
211 "gps_fix",
212 "time_diff",
213 ]
215 self.dtypes = dict(
216 [
217 ("year", int),
218 ("month", int),
219 ("day", int),
220 ("hour", int),
221 ("minute", int),
222 ("second", int),
223 ("bx", float),
224 ("by", float),
225 ("bz", float),
226 ("temperature_e", float),
227 ("temperature_h", float),
228 ("e1", float),
229 ("e2", float),
230 ("e3", float),
231 ("e4", float),
232 ("battery", float),
233 ("elevation", float),
234 ("n_satellites", int),
235 ("gps_fix", int),
236 ("time_diff", float),
237 ]
238 )
240 self.data_column_names = ["date"] + self.file_column_names[6:]
242 def __str__(self) -> str:
243 """Return string representation of LEMI424 object."""
244 lines = ["LEMI 424 data", "-" * 20]
245 lines.append(f"start: {self.start.isoformat()}")
246 lines.append(f"end: {self.end.isoformat()}")
247 lines.append(f"N samples: {self.n_samples}")
248 lines.append(f"latitude: {self.latitude} (degrees)")
249 lines.append(f"longitude: {self.longitude} (degrees)")
250 lines.append(f"elevation: {self.elevation} m")
252 return "\n".join(lines)
254 def __repr__(self) -> str:
255 """Return string representation of LEMI424 object."""
256 return self.__str__()
258 def __add__(self, other: "LEMI424 | pd.DataFrame") -> "LEMI424":
259 """
260 Append other LEMI424 objects or DataFrames together.
262 The start and end times should match up for proper concatenation.
264 Parameters
265 ----------
266 other : LEMI424 or pd.DataFrame
267 Object to append to this LEMI424 instance.
269 Returns
270 -------
271 LEMI424
272 New LEMI424 object with combined data.
274 Raises
275 ------
276 ValueError
277 If data is None or if DataFrame columns don't match.
279 """
280 if not self._has_data():
281 raise ValueError("Data is None cannot append to. Read file first")
282 if isinstance(other, LEMI424):
283 new = LEMI424()
284 new.__dict__.update(self.__dict__)
285 new.data = pd.concat([new.data, other.data])
286 return new
287 elif isinstance(other, pd.DataFrame):
288 if not other.columns.equals(self.data.columns):
289 raise ValueError("DataFrame columns are not the same.")
290 new = LEMI424()
291 new.__dict__.update(self.__dict__)
292 new.data = pd.concat([new.data, other])
293 return new
294 else:
295 raise ValueError(f"Cannot add {type(other)} to pd.DataFrame.")
297 @property
298 def data(self) -> pd.DataFrame | None:
299 """
300 Data represented as a pandas DataFrame with data column names.
302 Returns
303 -------
304 pd.DataFrame or None
305 The loaded data or None if no data is loaded.
307 """
308 return self._data
310 @data.setter
311 def data(self, data: pd.DataFrame | None) -> None:
312 """
313 Set data ensuring it is a pandas DataFrame with proper column names.
315 Parameters
316 ----------
317 data : pd.DataFrame or None
318 Data to set. Must have proper column names if not None.
320 Raises
321 ------
322 ValueError
323 If column names don't match expected format.
325 """
326 if data is None:
327 self._data = None
328 elif isinstance(data, pd.DataFrame):
329 if len(data.columns) == len(self.file_column_names):
330 if not data.columns.equals(pd.Index(self.file_column_names)):
331 raise ValueError(
332 "Column names are not the same. "
333 "Check LEMI424.column_names for accepted names"
334 )
335 elif len(data.columns) == len(self.data_column_names):
336 if not data.columns.equals(pd.Index(self.data_column_names)):
337 raise ValueError(
338 "Column names are not the same. "
339 "Check LEMI424.column_names for accepted names"
340 )
341 self._data = data
343 def _has_data(self) -> bool:
344 """
345 Check if object has data loaded.
347 Returns
348 -------
349 bool
350 True if data is loaded, False otherwise.
352 """
353 if self.data is not None:
354 return True
355 return False
357 @property
358 def fn(self) -> Path | None:
359 """
360 Full path to LEMI424 file.
362 Returns
363 -------
364 pathlib.Path or None
365 Path to the file or None if not set.
367 """
368 return self._fn
370 @fn.setter
371 def fn(self, value: str | Path | None) -> None:
372 """
373 Set the file path.
375 Parameters
376 ----------
377 value : str, pathlib.Path, or None
378 Path to the LEMI424 file.
380 Raises
381 ------
382 IOError
383 If the file does not exist.
385 """
386 if value is not None:
387 value = Path(value)
388 if not value.exists():
389 raise IOError(f"Could not find {value}")
390 self._fn = value
392 @property
393 def file_size(self) -> int | None:
394 """
395 Size of file in bytes.
397 Returns
398 -------
399 int or None
400 File size in bytes or None if no file is set.
402 """
403 if self.fn is not None:
404 return self.fn.stat().st_size
406 @property
407 def start(self) -> MTime | None:
408 """
409 Start time of data collection in the LEMI424 file.
411 Returns
412 -------
413 MTime or None
414 Start time or None if no data is loaded.
416 """
417 if self._has_data():
418 return MTime(time_stamp=self.data.index[0])
420 @property
421 def end(self) -> MTime | None:
422 """
423 End time of data collection in the LEMI424 file.
425 Returns
426 -------
427 MTime or None
428 End time or None if no data is loaded.
430 """
431 if self._has_data():
432 return MTime(time_stamp=self.data.index[-1])
434 @property
435 def latitude(self) -> float | None:
436 """
437 Median latitude where data have been collected.
439 Returns
440 -------
441 float or None
442 Median latitude in degrees or None if no data is loaded.
444 """
445 if self._has_data():
446 return self.data.latitude.median() * self.data.lat_hemisphere.median()
448 @property
449 def longitude(self) -> float | None:
450 """
451 Median longitude where data have been collected.
453 Returns
454 -------
455 float or None
456 Median longitude in degrees or None if no data is loaded.
458 """
459 if self._has_data():
460 return self.data.longitude.median() * self.data.lon_hemisphere.median()
462 @property
463 def elevation(self) -> float | None:
464 """
465 Median elevation where data have been collected.
467 Returns
468 -------
469 float or None
470 Median elevation in meters or None if no data is loaded.
472 """
473 if self._has_data():
474 return self.data.elevation.median()
476 @property
477 def n_samples(self) -> int | None:
478 """
479 Number of samples in the file.
481 Returns
482 -------
483 int or None
484 Number of samples or None if no data/file available.
486 """
487 if self._has_data():
488 return self.data.shape[0]
489 elif self.fn is not None and self.fn.exists():
490 return round(self.fn.stat().st_size / 152.0)
492 @property
493 def gps_lock(self) -> Any | None:
494 """
495 GPS lock status array.
497 Returns
498 -------
499 numpy.ndarray or None
500 GPS fix values or None if no data is loaded.
502 """
503 if self._has_data():
504 return self.data.gps_fix.values
506 @property
507 def station_metadata(self) -> Station:
508 """
509 Station metadata as mt_metadata.timeseries.Station object.
511 Returns
512 -------
513 mt_metadata.timeseries.Station
514 Station metadata object.
516 """
517 s = Station()
518 if self._has_data():
519 s.location.latitude = self.latitude
520 s.location.longitude = self.longitude
521 s.location.elevation = self.elevation
522 s.time_period.start = self.start
523 s.time_period.end = self.end
524 s.add_run(self.run_metadata)
525 return s
527 @property
528 def run_metadata(self) -> Run:
529 """
530 Run metadata as mt_metadata.timeseries.Run object.
532 Returns
533 -------
534 mt_metadata.timeseries.Run
535 Run metadata object.
537 """
538 r = Run()
539 r.id = "a"
540 r.sample_rate = self.sample_rate
541 r.data_logger.model = "LEMI424"
542 r.data_logger.manufacturer = "LEMI"
543 if self._has_data():
544 r.data_logger.power_source.voltage.start = self.data.battery.max()
545 r.data_logger.power_source.voltage.end = self.data.battery.min()
546 r.time_period.start = self.start
547 r.time_period.end = self.end
549 for ch_aux in ["temperature_e", "temperature_h"]:
550 r.add_channel(Auxiliary(component=ch_aux))
551 for ch_e in ["e1", "e2"]:
552 r.add_channel(Electric(component=ch_e))
553 for ch_h in ["bx", "by", "bz"]:
554 r.add_channel(Magnetic(component=ch_h))
555 return r
557 def read(self, fn: str | Path | None = None, fast: bool = True) -> None:
558 """
559 Read a LEMI424 file using pandas.
561 The `fast` way will read in the first and last line to get the start
562 and end time to make a time index. Then it will read in the data
563 skipping parsing the date time columns. It will check to make sure
564 the expected amount of points are correct. If not then it will read
565 in the slower way which uses the date time parser to ensure any
566 time gaps are respected.
568 Parameters
569 ----------
570 fn : str, pathlib.Path, or None, optional
571 Full path to file. Uses LEMI424.fn if not provided, by default None.
572 fast : bool, optional
573 Read the fast way (True) or not (False), by default True.
575 Raises
576 ------
577 IOError
578 If file cannot be found.
580 """
581 if fn is not None:
582 self.fn = fn
583 if not self.fn.exists():
584 msg = f"Could not find file {self.fn}"
585 self.logger.error(msg)
586 raise IOError(msg)
587 if fast:
588 try:
589 self.read_metadata()
590 data = pd.read_csv(
591 self.fn,
592 delimiter=r"\s+",
593 names=self.file_column_names,
594 dtype=self.dtypes,
595 usecols=(
596 "bx",
597 "by",
598 "bz",
599 "temperature_e",
600 "temperature_h",
601 "e1",
602 "e2",
603 "e3",
604 "e4",
605 "battery",
606 "elevation",
607 "latitude",
608 "lat_hemisphere",
609 "longitude",
610 "lon_hemisphere",
611 "n_satellites",
612 "gps_fix",
613 "time_diff",
614 ),
615 converters={
616 "latitude": lemi_position_parser,
617 "longitude": lemi_position_parser,
618 "lat_hemisphere": lemi_hemisphere_parser,
619 "lon_hemisphere": lemi_hemisphere_parser,
620 },
621 )
622 time_index = pd.date_range(
623 start=self.start.iso_no_tz,
624 end=self.end.iso_no_tz,
625 freq="1000000000ns",
626 )
627 if time_index.size != data.shape[0]:
628 raise ValueError("Missing a time stamp use read with fast=False")
629 data.index = time_index
630 self.data = data
631 return
632 except ValueError:
633 self.logger.warning(
634 "Data is missing a time stamp, reading in slow mode"
635 )
636 # read in chunks, this doesnt really speed up much as most of the
637 # compute time is used in the date time parsing.
638 if self.n_samples > self.chunk_size:
639 st = MTime(time_stamp=None).now()
640 dfs = []
641 for chunk in pd.read_csv(
642 self.fn,
643 delimiter=r"\s+",
644 names=self.file_column_names,
645 dtype=self.dtypes,
646 converters={
647 "latitude": lemi_position_parser,
648 "longitude": lemi_position_parser,
649 "lat_hemisphere": lemi_hemisphere_parser,
650 "lon_hemisphere": lemi_hemisphere_parser,
651 },
652 chunksize=self.chunk_size,
653 ):
654 # Create date index for this chunk
655 chunk.index = lemi_date_parser(
656 chunk["year"],
657 chunk["month"],
658 chunk["day"],
659 chunk["hour"],
660 chunk["minute"],
661 chunk["second"],
662 )
663 chunk.index.name = "date"
664 chunk = chunk.drop(
665 columns=["year", "month", "day", "hour", "minute", "second"]
666 )
667 dfs.append(chunk)
669 if not dfs:
670 raise ValueError("File is empty or contains no valid data")
672 self.data = pd.concat(dfs)
673 et = MTime(time_stamp=None).now()
674 self.logger.debug(f"Reading {self.fn.name} took {et - st:.2f} seconds")
675 else:
676 st = MTime(time_stamp=None).now()
677 self.data = pd.read_csv(
678 self.fn,
679 delimiter=r"\s+",
680 names=self.file_column_names,
681 dtype=self.dtypes,
682 converters={
683 "latitude": lemi_position_parser,
684 "longitude": lemi_position_parser,
685 "lat_hemisphere": lemi_hemisphere_parser,
686 "lon_hemisphere": lemi_hemisphere_parser,
687 },
688 )
690 if self.data.empty:
691 raise ValueError("File is empty or contains no valid data")
693 # Create date index from individual date/time columns
694 self.data.index = lemi_date_parser(
695 self.data["year"],
696 self.data["month"],
697 self.data["day"],
698 self.data["hour"],
699 self.data["minute"],
700 self.data["second"],
701 )
702 self.data.index.name = "date"
703 self.data = self.data.drop(
704 columns=["year", "month", "day", "hour", "minute", "second"]
705 )
707 et = MTime(time_stamp=None).now()
708 self.logger.debug(f"Reading {self.fn.name} took {et - st:.2f} seconds")
710 def read_metadata(self) -> None:
711 """
712 Read only first and last rows to get important metadata.
714 This method is used to extract essential metadata from the collection
715 without loading the entire dataset.
717 """
719 with open(self.fn) as fid:
720 first_line = fid.readline()
721 last_line = first_line # Default to first line for single-line files
722 for line in fid:
723 last_line = line # Update for multi-line files
725 if not first_line.strip():
726 raise ValueError("File is empty or contains no valid data")
728 lines = StringIO(f"{first_line}\n{last_line}")
730 self.data = pd.read_csv(
731 lines,
732 delimiter=r"\s+",
733 names=self.file_column_names,
734 dtype=self.dtypes,
735 converters={
736 "latitude": lemi_position_parser,
737 "longitude": lemi_position_parser,
738 "lat_hemisphere": lemi_hemisphere_parser,
739 "lon_hemisphere": lemi_hemisphere_parser,
740 },
741 )
743 # Create date index from individual date/time columns
744 self.data.index = lemi_date_parser(
745 self.data["year"],
746 self.data["month"],
747 self.data["day"],
748 self.data["hour"],
749 self.data["minute"],
750 self.data["second"],
751 )
752 self.data.index.name = "date"
753 self.data = self.data.drop(
754 columns=["year", "month", "day", "hour", "minute", "second"]
755 )
757 def read_calibration(self, fn: str | Path) -> FrequencyResponseTableFilter:
758 """
759 Read a LEMI424 calibration file.
761 Calibration files are assumed to be JSON files with the following format:
762 {
763 "Calibration": {
764 "gain": float,
765 "Freq": [float],
766 "Re": [float],
767 "Im": [float]
768 }
769 }
772 Parameters
773 ----------
774 fn : str or pathlib.Path
775 Full path to calibration file.
777 Returns
778 -------
779 mt_metadata.timeseries.filters.FrequencyResponseTableFilter
780 Calibration filter object.
782 """
784 with open(fn, "r") as cf:
785 try:
786 cal_data = json.load(cf)["Calibration"]
788 except json.JSONDecodeError as e:
789 self.logger.error(f"Error reading calibration file {fn}: {e}")
790 raise
791 except KeyError as e:
792 self.logger.error(f"Calibration key not found in file {fn}: {e}")
793 raise
795 gain = cal_data.get("gain", 1.0)
796 frequencies = np.array(cal_data.get("Freq", []))
797 real = np.array(cal_data.get("Re", []))
798 imag = np.array(cal_data.get("Im", []))
800 if real.size > 0 and imag.size > 0:
801 amplitudes = np.sqrt(real**2 + imag**2)
802 phases = np.degrees(np.arctan2(imag, real))
804 cal_filter = FrequencyResponseTableFilter(
805 frequencies=frequencies,
806 amplitudes=amplitudes,
807 phases=phases,
808 gain=gain,
809 instrument_type="flux gate magnetometer",
810 units_in="nanoTesla",
811 units_out="nanoTesla",
812 sequence_number=1,
813 )
814 return cal_filter
816 def to_run_ts(
817 self,
818 fn: str | Path | None = None,
819 e_channels: list[str] = ["e1", "e2"],
820 calibration_dict: dict | None = None,
821 ) -> RunTS:
822 """
823 Create a RunTS object from the data.
825 Parameters
826 ----------
827 fn : str, pathlib.Path, or None, optional
828 Full path to file. Will use LEMI424.fn if None, by default None.
829 e_channels : list of str, optional
830 Column names for the electric channels to use, by default ["e1", "e2"].
831 calibration_dict : dict, optional
832 Calibration dictionary to apply to the data, by default {}. Keys are
833 the channel names and values are the calibration file path. The file
834 path is assumed to be in the format `lemi-{component}.sr.json`.
836 Returns
837 -------
838 mth5.timeseries.RunTS
839 RunTS object containing the data.
841 """
843 ch_list = []
844 if calibration_dict is None:
845 calibration_dict = {}
846 if not isinstance(calibration_dict, dict):
847 raise ValueError("calibration_dict must be a dictionary")
849 for comp in (
850 ["bx", "by", "bz"] + e_channels + ["temperature_e", "temperature_h"]
851 ):
852 channel_response = None
853 if comp in calibration_dict.keys():
854 fap_filter = self.read_calibration(calibration_dict[comp])
855 fap_filter.name = f"lemi424_{comp}_calibration"
856 channel_response = ChannelResponse(filters_list=[fap_filter])
858 if comp[0] in ["h", "b"]:
859 ch = ChannelTS("magnetic")
860 ch.channel_metadata.units = "nT"
861 elif comp[0] in ["e"]:
862 ch = ChannelTS("electric")
863 ch.channel_metadata.units = "mV/km"
864 else:
865 ch = ChannelTS("auxiliary")
866 ch.channel_metadata.units = "C"
867 ch.sample_rate = self.sample_rate
868 ch.start = self.start
869 ch.ts = self.data[comp].values
870 ch.component = comp
871 if channel_response is not None:
872 ch.channel_response = channel_response
874 ch_list.append(ch)
875 return RunTS(
876 array_list=ch_list,
877 station_metadata=self.station_metadata,
878 run_metadata=self.run_metadata,
879 )
882# =============================================================================
883# define the reader
884# =============================================================================
885def read_lemi424(
886 fn: str | Path | list[str | Path],
887 e_channels: list[str] = ["e1", "e2"],
888 fast: bool = True,
889 calibration_dict: dict | None = None,
890) -> RunTS:
891 """
892 Read a LEMI 424 TXT file.
894 Parameters
895 ----------
896 fn : str or pathlib.Path
897 Input file name.
898 e_channels : list of str, optional
899 A list of electric channels to read, by default ["e1", "e2"].
900 fast : bool, optional
901 Use fast reading method, by default True.
902 calibration_dict : dict, optional
903 Calibration dictionary to apply to the data, by default None. Keys are
904 the channel names and values are the calibration file path.
906 Returns
907 -------
908 mth5.timeseries.RunTS
909 A RunTS object with appropriate metadata.
911 """
913 if not isinstance(fn, (list, tuple)):
914 fn = [fn]
915 txt_obj = LEMI424(fn[0])
916 txt_obj.read(fast=fast)
918 # read a list of files into a single run
919 if len(fn) > 1:
920 for txt_file in fn:
921 other = LEMI424(txt_file)
922 other.read()
923 txt_obj += other
924 return txt_obj.to_run_ts(e_channels=e_channels, calibration_dict=calibration_dict)