Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ processing \ kernel_dataset.py: 75%
448 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1"""
2Magnetotelluric kernel dataset processing module.
4This module contains a class for representing a dataset that can be processed.
6The module provides functionality for:
7- Managing magnetotelluric time series intervals
8- Supporting single station and remote reference processing
9- Handling run combination and time interval restrictions
10- Interfacing with MTH5 data structures
12Development Notes
13-----------------
14Players on the stage: One or more mth5s.
16Each mth5 has a "run_summary" dataframe available. Run_summary provides options for
17the local and possibly remote reference stations. Candidates for local station are
18the unique values in the station column.
20For any candidate station, there are some integer n runs available.
21This yields 2^n - 1 possible combinations that can be processed, neglecting any
22flagging of time intervals within any run, or any joining of runs.
23(There are actually 2**n, but we ignore the empty set, so -1)
25Intuition suggests default ought to be to process n runs in n+1 configurations:
26{all runs} + each run individually. This will give a bulk answer, and bad runs can
27be flagged by comparing them. After an initial processing, the tfs can be reviewed
28and the problematic runs can be addressed.
30The user can interact with the run_summary_df, selecting sub dataframes via querying,
31and in future maybe via some GUI (or a spreadsheet).
33The intended usage process is as follows:
34 0. Start with a list of mth5s
35 1. Extract a run_summary
36 2. Stare at the run_summary_df, and select a station "S" to process
37 3. Select a non-empty set of runs for station "S"
38 4. Select a remote reference "RR", (this is allowed to be None)
39 5. Extract the sub-dataframe corresponding to acquisition_runs from "S" and "RR"
40 7. If the remote is not None:
41 - Drop the runs (rows) associated with RR that do not intersect with S
42 - Restrict start/end times of RR runs that intersect with S so overlap is complete.
43 - Restrict start/end times of S runs so that they intersect with remote
44 8. This is now a TFKernel Dataset Definition (ish). Initialize a default processing
45 object and pass it this df.
47Examples
48--------
49>>> cc = ConfigCreator()
50>>> p = cc.create_from_kernel_dataset(kernel_dataset)
51- Optionally pass emtf_band_file=emtf_band_setup_file
52 9. Edit the Processing Config appropriately,
54TODO: Consider supporting a default value for 'channel_scale_factors' that is None,
55TODO: Might need to groupby survey & station, for now consider station_id unique.
56"""
58from __future__ import annotations
60import copy
62# =============================================================================
63# Imports
64# =============================================================================
65from pathlib import Path
66from typing import Any
68import mt_metadata.timeseries
69import pandas as pd
70from loguru import logger
71from mt_metadata.common.list_dict import ListDict
72from mt_metadata.timeseries import Survey
73from mt_metadata.transfer_functions.tf import Station
75import mth5.timeseries.run_ts
76from mth5.mth5 import MTH5
77from mth5.processing import KERNEL_DATASET_DTYPE, MINI_SUMMARY_COLUMNS
78from mth5.processing.run_summary import RunSummary
79from mth5.utils.helpers import initialize_mth5
82# =============================================================================
85class KernelDataset:
86 """
87 Magnetotelluric kernel dataset for time series processing.
89 This class works with mth5-derived channel_summary or run_summary dataframes
90 that specify time series intervals. It manages acquisition "runs" that can be
91 merged into processing runs, with support for both single station and remote
92 reference processing configurations.
94 Parameters
95 ----------
96 df : pd.DataFrame | None, optional
97 Pre-formed dataframe with dataset configuration. Normally built from a
98 run_summary, by default None
99 local_station_id : str, optional
100 Local station identifier for the dataset. Normally passed via
101 from_run_summary method, by default ""
102 remote_station_id : str | None, optional
103 Remote reference station identifier. Normally passed via from_run_summary
104 method, by default None
105 **kwargs : dict
106 Additional keyword arguments to set as attributes
108 Attributes
109 ----------
110 df : pd.DataFrame | None
111 Main dataset dataframe with time series intervals
112 local_station_id : str | None
113 Local station identifier
114 remote_station_id : str | None
115 Remote reference station identifier
116 survey_metadata : dict
117 Survey metadata container
118 initialized : bool
119 Whether MTH5 objects have been initialized
120 local_mth5_obj : Any | None
121 Local station MTH5 object
122 remote_mth5_obj : Any | None
123 Remote station MTH5 object
125 Notes
126 -----
127 The class is closely related to (may actually be an extension of) RunSummary.
128 The main idea is to specify one or two stations, and a list of acquisition "runs"
129 that can be merged into a "processing run". Each acquisition run can be further
130 divided into non-overlapping chunks by specifying time-intervals associated with
131 that acquisition run.
133 The time intervals can be used for several purposes but primarily:
134 - STFT processing for merged FC data structures
135 - Binding together into xarray time series for gap filling
136 - Managing and analyzing availability of reference time series
138 Examples
139 --------
140 Create a kernel dataset from run summary:
142 >>> from mth5.processing.run_summary import RunSummary
143 >>> run_summary = RunSummary()
144 >>> dataset = KernelDataset()
145 >>> dataset.from_run_summary(run_summary, "station01", "station02")
147 Process single station data:
149 >>> single_dataset = KernelDataset()
150 >>> single_dataset.from_run_summary(run_summary, "station01")
152 See Also
153 --------
154 RunSummary : Data summary for processing configuration
155 """
157 def __init__(
158 self,
159 df: pd.DataFrame | None = None,
160 local_station_id: str = "",
161 remote_station_id: str | None = None,
162 **kwargs: Any,
163 ) -> None:
164 """
165 Initialize KernelDataset instance.
167 Parameters
168 ----------
169 df : pd.DataFrame | None, optional
170 Pre-formed dataframe with dataset configuration, by default None
171 local_station_id : str, optional
172 Local station identifier, by default ""
173 remote_station_id : str | None, optional
174 Remote reference station identifier, by default None
175 **kwargs : dict
176 Additional keyword arguments to set as attributes
177 """
178 self.df = df
179 self.local_station_id = local_station_id
180 self.remote_station_id = remote_station_id
181 self._mini_summary_columns = MINI_SUMMARY_COLUMNS
182 self.survey_metadata: dict[str, Any] = {}
183 self.initialized: bool = False
184 self.local_mth5_obj: Any = None
185 self.remote_mth5_obj: Any = None
186 self._local_mth5_path: Path | None = None
187 self._remote_mth5_path: Path | None = None
189 for key, value in kwargs.items():
190 setattr(self, key, value)
192 def __str__(self) -> str:
193 """
194 Return string representation of the dataset.
196 Returns
197 -------
198 str
199 String representation showing mini summary
200 """
201 return str(self.mini_summary)
203 def __repr__(self) -> str:
204 """
205 Return detailed string representation.
207 Returns
208 -------
209 str
210 Detailed string representation
211 """
212 return self.__str__()
214 @property
215 def df(self) -> pd.DataFrame | None:
216 """
217 Main dataset dataframe.
219 Returns
220 -------
221 pd.DataFrame | None
222 Dataset dataframe with time series intervals, or None if not set
223 """
224 return self._df
226 @df.setter
227 def df(self, value: pd.DataFrame | None) -> None:
228 """
229 Set the dataset dataframe with proper validation.
231 Parameters
232 ----------
233 value : pd.DataFrame | None
234 Dataframe to set, or None to clear
236 Raises
237 ------
238 TypeError
239 If value is not a DataFrame or None
240 """
241 if value is None:
242 self._df = None
243 return
245 if not isinstance(value, pd.DataFrame):
246 msg = f"Need to set df with a Pandas.DataFrame not type({type(value)})"
247 logger.error(msg)
248 raise TypeError(msg)
250 self._df = self._add_duration_column(
251 self._set_datetime_columns(self._add_columns(value)), inplace=False
252 )
254 def _has_df(self) -> bool:
255 """
256 Check if dataframe is set and not empty.
258 Returns
259 -------
260 bool
261 True if dataframe exists and is not empty
262 """
263 if self._df is not None:
264 if not self._df.empty:
265 return True
266 return False
267 return False
269 def _df_has_local_station_id(self, df: pd.DataFrame) -> bool:
270 """
271 Check if dataframe contains the local station ID.
273 Parameters
274 ----------
275 df : pd.DataFrame
276 Dataframe to check
278 Returns
279 -------
280 bool
281 True if local station ID exists in dataframe
282 """
283 return (df.station == self.local_station_id).any()
285 def _df_has_remote_station_id(self, df: pd.DataFrame) -> bool:
286 """
287 Check if dataframe contains the remote station ID.
289 Parameters
290 ----------
291 df : pd.DataFrame
292 Dataframe to check
294 Returns
295 -------
296 bool
297 True if remote station ID exists in dataframe
298 """
299 return (df.station == self.remote_station_id).any()
301 def _set_datetime_columns(self, df: pd.DataFrame) -> pd.DataFrame:
302 """
303 Ensure start and end columns are datetime objects.
305 Parameters
306 ----------
307 df : pd.DataFrame
308 Input dataframe
310 Returns
311 -------
312 pd.DataFrame
313 Dataframe with datetime columns properly set
314 """
315 try:
316 df.start = pd.to_datetime(df.start, format="mixed")
317 df.end = pd.to_datetime(df.end, format="mixed")
318 except ValueError:
319 df.start = pd.to_datetime(df.start)
320 df.end = pd.to_datetime(df.end)
322 return df
324 def clone(self) -> "KernelDataset":
325 """
326 Create a deep copy of the dataset.
328 Returns
329 -------
330 KernelDataset
331 Deep copy of this instance
332 """
333 return copy.deepcopy(self)
335 def clone_dataframe(self) -> pd.DataFrame | None:
336 """
337 Create a deep copy of the dataframe.
339 Returns
340 -------
341 pd.DataFrame | None
342 Deep copy of the dataframe, or None if dataframe is not set
343 """
344 return copy.deepcopy(self.df)
346 def _add_columns(
347 self,
348 df: pd.DataFrame,
349 null_columns: list[str] | tuple[str, ...] = ("fc",),
350 ) -> pd.DataFrame:
351 """
352 Add missing columns with appropriate dtypes.
354 Parameters
355 ----------
356 df : pd.DataFrame
357 Kernel dataset dataframe, possibly missing some columns
358 null_columns : list[str] | tuple[str, ...], optional
359 Columns that will initialize to null rather than their expected dtype,
360 by default ("fc",)
362 Returns
363 -------
364 pd.DataFrame
365 Kernel dataset dataframe with all required columns present
367 Raises
368 ------
369 ValueError
370 If required columns (survey, station, run, start, end) are missing
371 """
372 for col, dtype in KERNEL_DATASET_DTYPE:
373 if col not in df.columns:
374 if col in ["survey", "station", "run", "start", "end"]:
375 raise ValueError(f"{col} must be a filled column in the dataframe")
377 try:
378 df[col] = dtype(0)
379 assigned_dtype = dtype
380 except TypeError:
381 df[col] = None # TODO: update to pd.NA
382 assigned_dtype = type(None)
384 if col in null_columns:
385 df[col] = pd.NA
386 assigned_dtype = type(pd.NA)
388 msg = (
389 f"KernelDataset DataFrame needs column {col}, adding and "
390 f"setting dtype to {assigned_dtype}."
391 )
392 logger.debug(msg)
394 return df
396 @property
397 def local_station_id(self) -> str | None:
398 """
399 Local station identifier.
401 Returns
402 -------
403 str | None
404 Local station identifier
405 """
406 return self._local_station_id
408 @local_station_id.setter
409 def local_station_id(self, value: str | None) -> None:
410 """
411 Set local station identifier.
413 Parameters
414 ----------
415 value : str | None
416 Station identifier to set
418 Raises
419 ------
420 ValueError
421 If value cannot be converted to string
422 NameError
423 If station ID is not found in dataframe when dataframe exists
424 """
425 if value is None:
426 self._local_station_id = None
427 else:
428 try:
429 self._local_station_id = str(value)
430 except ValueError:
431 raise ValueError(
432 f"Bad type {type(value)}. "
433 "Cannot convert local_station_id value to string."
434 )
435 if self._has_df() and self.df is not None:
436 if not self._df_has_local_station_id(self.df):
437 raise NameError(
438 f"Could not find {self._local_station_id} in dataframe"
439 )
441 @property
442 def local_mth5_path(self) -> Path | None:
443 """
444 Local station MTH5 file path.
446 Returns
447 -------
448 Path | None
449 Path to local station MTH5 file, extracted from dataframe or
450 stored path, or None if not available
451 """
452 if self._has_df() and self._df is not None:
453 unique_paths = self._df.loc[
454 self._df.station == self.local_station_id, "mth5_path"
455 ].unique()
456 if len(unique_paths) > 0:
457 return Path(unique_paths[0])
458 return None
459 else:
460 return self._local_mth5_path
462 @local_mth5_path.setter
463 def local_mth5_path(self, value: str | Path | None) -> None:
464 """
465 Set local MTH5 path.
467 Parameters
468 ----------
469 value : str | Path | None
470 Path to MTH5 file
471 """
472 self._local_mth5_path = self.set_path(value)
474 def has_local_mth5(self) -> bool:
475 """
476 Check if local MTH5 file exists.
478 Returns
479 -------
480 bool
481 True if local MTH5 file exists on filesystem
482 """
483 if self.local_mth5_path is None:
484 return False
485 else:
486 return self.local_mth5_path.exists()
488 @property
489 def remote_station_id(self) -> str | None:
490 """
491 Remote reference station identifier.
493 Returns
494 -------
495 str | None
496 Remote station identifier
497 """
498 return self._remote_station_id
500 @remote_station_id.setter
501 def remote_station_id(self, value: str | None) -> None:
502 """
503 Set remote station identifier.
505 Parameters
506 ----------
507 value : str | None
508 Remote station identifier
510 Raises
511 ------
512 ValueError
513 If value cannot be converted to string
514 NameError
515 If station ID is not found in dataframe when dataframe exists
516 """
517 if value is None:
518 self._remote_station_id = None
519 else:
520 try:
521 self._remote_station_id = str(value)
522 except ValueError:
523 raise ValueError(
524 f"Bad type {type(value)}. "
525 "Cannot convert remote_station_id value to string."
526 )
527 if self._has_df():
528 if not self._df_has_remote_station_id(self.df):
529 raise NameError(
530 f"Could not find {self._remote_station_id} in dataframe"
531 )
533 @property
534 def remote_mth5_path(self) -> Path:
535 """Remote mth5 path.
536 :return: Remote station MTH5 path, a property extracted from the dataframe.
537 :rtype: Path
538 """
539 if self._has_df() and self.remote_station_id is not None:
540 return Path(
541 self._df.loc[
542 self._df.station == self.remote_station_id, "mth5_path"
543 ].unique()[0]
544 )
545 else:
546 return self._remote_mth5_path
548 @remote_mth5_path.setter
549 def remote_mth5_path(self, value: str | Path | None):
550 """
551 Set the remote mth5 path.
553 Parameters
554 ----------
555 value : str | Path | None
556 Path to the remote mth5 file
557 """
558 self._remote_mth5_path = self.set_path(value)
560 def has_remote_mth5(self) -> bool:
561 """Test if remote mth5 exists."""
562 if self.remote_mth5_path is None:
563 return False
564 else:
565 return self.remote_mth5_path.exists()
567 @property
568 def processing_id(self) -> str:
569 """Its difficult to come up with unique ids without crazy long names
570 so this is a generic id of local-remote, the station metadata
571 will have run information and the config parameters.
572 """
573 if self.remote_station_id is not None:
574 return (
575 f"{self.local_station_id}_rr_{self.remote_station_id}_"
576 f"sr{int(self.sample_rate)}"
577 )
578 else:
579 return f"{self.local_station_id}_sr{int(self.sample_rate)}"
581 @property
582 def input_channels(self) -> list[str]:
583 """
584 Get input channels from dataframe.
586 Returns
587 -------
588 list[str]
589 Input channel identifiers (sources)
591 Raises
592 ------
593 AttributeError
594 If dataframe is not available or local_df has no input_channels
595 """
596 if self._has_df() and self.df is not None:
597 local_data = self.local_df
598 if local_data is not None and not local_data.empty:
599 return local_data.input_channels.iat[0]
600 return []
602 @property
603 def output_channels(self) -> list[str]:
604 """
605 Get output channels from dataframe.
607 Returns
608 -------
609 list[str]
610 Output channel identifiers
612 Raises
613 ------
614 AttributeError
615 If dataframe is not available or local_df has no output_channels
616 """
617 if self._has_df() and self.df is not None:
618 local_data = self.local_df
619 if local_data is not None and not local_data.empty:
620 return local_data.output_channels.iat[0]
621 return []
623 @property
624 def remote_channels(self) -> list[str]:
625 """
626 Get remote reference channels from dataframe.
628 Returns
629 -------
630 list[str]
631 Remote reference channel identifiers
633 Raises
634 ------
635 AttributeError
636 If dataframe is not available or remote_df has no remote_channels
637 """
638 if (
639 self._has_df()
640 and self.df is not None
641 and self.remote_station_id is not None
642 ):
643 remote_data = self.remote_df
644 if remote_data is not None and not remote_data.empty:
645 return remote_data.input_channels.iat[0]
646 return []
648 @property
649 def local_df(self) -> pd.DataFrame | None:
650 """
651 Get dataframe subset for local station runs.
653 Returns
654 -------
655 pd.DataFrame | None
656 Local station runs data, or None if dataframe not available
657 """
658 if self._has_df() and self.df is not None:
659 return self.df[self.df.station == self.local_station_id]
660 return None
662 @property
663 def remote_df(self) -> pd.DataFrame | None:
664 """
665 Get dataframe subset for remote station runs.
667 Returns
668 -------
669 pd.DataFrame | None
670 Remote station runs data, or None if dataframe not available
671 or no remote station configured
672 """
673 if (
674 self._has_df()
675 and self.df is not None
676 and self.remote_station_id is not None
677 ):
678 return self.df[self.df.station == self.remote_station_id]
679 return None
681 @classmethod
682 def set_path(cls, value: str | Path | None) -> Path | None:
683 """
684 Set and validate a file path.
686 Parameters
687 ----------
688 value : str | Path | None
689 Path value to set and validate
691 Returns
692 -------
693 Path | None
694 Validated Path object, or None if input is None
696 Raises
697 ------
698 IOError
699 If path does not exist on filesystem
700 ValueError
701 If value cannot be converted to Path
702 """
703 if value is None:
704 return None
706 if isinstance(value, (str, Path)):
707 return_path = Path(value)
708 if not return_path.exists():
709 raise IOError(f"Cannot find file: {return_path}")
710 return return_path
711 else:
712 raise ValueError(f"Cannot convert type {type(value)} to Path")
714 def from_run_summary(
715 self,
716 run_summary: RunSummary,
717 local_station_id: str | None = None,
718 remote_station_id: str | None = None,
719 sample_rate: float | int | None = None,
720 ) -> None:
721 """
722 Initialize the dataframe from a run summary.
724 Parameters
725 ----------
726 run_summary : RunSummary
727 Summary of available data for processing from one or more stations
728 local_station_id : str | None, optional
729 Label of the station for which an estimate will be computed,
730 by default None
731 remote_station_id : str | None, optional
732 Label of the remote reference station, by default None
733 sample_rate : float | int | None, optional
734 Sample rate to filter data by, by default None
736 Raises
737 ------
738 ValueError
739 If restricting to specified stations yields empty dataset or
740 if local and remote stations do not overlap for remote reference
741 """
743 self.df = None
745 if local_station_id is not None:
746 self.local_station_id = local_station_id
747 if remote_station_id is not None:
748 self.remote_station_id = remote_station_id
750 if sample_rate is not None:
751 run_summary = run_summary.set_sample_rate(sample_rate)
753 station_ids = [local_station_id]
754 if self.remote_station_id:
755 station_ids.append(remote_station_id)
756 # Filter out None values before passing to function
757 station_ids_filtered = [sid for sid in station_ids if sid is not None]
758 df = restrict_to_station_list(
759 run_summary.df, station_ids_filtered, inplace=False
760 )
762 # Check df is non-empty
763 if len(df) == 0:
764 msg = f"Restricting run_summary df to {station_ids} yields an empty set"
765 logger.critical(msg)
766 raise ValueError(msg)
768 # Check that the columns have data
769 len_df_before_drop_dataless_rows = len(df)
770 df = df[df.has_data]
771 n_dropped = len_df_before_drop_dataless_rows - len(df)
772 if n_dropped:
773 msg = f"Dropped {n_dropped} rows from Kernel Dataset due to missing data"
774 logger.warning(msg)
776 # Check df is non-empty (again)
777 if len(df) == 0:
778 msg = (
779 "Restricting run_summary df to runs that have data yields an empty set"
780 )
781 logger.critical(msg)
782 raise ValueError(msg)
784 # add columns column
785 df = self._add_columns(df)
787 # set remote reference
788 if self.remote_station_id:
789 cond = df.station == remote_station_id
790 df.remote = cond
792 # be sure to set date time columns and restrict to simultaneous runs
793 df = self._set_datetime_columns(df)
794 if self.remote_station_id:
795 df = self.restrict_run_intervals_to_simultaneous(df)
797 # Again check df is non-empty
798 if len(df) == 0:
799 msg = (
800 f"Local: {local_station_id} and remote: "
801 f"{remote_station_id} do not overlap. Remote reference "
802 "processing not a valid option."
803 )
804 logger.error(msg)
805 raise ValueError(msg)
807 self.df = df
809 self.survey_metadata = self.get_metadata_from_df(self.local_df)
811 def get_metadata_from_df(self, df: pd.DataFrame) -> Survey:
812 """
813 Extract metadata from the dataframe. The data frame should only include one
814 station. So use self.local_df or self.remote_df. (Run Summary)
816 Parameters
817 ----------
818 df : pd.DataFrame
819 Dataframe to extract metadata from
821 Returns
822 -------
823 dict[str, Any]
824 Dictionary containing survey metadata
825 """
826 if df is None or df.empty:
827 return {}
829 mth5_path = df["mth5_path"].unique()[0]
830 if len(mth5_path) == 0:
831 raise ValueError(
832 f"Cannot find MTH5 path for local station {self.local_station_id}"
833 )
835 h5_station_reference = df["station_hdf5_reference"].unique()[0]
837 if h5_station_reference is None:
838 logger.warning(
839 f"Cannot find h5_station_reference for local station {self.local_station_id}"
840 )
841 return {}
842 with MTH5() as m:
843 m.open_mth5(mth5_path)
844 station_group = m.from_reference(h5_station_reference)
845 survey_metadata = station_group.survey_metadata
847 # survey metadata returns a time series station, so need to update to a
848 # transfer function object
850 tf_station = Station()
851 tf_station.update(survey_metadata.stations[self.local_station_id])
853 # remove runs that are not in the dataframe
854 processing_runs = df.run.unique()
855 for run in tf_station.runs.keys():
856 if run not in processing_runs:
857 tf_station.remove_run(run)
858 else:
859 tf_station.runs[run].update(
860 survey_metadata.stations[self.local_station_id].runs[run]
861 )
863 # add to survey metadata by removing the old one first
864 survey_metadata.remove_station(self.local_station_id)
865 survey_metadata.add_station(tf_station)
867 return survey_metadata
869 @property
870 def mini_summary(self) -> pd.DataFrame:
871 """
872 Return a dataframe that fits in terminal display.
874 Returns
875 -------
876 pd.DataFrame
877 Subset of the main dataframe with key columns for summary display
878 """
879 return self.df[self._mini_summary_columns]
881 @property
882 def local_survey_id(self) -> str:
883 """
884 Return string label for local survey id.
886 Returns
887 -------
888 str
889 Survey ID for the local station
890 """
891 survey_id = self.df.loc[~self.df.remote].survey.unique()[0]
892 if survey_id in ["none"]:
893 survey_id = "0"
894 return survey_id
896 @property
897 def local_survey_metadata(self) -> mt_metadata.timeseries.Survey:
898 """Return survey metadata for local station."""
899 return self.survey_metadata
900 # except KeyError:
901 # msg = f"Unexpected key {self.local_survey_id} not found in survey_metadata"
902 # msg += f"{msg} WARNING -- Maybe old MTH5 -- trying to use key '0'"
903 # logger.warning(msg)
904 # return self.survey_metadata["0"]
906 def _add_duration_column(self, df, inplace=True) -> None:
907 """Adds a column to self.df with times end-start (in seconds)."""
909 timedeltas = df.end - df.start
910 durations = [x.total_seconds() for x in timedeltas]
911 if inplace:
912 df["duration"] = durations
913 return df
914 else:
915 new_df = df.copy()
916 new_df["duration"] = durations
917 return new_df
919 def _update_duration_column(self, inplace=True) -> None:
920 """Calls add_duration_column (after possible manual manipulation of start/end."""
922 if inplace:
923 self._df = self._add_duration_column(self._df, inplace)
924 else:
925 return self._add_duration_column(self._df, inplace)
927 def drop_runs_shorter_than(
928 self,
929 minimum_duration: float,
930 units: str = "s",
931 inplace: bool = True,
932 ) -> pd.DataFrame | None:
933 """
934 Drop runs from dataframe that are shorter than minimum duration.
936 Parameters
937 ----------
938 minimum_duration : float
939 The minimum allowed duration for a run (in units of units)
940 units : str, optional
941 Time units, by default "s". Currently only seconds are supported
942 inplace : bool, optional
943 Whether to modify dataframe in place, by default True
945 Returns
946 -------
947 pd.DataFrame | None
948 Modified dataframe if inplace=False, None if inplace=True
950 Raises
951 ------
952 NotImplementedError
953 If units other than seconds are specified
955 Notes
956 -----
957 This method needs to have duration refreshed beforehand.
958 """
959 if units != "s":
960 msg = "Expected units are seconds : units='s'"
961 raise NotImplementedError(msg)
963 drop_cond = self.df.duration < minimum_duration
964 if inplace:
965 self._update_duration_column(inplace)
966 self.df.drop(self.df[drop_cond].index, inplace=inplace)
967 self.df.reset_index(drop=True, inplace=True)
968 return None
969 else:
970 new_df = self._update_duration_column(inplace)
971 new_df = self.df.drop(self.df[drop_cond].index)
972 new_df.reset_index(drop=True, inplace=True)
973 return new_df
975 def select_station_runs(
976 self,
977 station_runs_dict: dict,
978 keep_or_drop: bool,
979 inplace: bool = True,
980 ) -> pd.DataFrame | None:
981 """
982 Partition dataframe based on station_runs_dict and return one partition.
984 Parameters
985 ----------
986 station_runs_dict : dict
987 Keys are string IDs of stations to keep/drop.
988 Values are lists of string labels for run_ids to keep/drop.
989 Example: {"mt01": ["0001", "0003"]}
990 keep_or_drop : bool
991 If True: returns df with only the station-runs specified
992 If False: returns df with station_runs_dict entries removed
993 inplace : bool, optional
994 If True, modifies dataframe in place, by default True
996 Returns
997 -------
998 pd.DataFrame | None
999 Modified dataframe if inplace=False, None if inplace=True
1000 """
1002 for station_id, run_ids in station_runs_dict.items():
1003 if isinstance(run_ids, str):
1004 run_ids = [
1005 run_ids,
1006 ]
1007 cond1 = self.df["station"] == station_id
1008 cond2 = self.df["run"].isin(run_ids)
1009 if keep_or_drop == "keep":
1010 drop_df = self.df[cond1 & ~cond2]
1011 else:
1012 drop_df = self.df[cond1 & cond2]
1014 if inplace:
1015 self.df.drop(drop_df.index, inplace=True)
1016 self.df.reset_index(drop=True, inplace=True)
1017 else:
1018 df = self.df.drop(drop_df.index, inplace=False)
1019 df = df.reset_index(drop=True, inplace=True)
1020 return df
1022 def set_run_times(self, run_time_dict: dict, inplace: bool = True):
1023 """
1024 Set run times from a dictionary.
1026 Parameters
1027 ----------
1028 run_time_dict : dict
1029 Dictionary formatted as {run_id: {start, end}}
1030 inplace : bool, optional
1031 Whether to modify dataframe in place, by default True
1033 Returns
1034 -------
1035 pd.DataFrame | None
1036 Modified dataframe if inplace=False, None if inplace=True
1037 """
1038 msg = "Need to set run time with a dictionary in the form of {run_id: {start, end}}"
1039 if not isinstance(run_time_dict, dict):
1040 raise TypeError(msg)
1042 for key, times in run_time_dict.items():
1043 if not isinstance(times, dict):
1044 raise TypeError(msg)
1045 if not "start" in times.keys() or "end" not in times.keys():
1046 raise KeyError(msg)
1048 cond1 = self.df.run == key
1049 cond2 = self.df.start <= times["start"]
1050 cond3 = self.df.end >= times["end"]
1051 self.df.loc[cond1 & cond2 & cond3, "start"] = times["start"]
1052 self.df.loc[cond1 & cond2 & cond3, "end"] = times["end"]
1053 self._update_duration_column()
1054 self.df = self.restrict_run_intervals_to_simultaneous(self.df)
1056 @property
1057 def is_single_station(self) -> bool:
1058 """Returns True if no RR station."""
1059 if self.local_station_id:
1060 if self.remote_station_id:
1061 return False
1062 else:
1063 return True
1064 else:
1065 return False
1067 def restrict_run_intervals_to_simultaneous(self, df: pd.DataFrame) -> None:
1068 """For each run in local_station_id check if it has overlap with other runs
1070 There is room for optimization here
1072 Note that you can wind up splitting runs here. For example, in that case where
1073 local is running continuously, but remote is intermittent. Then the local
1074 run may break into several chunks.
1075 :rtype: None
1076 """
1077 local_df = df[df.station == self.local_station_id]
1078 remote_df = df[df.station == self.remote_station_id]
1079 output_sub_runs = []
1080 for i_local, local_row in local_df.iterrows():
1081 for i_remote, remote_row in remote_df.iterrows():
1082 if intervals_overlap(
1083 local_row.start,
1084 local_row.end,
1085 remote_row.start,
1086 remote_row.end,
1087 ):
1088 # print(f"OVERLAP {i_local}, {i_remote}")
1089 olap_start, olap_end = overlap(
1090 local_row.start,
1091 local_row.end,
1092 remote_row.start,
1093 remote_row.end,
1094 )
1096 local_sub_run = local_row.copy(deep=True)
1097 remote_sub_run = remote_row.copy(deep=True)
1098 local_sub_run.start = olap_start
1099 local_sub_run.end = olap_end
1100 remote_sub_run.start = olap_start
1101 remote_sub_run.end = olap_end
1102 output_sub_runs.append(local_sub_run)
1103 output_sub_runs.append(remote_sub_run)
1104 else:
1105 pass
1106 # print(f"NOVERLAP {i_local}, {i_remote}")
1107 new_df = pd.DataFrame(output_sub_runs)
1108 new_df = new_df.reset_index(drop=True)
1110 if new_df.empty:
1111 msg = (
1112 f"Local: {self.local_station_id} and "
1113 f"remote: {self.remote_station_id} do "
1114 f"not overlap, Remote reference processing not a valid option."
1115 )
1116 logger.error(msg)
1117 raise ValueError(msg)
1119 return new_df
1121 def get_station_metadata(
1122 self, local_station_id: str
1123 ) -> mt_metadata.timeseries.Station:
1124 """Returns the station metadata.
1126 Development Notes:
1127 TODO: This appears to be unused. Was probably a precursor to the
1128 update_survey_metadata() method. Delete if unused. If used fill out doc:
1129 "Helper function for archiving the TF -- returns an object we can use to populate
1130 station metadata in the _____"
1131 :param local_station_id: The name of the local station.
1132 :type local_station_id: str
1133 :rtype: mt_metadata.timeseries.Station
1134 """
1135 # get a list of local runs:
1136 cond = self.df["station"] == local_station_id
1137 sub_df = self.df[cond]
1138 sub_df.drop_duplicates(subset="run", inplace=True)
1140 # sanity check:
1141 run_ids = sub_df.run.unique()
1142 assert len(run_ids) == len(sub_df)
1144 station_metadata = sub_df.mth5_obj[0].from_reference(
1145 sub_df.station_hdf5_reference[0]
1146 )
1147 station_metadata.runs = ListDict()
1148 for i, row in sub_df.iterrows():
1149 local_run_obj = self.get_run_object(row)
1150 station_metadata.add_run(local_run_obj.metadata)
1151 return station_metadata
1153 def get_run_object(
1154 self, index_or_row: int | pd.Series
1155 ) -> mt_metadata.timeseries.Run:
1156 """
1157 Get the run object associated with a row of the dataframe.
1159 Parameters
1160 ----------
1161 index_or_row : int | pd.Series
1162 Row index or row Series from the dataframe
1164 Returns
1165 -------
1166 mt_metadata.timeseries.Run
1167 The run object associated with the row
1169 Notes
1170 -----
1171 This method may be deprecated in favor of direct calls to
1172 run_obj = row.mth5_obj.from_reference(row.run_hdf5_reference) in pipelines.
1173 """
1174 if isinstance(index_or_row, int):
1175 row = self.df.loc[index_or_row]
1176 else:
1177 row = index_or_row
1178 run_obj = row.mth5_obj.from_reference(row.run_hdf5_reference)
1179 return run_obj
1181 @property
1182 def num_sample_rates(self) -> int:
1183 """Returns the number of unique sample rates in the dataframe."""
1184 return len(self.df.sample_rate.unique())
1186 @property
1187 def sample_rate(self) -> float:
1188 r"""Returns the sample rate that of the data in the dataframe."""
1189 if self.num_sample_rates != 1:
1190 msg = "Aurora does not yet process data from mixed sample rates"
1191 logger.error(f"{msg}")
1192 raise NotImplementedError(msg)
1193 sample_rate = self.df.sample_rate.unique()[0]
1194 return sample_rate
1196 # this should be deprecated in the future in favor of usin get_metadata_from_df
1197 def update_survey_metadata(
1198 self, i: int, row: pd.Series, run_ts: mth5.timeseries.run_ts.RunTS
1199 ) -> None:
1200 """Wrangle survey_metadata into kernel_dataset.
1202 Development Notes:
1203 - The survey metadata needs to be passed to TF before exporting data.
1204 - This was factored out of initialize_dataframe_for_processing
1205 - TODO: It looks like we don't need to pass the whole run_ts, just its metadata
1206 There may be some performance implications to passing the whole object.
1207 Consider passing run_ts.survey_metadata, run_ts.run_metadata,
1208 run_ts.station_metadata only
1209 :param i: This would be the index of row, if we were sure that the dataframe was cleanly indexed.
1210 :type i: int
1211 :param row:
1212 :type row: pd.Series
1213 :param run_ts: Mth5 object having the survey_metadata.
1214 :type run_ts: mth5.timeseries.run_ts.RunTS
1215 :rtype: None
1216 """
1217 survey_id = run_ts.survey_metadata.id
1218 if survey_id not in self.survey_metadata.keys():
1219 self.survey_metadata[survey_id] = run_ts.survey_metadata
1220 else:
1221 if row.station in self.survey_metadata[survey_id].stations.keys():
1222 self.survey_metadata[survey_id].stations[row.station].add_run(
1223 run_ts.run_metadata
1224 )
1225 else:
1226 self.survey_metadata[survey_id].add_station(run_ts.station_metadata)
1227 if len(self.survey_metadata.keys()) > 1:
1228 raise NotImplementedError
1230 @property
1231 def mth5_objs(self):
1232 """Mth5 objs.
1233 :return: Dictionary [station_id: mth5_obj].
1234 :rtype: dict
1235 """
1236 mth5_obj_dict = {}
1237 mth5_obj_dict[self.local_station_id] = self.local_mth5_obj
1238 if self.remote_station_id is not None:
1239 mth5_obj_dict[self.remote_station_id] = self.remote_mth5_obj
1240 return mth5_obj_dict
1242 def initialize_mth5s(self, mode: str = "r"):
1243 """
1244 Return a dictionary of open mth5 objects, keyed by station_id.
1246 Parameters
1247 ----------
1248 mode : str, optional
1249 File opening mode, by default "r" (read-only)
1251 Returns
1252 -------
1253 dict
1254 Dictionary keyed by station IDs containing MTH5 objects:
1255 - local station id: mth5.mth5.MTH5
1256 - remote station id: mth5.mth5.MTH5 (if present)
1258 Notes
1259 -----
1260 Future versions for multiple station processing may need
1261 nested dict structure with [survey_id][station].
1262 """
1263 self.local_mth5_obj = initialize_mth5(self.local_mth5_path, mode=mode)
1264 if self.remote_station_id:
1265 self.remote_mth5_obj = initialize_mth5(self.remote_mth5_path, mode="r")
1267 self.initialized = True
1269 return self.mth5_objs
1271 def initialize_dataframe_for_processing(self) -> None:
1272 """Adds extra columns needed for processing to the dataframe.
1274 Populates them with mth5 objects, run_hdf5_reference, and xr.Datasets.
1276 Development Notes:
1277 Note #1: When assigning xarrays to dataframe cells, df dislikes xr.Dataset,
1278 so we convert to xr.DataArray before packing df
1280 Note #2: [OPTIMIZATION] By accessing the run_ts and packing the "run_dataarray" column of the df, we
1281 perform a non-lazy operation, and essentially forcing the entire decimation_level=0 dataset to be
1282 loaded into memory. Seeking a lazy method to handle this maybe worthwhile. For example, using
1283 a df.apply() approach to initialize only one row at a time would allow us to generate the FCs one
1284 row at a time and never ingest more than one run of data at a time ...
1286 Note #3: Uncommenting the continue statement here is desireable, will speed things up, but
1287 is not yet tested. A nice test would be to have two stations, some runs having FCs built
1288 and others not having FCs built. What goes wrong is in update_survey_metadata.
1289 Need a way to get the survey metadata from a run, not a run_ts if possible
1290 """
1292 self.add_columns_for_processing()
1294 for index, row in self.df.iterrows():
1295 run_obj = row.mth5_obj.get_run(row.station, row.run, survey=row.survey)
1296 self.df.loc[index, "run_hdf5_reference"] = run_obj.hdf5_group.ref
1298 if pd.notna(row.fc) and row.fc:
1299 msg = f"row {row} already has fcs prescribed by processing config"
1300 msg += "-- skipping time series initialisation"
1301 logger.info(msg)
1302 # see Note #3
1303 # continue
1304 # the line below is not lazy, See Note #2
1305 run_ts = run_obj.to_runts(start=row.start, end=row.end)
1306 self.df.at[index, "run_dataarray"] = run_ts.dataset.to_array("channel")
1308 # self.update_survey_metadata(i, row, run_ts)
1310 logger.info("Dataset dataframe initialized successfully, updated metadata.")
1312 def add_columns_for_processing(self) -> None:
1313 """Add columns to the dataframe used during processing.
1315 Development Notes:
1316 - This was originally in pipelines.
1317 - Q: Should mth5_objs be keyed by survey-station?
1318 - A: Yes, and ...
1319 since the KernelDataset dataframe will be iterated over, should probably
1320 write an iterator method. This can iterate over survey-station tuples
1321 for multiple station processing.
1322 - Currently the model of keeping all these data objects "live" in the df
1323 seems to work OK, but is not well suited to HPC or lazy processing.
1324 :param mth5_objs: Keys are station_id, values are MTH5 objects.
1325 :type mth5_objs: dict,
1326 """
1327 if not self.initialized:
1328 raise ValueError("mth5 objects have not been initialized yet.")
1330 if self._has_df():
1331 self._df.loc[self._df.station == self.local_station_id, "mth5_obj"] = (
1332 self.local_mth5_obj
1333 )
1334 if self.remote_station_id is not None:
1335 self._df.loc[self._df.station == self.remote_station_id, "mth5_obj"] = (
1336 self.remote_mth5_obj
1337 )
1339 def close_mth5s(self) -> None:
1340 """Loop over all unique mth5_objs in dataset df and make sure they are closed.+."""
1341 mth5_objs = self.df["mth5_obj"].unique()
1342 for mth5_obj in mth5_objs:
1343 mth5_obj.close_mth5()
1344 return
1347def restrict_to_station_list(
1348 df: pd.DataFrame,
1349 station_ids: str | list[str],
1350 inplace: bool = True,
1351) -> pd.DataFrame:
1352 """
1353 Drop all rows where station_ids are NOT in the provided list.
1355 Operates on a deepcopy of dataframe if inplace=False.
1357 Parameters
1358 ----------
1359 df : pd.DataFrame
1360 A run summary dataframe
1361 station_ids : str | list[str]
1362 Station ids to keep, normally local and remote
1363 inplace : bool, optional
1364 If True, modifies dataframe in place, by default True
1366 Returns
1367 -------
1368 pd.DataFrame
1369 Filtered dataframe with only specified stations
1370 """
1371 if isinstance(station_ids, str):
1372 station_ids = [station_ids]
1373 if not inplace:
1374 df = copy.deepcopy(df)
1375 cond1 = ~df["station"].isin(station_ids)
1376 df.drop(df[cond1].index, inplace=True)
1377 df = df.reset_index(drop=True)
1378 return df
1381def intervals_overlap(
1382 start1: pd.Timestamp,
1383 end1: pd.Timestamp,
1384 start2: pd.Timestamp,
1385 end2: pd.Timestamp,
1386) -> bool:
1387 """Checks if intervals 1, and 2 overlap.
1389 Interval 1 is (start1, end1), Interval 2 is (start2, end2),
1391 Development Notes:
1392 This may work vectorized out of the box but has not been tested.
1393 Also, it is intended to work with pd.Timestamp objects, but should work
1394 for many objects that have an ordering associated.
1395 This website was used as a reference when writing the method:
1396 https://stackoverflow.com/questions/3721249/python-date-interval-intersection
1397 :param start1: Start of interval 1.
1398 :type start1: pd.Timestamp
1399 :param end1: End of interval 1.
1400 :type end1: pd.Timestamp
1401 :param start2: Start of interval 2.
1402 :type start2: pd.Timestamp
1403 :param end2: End of interval 2.
1404 :type end2: pd.Timestamp
1405 :return cond: True of the intervals overlap, False if they do now.
1406 :rtype cond: bool
1407 """
1408 cond = (start1 <= start2 <= end1) or (start2 <= start1 <= end2)
1409 return cond
1412def overlap(
1413 t1_start: pd.Timestamp,
1414 t1_end: pd.Timestamp,
1415 t2_start: pd.Timestamp,
1416 t2_end: pd.Timestamp,
1417) -> tuple:
1418 """Get the start and end times of the overlap between two intervals.
1420 Interval 1 is (start1, end1), Interval 2 is (start2, end2),
1422 Development Notes:
1423 Possibly some nicer syntax in this discussion:
1424 https://stackoverflow.com/questions/3721249/python-date-interval-intersection
1425 - Intended to work with pd.Timestamp objects, but should work for many objects
1426 that have an ordering associated.
1427 :param t1_start: The start of interval 1.
1428 :type t1_start: pd.Timestamp
1429 :param t1_end: The end of interval 1.
1430 :type t1_end: pd.Timestamp
1431 :param t2_start: The start of interval 2.
1432 :type t2_start: pd.Timestamp
1433 :param t2_end: The end of interval 2.
1434 :type t2_end: pd.Timestamp
1435 :return start, end: Start, end are either same type as input, or they are None,None.
1436 :rtype start, end: tuple
1437 """
1438 if t1_start <= t2_start <= t2_end <= t1_end:
1439 return t2_start, t2_end
1440 elif t1_start <= t2_start <= t1_end:
1441 return t2_start, t1_end
1442 elif t1_start <= t2_end <= t1_end:
1443 return t1_start, t2_end
1444 elif t2_start <= t1_start <= t1_end <= t2_end:
1445 return t1_start, t1_end
1446 else:
1447 return None, None