Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ groups \ fc_dataset.py: 57%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Mar 10 09:02:16 2022
5@author: jpeacock
6"""
8# =============================================================================
9# Imports
10# =============================================================================
11from __future__ import annotations
13import weakref
14from typing import Any
16import h5py
17import numpy as np
18import xarray as xr
19from loguru import logger
20from mt_metadata.processing.fourier_coefficients import FCChannel
22from mth5.helpers import add_attributes_to_metadata_class_pydantic, to_numpy_type
23from mth5.timeseries.ts_helpers import make_dt_coordinates
24from mth5.utils.exceptions import MTH5Error
27# =============================================================================
30class FCChannelDataset:
31 """
32 Container for Fourier coefficients (FC) from windowed FFT analysis.
34 Holds multi-dimensional Fourier coefficient data representing time-frequency
35 analysis results. Data is uniformly sampled in both frequency (via harmonic
36 index) and time (via uniform FFT window step size).
38 Parameters
39 ----------
40 dataset : h5py.Dataset
41 HDF5 dataset containing the Fourier coefficient data.
42 dataset_metadata : FCChannel | None, optional
43 Metadata object containing FC channel properties like start time,
44 end time, sample rates, units, and frequency method. If provided,
45 metadata will be written to HDF5 attributes. Defaults to None.
46 **kwargs : Any
47 Additional keyword arguments (reserved for future use).
49 Attributes
50 ----------
51 hdf5_dataset : h5py.Dataset
52 Weak reference to the HDF5 dataset.
53 metadata : FCChannel
54 Metadata container for the Fourier coefficients.
55 logger : loguru.logger
56 Logger instance for reporting messages.
58 Raises
59 ------
60 MTH5Error
61 If dataset_metadata is provided but is not of type FCChannel.
62 TypeError
63 If input data cannot be converted to numpy array or has
64 incompatible dtype/shape.
66 Notes
67 -----
68 The data array has shape (n_windows, n_frequencies) where:
69 - n_windows: Number of time windows in the FFT moving window analysis
70 - n_frequencies: Number of frequency bins determined by window size
72 Data is typically complex-valued representing Fourier coefficients.
73 Time windows are uniformly spaced with interval 1/sample_rate_window_step.
74 Frequencies are uniformly spaced from frequency_min to frequency_max.
76 Metadata includes:
77 - Time period (start and end)
78 - Acquisition and decimated sample rates
79 - Window sample rate (delta_t within window)
80 - Units
81 - Frequency method (integer harmonic index calculation)
82 - Component name (channel designation)
84 Examples
85 --------
86 Create an FC dataset from HDF5 group:
88 >>> import h5py
89 >>> import numpy as np
90 >>> from mt_metadata.processing.fourier_coefficients import FCChannel
91 >>> with h5py.File('fc.h5', 'w') as f:
92 ... # Create 2D array: 50 time windows, 256 frequencies
93 ... data = np.random.rand(50, 256) + 1j * np.random.rand(50, 256)
94 ... dset = f.create_dataset('Ex', data=data, dtype=np.complex128)
95 ... # Create FCChannelDataset
96 ... fc = FCChannelDataset(dset, write_metadata=True)
98 Convert to xarray and access time-frequency data:
100 >>> xr_data = fc.to_xarray()
101 >>> print(xr_data.dims) # ('time', 'frequency')
102 >>> # Access data at specific time and frequency
103 >>> subset = xr_data.sel(time='2023-01-01T12:00:00', method='nearest')
105 Inspect properties:
107 >>> print(f"Windows: {fc.n_windows}, Frequencies: {fc.n_frequencies}")
108 >>> print(f"Frequency range: {fc.frequency.min():.2f}-{fc.frequency.max():.2f} Hz")
110 """
112 def __init__(
113 self,
114 dataset: h5py.Dataset,
115 dataset_metadata: FCChannel | None = None,
116 **kwargs: Any,
117 ) -> None:
118 """
119 Initialize an FCChannelDataset.
121 Parameters
122 ----------
123 dataset : h5py.Dataset
124 HDF5 dataset for storing Fourier coefficient data.
125 dataset_metadata : FCChannel | None, optional
126 Metadata object. If provided, updates internal metadata and
127 writes to HDF5 (unless file is read-only). Defaults to None.
128 **kwargs : Any
129 Additional keyword arguments (reserved for future use).
131 Raises
132 ------
133 MTH5Error
134 If dataset_metadata type doesn't match FCChannel.
136 Notes
137 -----
138 Metadata is automatically read from HDF5 attributes if 'mth5_type'
139 attribute exists. Write operations are wrapped in try-except to
140 gracefully handle read-only files.
142 Examples
143 --------
144 Create and initialize an FC dataset:
146 >>> import h5py
147 >>> import numpy as np
148 >>> with h5py.File('fc.h5', 'w') as f:
149 ... data = np.random.rand(20, 128) + 1j * np.random.rand(20, 128)
150 ... dset = f.create_dataset('Ex', data=data)
151 ... fc = FCChannelDataset(dset) # Auto-initialize metadata
153 """
154 if dataset is not None and isinstance(dataset, (h5py.Dataset)):
155 self.hdf5_dataset = weakref.ref(dataset)()
156 self.logger = logger
158 # set metadata to the appropriate class. Standards is not a
159 # Base object so should be skipped. If the class name is not
160 # defined yet set to Base class.
161 metadata_obj = FCChannel
162 self.metadata = add_attributes_to_metadata_class_pydantic(metadata_obj)
163 self.metadata.hdf5_reference = self.hdf5_dataset.ref
164 self.metadata.mth5_type = self._class_name
166 # if the input data set already has filled attributes, namely if the
167 # channel data already exists then read them in with our writing back
168 if "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
169 self.metadata.from_dict(
170 {self.hdf5_dataset.attrs["mth5_type"]: dict(self.hdf5_dataset.attrs)}
171 )
172 # if metadata is input, make sure that its the same class type amd write
173 # to the hdf5 dataset
174 if dataset_metadata is not None:
175 if not isinstance(self.metadata, type(dataset_metadata)):
176 msg = (
177 f"metadata must be type metadata.{self._class_name} not "
178 "{type(dataset_metadata)}"
179 )
180 self.logger.error(msg)
181 raise MTH5Error(msg)
182 # load from dict because of the extra attributes for MTH5
183 self.metadata.from_dict(dataset_metadata.to_dict())
184 self.metadata.hdf5_reference = self.hdf5_dataset.ref
185 self.metadata.mth5_type = self._class_name
187 # write out metadata to make sure that its in the file.
188 try:
189 self.write_metadata()
190 except (RuntimeError, KeyError, OSError):
191 # file is read only
192 pass
194 # if the attrs don't have the proper metadata keys yet write them
195 if not "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
196 self.write_metadata()
198 def __str__(self) -> str:
199 """
200 Return string representation of the FC dataset as JSON.
202 Returns
203 -------
204 str
205 JSON representation of the FC metadata.
207 Examples
208 --------
209 >>> fc_str = str(fc)
210 >>> print(fc_str[:50]) # Print first 50 characters
211 {"fcchannel": {"component": "Ex", ...
213 """
214 return self.metadata.to_json()
216 def __repr__(self) -> str:
217 """
218 Return official string representation of the FC dataset.
220 Returns
221 -------
222 str
223 JSON representation of the FC metadata.
225 Examples
226 --------
227 >>> repr(fc) == str(fc)
228 True
230 """
231 return self.__str__()
233 @property
234 def _class_name(self) -> str:
235 """
236 Extract the class name without 'Dataset' suffix.
238 Returns
239 -------
240 str
241 Class name with 'Dataset' suffix removed.
243 Examples
244 --------
245 >>> fc._class_name
246 'FCChannel'
248 """
249 return self.__class__.__name__.split("Dataset")[0]
251 def read_metadata(self) -> None:
252 """
253 Read metadata from HDF5 attributes into metadata container.
255 Reads all attributes from the HDF5 dataset and loads them into
256 the internal metadata object for validation and access.
258 Returns
259 -------
260 None
262 Notes
263 -----
264 This is automatically called during initialization if 'mth5_type'
265 attribute exists in the HDF5 dataset.
267 Examples
268 --------
269 Reload metadata from HDF5 after external modification:
271 >>> # Metadata was modified in HDF5
272 >>> fc.read_metadata() # Reload changes
273 >>> print(fc.metadata.component) # Access updated component
275 """
276 self.metadata.from_dict({self._class_name: dict(self.hdf5_dataset.attrs)})
278 def write_metadata(self) -> None:
279 """
280 Write metadata from container to HDF5 dataset attributes.
282 Converts the pydantic metadata model to a dictionary and writes
283 each field as an HDF5 attribute. Values are converted to appropriate
284 numpy types for compatibility. Always ensures 'mth5_type' attribute
285 is set to 'FCChannel'.
287 Returns
288 -------
289 None
291 Notes
292 -----
293 All existing attributes with the same names will be overwritten.
294 This is called automatically during initialization and after
295 metadata updates. Read-only files will silently skip writes.
297 Examples
298 --------
299 Save updated metadata to HDF5:
301 >>> fc.metadata.component = "Ey"
302 >>> fc.write_metadata() # Persist to file
303 >>> # Verify write
304 >>> print(fc.hdf5_dataset.attrs['component'])
305 b'Ey'
307 """
308 meta_dict = self.metadata.to_dict()[self.metadata._class_name.lower()]
309 for key, value in meta_dict.items():
310 value = to_numpy_type(value)
311 self.hdf5_dataset.attrs.create(key, value)
313 # Add the mth5_type attribute that is expected by channel_summary
314 if "mth5_type" not in self.hdf5_dataset.attrs:
315 self.hdf5_dataset.attrs.create("mth5_type", "FCChannel")
317 @property
318 def n_windows(self) -> int:
319 """
320 Number of time windows in the FFT analysis.
322 Returns
323 -------
324 int
325 Number of time windows (first dimension of data array).
327 Notes
328 -----
329 This corresponds to the number of rows in the 2D spectrogram data.
330 Each window represents a uniform time interval determined by the
331 window step size (1/sample_rate_window_step).
333 Examples
334 --------
335 >>> print(f"Time windows: {fc.n_windows}")
336 Time windows: 50
338 """
339 return self.hdf5_dataset.shape[0]
341 @property
342 def time(self) -> np.ndarray:
343 """
344 Time array including the start of each time window.
346 Generates uniformly spaced time coordinates based on the start time,
347 window step rate, and number of windows. Uses metadata time period
348 to determine bounds.
350 Returns
351 -------
352 np.ndarray
353 Array of datetime64 values for each window start time.
355 Notes
356 -----
357 Time coordinates are generated using make_dt_coordinates, which
358 ensures consistency between specified start/end times and the
359 number of windows.
361 Examples
362 --------
363 Access time array for time-based indexing:
365 >>> time_array = fc.time
366 >>> print(time_array.shape) # (n_windows,)
367 >>> print(time_array[0]) # First window time
368 2023-01-01T00:00:00.000000
370 """
371 return make_dt_coordinates(
372 self.metadata.time_period.start,
373 1.0 / self.metadata.sample_rate_window_step,
374 self.n_windows,
375 end_time=self.metadata.time_period.end,
376 )
378 @property
379 def n_frequencies(self) -> int:
380 """
381 Number of frequency bins in the Fourier analysis.
383 Returns
384 -------
385 int
386 Number of frequency bins (second dimension of data array).
388 Notes
389 -----
390 This corresponds to the number of columns in the 2D spectrogram data.
391 Determined by the FFT window size and relates to the frequency
392 resolution of the analysis.
394 Examples
395 --------
396 >>> print(f"Frequency bins: {fc.n_frequencies}")
397 Frequency bins: 256
399 """
400 return self.hdf5_dataset.shape[1]
402 @property
403 def frequency(self) -> np.ndarray:
404 """
405 Frequency array from metadata frequency bounds.
407 Generates uniformly spaced frequency coordinates based on the
408 metadata frequency range and number of frequency bins.
410 Returns
411 -------
412 np.ndarray
413 Array of frequency values, linearly spaced from frequency_min
414 to frequency_max.
416 Notes
417 -----
418 Frequencies represent harmonic indices or actual frequency values
419 depending on the frequency method specified in metadata.
420 Spacing is determined by n_frequencies bins over the range.
422 Examples
423 --------
424 Access frequency array for frequency-based indexing:
426 >>> freq_array = fc.frequency
427 >>> print(freq_array.shape) # (n_frequencies,)
428 >>> print(f"Frequency range: {freq_array.min():.2f} to {freq_array.max():.2f} Hz")
429 Frequency range: 0.00 to 64.00 Hz
431 """
432 return np.linspace(
433 self.metadata.frequency_min,
434 self.metadata.frequency_max,
435 self.n_frequencies,
436 )
438 def replace_dataset(self, new_data_array: np.ndarray) -> None:
439 """
440 Replace entire dataset with new data.
442 Resizes the HDF5 dataset if necessary and replaces all data.
443 Converts input to numpy array if needed.
445 Parameters
446 ----------
447 new_data_array : np.ndarray
448 New FC data to store. Should have shape (n_windows, n_frequencies)
449 and typically complex-valued.
451 Returns
452 -------
453 None
455 Raises
456 ------
457 TypeError
458 If input cannot be converted to numpy array.
460 Notes
461 -----
462 If new data has different shape, HDF5 dataset will be resized.
463 This is generally safe but may fragment the HDF5 file.
465 Examples
466 --------
467 Replace FC data with new analysis results:
469 >>> import numpy as np
470 >>> new_fc = np.random.rand(30, 256) + 1j * np.random.rand(30, 256)
471 >>> fc.replace_dataset(new_fc)
472 >>> print(fc.to_numpy().shape)
473 (30, 256)
475 Replace with data from list (auto-converted to array):
477 >>> data_list = [[[1+1j, 2+2j]], [[3+3j, 4+4j]]] * 15
478 >>> fc.replace_dataset(data_list)
479 >>> fc.to_numpy().shape
480 (30, 2)
482 """
483 if not isinstance(new_data_array, np.ndarray):
484 try:
485 new_data_array = np.array(new_data_array)
486 except (ValueError, TypeError) as error:
487 msg = f"{error} Input must be a numpy array not {type(new_data_array)}"
488 self.logger.exception(msg)
489 raise TypeError(msg)
490 if new_data_array.shape != self.hdf5_dataset.shape:
491 self.hdf5_dataset.resize(new_data_array.shape)
492 self.hdf5_dataset[...] = new_data_array
494 def to_xarray(self) -> xr.DataArray:
495 """
496 Convert FC data to xarray DataArray.
498 Creates an xarray DataArray with proper coordinates for time and
499 frequency. Includes metadata as attributes.
501 Returns
502 -------
503 xr.DataArray
504 DataArray with dimensions (time, frequency) and coordinates
505 from metadata and computed properties.
507 Notes
508 -----
509 Metadata changes in xarray are not validated and will not be
510 synchronized back to HDF5 without explicit call to from_xarray().
511 Data is loaded entirely into memory.
513 Examples
514 --------
515 Convert to xarray with automatic coordinates:
517 >>> xr_data = fc.to_xarray()
518 >>> print(xr_data.dims)
519 ('time', 'frequency')
520 >>> print(xr_data.shape)
521 (50, 256)
523 Select data by time and frequency range:
525 >>> subset = xr_data.sel(
526 ... time=slice('2023-01-01T00:00:00', '2023-01-01T12:00:00'),
527 ... frequency=slice(0, 10)
528 ... )
529 >>> print(subset.shape) # Subset shape
531 """
532 return xr.DataArray(
533 data=self.hdf5_dataset[()],
534 dims=["time", "frequency"],
535 name=self.metadata.component,
536 coords=[
537 ("time", self.time),
538 ("frequency", self.frequency),
539 ],
540 attrs=self.metadata.to_dict(single=True),
541 )
543 def to_numpy(self) -> np.ndarray:
544 """
545 Convert FC data to numpy array.
547 Returns the HDF5 dataset as a numpy array. Data is loaded
548 entirely into memory.
550 Returns
551 -------
552 np.ndarray
553 2D complex array with shape (n_windows, n_frequencies).
555 Notes
556 -----
557 For large spectrograms, this loads all data into RAM. Consider using
558 HDF5 slicing for memory-efficient access to subsets.
560 Examples
561 --------
562 Get full FC data as numpy array:
564 >>> data = fc.to_numpy()
565 >>> print(data.shape)
566 (50, 256)
567 >>> print(data.dtype)
568 complex128
570 Access specific time window and frequency:
572 >>> data = fc.to_numpy()
573 >>> # Get first 10 windows, frequency bin 100
574 >>> subset = data[:10, 100]
575 >>> print(subset.shape)
576 (10,)
578 """
579 return self.hdf5_dataset[()]
581 def from_numpy(self, new_estimate: np.ndarray) -> None:
582 """
583 Load FC data from numpy array.
585 Validates dtype and shape compatibility, resizes dataset if needed,
586 and stores the data.
588 Parameters
589 ----------
590 new_estimate : np.ndarray
591 FC data to load. Should have shape (n_windows, n_frequencies).
592 Typically complex-valued array.
594 Returns
595 -------
596 None
598 Raises
599 ------
600 TypeError
601 If dtype doesn't match existing dataset or input cannot
602 be converted to numpy array.
604 Notes
605 -----
606 'data' is a built-in Python function and cannot be used as parameter name.
607 The dataset will be resized if shape doesn't match.
608 Dtype compatibility is strictly enforced.
610 Examples
611 --------
612 Load FC data from numpy array:
614 >>> import numpy as np
615 >>> new_data = np.random.rand(25, 128) + 1j * np.random.rand(25, 128)
616 >>> fc.from_numpy(new_data)
617 >>> print(fc.to_numpy().shape)
618 (25, 128)
620 Load with magnitude and phase separation:
622 >>> magnitude = np.random.rand(20, 256)
623 >>> phase = np.random.rand(20, 256) * 2 * np.pi
624 >>> fc_data = magnitude * np.exp(1j * phase)
625 >>> fc.from_numpy(fc_data)
627 """
628 if not isinstance(new_estimate, np.ndarray):
629 try:
630 new_estimate = np.array(new_estimate)
631 except (ValueError, TypeError) as error:
632 msg = f"{error} Input must be a numpy array not {type(new_estimate)}"
633 self.logger.exception(msg)
634 raise TypeError(msg)
635 if new_estimate.dtype != self.hdf5_dataset.dtype:
636 msg = (
637 f"Input array must be type {new_estimate.dtype} not "
638 "{self.hdf5_dataset.dtype}"
639 )
640 self.logger.error(msg)
641 raise TypeError(msg)
642 if new_estimate.shape != self.hdf5_dataset.shape:
643 self.hdf5_dataset.resize(new_estimate.shape)
644 self.hdf5_dataset[...] = new_estimate
646 def from_xarray(
647 self,
648 data: xr.DataArray,
649 sample_rate_decimation_level: int | float,
650 ) -> None:
651 """
652 Load FC data from xarray DataArray.
654 Updates metadata from xarray coordinates and attributes, then
655 stores the data. Computes frequency and time parameters from
656 the provided xarray object.
658 Parameters
659 ----------
660 data : xr.DataArray
661 DataArray containing FC data. Expected dimensions:
662 (time, frequency).
663 sample_rate_decimation_level : int | float
664 Decimation level applied to original sample rate.
665 Used to track processing history.
667 Returns
668 -------
669 None
671 Notes
672 -----
673 This will update time_period (start/end), frequency bounds,
674 window step rate, decimation level, component name, and units
675 from the xarray object. All changes are persisted to HDF5.
677 Examples
678 --------
679 Load FC data from modified xarray:
681 >>> xr_data = fc.to_xarray()
682 >>> # Modify data (e.g., apply filter)
683 >>> modified = xr_data * np.hamming(256) # Apply frequency window
684 >>> fc.from_xarray(modified, sample_rate_decimation_level=4)
685 >>> print(fc.metadata.sample_rate_decimation_level)
686 4
688 Load with updated metadata from another analysis:
690 >>> import xarray as xr
691 >>> import pandas as pd
692 >>> time_coords = pd.date_range('2023-01-01', periods=30, freq='1H')
693 >>> freq_coords = np.arange(0, 128)
694 >>> new_fc = xr.DataArray(
695 ... data=np.random.rand(30, 128) + 1j * np.random.rand(30, 128),
696 ... coords={'time': time_coords, 'frequency': freq_coords},
697 ... dims=['time', 'frequency'],
698 ... name='Ey',
699 ... attrs={'units': 'mV/km'}
700 ... )
701 >>> fc.from_xarray(new_fc, sample_rate_decimation_level=1)
702 >>> print(fc.metadata.component)
703 Ey
705 """
706 self.metadata.time_period.start = data.time[0].values
707 self.metadata.time_period.end = data.time[-1].values
709 self.metadata.sample_rate_decimation_level = sample_rate_decimation_level
710 self.metadata.frequency_min = data.coords["frequency"].data.min()
711 self.metadata.frequency_max = data.coords["frequency"].data.max()
712 step_size = data.coords["time"].data[1] - data.coords["time"].data[0]
713 self.metadata.sample_rate_window_step = step_size / np.timedelta64(1, "s")
714 self.metadata.component = data.name
715 try:
716 self.metadata.units = data.units
717 except AttributeError:
718 self.logger.debug("Could not find 'units' in xarray")
719 self.write_metadata()
721 self.from_numpy(data.to_numpy())