Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ tables \ channel_table.py: 92%
92 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2from __future__ import annotations
5"""Channel summary utilities for MTH5 tables."""
7from typing import Any, Iterable
9import h5py
10import numpy as np
12# =============================================================================
13# Imports
14# =============================================================================
15import pandas as pd
16from mt_metadata.transfer_functions import (
17 ALLOWED_INPUT_CHANNELS,
18 ALLOWED_OUTPUT_CHANNELS,
19)
21from mth5 import CHANNEL_DTYPE, RUN_SUMMARY_COLUMNS
22from mth5.tables import MTH5Table
25# =============================================================================
28class ChannelSummaryTable(MTH5Table):
29 """Convenience wrapper around the channel summary dataset.
31 Provides helpers to summarize channels, convert to pandas, and derive
32 run-level summaries.
34 Examples
35 --------
36 >>> ch_table = ChannelSummaryTable(hdf5_dataset)
37 >>> df = ch_table.to_dataframe() # doctest: +SKIP
38 >>> run_df = ch_table.to_run_summary() # doctest: +SKIP
39 """
41 def __init__(self, hdf5_dataset: h5py.Dataset) -> None:
42 super().__init__(hdf5_dataset, CHANNEL_DTYPE)
44 def _has_entries(self) -> bool:
45 """Return ``True`` if the summary table contains data."""
47 if len(self.array) == 1:
48 if self.array[0][0] == b"" and self.array[0][1] == b"":
49 return False
50 return True
52 def to_dataframe(self) -> pd.DataFrame:
53 """Convert the channel summary to a pandas DataFrame.
55 Returns
56 -------
57 pandas.DataFrame
58 Channel summary with decoded string columns and parsed datetimes.
60 Examples
61 --------
62 >>> df = ch_table.to_dataframe() # doctest: +SKIP
63 >>> df.head() # doctest: +SKIP
64 """
66 df = pd.DataFrame(self.array[()])
67 for key in [
68 "survey",
69 "station",
70 "run",
71 "component",
72 "measurement_type",
73 "units",
74 ]:
75 setattr(df, key, getattr(df, key).str.decode("utf-8"))
76 try:
77 df.start = pd.to_datetime(df.start.str.decode("utf-8"), format="mixed")
78 df.end = pd.to_datetime(df.end.str.decode("utf-8"), format="mixed")
79 except ValueError:
80 df.start = pd.to_datetime(df.start.str.decode("utf-8"))
81 df.end = pd.to_datetime(df.end.str.decode("utf-8"))
83 return df
85 def summarize(self) -> None:
86 """Populate the summary table from channel datasets in the file."""
88 self.clear_table()
90 def has_data(h5_dataset: h5py.Dataset) -> bool:
91 """Return True when the dataset has any non-zero data."""
92 if len(h5_dataset) > 0:
93 if len(np.nonzero(h5_dataset)[0]) > 0:
94 return True
95 return False
96 return False
98 def get_channel_entry(
99 group: h5py.Dataset, dtype: Any = CHANNEL_DTYPE
100 ) -> np.ndarray:
101 ch_entry = np.array(
102 [
103 (
104 group.parent.parent.parent.parent.attrs["id"].encode("utf-8"),
105 group.parent.parent.attrs["id"].encode("utf-8"),
106 group.parent.attrs["id"].encode("utf-8"),
107 group.parent.parent.attrs["location.latitude"],
108 group.parent.parent.attrs["location.longitude"],
109 group.parent.parent.attrs["location.elevation"],
110 group.attrs["component"],
111 group.attrs["time_period.start"],
112 group.attrs["time_period.end"],
113 group.size,
114 group.attrs["sample_rate"],
115 group.attrs["type"],
116 group.attrs["measurement_azimuth"],
117 group.attrs["measurement_tilt"],
118 group.attrs["units"],
119 has_data(group),
120 group.ref,
121 group.parent.ref,
122 group.parent.parent.ref,
123 )
124 ],
125 dtype=dtype,
126 )
127 return ch_entry
129 def recursive_get_channel_entry(group: h5py.Group | h5py.File) -> None:
130 """Traverse HDF5 tree and collect channel entries."""
131 if isinstance(group, (h5py._hl.group.Group, h5py._hl.files.File)):
132 for key, node in group.items():
133 recursive_get_channel_entry(node)
134 elif isinstance(group, h5py._hl.dataset.Dataset):
135 try:
136 ch_type = group.attrs["type"]
137 if ch_type in ["electric", "magnetic", "auxiliary"]:
138 ch_entry = get_channel_entry(group)
139 try:
140 self.add_row(ch_entry)
141 except ValueError as error:
142 msg = (
143 f"{error}. "
144 "it is possible that the OS that made the table is not the OS operating on it."
145 )
146 self.logger.warning(msg)
148 except KeyError:
149 pass
151 recursive_get_channel_entry(self.array.parent)
153 def to_run_summary(
154 self,
155 allowed_input_channels: Iterable[str] = ALLOWED_INPUT_CHANNELS,
156 allowed_output_channels: Iterable[str] = ALLOWED_OUTPUT_CHANNELS,
157 sortby: list[str] | None = None,
158 ) -> pd.DataFrame:
159 """Compress channel summary into a run-level summary (one row per run).
161 Parameters
162 ----------
163 allowed_input_channels : Iterable[str], optional
164 Allowed input channel names, by default ``ALLOWED_INPUT_CHANNELS``.
165 allowed_output_channels : Iterable[str], optional
166 Allowed output channel names, by default ``ALLOWED_OUTPUT_CHANNELS``.
167 sortby : list of str or None, optional
168 Columns to sort by; defaults to ``["station", "start"]`` when ``None``.
170 Returns
171 -------
172 pandas.DataFrame
173 Run-level summary including channels, durations, and references.
175 Examples
176 --------
177 >>> run_df = ch_table.to_run_summary() # doctest: +SKIP
178 >>> run_df.columns[:4].tolist() # doctest: +SKIP
179 ['survey', 'station', 'run', 'start']
180 """
182 if not self._has_entries():
183 self.summarize()
184 ch_summary_df = self.to_dataframe()
186 group_by_columns = ["survey", "station", "run"]
187 grouper = ch_summary_df.groupby(group_by_columns)
188 row_list = []
189 for group_values, group in grouper:
190 # for entry in group.itertuples():
191 row = dict([(key, None) for key in RUN_SUMMARY_COLUMNS])
192 row["survey"] = group.survey.iloc[0]
193 row["station"] = group.station.iloc[0]
194 row["run"] = group.run.iloc[0]
195 row["start"] = group.start.iloc[0]
196 row["end"] = group.end.iloc[0]
197 row["sample_rate"] = group.sample_rate.iloc[0]
198 # max
199 row["n_samples"] = group.n_samples.max()
200 channels_list = group.component.to_list()
201 num_channels = len(channels_list)
202 row["input_channels"] = [
203 x for x in channels_list if x in allowed_input_channels
204 ]
205 row["output_channels"] = [
206 x for x in channels_list if x in allowed_output_channels
207 ]
208 row["channel_scale_factors"] = dict(
209 zip(channels_list, num_channels * [1.0])
210 )
211 row["has_data"] = True
212 if False in group.has_data.values:
213 row["has_data"] = False
215 row["run_hdf5_reference"] = group.run_hdf5_reference.iloc[0]
216 row["station_hdf5_reference"] = group.station_hdf5_reference.iloc[0]
218 row_list.append(row)
220 run_summary_df = pd.DataFrame(data=row_list)
221 if sortby is None:
222 sortby = ["station", "start"]
223 if sortby:
224 run_summary_df.sort_values(by=sortby, inplace=True)
226 # add durations
227 timedeltas = run_summary_df.end - run_summary_df.start
228 durations = [x.total_seconds() for x in timedeltas]
229 run_summary_df["duration"] = durations
231 return run_summary_df