Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ groups \ fc_dataset.py: 57%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Mar 10 09:02:16 2022 

4 

5@author: jpeacock 

6""" 

7 

8# ============================================================================= 

9# Imports 

10# ============================================================================= 

11from __future__ import annotations 

12 

13import weakref 

14from typing import Any 

15 

16import h5py 

17import numpy as np 

18import xarray as xr 

19from loguru import logger 

20from mt_metadata.processing.fourier_coefficients import FCChannel 

21 

22from mth5.helpers import add_attributes_to_metadata_class_pydantic, to_numpy_type 

23from mth5.timeseries.ts_helpers import make_dt_coordinates 

24from mth5.utils.exceptions import MTH5Error 

25 

26 

27# ============================================================================= 

28 

29 

30class FCChannelDataset: 

31 """ 

32 Container for Fourier coefficients (FC) from windowed FFT analysis. 

33 

34 Holds multi-dimensional Fourier coefficient data representing time-frequency 

35 analysis results. Data is uniformly sampled in both frequency (via harmonic 

36 index) and time (via uniform FFT window step size). 

37 

38 Parameters 

39 ---------- 

40 dataset : h5py.Dataset 

41 HDF5 dataset containing the Fourier coefficient data. 

42 dataset_metadata : FCChannel | None, optional 

43 Metadata object containing FC channel properties like start time, 

44 end time, sample rates, units, and frequency method. If provided, 

45 metadata will be written to HDF5 attributes. Defaults to None. 

46 **kwargs : Any 

47 Additional keyword arguments (reserved for future use). 

48 

49 Attributes 

50 ---------- 

51 hdf5_dataset : h5py.Dataset 

52 Weak reference to the HDF5 dataset. 

53 metadata : FCChannel 

54 Metadata container for the Fourier coefficients. 

55 logger : loguru.logger 

56 Logger instance for reporting messages. 

57 

58 Raises 

59 ------ 

60 MTH5Error 

61 If dataset_metadata is provided but is not of type FCChannel. 

62 TypeError 

63 If input data cannot be converted to numpy array or has 

64 incompatible dtype/shape. 

65 

66 Notes 

67 ----- 

68 The data array has shape (n_windows, n_frequencies) where: 

69 - n_windows: Number of time windows in the FFT moving window analysis 

70 - n_frequencies: Number of frequency bins determined by window size 

71 

72 Data is typically complex-valued representing Fourier coefficients. 

73 Time windows are uniformly spaced with interval 1/sample_rate_window_step. 

74 Frequencies are uniformly spaced from frequency_min to frequency_max. 

75 

76 Metadata includes: 

77 - Time period (start and end) 

78 - Acquisition and decimated sample rates 

79 - Window sample rate (delta_t within window) 

80 - Units 

81 - Frequency method (integer harmonic index calculation) 

82 - Component name (channel designation) 

83 

84 Examples 

85 -------- 

86 Create an FC dataset from HDF5 group: 

87 

88 >>> import h5py 

89 >>> import numpy as np 

90 >>> from mt_metadata.processing.fourier_coefficients import FCChannel 

91 >>> with h5py.File('fc.h5', 'w') as f: 

92 ... # Create 2D array: 50 time windows, 256 frequencies 

93 ... data = np.random.rand(50, 256) + 1j * np.random.rand(50, 256) 

94 ... dset = f.create_dataset('Ex', data=data, dtype=np.complex128) 

95 ... # Create FCChannelDataset 

96 ... fc = FCChannelDataset(dset, write_metadata=True) 

97 

98 Convert to xarray and access time-frequency data: 

99 

100 >>> xr_data = fc.to_xarray() 

101 >>> print(xr_data.dims) # ('time', 'frequency') 

102 >>> # Access data at specific time and frequency 

103 >>> subset = xr_data.sel(time='2023-01-01T12:00:00', method='nearest') 

104 

105 Inspect properties: 

106 

107 >>> print(f"Windows: {fc.n_windows}, Frequencies: {fc.n_frequencies}") 

108 >>> print(f"Frequency range: {fc.frequency.min():.2f}-{fc.frequency.max():.2f} Hz") 

109 

110 """ 

111 

112 def __init__( 

113 self, 

114 dataset: h5py.Dataset, 

115 dataset_metadata: FCChannel | None = None, 

116 **kwargs: Any, 

117 ) -> None: 

118 """ 

119 Initialize an FCChannelDataset. 

120 

121 Parameters 

122 ---------- 

123 dataset : h5py.Dataset 

124 HDF5 dataset for storing Fourier coefficient data. 

125 dataset_metadata : FCChannel | None, optional 

126 Metadata object. If provided, updates internal metadata and 

127 writes to HDF5 (unless file is read-only). Defaults to None. 

128 **kwargs : Any 

129 Additional keyword arguments (reserved for future use). 

130 

131 Raises 

132 ------ 

133 MTH5Error 

134 If dataset_metadata type doesn't match FCChannel. 

135 

136 Notes 

137 ----- 

138 Metadata is automatically read from HDF5 attributes if 'mth5_type' 

139 attribute exists. Write operations are wrapped in try-except to 

140 gracefully handle read-only files. 

141 

142 Examples 

143 -------- 

144 Create and initialize an FC dataset: 

145 

146 >>> import h5py 

147 >>> import numpy as np 

148 >>> with h5py.File('fc.h5', 'w') as f: 

149 ... data = np.random.rand(20, 128) + 1j * np.random.rand(20, 128) 

150 ... dset = f.create_dataset('Ex', data=data) 

151 ... fc = FCChannelDataset(dset) # Auto-initialize metadata 

152 

153 """ 

154 if dataset is not None and isinstance(dataset, (h5py.Dataset)): 

155 self.hdf5_dataset = weakref.ref(dataset)() 

156 self.logger = logger 

157 

158 # set metadata to the appropriate class. Standards is not a 

159 # Base object so should be skipped. If the class name is not 

160 # defined yet set to Base class. 

161 metadata_obj = FCChannel 

162 self.metadata = add_attributes_to_metadata_class_pydantic(metadata_obj) 

163 self.metadata.hdf5_reference = self.hdf5_dataset.ref 

164 self.metadata.mth5_type = self._class_name 

165 

166 # if the input data set already has filled attributes, namely if the 

167 # channel data already exists then read them in with our writing back 

168 if "mth5_type" in list(self.hdf5_dataset.attrs.keys()): 

169 self.metadata.from_dict( 

170 {self.hdf5_dataset.attrs["mth5_type"]: dict(self.hdf5_dataset.attrs)} 

171 ) 

172 # if metadata is input, make sure that its the same class type amd write 

173 # to the hdf5 dataset 

174 if dataset_metadata is not None: 

175 if not isinstance(self.metadata, type(dataset_metadata)): 

176 msg = ( 

177 f"metadata must be type metadata.{self._class_name} not " 

178 "{type(dataset_metadata)}" 

179 ) 

180 self.logger.error(msg) 

181 raise MTH5Error(msg) 

182 # load from dict because of the extra attributes for MTH5 

183 self.metadata.from_dict(dataset_metadata.to_dict()) 

184 self.metadata.hdf5_reference = self.hdf5_dataset.ref 

185 self.metadata.mth5_type = self._class_name 

186 

187 # write out metadata to make sure that its in the file. 

188 try: 

189 self.write_metadata() 

190 except (RuntimeError, KeyError, OSError): 

191 # file is read only 

192 pass 

193 

194 # if the attrs don't have the proper metadata keys yet write them 

195 if not "mth5_type" in list(self.hdf5_dataset.attrs.keys()): 

196 self.write_metadata() 

197 

198 def __str__(self) -> str: 

199 """ 

200 Return string representation of the FC dataset as JSON. 

201 

202 Returns 

203 ------- 

204 str 

205 JSON representation of the FC metadata. 

206 

207 Examples 

208 -------- 

209 >>> fc_str = str(fc) 

210 >>> print(fc_str[:50]) # Print first 50 characters 

211 {"fcchannel": {"component": "Ex", ... 

212 

213 """ 

214 return self.metadata.to_json() 

215 

216 def __repr__(self) -> str: 

217 """ 

218 Return official string representation of the FC dataset. 

219 

220 Returns 

221 ------- 

222 str 

223 JSON representation of the FC metadata. 

224 

225 Examples 

226 -------- 

227 >>> repr(fc) == str(fc) 

228 True 

229 

230 """ 

231 return self.__str__() 

232 

233 @property 

234 def _class_name(self) -> str: 

235 """ 

236 Extract the class name without 'Dataset' suffix. 

237 

238 Returns 

239 ------- 

240 str 

241 Class name with 'Dataset' suffix removed. 

242 

243 Examples 

244 -------- 

245 >>> fc._class_name 

246 'FCChannel' 

247 

248 """ 

249 return self.__class__.__name__.split("Dataset")[0] 

250 

251 def read_metadata(self) -> None: 

252 """ 

253 Read metadata from HDF5 attributes into metadata container. 

254 

255 Reads all attributes from the HDF5 dataset and loads them into 

256 the internal metadata object for validation and access. 

257 

258 Returns 

259 ------- 

260 None 

261 

262 Notes 

263 ----- 

264 This is automatically called during initialization if 'mth5_type' 

265 attribute exists in the HDF5 dataset. 

266 

267 Examples 

268 -------- 

269 Reload metadata from HDF5 after external modification: 

270 

271 >>> # Metadata was modified in HDF5 

272 >>> fc.read_metadata() # Reload changes 

273 >>> print(fc.metadata.component) # Access updated component 

274 

275 """ 

276 self.metadata.from_dict({self._class_name: dict(self.hdf5_dataset.attrs)}) 

277 

278 def write_metadata(self) -> None: 

279 """ 

280 Write metadata from container to HDF5 dataset attributes. 

281 

282 Converts the pydantic metadata model to a dictionary and writes 

283 each field as an HDF5 attribute. Values are converted to appropriate 

284 numpy types for compatibility. Always ensures 'mth5_type' attribute 

285 is set to 'FCChannel'. 

286 

287 Returns 

288 ------- 

289 None 

290 

291 Notes 

292 ----- 

293 All existing attributes with the same names will be overwritten. 

294 This is called automatically during initialization and after 

295 metadata updates. Read-only files will silently skip writes. 

296 

297 Examples 

298 -------- 

299 Save updated metadata to HDF5: 

300 

301 >>> fc.metadata.component = "Ey" 

302 >>> fc.write_metadata() # Persist to file 

303 >>> # Verify write 

304 >>> print(fc.hdf5_dataset.attrs['component']) 

305 b'Ey' 

306 

307 """ 

308 meta_dict = self.metadata.to_dict()[self.metadata._class_name.lower()] 

309 for key, value in meta_dict.items(): 

310 value = to_numpy_type(value) 

311 self.hdf5_dataset.attrs.create(key, value) 

312 

313 # Add the mth5_type attribute that is expected by channel_summary 

314 if "mth5_type" not in self.hdf5_dataset.attrs: 

315 self.hdf5_dataset.attrs.create("mth5_type", "FCChannel") 

316 

317 @property 

318 def n_windows(self) -> int: 

319 """ 

320 Number of time windows in the FFT analysis. 

321 

322 Returns 

323 ------- 

324 int 

325 Number of time windows (first dimension of data array). 

326 

327 Notes 

328 ----- 

329 This corresponds to the number of rows in the 2D spectrogram data. 

330 Each window represents a uniform time interval determined by the 

331 window step size (1/sample_rate_window_step). 

332 

333 Examples 

334 -------- 

335 >>> print(f"Time windows: {fc.n_windows}") 

336 Time windows: 50 

337 

338 """ 

339 return self.hdf5_dataset.shape[0] 

340 

341 @property 

342 def time(self) -> np.ndarray: 

343 """ 

344 Time array including the start of each time window. 

345 

346 Generates uniformly spaced time coordinates based on the start time, 

347 window step rate, and number of windows. Uses metadata time period 

348 to determine bounds. 

349 

350 Returns 

351 ------- 

352 np.ndarray 

353 Array of datetime64 values for each window start time. 

354 

355 Notes 

356 ----- 

357 Time coordinates are generated using make_dt_coordinates, which 

358 ensures consistency between specified start/end times and the 

359 number of windows. 

360 

361 Examples 

362 -------- 

363 Access time array for time-based indexing: 

364 

365 >>> time_array = fc.time 

366 >>> print(time_array.shape) # (n_windows,) 

367 >>> print(time_array[0]) # First window time 

368 2023-01-01T00:00:00.000000 

369 

370 """ 

371 return make_dt_coordinates( 

372 self.metadata.time_period.start, 

373 1.0 / self.metadata.sample_rate_window_step, 

374 self.n_windows, 

375 end_time=self.metadata.time_period.end, 

376 ) 

377 

378 @property 

379 def n_frequencies(self) -> int: 

380 """ 

381 Number of frequency bins in the Fourier analysis. 

382 

383 Returns 

384 ------- 

385 int 

386 Number of frequency bins (second dimension of data array). 

387 

388 Notes 

389 ----- 

390 This corresponds to the number of columns in the 2D spectrogram data. 

391 Determined by the FFT window size and relates to the frequency 

392 resolution of the analysis. 

393 

394 Examples 

395 -------- 

396 >>> print(f"Frequency bins: {fc.n_frequencies}") 

397 Frequency bins: 256 

398 

399 """ 

400 return self.hdf5_dataset.shape[1] 

401 

402 @property 

403 def frequency(self) -> np.ndarray: 

404 """ 

405 Frequency array from metadata frequency bounds. 

406 

407 Generates uniformly spaced frequency coordinates based on the 

408 metadata frequency range and number of frequency bins. 

409 

410 Returns 

411 ------- 

412 np.ndarray 

413 Array of frequency values, linearly spaced from frequency_min 

414 to frequency_max. 

415 

416 Notes 

417 ----- 

418 Frequencies represent harmonic indices or actual frequency values 

419 depending on the frequency method specified in metadata. 

420 Spacing is determined by n_frequencies bins over the range. 

421 

422 Examples 

423 -------- 

424 Access frequency array for frequency-based indexing: 

425 

426 >>> freq_array = fc.frequency 

427 >>> print(freq_array.shape) # (n_frequencies,) 

428 >>> print(f"Frequency range: {freq_array.min():.2f} to {freq_array.max():.2f} Hz") 

429 Frequency range: 0.00 to 64.00 Hz 

430 

431 """ 

432 return np.linspace( 

433 self.metadata.frequency_min, 

434 self.metadata.frequency_max, 

435 self.n_frequencies, 

436 ) 

437 

438 def replace_dataset(self, new_data_array: np.ndarray) -> None: 

439 """ 

440 Replace entire dataset with new data. 

441 

442 Resizes the HDF5 dataset if necessary and replaces all data. 

443 Converts input to numpy array if needed. 

444 

445 Parameters 

446 ---------- 

447 new_data_array : np.ndarray 

448 New FC data to store. Should have shape (n_windows, n_frequencies) 

449 and typically complex-valued. 

450 

451 Returns 

452 ------- 

453 None 

454 

455 Raises 

456 ------ 

457 TypeError 

458 If input cannot be converted to numpy array. 

459 

460 Notes 

461 ----- 

462 If new data has different shape, HDF5 dataset will be resized. 

463 This is generally safe but may fragment the HDF5 file. 

464 

465 Examples 

466 -------- 

467 Replace FC data with new analysis results: 

468 

469 >>> import numpy as np 

470 >>> new_fc = np.random.rand(30, 256) + 1j * np.random.rand(30, 256) 

471 >>> fc.replace_dataset(new_fc) 

472 >>> print(fc.to_numpy().shape) 

473 (30, 256) 

474 

475 Replace with data from list (auto-converted to array): 

476 

477 >>> data_list = [[[1+1j, 2+2j]], [[3+3j, 4+4j]]] * 15 

478 >>> fc.replace_dataset(data_list) 

479 >>> fc.to_numpy().shape 

480 (30, 2) 

481 

482 """ 

483 if not isinstance(new_data_array, np.ndarray): 

484 try: 

485 new_data_array = np.array(new_data_array) 

486 except (ValueError, TypeError) as error: 

487 msg = f"{error} Input must be a numpy array not {type(new_data_array)}" 

488 self.logger.exception(msg) 

489 raise TypeError(msg) 

490 if new_data_array.shape != self.hdf5_dataset.shape: 

491 self.hdf5_dataset.resize(new_data_array.shape) 

492 self.hdf5_dataset[...] = new_data_array 

493 

494 def to_xarray(self) -> xr.DataArray: 

495 """ 

496 Convert FC data to xarray DataArray. 

497 

498 Creates an xarray DataArray with proper coordinates for time and 

499 frequency. Includes metadata as attributes. 

500 

501 Returns 

502 ------- 

503 xr.DataArray 

504 DataArray with dimensions (time, frequency) and coordinates 

505 from metadata and computed properties. 

506 

507 Notes 

508 ----- 

509 Metadata changes in xarray are not validated and will not be 

510 synchronized back to HDF5 without explicit call to from_xarray(). 

511 Data is loaded entirely into memory. 

512 

513 Examples 

514 -------- 

515 Convert to xarray with automatic coordinates: 

516 

517 >>> xr_data = fc.to_xarray() 

518 >>> print(xr_data.dims) 

519 ('time', 'frequency') 

520 >>> print(xr_data.shape) 

521 (50, 256) 

522 

523 Select data by time and frequency range: 

524 

525 >>> subset = xr_data.sel( 

526 ... time=slice('2023-01-01T00:00:00', '2023-01-01T12:00:00'), 

527 ... frequency=slice(0, 10) 

528 ... ) 

529 >>> print(subset.shape) # Subset shape 

530 

531 """ 

532 return xr.DataArray( 

533 data=self.hdf5_dataset[()], 

534 dims=["time", "frequency"], 

535 name=self.metadata.component, 

536 coords=[ 

537 ("time", self.time), 

538 ("frequency", self.frequency), 

539 ], 

540 attrs=self.metadata.to_dict(single=True), 

541 ) 

542 

543 def to_numpy(self) -> np.ndarray: 

544 """ 

545 Convert FC data to numpy array. 

546 

547 Returns the HDF5 dataset as a numpy array. Data is loaded 

548 entirely into memory. 

549 

550 Returns 

551 ------- 

552 np.ndarray 

553 2D complex array with shape (n_windows, n_frequencies). 

554 

555 Notes 

556 ----- 

557 For large spectrograms, this loads all data into RAM. Consider using 

558 HDF5 slicing for memory-efficient access to subsets. 

559 

560 Examples 

561 -------- 

562 Get full FC data as numpy array: 

563 

564 >>> data = fc.to_numpy() 

565 >>> print(data.shape) 

566 (50, 256) 

567 >>> print(data.dtype) 

568 complex128 

569 

570 Access specific time window and frequency: 

571 

572 >>> data = fc.to_numpy() 

573 >>> # Get first 10 windows, frequency bin 100 

574 >>> subset = data[:10, 100] 

575 >>> print(subset.shape) 

576 (10,) 

577 

578 """ 

579 return self.hdf5_dataset[()] 

580 

581 def from_numpy(self, new_estimate: np.ndarray) -> None: 

582 """ 

583 Load FC data from numpy array. 

584 

585 Validates dtype and shape compatibility, resizes dataset if needed, 

586 and stores the data. 

587 

588 Parameters 

589 ---------- 

590 new_estimate : np.ndarray 

591 FC data to load. Should have shape (n_windows, n_frequencies). 

592 Typically complex-valued array. 

593 

594 Returns 

595 ------- 

596 None 

597 

598 Raises 

599 ------ 

600 TypeError 

601 If dtype doesn't match existing dataset or input cannot 

602 be converted to numpy array. 

603 

604 Notes 

605 ----- 

606 'data' is a built-in Python function and cannot be used as parameter name. 

607 The dataset will be resized if shape doesn't match. 

608 Dtype compatibility is strictly enforced. 

609 

610 Examples 

611 -------- 

612 Load FC data from numpy array: 

613 

614 >>> import numpy as np 

615 >>> new_data = np.random.rand(25, 128) + 1j * np.random.rand(25, 128) 

616 >>> fc.from_numpy(new_data) 

617 >>> print(fc.to_numpy().shape) 

618 (25, 128) 

619 

620 Load with magnitude and phase separation: 

621 

622 >>> magnitude = np.random.rand(20, 256) 

623 >>> phase = np.random.rand(20, 256) * 2 * np.pi 

624 >>> fc_data = magnitude * np.exp(1j * phase) 

625 >>> fc.from_numpy(fc_data) 

626 

627 """ 

628 if not isinstance(new_estimate, np.ndarray): 

629 try: 

630 new_estimate = np.array(new_estimate) 

631 except (ValueError, TypeError) as error: 

632 msg = f"{error} Input must be a numpy array not {type(new_estimate)}" 

633 self.logger.exception(msg) 

634 raise TypeError(msg) 

635 if new_estimate.dtype != self.hdf5_dataset.dtype: 

636 msg = ( 

637 f"Input array must be type {new_estimate.dtype} not " 

638 "{self.hdf5_dataset.dtype}" 

639 ) 

640 self.logger.error(msg) 

641 raise TypeError(msg) 

642 if new_estimate.shape != self.hdf5_dataset.shape: 

643 self.hdf5_dataset.resize(new_estimate.shape) 

644 self.hdf5_dataset[...] = new_estimate 

645 

646 def from_xarray( 

647 self, 

648 data: xr.DataArray, 

649 sample_rate_decimation_level: int | float, 

650 ) -> None: 

651 """ 

652 Load FC data from xarray DataArray. 

653 

654 Updates metadata from xarray coordinates and attributes, then 

655 stores the data. Computes frequency and time parameters from 

656 the provided xarray object. 

657 

658 Parameters 

659 ---------- 

660 data : xr.DataArray 

661 DataArray containing FC data. Expected dimensions: 

662 (time, frequency). 

663 sample_rate_decimation_level : int | float 

664 Decimation level applied to original sample rate. 

665 Used to track processing history. 

666 

667 Returns 

668 ------- 

669 None 

670 

671 Notes 

672 ----- 

673 This will update time_period (start/end), frequency bounds, 

674 window step rate, decimation level, component name, and units 

675 from the xarray object. All changes are persisted to HDF5. 

676 

677 Examples 

678 -------- 

679 Load FC data from modified xarray: 

680 

681 >>> xr_data = fc.to_xarray() 

682 >>> # Modify data (e.g., apply filter) 

683 >>> modified = xr_data * np.hamming(256) # Apply frequency window 

684 >>> fc.from_xarray(modified, sample_rate_decimation_level=4) 

685 >>> print(fc.metadata.sample_rate_decimation_level) 

686 4 

687 

688 Load with updated metadata from another analysis: 

689 

690 >>> import xarray as xr 

691 >>> import pandas as pd 

692 >>> time_coords = pd.date_range('2023-01-01', periods=30, freq='1H') 

693 >>> freq_coords = np.arange(0, 128) 

694 >>> new_fc = xr.DataArray( 

695 ... data=np.random.rand(30, 128) + 1j * np.random.rand(30, 128), 

696 ... coords={'time': time_coords, 'frequency': freq_coords}, 

697 ... dims=['time', 'frequency'], 

698 ... name='Ey', 

699 ... attrs={'units': 'mV/km'} 

700 ... ) 

701 >>> fc.from_xarray(new_fc, sample_rate_decimation_level=1) 

702 >>> print(fc.metadata.component) 

703 Ey 

704 

705 """ 

706 self.metadata.time_period.start = data.time[0].values 

707 self.metadata.time_period.end = data.time[-1].values 

708 

709 self.metadata.sample_rate_decimation_level = sample_rate_decimation_level 

710 self.metadata.frequency_min = data.coords["frequency"].data.min() 

711 self.metadata.frequency_max = data.coords["frequency"].data.max() 

712 step_size = data.coords["time"].data[1] - data.coords["time"].data[0] 

713 self.metadata.sample_rate_window_step = step_size / np.timedelta64(1, "s") 

714 self.metadata.component = data.name 

715 try: 

716 self.metadata.units = data.units 

717 except AttributeError: 

718 self.logger.debug("Could not find 'units' in xarray") 

719 self.write_metadata() 

720 

721 self.from_numpy(data.to_numpy())