Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ timeseries \ spectre \ helpers.py: 82%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1"""
2This is a placeholder module for functions that are used in testing and development of spectrograms.
3"""
5import pathlib
6from typing import List, Literal, Optional, Union
8import xarray as xr
9from loguru import logger
10from mt_metadata.processing.fourier_coefficients import Decimation as FCDecimation
11from mt_metadata.processing.fourier_coefficients.decimation import (
12 fc_decimations_creator,
13 get_degenerate_fc_decimation,
14)
16import mth5
17from mth5.mth5 import MTH5
18from mth5.processing.spectre.stft import run_ts_to_stft_scipy
19from mth5.utils.helpers import path_or_mth5_object
22GROUPBY_COLUMNS = ["survey", "station", "sample_rate"]
25@path_or_mth5_object
26def add_fcs_to_mth5(
27 m: MTH5,
28 fc_decimations: Optional[Union[str, list]] = None,
29 groupby_columns: List[str] = GROUPBY_COLUMNS,
30) -> None:
31 """
32 Add Fourier Coefficient Levels ot an existing MTH5.
34 TODO: This method currently loops the heirarcy of the h5, and then calls an operator.
35 How about making a single table that represents the loop up front and then looping once that
36 table instead of this nested loop business? We would need a function that takes as input
37 the groupby_columns.
39 **Notes:**
41 - This module computes the FCs differently than the legacy aurora pipeline. It uses scipy.signal.spectrogram.
42 There is a test in Aurora to confirm that there are equivalent if we are not using fancy pre-whitening.
44 Parameters
45 ----------
46 m: MTH5 object
47 The mth5 file, open in append mode.
48 fc_decimations: Optional[Union[str, list]]
49 This specifies the scheme to use for decimating the time series when building the FC layer.
50 None: Just use default (something like four decimation levels, decimated by 4 each time say.)
51 String: Controlled Vocabulary, values are a work in progress, that will allow custom definition of
52 the fc_decimations for some common cases. For example, say you have stored already decimated time
53 series, then you want simply the zeroth decimation for each run, because the decimated time series live
54 under another run container, and that will get its own FCs. This is experimental.
55 List: (**UNTESTED**) -- This means that the user thought about the decimations that they want to create and is
56 passing them explicitly. -- probably will need to be a dictionary actually, since this
57 would get redefined at each sample rate.
59 """
60 # Group the channel summary by survey, station, sample_rate
61 channel_summary_df = m.channel_summary.to_dataframe()
62 grouper = channel_summary_df.groupby(groupby_columns)
63 logger.debug(f"Detected {len(grouper)} unique station-sample_rate instances")
65 # loop over groups
66 for (
67 survey,
68 station,
69 sample_rate,
70 ), group in (
71 grouper
72 ): # TODO: is there a way to use the groupby_columns var instead of this tuple?
73 msg = f"\n\n\nsurvey: {survey}, station: {station}, sample_rate {sample_rate}"
74 logger.info(msg)
75 station_obj = m.get_station(station, survey)
76 run_summary = station_obj.run_summary
78 # Get the FC decimation schemes if not provided -- note that this depends only on sample rate
79 fc_decimations = _fc_decimations_from_sample_rate(
80 fc_decimations=fc_decimations, sample_rate=sample_rate
81 )
83 # TODO: Make this a function that can be done using df.apply()
84 for i_run_row, run_row in run_summary.iterrows():
85 logger.info(
86 f"survey: {survey}, station: {station}, sample_rate {sample_rate}, i_run_row {i_run_row}"
87 )
88 # Access Run
89 run_obj = m.from_reference(run_row.hdf5_reference)
91 # Set the time period:
92 # TODO: Should this be over-writing time period if it is already there?
93 for fc_decimation in fc_decimations:
94 fc_decimation.time_period = run_obj.metadata.time_period
96 # Access the data to Fourier transform
97 runts = run_obj.to_runts(
98 start=fc_decimation.time_period.start,
99 end=fc_decimation.time_period.end,
100 )
101 run_xrds = runts.dataset
103 # access container for FCs
104 fc_group = station_obj.fourier_coefficients_group.add_fc_group(
105 run_obj.metadata.id
106 )
108 # If timing corrections were needed they could go here, right before STFT
110 # TODO: replace i_dec_level with ts_decimation.level in the following
111 for i_dec_level, fc_decimation in enumerate(fc_decimations):
112 ts_decimation = fc_decimation.time_series_decimation
114 # Temporary check that i_dec_level and ts_decimation.level are the same
115 try:
116 assert i_dec_level == ts_decimation.level
117 except:
118 msg = "decimation level has unexpected value"
119 raise ValueError(msg)
121 if ts_decimation.level != 0: # Apply decimation
122 target_sample_rate = run_xrds.sample_rate / ts_decimation.factor
123 run_xrds.sps_filters.decimate(target_sample_rate=target_sample_rate)
125 _add_spectrogram_to_mth5(
126 fc_decimation=fc_decimation,
127 run_obj=run_obj,
128 run_xrds=run_xrds,
129 fc_group=fc_group,
130 )
132 return
135def _fc_decimations_from_sample_rate(
136 sample_rate: float,
137 fc_decimations: Optional[Union[str, list]] = None,
138) -> Union[str, list]:
139 """
140 Helper function to get some fc_decimations.
141 Really only seems to be used by add_fcs_to_mth5.
143 Development Notes:
144 This function is probably overslicing the add_fcs_to_mth5 function
146 :return fc_decimations: This is an iterable of
147 """
148 # Get the FC decimation schemes if not provided -- note that this depend only on sample rate
149 if not fc_decimations:
150 msg = "FC Decimations not supplied, creating defaults on the fly"
151 logger.info(f"{msg}")
152 fc_decimations = fc_decimations_creator(
153 initial_sample_rate=sample_rate, time_period=None
154 )
155 elif isinstance(fc_decimations, str):
156 if fc_decimations == "degenerate":
157 fc_decimations = get_degenerate_fc_decimation(sample_rate)
159 return fc_decimations
162def _add_spectrogram_to_mth5(
163 fc_decimation: FCDecimation,
164 run_obj: mth5.groups.RunGroup,
165 run_xrds: xr.Dataset,
166 fc_group: mth5.groups.FCGroup,
167) -> None:
168 """
170 This function has been factored out of add_fcs_to_mth5.
171 This is the most atomic level of adding FCs and may be useful as standalone method.
173 Parameters
174 ----------
175 fc_decimation : FCDecimation
176 Metadata about how the decimation level is to be processed
177 run_xrds : xarray.core.dataset.Dataset
178 Time series to be converted to a spectrogram and stored in MTH5.
180 Returns
181 -------
182 run_xrds : xarray.core.dataset.Dataset
183 pre-whitened time series
185 """
187 # check if this decimation level yields a valid spectrogram
188 if not fc_decimation.is_valid_for_time_series_length(run_xrds.time.shape[0]):
189 logger.info(
190 f"Decimation Level {fc_decimation.time_series_decimation.level} invalid, TS of {run_xrds.time.shape[0]} samples too short"
191 )
192 return
194 spectrogram = run_ts_to_stft_scipy(fc_decimation, run_xrds)
195 stft_obj = calibrate_stft_obj(spectrogram.dataset, run_obj)
197 # Pack FCs into h5 and update metadata
198 fc_decimation_group: FCDecimationGroup = fc_group.add_decimation_level(
199 f"{fc_decimation.time_series_decimation.level}",
200 decimation_level_metadata=fc_decimation,
201 )
202 fc_decimation_group.from_xarray(
203 stft_obj, fc_decimation_group.metadata.decimation.sample_rate
204 )
205 fc_decimation_group.update_metadata()
206 fc_group.update_metadata()
209@path_or_mth5_object
210def read_back_fcs(
211 m: Union[MTH5, pathlib.Path, str],
212 mode: str = "r",
213 groupby_columns: List[str] = GROUPBY_COLUMNS,
214) -> None:
215 """
216 Loops over stations in the channel summary of input (m) grouping by common sample_rate.
217 Then loop over the runs in the corresponding FC Group. Finally, within an fc_group,
218 loop decimation levels and read data to xarray. Log info about the shape of the xarray.
220 This is a helper function for tests. It was used as a sanity check while debugging the FC files, and
221 also is a good example for how to access the data at each level for each channel.
223 Development Notes:
224 The Time axis of the FC array changes from decimation_level to decimation_level.
225 The frequency axis will shape will depend on the window length that was used to perform STFT.
226 This is currently storing all (positive frequency) fcs by default, but future versions can
227 also have selected bands within an FC container.
229 Parameters
230 ----------
231 m: Union[MTH5, pathlib.Path, str]
232 Either a path to an mth5, or an MTH5 object that the FCs will be read back from.
233 mode: str
234 The mode to open the MTH5 file in. Defualts to (r)ead only.
237 """
238 channel_summary_df = m.channel_summary.to_dataframe()
239 logger.debug(channel_summary_df)
240 grouper = channel_summary_df.groupby(groupby_columns)
241 for (survey, station, sample_rate), group in grouper:
242 logger.info(f"survey: {survey}, station: {station}, sample_rate {sample_rate}")
243 station_obj = m.get_station(station, survey)
244 fc_groups = station_obj.fourier_coefficients_group.groups_list
245 logger.info(f"FC Groups: {fc_groups}")
246 for run_id in fc_groups:
247 fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
248 dec_level_ids = fc_group.groups_list
249 for dec_level_id in dec_level_ids:
250 dec_level = fc_group.get_decimation_level(dec_level_id)
251 xrds = dec_level.to_xarray(["hx", "hy"])
252 msg = f"dec_level {dec_level_id}"
253 msg = f"{msg} \n Time axis shape {xrds.time.data.shape}"
254 msg = f"{msg} \n Freq axis shape {xrds.frequency.data.shape}"
255 logger.debug(msg)
257 return
260def calibrate_stft_obj(
261 stft_obj: xr.Dataset,
262 run_obj: mth5.groups.RunGroup,
263 units: Literal["MT", "SI"] = "MT",
264 channel_scale_factors: Optional[dict] = None,
265) -> xr.Dataset:
266 """
267 Calibrates frequency domain data into MT units.
269 Development Notes:
270 The calibration often raises a runtime warning due to DC term in calibration response = 0.
271 TODO: It would be nice to suppress this, maybe by only calibrating the non-dc terms and directly assigning np.nan to the dc component when DC-response is zero.
273 Parameters
274 ----------
275 stft_obj : xarray.core.dataset.Dataset
276 Time series of Fourier coefficients to be calibrated
277 run_obj : mth5.groups.master_station_run_channel.RunGroup
278 Provides information about filters for calibration
279 units : string
280 usually "MT", contemplating supporting "SI"
281 scale_factors : dict or None
282 keyed by channel, supports a single scalar to apply to that channels data
283 Useful for debugging. Should not be used in production and should throw a
284 warning if it is not None
286 Returns
287 -------
288 stft_obj : xarray.core.dataset.Dataset
289 Time series of calibrated Fourier coefficients
290 """
291 for channel_id in stft_obj.keys():
292 channel = run_obj.get_channel(channel_id)
293 channel_response = channel.channel_response
294 if not channel_response.filters_list:
295 msg = f"Channel {channel_id} with empty filters list detected"
296 logger.warning(msg)
297 if channel_id == "hy":
298 msg = "Channel hy has no filters, try using filters from hx"
299 logger.warning(msg)
300 channel_response = run_obj.get_channel("hx").channel_response
302 indices_to_flip = channel_response.get_indices_of_filters_to_remove(
303 include_decimation=False, include_delay=False
304 )
305 indices_to_flip = [
306 i for i in indices_to_flip if channel.metadata.filters[i].applied
307 ]
308 filters_to_remove = [channel_response.filters_list[i] for i in indices_to_flip]
309 if not filters_to_remove:
310 logger.warning("No filters to remove")
312 calibration_response = channel_response.complex_response(
313 stft_obj.frequency.data, filters_list=filters_to_remove
314 )
316 if channel_scale_factors:
317 try:
318 channel_scale_factor = channel_scale_factors[channel_id]
319 except KeyError:
320 channel_scale_factor = 1.0
321 calibration_response /= channel_scale_factor
323 if units == "SI":
324 logger.warning("Warning: SI Units are not robustly supported issue #36")
326 # TODO: FIXME Sometimes raises a runtime warning due to DC term in calibration response = 0
327 stft_obj[channel_id].data /= calibration_response
328 return stft_obj