Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ timeseries \ spectre \ helpers.py: 82%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1""" 

2This is a placeholder module for functions that are used in testing and development of spectrograms. 

3""" 

4 

5import pathlib 

6from typing import List, Literal, Optional, Union 

7 

8import xarray as xr 

9from loguru import logger 

10from mt_metadata.processing.fourier_coefficients import Decimation as FCDecimation 

11from mt_metadata.processing.fourier_coefficients.decimation import ( 

12 fc_decimations_creator, 

13 get_degenerate_fc_decimation, 

14) 

15 

16import mth5 

17from mth5.mth5 import MTH5 

18from mth5.processing.spectre.stft import run_ts_to_stft_scipy 

19from mth5.utils.helpers import path_or_mth5_object 

20 

21 

22GROUPBY_COLUMNS = ["survey", "station", "sample_rate"] 

23 

24 

25@path_or_mth5_object 

26def add_fcs_to_mth5( 

27 m: MTH5, 

28 fc_decimations: Optional[Union[str, list]] = None, 

29 groupby_columns: List[str] = GROUPBY_COLUMNS, 

30) -> None: 

31 """ 

32 Add Fourier Coefficient Levels ot an existing MTH5. 

33 

34 TODO: This method currently loops the heirarcy of the h5, and then calls an operator. 

35 How about making a single table that represents the loop up front and then looping once that 

36 table instead of this nested loop business? We would need a function that takes as input 

37 the groupby_columns. 

38 

39 **Notes:** 

40 

41 - This module computes the FCs differently than the legacy aurora pipeline. It uses scipy.signal.spectrogram. 

42 There is a test in Aurora to confirm that there are equivalent if we are not using fancy pre-whitening. 

43 

44 Parameters 

45 ---------- 

46 m: MTH5 object 

47 The mth5 file, open in append mode. 

48 fc_decimations: Optional[Union[str, list]] 

49 This specifies the scheme to use for decimating the time series when building the FC layer. 

50 None: Just use default (something like four decimation levels, decimated by 4 each time say.) 

51 String: Controlled Vocabulary, values are a work in progress, that will allow custom definition of 

52 the fc_decimations for some common cases. For example, say you have stored already decimated time 

53 series, then you want simply the zeroth decimation for each run, because the decimated time series live 

54 under another run container, and that will get its own FCs. This is experimental. 

55 List: (**UNTESTED**) -- This means that the user thought about the decimations that they want to create and is 

56 passing them explicitly. -- probably will need to be a dictionary actually, since this 

57 would get redefined at each sample rate. 

58 

59 """ 

60 # Group the channel summary by survey, station, sample_rate 

61 channel_summary_df = m.channel_summary.to_dataframe() 

62 grouper = channel_summary_df.groupby(groupby_columns) 

63 logger.debug(f"Detected {len(grouper)} unique station-sample_rate instances") 

64 

65 # loop over groups 

66 for ( 

67 survey, 

68 station, 

69 sample_rate, 

70 ), group in ( 

71 grouper 

72 ): # TODO: is there a way to use the groupby_columns var instead of this tuple? 

73 msg = f"\n\n\nsurvey: {survey}, station: {station}, sample_rate {sample_rate}" 

74 logger.info(msg) 

75 station_obj = m.get_station(station, survey) 

76 run_summary = station_obj.run_summary 

77 

78 # Get the FC decimation schemes if not provided -- note that this depends only on sample rate 

79 fc_decimations = _fc_decimations_from_sample_rate( 

80 fc_decimations=fc_decimations, sample_rate=sample_rate 

81 ) 

82 

83 # TODO: Make this a function that can be done using df.apply() 

84 for i_run_row, run_row in run_summary.iterrows(): 

85 logger.info( 

86 f"survey: {survey}, station: {station}, sample_rate {sample_rate}, i_run_row {i_run_row}" 

87 ) 

88 # Access Run 

89 run_obj = m.from_reference(run_row.hdf5_reference) 

90 

91 # Set the time period: 

92 # TODO: Should this be over-writing time period if it is already there? 

93 for fc_decimation in fc_decimations: 

94 fc_decimation.time_period = run_obj.metadata.time_period 

95 

96 # Access the data to Fourier transform 

97 runts = run_obj.to_runts( 

98 start=fc_decimation.time_period.start, 

99 end=fc_decimation.time_period.end, 

100 ) 

101 run_xrds = runts.dataset 

102 

103 # access container for FCs 

104 fc_group = station_obj.fourier_coefficients_group.add_fc_group( 

105 run_obj.metadata.id 

106 ) 

107 

108 # If timing corrections were needed they could go here, right before STFT 

109 

110 # TODO: replace i_dec_level with ts_decimation.level in the following 

111 for i_dec_level, fc_decimation in enumerate(fc_decimations): 

112 ts_decimation = fc_decimation.time_series_decimation 

113 

114 # Temporary check that i_dec_level and ts_decimation.level are the same 

115 try: 

116 assert i_dec_level == ts_decimation.level 

117 except: 

118 msg = "decimation level has unexpected value" 

119 raise ValueError(msg) 

120 

121 if ts_decimation.level != 0: # Apply decimation 

122 target_sample_rate = run_xrds.sample_rate / ts_decimation.factor 

123 run_xrds.sps_filters.decimate(target_sample_rate=target_sample_rate) 

124 

125 _add_spectrogram_to_mth5( 

126 fc_decimation=fc_decimation, 

127 run_obj=run_obj, 

128 run_xrds=run_xrds, 

129 fc_group=fc_group, 

130 ) 

131 

132 return 

133 

134 

135def _fc_decimations_from_sample_rate( 

136 sample_rate: float, 

137 fc_decimations: Optional[Union[str, list]] = None, 

138) -> Union[str, list]: 

139 """ 

140 Helper function to get some fc_decimations. 

141 Really only seems to be used by add_fcs_to_mth5. 

142 

143 Development Notes: 

144 This function is probably overslicing the add_fcs_to_mth5 function 

145 

146 :return fc_decimations: This is an iterable of 

147 """ 

148 # Get the FC decimation schemes if not provided -- note that this depend only on sample rate 

149 if not fc_decimations: 

150 msg = "FC Decimations not supplied, creating defaults on the fly" 

151 logger.info(f"{msg}") 

152 fc_decimations = fc_decimations_creator( 

153 initial_sample_rate=sample_rate, time_period=None 

154 ) 

155 elif isinstance(fc_decimations, str): 

156 if fc_decimations == "degenerate": 

157 fc_decimations = get_degenerate_fc_decimation(sample_rate) 

158 

159 return fc_decimations 

160 

161 

162def _add_spectrogram_to_mth5( 

163 fc_decimation: FCDecimation, 

164 run_obj: mth5.groups.RunGroup, 

165 run_xrds: xr.Dataset, 

166 fc_group: mth5.groups.FCGroup, 

167) -> None: 

168 """ 

169 

170 This function has been factored out of add_fcs_to_mth5. 

171 This is the most atomic level of adding FCs and may be useful as standalone method. 

172 

173 Parameters 

174 ---------- 

175 fc_decimation : FCDecimation 

176 Metadata about how the decimation level is to be processed 

177 run_xrds : xarray.core.dataset.Dataset 

178 Time series to be converted to a spectrogram and stored in MTH5. 

179 

180 Returns 

181 ------- 

182 run_xrds : xarray.core.dataset.Dataset 

183 pre-whitened time series 

184 

185 """ 

186 

187 # check if this decimation level yields a valid spectrogram 

188 if not fc_decimation.is_valid_for_time_series_length(run_xrds.time.shape[0]): 

189 logger.info( 

190 f"Decimation Level {fc_decimation.time_series_decimation.level} invalid, TS of {run_xrds.time.shape[0]} samples too short" 

191 ) 

192 return 

193 

194 spectrogram = run_ts_to_stft_scipy(fc_decimation, run_xrds) 

195 stft_obj = calibrate_stft_obj(spectrogram.dataset, run_obj) 

196 

197 # Pack FCs into h5 and update metadata 

198 fc_decimation_group: FCDecimationGroup = fc_group.add_decimation_level( 

199 f"{fc_decimation.time_series_decimation.level}", 

200 decimation_level_metadata=fc_decimation, 

201 ) 

202 fc_decimation_group.from_xarray( 

203 stft_obj, fc_decimation_group.metadata.decimation.sample_rate 

204 ) 

205 fc_decimation_group.update_metadata() 

206 fc_group.update_metadata() 

207 

208 

209@path_or_mth5_object 

210def read_back_fcs( 

211 m: Union[MTH5, pathlib.Path, str], 

212 mode: str = "r", 

213 groupby_columns: List[str] = GROUPBY_COLUMNS, 

214) -> None: 

215 """ 

216 Loops over stations in the channel summary of input (m) grouping by common sample_rate. 

217 Then loop over the runs in the corresponding FC Group. Finally, within an fc_group, 

218 loop decimation levels and read data to xarray. Log info about the shape of the xarray. 

219 

220 This is a helper function for tests. It was used as a sanity check while debugging the FC files, and 

221 also is a good example for how to access the data at each level for each channel. 

222 

223 Development Notes: 

224 The Time axis of the FC array changes from decimation_level to decimation_level. 

225 The frequency axis will shape will depend on the window length that was used to perform STFT. 

226 This is currently storing all (positive frequency) fcs by default, but future versions can 

227 also have selected bands within an FC container. 

228 

229 Parameters 

230 ---------- 

231 m: Union[MTH5, pathlib.Path, str] 

232 Either a path to an mth5, or an MTH5 object that the FCs will be read back from. 

233 mode: str 

234 The mode to open the MTH5 file in. Defualts to (r)ead only. 

235 

236 

237 """ 

238 channel_summary_df = m.channel_summary.to_dataframe() 

239 logger.debug(channel_summary_df) 

240 grouper = channel_summary_df.groupby(groupby_columns) 

241 for (survey, station, sample_rate), group in grouper: 

242 logger.info(f"survey: {survey}, station: {station}, sample_rate {sample_rate}") 

243 station_obj = m.get_station(station, survey) 

244 fc_groups = station_obj.fourier_coefficients_group.groups_list 

245 logger.info(f"FC Groups: {fc_groups}") 

246 for run_id in fc_groups: 

247 fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id) 

248 dec_level_ids = fc_group.groups_list 

249 for dec_level_id in dec_level_ids: 

250 dec_level = fc_group.get_decimation_level(dec_level_id) 

251 xrds = dec_level.to_xarray(["hx", "hy"]) 

252 msg = f"dec_level {dec_level_id}" 

253 msg = f"{msg} \n Time axis shape {xrds.time.data.shape}" 

254 msg = f"{msg} \n Freq axis shape {xrds.frequency.data.shape}" 

255 logger.debug(msg) 

256 

257 return 

258 

259 

260def calibrate_stft_obj( 

261 stft_obj: xr.Dataset, 

262 run_obj: mth5.groups.RunGroup, 

263 units: Literal["MT", "SI"] = "MT", 

264 channel_scale_factors: Optional[dict] = None, 

265) -> xr.Dataset: 

266 """ 

267 Calibrates frequency domain data into MT units. 

268 

269 Development Notes: 

270 The calibration often raises a runtime warning due to DC term in calibration response = 0. 

271 TODO: It would be nice to suppress this, maybe by only calibrating the non-dc terms and directly assigning np.nan to the dc component when DC-response is zero. 

272 

273 Parameters 

274 ---------- 

275 stft_obj : xarray.core.dataset.Dataset 

276 Time series of Fourier coefficients to be calibrated 

277 run_obj : mth5.groups.master_station_run_channel.RunGroup 

278 Provides information about filters for calibration 

279 units : string 

280 usually "MT", contemplating supporting "SI" 

281 scale_factors : dict or None 

282 keyed by channel, supports a single scalar to apply to that channels data 

283 Useful for debugging. Should not be used in production and should throw a 

284 warning if it is not None 

285 

286 Returns 

287 ------- 

288 stft_obj : xarray.core.dataset.Dataset 

289 Time series of calibrated Fourier coefficients 

290 """ 

291 for channel_id in stft_obj.keys(): 

292 channel = run_obj.get_channel(channel_id) 

293 channel_response = channel.channel_response 

294 if not channel_response.filters_list: 

295 msg = f"Channel {channel_id} with empty filters list detected" 

296 logger.warning(msg) 

297 if channel_id == "hy": 

298 msg = "Channel hy has no filters, try using filters from hx" 

299 logger.warning(msg) 

300 channel_response = run_obj.get_channel("hx").channel_response 

301 

302 indices_to_flip = channel_response.get_indices_of_filters_to_remove( 

303 include_decimation=False, include_delay=False 

304 ) 

305 indices_to_flip = [ 

306 i for i in indices_to_flip if channel.metadata.filters[i].applied 

307 ] 

308 filters_to_remove = [channel_response.filters_list[i] for i in indices_to_flip] 

309 if not filters_to_remove: 

310 logger.warning("No filters to remove") 

311 

312 calibration_response = channel_response.complex_response( 

313 stft_obj.frequency.data, filters_list=filters_to_remove 

314 ) 

315 

316 if channel_scale_factors: 

317 try: 

318 channel_scale_factor = channel_scale_factors[channel_id] 

319 except KeyError: 

320 channel_scale_factor = 1.0 

321 calibration_response /= channel_scale_factor 

322 

323 if units == "SI": 

324 logger.warning("Warning: SI Units are not robustly supported issue #36") 

325 

326 # TODO: FIXME Sometimes raises a runtime warning due to DC term in calibration response = 0 

327 stft_obj[channel_id].data /= calibration_response 

328 return stft_obj