Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ tables \ channel_table.py: 92%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2from __future__ import annotations 

3 

4 

5"""Channel summary utilities for MTH5 tables.""" 

6 

7from typing import Any, Iterable 

8 

9import h5py 

10import numpy as np 

11 

12# ============================================================================= 

13# Imports 

14# ============================================================================= 

15import pandas as pd 

16from mt_metadata.transfer_functions import ( 

17 ALLOWED_INPUT_CHANNELS, 

18 ALLOWED_OUTPUT_CHANNELS, 

19) 

20 

21from mth5 import CHANNEL_DTYPE, RUN_SUMMARY_COLUMNS 

22from mth5.tables import MTH5Table 

23 

24 

25# ============================================================================= 

26 

27 

28class ChannelSummaryTable(MTH5Table): 

29 """Convenience wrapper around the channel summary dataset. 

30 

31 Provides helpers to summarize channels, convert to pandas, and derive 

32 run-level summaries. 

33 

34 Examples 

35 -------- 

36 >>> ch_table = ChannelSummaryTable(hdf5_dataset) 

37 >>> df = ch_table.to_dataframe() # doctest: +SKIP 

38 >>> run_df = ch_table.to_run_summary() # doctest: +SKIP 

39 """ 

40 

41 def __init__(self, hdf5_dataset: h5py.Dataset) -> None: 

42 super().__init__(hdf5_dataset, CHANNEL_DTYPE) 

43 

44 def _has_entries(self) -> bool: 

45 """Return ``True`` if the summary table contains data.""" 

46 

47 if len(self.array) == 1: 

48 if self.array[0][0] == b"" and self.array[0][1] == b"": 

49 return False 

50 return True 

51 

52 def to_dataframe(self) -> pd.DataFrame: 

53 """Convert the channel summary to a pandas DataFrame. 

54 

55 Returns 

56 ------- 

57 pandas.DataFrame 

58 Channel summary with decoded string columns and parsed datetimes. 

59 

60 Examples 

61 -------- 

62 >>> df = ch_table.to_dataframe() # doctest: +SKIP 

63 >>> df.head() # doctest: +SKIP 

64 """ 

65 

66 df = pd.DataFrame(self.array[()]) 

67 for key in [ 

68 "survey", 

69 "station", 

70 "run", 

71 "component", 

72 "measurement_type", 

73 "units", 

74 ]: 

75 setattr(df, key, getattr(df, key).str.decode("utf-8")) 

76 try: 

77 df.start = pd.to_datetime(df.start.str.decode("utf-8"), format="mixed") 

78 df.end = pd.to_datetime(df.end.str.decode("utf-8"), format="mixed") 

79 except ValueError: 

80 df.start = pd.to_datetime(df.start.str.decode("utf-8")) 

81 df.end = pd.to_datetime(df.end.str.decode("utf-8")) 

82 

83 return df 

84 

85 def summarize(self) -> None: 

86 """Populate the summary table from channel datasets in the file.""" 

87 

88 self.clear_table() 

89 

90 def has_data(h5_dataset: h5py.Dataset) -> bool: 

91 """Return True when the dataset has any non-zero data.""" 

92 if len(h5_dataset) > 0: 

93 if len(np.nonzero(h5_dataset)[0]) > 0: 

94 return True 

95 return False 

96 return False 

97 

98 def get_channel_entry( 

99 group: h5py.Dataset, dtype: Any = CHANNEL_DTYPE 

100 ) -> np.ndarray: 

101 ch_entry = np.array( 

102 [ 

103 ( 

104 group.parent.parent.parent.parent.attrs["id"].encode("utf-8"), 

105 group.parent.parent.attrs["id"].encode("utf-8"), 

106 group.parent.attrs["id"].encode("utf-8"), 

107 group.parent.parent.attrs["location.latitude"], 

108 group.parent.parent.attrs["location.longitude"], 

109 group.parent.parent.attrs["location.elevation"], 

110 group.attrs["component"], 

111 group.attrs["time_period.start"], 

112 group.attrs["time_period.end"], 

113 group.size, 

114 group.attrs["sample_rate"], 

115 group.attrs["type"], 

116 group.attrs["measurement_azimuth"], 

117 group.attrs["measurement_tilt"], 

118 group.attrs["units"], 

119 has_data(group), 

120 group.ref, 

121 group.parent.ref, 

122 group.parent.parent.ref, 

123 ) 

124 ], 

125 dtype=dtype, 

126 ) 

127 return ch_entry 

128 

129 def recursive_get_channel_entry(group: h5py.Group | h5py.File) -> None: 

130 """Traverse HDF5 tree and collect channel entries.""" 

131 if isinstance(group, (h5py._hl.group.Group, h5py._hl.files.File)): 

132 for key, node in group.items(): 

133 recursive_get_channel_entry(node) 

134 elif isinstance(group, h5py._hl.dataset.Dataset): 

135 try: 

136 ch_type = group.attrs["type"] 

137 if ch_type in ["electric", "magnetic", "auxiliary"]: 

138 ch_entry = get_channel_entry(group) 

139 try: 

140 self.add_row(ch_entry) 

141 except ValueError as error: 

142 msg = ( 

143 f"{error}. " 

144 "it is possible that the OS that made the table is not the OS operating on it." 

145 ) 

146 self.logger.warning(msg) 

147 

148 except KeyError: 

149 pass 

150 

151 recursive_get_channel_entry(self.array.parent) 

152 

153 def to_run_summary( 

154 self, 

155 allowed_input_channels: Iterable[str] = ALLOWED_INPUT_CHANNELS, 

156 allowed_output_channels: Iterable[str] = ALLOWED_OUTPUT_CHANNELS, 

157 sortby: list[str] | None = None, 

158 ) -> pd.DataFrame: 

159 """Compress channel summary into a run-level summary (one row per run). 

160 

161 Parameters 

162 ---------- 

163 allowed_input_channels : Iterable[str], optional 

164 Allowed input channel names, by default ``ALLOWED_INPUT_CHANNELS``. 

165 allowed_output_channels : Iterable[str], optional 

166 Allowed output channel names, by default ``ALLOWED_OUTPUT_CHANNELS``. 

167 sortby : list of str or None, optional 

168 Columns to sort by; defaults to ``["station", "start"]`` when ``None``. 

169 

170 Returns 

171 ------- 

172 pandas.DataFrame 

173 Run-level summary including channels, durations, and references. 

174 

175 Examples 

176 -------- 

177 >>> run_df = ch_table.to_run_summary() # doctest: +SKIP 

178 >>> run_df.columns[:4].tolist() # doctest: +SKIP 

179 ['survey', 'station', 'run', 'start'] 

180 """ 

181 

182 if not self._has_entries(): 

183 self.summarize() 

184 ch_summary_df = self.to_dataframe() 

185 

186 group_by_columns = ["survey", "station", "run"] 

187 grouper = ch_summary_df.groupby(group_by_columns) 

188 row_list = [] 

189 for group_values, group in grouper: 

190 # for entry in group.itertuples(): 

191 row = dict([(key, None) for key in RUN_SUMMARY_COLUMNS]) 

192 row["survey"] = group.survey.iloc[0] 

193 row["station"] = group.station.iloc[0] 

194 row["run"] = group.run.iloc[0] 

195 row["start"] = group.start.iloc[0] 

196 row["end"] = group.end.iloc[0] 

197 row["sample_rate"] = group.sample_rate.iloc[0] 

198 # max 

199 row["n_samples"] = group.n_samples.max() 

200 channels_list = group.component.to_list() 

201 num_channels = len(channels_list) 

202 row["input_channels"] = [ 

203 x for x in channels_list if x in allowed_input_channels 

204 ] 

205 row["output_channels"] = [ 

206 x for x in channels_list if x in allowed_output_channels 

207 ] 

208 row["channel_scale_factors"] = dict( 

209 zip(channels_list, num_channels * [1.0]) 

210 ) 

211 row["has_data"] = True 

212 if False in group.has_data.values: 

213 row["has_data"] = False 

214 

215 row["run_hdf5_reference"] = group.run_hdf5_reference.iloc[0] 

216 row["station_hdf5_reference"] = group.station_hdf5_reference.iloc[0] 

217 

218 row_list.append(row) 

219 

220 run_summary_df = pd.DataFrame(data=row_list) 

221 if sortby is None: 

222 sortby = ["station", "start"] 

223 if sortby: 

224 run_summary_df.sort_values(by=sortby, inplace=True) 

225 

226 # add durations 

227 timedeltas = run_summary_df.end - run_summary_df.start 

228 durations = [x.total_seconds() for x in timedeltas] 

229 run_summary_df["duration"] = durations 

230 

231 return run_summary_df