Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ groups \ channel_dataset.py: 52%
413 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Sat May 27 10:03:23 2023
5@author: jpeacock
6"""
8# =============================================================================
9# Imports
10# =============================================================================
11from __future__ import annotations
13import inspect
14import weakref
15from typing import Any
17import h5py
18import numpy as np
19import pandas as pd
20import xarray as xr
21from loguru import logger
22from mt_metadata import timeseries as metadata
23from mt_metadata.base import MetadataBase
24from mt_metadata.common.mttime import MTime
25from mt_metadata.timeseries.filters import ChannelResponse
27from mth5 import CHANNEL_DTYPE
28from mth5.groups import FiltersGroup
29from mth5.helpers import (
30 add_attributes_to_metadata_class_pydantic,
31 from_numpy_type,
32 inherit_doc_string,
33 read_attrs_to_dict,
34 to_numpy_type,
35)
36from mth5.timeseries import ChannelTS
37from mth5.timeseries.channel_ts import make_dt_coordinates
38from mth5.utils.exceptions import MTH5Error
41meta_classes = dict(inspect.getmembers(metadata, inspect.isclass))
44# =============================================================================
45class ChannelDataset:
46 """
47 A container for channel time series data stored in HDF5 format.
49 This class provides a flexible interface to work with magnetotelluric channel data,
50 allowing conversion to various formats (xarray, pandas, numpy) while maintaining
51 metadata integrity.
53 Parameters
54 ----------
55 dataset : h5py.Dataset or None
56 HDF5 dataset object containing the channel time series data.
57 dataset_metadata : MetadataBase, optional
58 Metadata container for Electric, Magnetic, or Auxiliary channel types.
59 Default is None.
60 write_metadata : bool, optional
61 Whether to write metadata to the HDF5 dataset on initialization.
62 Default is True.
63 **kwargs : dict
64 Additional keyword arguments to set as instance attributes.
66 Attributes
67 ----------
68 hdf5_dataset : h5py.Dataset
69 Weak reference to the underlying HDF5 dataset.
70 metadata : MetadataBase
71 Channel metadata object with validation.
72 logger : loguru.Logger
73 Logger instance for tracking operations.
75 Raises
76 ------
77 MTH5Error
78 If the dataset is not of the correct type or metadata validation fails.
80 See Also
81 --------
82 ElectricDataset : Specialized container for electric field channels.
83 MagneticDataset : Specialized container for magnetic field channels.
84 AuxiliaryDataset : Specialized container for auxiliary channels.
86 Examples
87 --------
88 >>> from mth5 import mth5
89 >>> mth5_obj = mth5.MTH5()
90 >>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
91 >>> run = mth5_obj.stations_group.get_station('MT001').get_run('MT001a')
92 >>> channel = run.get_channel('Ex')
93 >>> channel
94 Channel Electric:
95 -------------------
96 component: Ex
97 data type: electric
98 data format: float32
99 data shape: (4096,)
100 start: 1980-01-01T00:00:00+00:00
101 end: 1980-01-01T00:00:01+00:00
102 sample rate: 4096
104 Access time series data
106 >>> ts_data = channel.to_channel_ts()
107 >>> print(f"Mean: {ts_data.ts.mean():.2f}, Std: {ts_data.ts.std():.2f}")
109 Convert to xarray for time-based indexing
111 >>> xr_data = channel.to_xarray()
112 >>> subset = xr_data.sel(time=slice('1980-01-01T00:00:00', '1980-01-01T00:00:10'))
114 """
116 def __init__(
117 self,
118 dataset: h5py.Dataset | None,
119 dataset_metadata: MetadataBase | None = None,
120 write_metadata: bool = True,
121 **kwargs: Any,
122 ) -> None:
123 for key, value in kwargs.items():
124 setattr(self, key, value)
125 if dataset is not None and isinstance(dataset, (h5py.Dataset)):
126 self.hdf5_dataset = weakref.ref(dataset)()
127 self.logger = logger
129 # set metadata to the appropriate class. Standards is not a
130 # Base object so should be skipped. If the class name is not
131 # defined yet set to Base class.
132 try:
133 metadata_obj = meta_classes[self._class_name]
134 except KeyError:
135 metadata_obj = MetadataBase
136 # add mth5 attributes to the metadata class
137 self.metadata = add_attributes_to_metadata_class_pydantic(metadata_obj)
138 self.metadata.hdf5_reference = self.hdf5_dataset.ref
139 self.metadata.mth5_type = self._class_name
140 # if the input data set already has filled attributes, namely if the
141 # channel data already exists then read them in without writing back
142 if "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
143 self.read_metadata()
145 # this causes issues because the attrs are in binary format
146 # self.metadata.from_dict(
147 # {self.hdf5_dataset.attrs["mth5_type"]: dict(self.hdf5_dataset.attrs)}
148 # )
149 # if metadata is input, make sure that its the same class type amd write
150 # to the hdf5 dataset
151 if dataset_metadata is not None:
152 if not isinstance(self.metadata, type(dataset_metadata)):
153 msg = (
154 f"metadata must be type metadata.{self._class_name} not "
155 f"{type(dataset_metadata)}"
156 )
157 self.logger.error(msg)
158 raise MTH5Error(msg)
159 # load from dict because of the extra attributes for MTH5
160 # Filter out None values for mth5_type to avoid pydantic validation errors
161 metadata_dict = dataset_metadata.to_dict()
162 # Clean the dict to remove None values for mth5_type that might cause validation errors
163 for class_key, class_data in metadata_dict.items():
164 if isinstance(class_data, dict) and class_data.get("mth5_type") is None:
165 class_data.pop("mth5_type", None)
167 self.metadata.from_dict(metadata_dict)
168 # Always set these critical properties regardless of the path
169 self.metadata.hdf5_reference = self.hdf5_dataset.ref
170 self.metadata.mth5_type = self._class_name
172 # write out metadata to make sure that its in the file.
173 if write_metadata:
174 self.write_metadata()
175 else:
176 self.read_metadata()
177 # if the attrs don't have the proper metadata keys yet write them
178 if not "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
179 self.hdf5_dataset.attrs["mth5_type"] = self._class_name
180 self.write_metadata()
182 def __str__(self) -> str:
183 """
184 Generate a human-readable string representation of the channel.
186 Returns
187 -------
188 str
189 Formatted string with channel metadata and data information.
191 Examples
192 --------
193 >>> print(channel)
194 Channel Electric:
195 -------------------
196 component: Ex
197 data type: electric
198 data format: float32
199 data shape: (4096,)
200 start: 1980-01-01T00:00:00+00:00
201 end: 1980-01-01T00:00:01+00:00
202 sample rate: 4096
203 """
204 try:
205 lines = ["Channel {0}:".format(self._class_name)]
206 lines.append("-" * (len(lines[0]) + 2))
207 info_str = "\t{0:<18}{1}"
208 lines.append(info_str.format("component:", self.metadata.component))
209 lines.append(info_str.format("data type:", self.metadata.type))
210 lines.append(info_str.format("data format:", self.hdf5_dataset.dtype))
211 lines.append(info_str.format("data shape:", self.hdf5_dataset.shape))
212 lines.append(info_str.format("start:", self.metadata.time_period.start))
213 lines.append(info_str.format("end:", self.metadata.time_period.end))
214 lines.append(info_str.format("sample rate:", self.metadata.sample_rate))
215 return "\n".join(lines)
216 except ValueError:
217 return "MTH5 file is closed and cannot be accessed."
219 def __repr__(self) -> str:
220 """
221 Return the string representation of the channel.
223 Returns
224 -------
225 str
226 String representation identical to __str__.
227 """
228 return self.__str__()
230 @property
231 def _class_name(self) -> str:
232 """
233 Extract the base class name without 'Dataset' suffix.
235 Returns
236 -------
237 str
238 Base class name (e.g., 'Electric', 'Magnetic', 'Auxiliary').
239 """
240 return self.__class__.__name__.split("Dataset")[0]
242 @property
243 def run_metadata(self) -> metadata.Run:
244 """
245 Get the run-level metadata containing this channel.
247 Returns
248 -------
249 metadata.Run
250 Run metadata object with channel information included.
252 Examples
253 --------
254 >>> run_meta = channel.run_metadata
255 >>> print(run_meta.id)
256 'MT001a'
257 >>> print(run_meta.channels_recorded_electric)
258 ['Ex', 'Ey']
259 """
260 meta_dict = dict(self.hdf5_dataset.parent.attrs)
261 for key, value in meta_dict.items():
262 meta_dict[key] = from_numpy_type(value)
263 run_metadata = metadata.Run()
264 run_metadata.from_dict({"run": meta_dict})
265 run_metadata.add_channel(self.metadata)
266 return run_metadata
268 @property
269 def station_metadata(self) -> metadata.Station:
270 """
271 Get the station-level metadata containing this channel.
273 Returns
274 -------
275 metadata.Station
276 Station metadata object with run and channel information.
278 Examples
279 --------
280 >>> station_meta = channel.station_metadata
281 >>> print(f"{station_meta.id}: {station_meta.location.latitude}, {station_meta.location.longitude}")
282 'MT001: 40.5, -112.3'
283 """
284 station_metadata = metadata.Station()
285 station_metadata.from_dict(
286 {
287 "station": read_attrs_to_dict(
288 dict(self.hdf5_dataset.parent.parent.attrs), station_metadata
289 )
290 }
291 )
292 station_metadata.add_run(self.run_metadata)
293 return station_metadata
295 @property
296 def survey_metadata(self) -> metadata.Survey:
297 """
298 Get the survey-level metadata containing this channel.
300 Returns
301 -------
302 metadata.Survey
303 Complete survey metadata hierarchy including this channel.
305 Examples
306 --------
307 >>> survey_meta = channel.survey_metadata
308 >>> print(survey_meta.id)
309 'MT Survey 2023'
310 >>> print(f"Stations: {len(survey_meta.stations)}")
311 Stations: 15
312 """
313 survey_metadata = metadata.Survey()
314 survey_metadata.from_dict(
315 {
316 "survey": read_attrs_to_dict(
317 dict(self.hdf5_dataset.parent.parent.parent.parent.attrs),
318 survey_metadata,
319 )
320 }
321 )
322 survey_metadata.add_station(self.station_metadata)
323 return survey_metadata
325 @property
326 def survey_id(self) -> str:
327 """
328 Get the survey identifier.
330 Returns
331 -------
332 str
333 Survey ID string.
335 Examples
336 --------
337 >>> print(channel.survey_id)
338 'MT_Survey_2023'
339 """
340 return self.hdf5_dataset.parent.parent.parent.parent.attrs["id"]
342 @property
343 def channel_response(self) -> ChannelResponse:
344 """
345 Get the complete channel response from applied filters.
347 Constructs a ChannelResponse object by retrieving all filters referenced
348 in the channel metadata from the survey's Filters group.
350 Returns
351 -------
352 ChannelResponse
353 Channel response object containing all applied filters in sequence.
355 Notes
356 -----
357 Filters are applied in the order specified by their sequence_number.
358 Filter names are normalized by replacing '/' with ' per ' and converting
359 to lowercase.
361 Examples
362 --------
363 >>> response = channel.channel_response
364 >>> print(f"Number of filters: {len(response.filters_list)}")
365 Number of filters: 3
366 >>> for filt in response.filters_list:
367 ... print(f"{filt.name}: {filt.type}")
368 zpk: zpk
369 coefficient: coefficient
370 time delay: time_delay
371 """
372 # get the filters to make a channel response
373 filters_group = FiltersGroup(
374 self.hdf5_dataset.parent.parent.parent.parent["Filters"]
375 )
376 f_list = []
377 # Check if filters field exists (new version with AppliedFilter objects)
378 if hasattr(self.metadata, "filters") and self.metadata.filters:
379 for count, applied_filter in enumerate(self.metadata.filters, 1):
380 if hasattr(applied_filter, "name"):
381 name = applied_filter.name.replace("/", " per ").lower()
382 try:
383 filt_obj = filters_group.to_filter_object(name)
384 filt_obj.sequence_number = count
385 f_list.append(filt_obj)
386 except KeyError:
387 self.logger.warning(f"Could not locate filter {name}")
388 continue
389 # Fallback to old filter field for backward compatibility
390 elif hasattr(self.metadata, "filter") and hasattr(self.metadata.filter, "name"):
391 for count, name in enumerate(self.metadata.filter.name, 1):
392 name = name.replace("/", " per ").lower()
393 try:
394 filt_obj = filters_group.to_filter_object(name)
395 filt_obj.sequence_number = count
396 f_list.append(filt_obj)
397 except KeyError:
398 self.logger.warning(f"Could not locate filter {name}")
399 continue
400 return ChannelResponse(filters_list=f_list)
402 @property
403 def start(self) -> MTime:
404 """
405 Get the start time of the channel data.
407 Returns
408 -------
409 MTime
410 Start time from metadata.time_period.start.
412 Examples
413 --------
414 >>> print(channel.start)
415 1980-01-01T00:00:00+00:00
416 >>> print(channel.start.iso_str)
417 '1980-01-01T00:00:00.000000+00:00'
418 """
419 return self.metadata.time_period.start
421 @start.setter
422 def start(self, value: str | MTime) -> None:
423 """
424 Set the start time with validation.
426 Parameters
427 ----------
428 value : str or MTime
429 New start time in ISO format string or MTime object.
431 Examples
432 --------
433 >>> channel.start = '1980-01-01T12:00:00'
434 >>> channel.start = MTime('1980-01-01T12:00:00')
435 """
436 if isinstance(value, MTime):
437 self.metadata.time_period.start = value.isoformat()
438 else:
439 self.metadata.time_period.start = value
441 @property
442 def end(self) -> MTime:
443 """
444 Calculate the end time based on start time, sample rate, and number of samples.
446 Returns
447 -------
448 MTime
449 Calculated end time of the data.
451 Notes
452 -----
453 End time is calculated as: start + (n_samples - 1) / sample_rate
454 The -1 ensures the last sample falls exactly at the end time.
456 Examples
457 --------
458 >>> print(f"Duration: {channel.end - channel.start} seconds")
459 Duration: 3600.0 seconds
460 >>> print(channel.end.iso_str)
461 '1980-01-01T01:00:00.000000+00:00'
462 """
463 return self.start + ((self.n_samples - 1) / self.sample_rate)
465 @property
466 def sample_rate(self) -> float:
467 """
468 Get the sample rate in samples per second.
470 Returns
471 -------
472 float
473 Sample rate in Hz.
475 Examples
476 --------
477 >>> print(f"Sample rate: {channel.sample_rate} Hz")
478 Sample rate: 256.0 Hz
479 """
480 return self.metadata.sample_rate
482 @sample_rate.setter
483 def sample_rate(self, value: float) -> None:
484 """
485 Set the sample rate with validation through metadata.
487 Parameters
488 ----------
489 value : float
490 New sample rate in Hz.
492 Examples
493 --------
494 >>> channel.sample_rate = 256.0
495 """
496 self.metadata.sample_rate = value
498 @property
499 def n_samples(self) -> int:
500 """
501 Get the total number of samples in the dataset.
503 Returns
504 -------
505 int
506 Number of data points in the time series.
508 Examples
509 --------
510 >>> print(f"Total samples: {channel.n_samples:,}")
511 Total samples: 921,600
512 >>> duration = channel.n_samples / channel.sample_rate
513 >>> print(f"Duration: {duration/3600:.1f} hours")
514 Duration: 1.0 hours
515 """
516 return self.hdf5_dataset.size
518 @property
519 def time_index(self) -> pd.DatetimeIndex:
520 """
521 Create a time index for the dataset based on metadata.
523 Returns
524 -------
525 pd.DatetimeIndex
526 Pandas datetime index spanning the entire dataset.
528 Notes
529 -----
530 The time index is useful for time-based queries and slicing operations.
531 It is generated dynamically from start time, sample rate, and number of samples.
533 Examples
534 --------
535 >>> time_idx = channel.time_index
536 >>> print(time_idx[0], time_idx[-1])
537 1980-01-01 00:00:00 1980-01-01 00:59:59.996093750
538 >>> print(f"Index length: {len(time_idx)}")
539 Index length: 921600
540 """
541 return make_dt_coordinates(self.start, self.sample_rate, self.n_samples)
543 def read_metadata(self) -> None:
544 """
545 Read metadata from HDF5 attributes into the metadata container.
547 Loads all HDF5 attributes from the dataset and converts them to the
548 appropriate Python types before populating the metadata object.
550 For older MTH5 files, this method attempts to coerce values to the
551 expected types based on the metadata schema to maintain backwards
552 compatibility.
554 Notes
555 -----
556 This method automatically validates metadata through the metadata
557 container's validators. Type coercion is applied to handle older
558 file formats that may have stored metadata with different types.
560 Examples
561 --------
562 >>> channel.read_metadata()
563 >>> print(channel.metadata.component)
564 'Ex'
565 >>> print(channel.metadata.sample_rate)
566 256.0
568 Handles type coercion for older files
570 >>> # If sample_rate was stored as string '256.0' in old file
571 >>> channel.read_metadata()
572 >>> print(type(channel.metadata.sample_rate))
573 <class 'float'>
574 """
575 meta_dict = read_attrs_to_dict(dict(self.hdf5_dataset.attrs), self.metadata)
576 # Defensive check: skip if meta_dict is empty
577 if not meta_dict:
578 self.logger.debug(
579 f"No metadata found for {self._class_name}, skipping from_dict."
580 )
581 return
582 self.metadata.from_dict({self._class_name: meta_dict})
583 self._has_read_metadata = True
585 def write_metadata(self) -> None:
586 """
587 Write metadata from the container to HDF5 dataset attributes.
589 Converts all metadata values to numpy-compatible types before writing
590 to HDF5 attributes. Falls back to string conversion if direct conversion fails.
592 Notes
593 -----
594 This method is automatically called during initialization and when
595 metadata is updated.
597 Examples
598 --------
599 >>> channel.metadata.component = 'Ey'
600 >>> channel.metadata.measurement_azimuth = 90.0
601 >>> channel.write_metadata()
602 """
603 meta_dict = self.metadata.to_dict()[self.metadata._class_name.lower()]
604 for key, value in meta_dict.items():
605 try:
606 value = to_numpy_type(value)
607 self.hdf5_dataset.attrs.create(key, value)
608 except Exception as e:
609 # Convert problematic values to string as fallback
610 self.hdf5_dataset.attrs.create(key, str(value))
612 def replace_dataset(self, new_data_array: np.ndarray) -> None:
613 """
614 Replace the entire dataset with new data.
616 Parameters
617 ----------
618 new_data_array : np.ndarray
619 New data array with shape (npts,). Must be 1-dimensional.
621 Raises
622 ------
623 TypeError
624 If new_data_array cannot be converted to numpy array.
626 Notes
627 -----
628 The HDF5 dataset will be resized if the new array has a different shape.
629 All existing data will be overwritten.
631 Examples
632 --------
633 Replace with synthetic data
635 >>> import numpy as np
636 >>> new_data = np.sin(2 * np.pi * 1.0 * np.linspace(0, 10, 2560))
637 >>> channel.replace_dataset(new_data)
638 >>> print(f"New shape: {channel.hdf5_dataset.shape}")
639 New shape: (2560,)
641 Replace with processed data
643 >>> original = channel.hdf5_dataset[:]
644 >>> filtered = np.convolve(original, np.ones(5)/5, mode='same')
645 >>> channel.replace_dataset(filtered)
646 """
647 if not isinstance(new_data_array, np.ndarray):
648 try:
649 new_data_array = np.array(new_data_array)
650 except (ValueError, TypeError) as error:
651 msg = f"{error} Input must be a numpy array not {type(new_data_array)}"
652 self.logger.exception(msg)
653 raise TypeError(msg)
654 if new_data_array.shape != self.hdf5_dataset.shape:
655 self.hdf5_dataset.resize(new_data_array.shape)
656 self.hdf5_dataset[...] = new_data_array
658 def extend_dataset(
659 self,
660 new_data_array: np.ndarray,
661 start_time: str | MTime,
662 sample_rate: float,
663 fill: str | float | int | None = None,
664 max_gap_seconds: float | int = 1,
665 fill_window: int = 10,
666 ) -> None:
667 """
668 Extend or prepend data to the existing dataset with gap handling.
670 Intelligently adds new data before, after, or within the existing time series.
671 Handles time alignment, overlaps, and gaps with configurable fill strategies.
673 Parameters
674 ----------
675 new_data_array : np.ndarray
676 New data array with shape (npts,).
677 start_time : str or MTime
678 Start time of the new data array in UTC.
679 sample_rate : float
680 Sample rate of the new data array in Hz. Must match existing sample rate.
681 fill : str, float, int, or None, optional
682 Strategy for filling data gaps:
684 - None : Raise MTH5Error if gap exists (default)
685 - 'mean' : Fill with mean of both datasets within fill_window
686 - 'median' : Fill with median of both datasets within fill_window
687 - 'nan' : Fill with NaN values
688 - numeric value : Fill with specified constant
690 max_gap_seconds : float or int, optional
691 Maximum allowed gap in seconds. Exceeding this raises MTH5Error.
692 Default is 1 second.
693 fill_window : int, optional
694 Number of points from each dataset edge to estimate fill values.
695 Default is 10 points.
697 Raises
698 ------
699 MTH5Error
700 If sample rates don't match, gap exceeds max_gap_seconds, or
701 fill strategy is invalid.
702 TypeError
703 If new_data_array cannot be converted to numpy array.
705 Notes
706 -----
707 - **Prepend**: New data start < existing start
708 - **Append**: New data start > existing end
709 - **Overwrite**: New data overlaps existing data
711 The dataset is automatically resized to accommodate new data.
713 Examples
714 --------
715 Append data with a small gap
717 >>> ex = mth5_obj.get_channel('MT001', 'MT001a', 'Ex')
718 >>> print(f"Original: {ex.n_samples} samples, ends {ex.end}")
719 Original: 4096 samples, ends 2015-01-08T19:32:09.500000+00:00
720 >>> new_data = np.random.randn(4096)
721 >>> new_start = (ex.end + 0.5).isoformat() # 0.5s gap
722 >>> ex.extend_dataset(new_data, new_start, ex.sample_rate,
723 ... fill='median', max_gap_seconds=2)
724 >>> print(f"Extended: {ex.n_samples} samples, ends {ex.end}")
725 Extended: 8200 samples, ends 2015-01-08T19:40:42.500000+00:00
727 Prepend data seamlessly
729 >>> prepend_data = np.random.randn(2048)
730 >>> prepend_start = (ex.start - 2048/ex.sample_rate).isoformat()
731 >>> ex.extend_dataset(prepend_data, prepend_start, ex.sample_rate)
732 >>> print(f"New start: {ex.start}")
734 Overwrite section of existing data
736 >>> replacement_data = np.zeros(1024)
737 >>> replace_start = (ex.start + 1.0).isoformat() # 1s after start
738 >>> ex.extend_dataset(replacement_data, replace_start, ex.sample_rate)
740 """
741 fw = fill_window
742 # check input parameters
743 if sample_rate != self.sample_rate:
744 msg = (
745 "new data must have the same sampling rate as existing data.\n"
746 + f"\tnew sample rate = {sample_rate}\n"
747 + f"\texisting sample rate = {self.sample_rate}"
748 )
749 self.logger.error(msg)
750 raise MTH5Error(msg)
751 if not isinstance(new_data_array, np.ndarray):
752 try:
753 new_data_array = np.array(new_data_array)
754 except (ValueError, TypeError) as error:
755 msg = f"{error} Input must be a numpy array not {type(new_data_array)}"
756 self.logger.exception(msg)
757 raise TypeError(msg)
758 if not isinstance(start_time, MTime):
759 start_time = MTime(time_stamp=start_time)
760 # get end time will need later
761 end_time = start_time + (new_data_array.size / sample_rate)
763 # check start time
764 start_t_diff = self._get_diff_new_array_start(start_time)
765 end_t_diff = self._get_diff_new_array_end(end_time)
767 self.logger.info("Extending data.")
768 self.logger.info(f"Existing start time {self.start}")
769 self.logger.info(f"New start time {start_time}")
770 self.logger.info(f"Existing end time {self.end}")
771 self.logger.info(f"New end time {end_time}")
773 # prepend data
774 if start_t_diff < 0:
775 self.logger.info("Prepending: ")
776 self.logger.info(
777 f"new start time {start_time} is before existing {self.start}"
778 )
779 if end_time.iso_no_tz not in self.time_index:
780 gap = abs(end_time - self.start)
781 if gap > 0:
782 if gap > max_gap_seconds:
783 msg = (
784 f"Time gap of {gap} seconds "
785 + f"is more than max_gap_seconds = {max_gap_seconds}."
786 + " Consider making a new run."
787 )
788 self.logger.error(msg)
789 raise MTH5Error(msg)
790 if fill is None:
791 msg = (
792 f"A time gap of {gap} seconds is found "
793 + "between new and existing data sets. \n"
794 + f"\tnew end time: {end_time}\n"
795 + f"\texisting start time: {self.start}"
796 )
797 self.logger.error(msg)
798 raise MTH5Error(msg)
799 # set new start time
800 old_slice = self.time_slice(self.start, end_time=self.end)
801 old_start = self.start.copy()
802 self.start = start_time
804 # resize the existing data to make room for new data
805 self.hdf5_dataset.resize(
806 (
807 int(
808 new_data_array.size
809 + self.hdf5_dataset.size
810 + gap * sample_rate
811 ),
812 )
813 )
815 # fill based on time, refill existing data first
816 self.hdf5_dataset[
817 self.get_index_from_time(old_start) :
818 ] = old_slice.ts.values
819 self.hdf5_dataset[
820 0 : self.get_index_from_time(end_time)
821 ] = new_data_array
823 if fill == "mean":
824 fill_value = np.mean(
825 np.array(
826 [
827 new_data_array[-fw:].mean(),
828 float(old_slice.ts[0:fw].mean()),
829 ]
830 )
831 )
832 elif fill == "median":
833 fill_value = np.median(
834 np.array(
835 [
836 np.median(new_data_array[-fw:]),
837 np.median(old_slice.ts[0:fw]),
838 ]
839 )
840 )
841 elif fill == "nan":
842 fill_value = np.nan
843 elif isinstance(fill, (int, float)):
844 fill_value = fill
845 else:
846 msg = f"fill value {fill} is not understood"
847 self.logger.error(msg)
848 raise MTH5Error(msg)
849 self.logger.info(f"filling data gap with {fill_value}")
850 self.hdf5_dataset[
851 self.get_index_from_time(end_time) : self.get_index_from_time(
852 old_start
853 )
854 ] = fill_value
855 else:
856 new_size = (self.n_samples + int(abs(start_t_diff) * sample_rate),)
857 overlap = abs(end_time - self.start)
858 self.logger.warning(
859 f"New data is overlapping by {overlap} s."
860 + " Any overlap will be overwritten."
861 )
862 # set new start time
863 old_slice = self.time_slice(self.start, end_time=self.end)
864 old_start = self.start.copy()
865 self.start = start_time
866 self.logger.debug(
867 f"resizing data set from {self.n_samples} to {new_size}"
868 )
869 self.hdf5_dataset.resize(new_size)
871 # put back the existing data, which any overlapping times
872 # will be overwritten
873 self.hdf5_dataset[
874 self.get_index_from_time(old_start) :
875 ] = old_slice.ts.values
876 self.hdf5_dataset[
877 0 : self.get_index_from_time(end_time)
878 ] = new_data_array
879 # append data
880 elif start_t_diff > 0:
881 old_end = self.end.copy()
882 if start_time.iso_no_tz not in self.time_index:
883 gap = abs(self.end - start_time)
884 if gap > 0:
885 if gap > max_gap_seconds:
886 msg = (
887 f"Time gap of {gap} seconds "
888 + f"is more than max_gap_seconds = {max_gap_seconds}."
889 + " Consider making a new run."
890 )
891 self.logger.error(msg)
892 raise MTH5Error(msg)
893 if fill is None:
894 msg = (
895 f"A time gap of {gap} seconds is found "
896 + "between new and existing data sets. \n"
897 + f"\tnew start time: {start_time}\n"
898 + f"\texisting end time: {self.end}"
899 )
900 self.logger.error(msg)
901 raise MTH5Error(msg)
902 # resize the existing data to make room for new data
903 self.hdf5_dataset.resize(
904 (
905 int(
906 new_data_array.size
907 + self.hdf5_dataset.size
908 + gap * sample_rate
909 ),
910 )
911 )
913 self.hdf5_dataset[
914 self.get_index_from_time(start_time) :
915 ] = new_data_array
916 old_index = self.get_index_from_time(old_end)
917 if fill == "mean":
918 fill_value = np.mean(
919 np.array(
920 [
921 new_data_array[0:fw].mean(),
922 np.mean(self.hdf5_dataset[old_index - fw :]),
923 ]
924 )
925 )
926 elif fill == "median":
927 fill_value = np.median(
928 np.array(
929 [
930 np.median(new_data_array[0:fw]),
931 np.median(self.hdf5_dataset[old_index - fw :]),
932 ]
933 )
934 )
935 elif fill == "nan":
936 fill_value = np.nan
937 elif isinstance(fill, (int, float)):
938 fill_value = fill
939 else:
940 msg = f"fill value {fill} is not understood"
941 self.logger.error(msg)
942 raise MTH5Error(msg)
943 self.logger.info(f"filling data gap with {fill_value}")
944 self.hdf5_dataset[
945 self.get_index_from_time(old_end) : self.get_index_from_time(
946 start_time
947 )
948 ] = fill_value
949 else:
950 # if the new data fits within the extisting time span
951 if end_t_diff < 0:
952 self.logger.debug(
953 "New data fits within existing time span"
954 + " all data in the window : "
955 f"{start_time} -- {end_time} " + "will be overwritten."
956 )
957 self.hdf5_dataset[
958 self.get_index_from_time(start_time) : self.get_index_from_time(
959 end_time
960 )
961 ] = new_data_array
962 else:
963 new_size = (self.n_samples + int(abs(start_t_diff) * sample_rate),)
964 overlap = abs(self.end - start_time)
965 self.logger.warning(
966 f"New data is overlapping by {overlap} s."
967 + " Any overlap will be overwritten."
968 )
970 self.logger.debug(
971 f"resizing data set from {self.n_samples} to {new_size}"
972 )
973 self.hdf5_dataset.resize(new_size)
975 # put back the existing data, which any overlapping times
976 # will be overwritten
977 self.hdf5_dataset[
978 self.get_index_from_time(start_time) :
979 ] = new_data_array
981 def has_data(self) -> bool:
982 """
983 Check if the channel contains non-zero data.
985 Returns
986 -------
987 bool
988 True if dataset has non-zero values, False if all zeros or empty.
990 Examples
991 --------
992 >>> if channel.has_data():
993 ... print("Channel has valid data")
994 ... else:
995 ... print("Channel is empty or all zeros")
996 Channel has valid data
998 >>> empty_channel.has_data()
999 False
1000 """
1001 if len(self.hdf5_dataset) > 0:
1002 if len(np.nonzero(self.hdf5_dataset)[0]) > 0:
1003 return True
1004 else:
1005 return False
1006 return False
1008 def to_channel_ts(self) -> ChannelTS:
1009 """
1010 Convert the dataset to a ChannelTS object with full metadata.
1012 Returns
1013 -------
1014 ChannelTS
1015 Time series object with data, metadata, and channel response.
1017 Notes
1018 -----
1019 Data is loaded into memory. The resulting ChannelTS object is independent
1020 of the HDF5 file and can be modified without affecting the original dataset.
1022 Examples
1023 --------
1024 >>> ts = channel.to_channel_ts()
1025 >>> print(f"Type: {type(ts)}")
1026 Type: <class 'mth5.timeseries.channel_ts.ChannelTS'>
1027 >>> print(f"Shape: {ts.ts.shape}, Mean: {ts.ts.mean():.2f}")
1028 Shape: (4096,), Mean: 0.15
1030 Process the time series
1032 >>> filtered_ts = ts.low_pass_filter(cutoff=10.0)
1033 >>> detrended_ts = ts.detrend('linear')
1034 >>> ts.plot()
1035 """
1036 # Now that copy() method is robust, we can use direct copying
1037 return ChannelTS(
1038 channel_type=self.metadata.type,
1039 data=self.hdf5_dataset[()],
1040 channel_metadata=self.metadata.copy(),
1041 run_metadata=self.run_metadata.copy(),
1042 station_metadata=self.station_metadata.copy(),
1043 survey_metadata=self.survey_metadata.copy(),
1044 channel_response=self.channel_response,
1045 )
1047 def to_xarray(self) -> xr.DataArray:
1048 """
1049 Convert the dataset to an xarray DataArray with time coordinates.
1051 Returns
1052 -------
1053 xr.DataArray
1054 DataArray with time index and metadata as attributes.
1056 Notes
1057 -----
1058 Data is loaded into memory. Metadata is stored in the attrs dictionary
1059 and will not be validated if modified.
1061 Examples
1062 --------
1063 >>> xr_data = channel.to_xarray()
1064 >>> print(xr_data)
1065 <xarray.DataArray (time: 4096)>
1066 array([0.931, 0.142, ..., 0.882])
1067 Coordinates:
1068 * time (time) datetime64[ns] 1980-01-01 ... 1980-01-01T00:00:15.996
1069 Attributes:
1070 component: Ex
1071 sample_rate: 256.0
1072 ...
1074 Use xarray's powerful selection
1076 >>> morning = xr_data.sel(time=slice('1980-01-01T06:00', '1980-01-01T12:00'))
1077 >>> daily_mean = xr_data.resample(time='1D').mean()
1078 >>> xr_data.plot()
1079 """
1080 return xr.DataArray(
1081 self.hdf5_dataset[()],
1082 coords=[("time", self.time_index)],
1083 attrs=self.metadata.to_dict(single=True),
1084 )
1086 def to_dataframe(self) -> pd.DataFrame:
1087 """
1088 Convert the dataset to a pandas DataFrame with time index.
1090 Returns
1091 -------
1092 pd.DataFrame
1093 DataFrame with 'data' column and time index. Metadata stored in attrs.
1095 Notes
1096 -----
1097 Data is loaded into memory. Metadata is stored in the experimental
1098 attrs attribute and will not be validated if modified.
1100 Examples
1101 --------
1102 >>> df = channel.to_dataframe()
1103 >>> print(df.head())
1104 data
1105 time
1106 1980-01-01 00:00:00 0.931
1107 1980-01-01 00:00:00 0.142
1108 ...
1110 Use pandas operations
1112 >>> df['data'].describe()
1113 >>> df.resample('1H').mean()
1114 >>> df.plot(y='data', figsize=(12, 4))
1116 Access metadata
1118 >>> print(df.attrs['component'])
1119 'Ex'
1120 >>> print(df.attrs['sample_rate'])
1121 256.0
1122 """
1123 df = pd.DataFrame({"data": self.hdf5_dataset[()]}, index=self.time_index)
1124 df.attrs.update(self.metadata.to_dict(single=True))
1126 return df
1128 def to_numpy(self) -> np.recarray:
1129 """
1130 Convert the dataset to a numpy structured array with time and data columns.
1132 Returns
1133 -------
1134 np.recarray
1135 Record array with 'time' and 'channel_data' fields.
1137 Notes
1138 -----
1139 Data is loaded into memory. The 'data' name is avoided as it's a
1140 builtin to numpy.
1142 Examples
1143 --------
1144 >>> arr = channel.to_numpy()
1145 >>> print(arr.dtype.names)
1146 ('time', 'channel_data')
1147 >>> print(arr['time'][0])
1148 1980-01-01T00:00:00.000000000
1149 >>> print(arr['channel_data'].mean())
1150 0.152
1152 Access fields
1154 >>> times = arr['time']
1155 >>> data = arr['channel_data']
1156 >>> import matplotlib.pyplot as plt
1157 >>> plt.plot(times, data)
1158 """
1159 return np.core.records.fromarrays(
1160 [self.time_index.to_numpy(), self.hdf5_dataset[()]],
1161 names="time,channel_data",
1162 )
1164 def from_channel_ts(
1165 self,
1166 channel_ts_obj: ChannelTS,
1167 how: str = "replace",
1168 fill: str | float | int | None = None,
1169 max_gap_seconds: float | int = 1,
1170 fill_window: int = 10,
1171 ) -> None:
1172 """
1173 Populate the dataset from a ChannelTS object.
1175 Parameters
1176 ----------
1177 channel_ts_obj : ChannelTS
1178 Time series object containing data and metadata.
1179 how : {'replace', 'extend'}, optional
1180 Method for adding data:
1182 - 'replace' : Replace entire dataset (default)
1183 - 'extend' : Append/prepend to existing data with gap handling
1185 fill : str, float, int, or None, optional
1186 Gap filling strategy (only used with how='extend'):
1188 - None : Raise error on gaps (default)
1189 - 'mean' : Fill with mean of both datasets
1190 - 'median' : Fill with median of both datasets
1191 - 'nan' : Fill with NaN
1192 - numeric : Fill with constant value
1194 max_gap_seconds : float or int, optional
1195 Maximum allowed gap in seconds. Default is 1.
1196 fill_window : int, optional
1197 Points to use for estimating fill values. Default is 10.
1199 Raises
1200 ------
1201 TypeError
1202 If channel_ts_obj is not a ChannelTS instance.
1203 MTH5Error
1204 If time alignment or metadata validation fails.
1206 Examples
1207 --------
1208 Replace entire dataset
1210 >>> from mth5.timeseries import ChannelTS
1211 >>> import numpy as np
1212 >>> ts = ChannelTS(
1213 ... channel_type='electric',
1214 ... data=np.random.randn(1000),
1215 ... channel_metadata={'electric': {
1216 ... 'component': 'ex',
1217 ... 'sample_rate': 256.0
1218 ... }}
1219 ... )
1220 >>> channel.from_channel_ts(ts, how='replace')
1221 >>> print(channel.n_samples)
1222 1000
1224 Extend existing dataset
1226 >>> new_ts = ChannelTS(
1227 ... channel_type='electric',
1228 ... data=np.random.randn(500),
1229 ... channel_metadata={'electric': {
1230 ... 'component': 'ex',
1231 ... 'sample_rate': 256.0,
1232 ... 'time_period.start': channel.end.isoformat()
1233 ... }}
1234 ... )
1235 >>> channel.from_channel_ts(new_ts, how='extend', fill='median')
1236 >>> print(channel.n_samples)
1237 1500
1238 """
1239 if not isinstance(channel_ts_obj, ChannelTS):
1240 msg = f"Input must be a ChannelTS object not {type(channel_ts_obj)}"
1241 self.logger.error(msg)
1242 raise TypeError(msg)
1243 if how == "replace":
1244 # Get the metadata dict first to avoid deepcopy issues with HDF5 references
1245 # channel_ts_obj.channel_metadata.mth5_type = (
1246 # channel_ts_obj.channel_metadata._class_name
1247 # )
1248 metadata_dict = channel_ts_obj.channel_metadata.to_dict()
1249 metadata_dict[channel_ts_obj.channel_metadata._class_name][
1250 "mth5_type"
1251 ] = channel_ts_obj.channel_metadata._class_name
1252 # metadata_dict[channel_ts_obj.channel_metadata._class_name][
1253 # "hdf5_reference"
1254 # ] = self.hdf5_dataset.ref
1255 # Update metadata with MTH5-specific attributes
1256 self.metadata.from_dict(metadata_dict)
1257 self.metadata.mth5_type = self._class_name
1258 self.metadata.hdf5_reference = self.hdf5_dataset.ref
1259 self.replace_dataset(channel_ts_obj.ts)
1260 # # apparently need to reset these otherwise they get overwritten with None
1261 # self.metadata.hdf5_reference = self.hdf5_dataset.ref
1262 # self.metadata.mth5_type = self._class_name
1263 self.write_metadata()
1264 elif how == "extend":
1265 self.extend_dataset(
1266 channel_ts_obj.ts,
1267 channel_ts_obj.start,
1268 channel_ts_obj.sample_rate,
1269 fill=fill,
1270 )
1271 #
1272 # TODO need to check on metadata.
1274 def from_xarray(
1275 self,
1276 data_array: xr.DataArray,
1277 how: str = "replace",
1278 fill: str | float | int | None = None,
1279 max_gap_seconds: float | int = 1,
1280 fill_window: int = 10,
1281 ) -> None:
1282 """
1283 Populate the dataset from an xarray DataArray.
1285 Parameters
1286 ----------
1287 data_array : xr.DataArray
1288 DataArray with time coordinate and metadata in attrs.
1289 how : {'replace', 'extend'}, optional
1290 Method for adding data:
1292 - 'replace' : Replace entire dataset (default)
1293 - 'extend' : Append/prepend to existing data with gap handling
1295 fill : str, float, int, or None, optional
1296 Gap filling strategy (only used with how='extend'):
1298 - None : Raise error on gaps (default)
1299 - 'mean' : Fill with mean of both datasets
1300 - 'median' : Fill with median of both datasets
1301 - 'nan' : Fill with NaN
1302 - numeric : Fill with constant value
1304 max_gap_seconds : float or int, optional
1305 Maximum allowed gap in seconds. Default is 1.
1306 fill_window : int, optional
1307 Points to use for estimating fill values. Default is 10.
1309 Raises
1310 ------
1311 TypeError
1312 If data_array is not an xarray.DataArray.
1313 MTH5Error
1314 If time alignment fails.
1316 Examples
1317 --------
1318 Replace from xarray
1320 >>> import xarray as xr
1321 >>> import numpy as np
1322 >>> import pandas as pd
1323 >>> time = pd.date_range('2020-01-01', periods=1000, freq='0.004S')
1324 >>> data = xr.DataArray(
1325 ... np.random.randn(1000),
1326 ... coords=[('time', time)],
1327 ... attrs={'component': 'ex', 'sample_rate': 256.0}
1328 ... )
1329 >>> channel.from_xarray(data, how='replace')
1330 >>> print(channel.n_samples)
1331 1000
1333 Extend from xarray with gap
1335 >>> time2 = pd.date_range('2020-01-01T00:00:05', periods=500, freq='0.004S')
1336 >>> data2 = xr.DataArray(np.random.randn(500), coords=[('time', time2)])
1337 >>> channel.from_xarray(data2, how='extend', fill='mean')
1338 """
1339 if not isinstance(data_array, xr.DataArray):
1340 msg = f"Input must be a xarray.DataArray object not {type(data_array)}"
1341 self.logger.error(msg)
1342 raise TypeError(msg)
1343 if how == "replace":
1344 self.metadata.from_dict({self.metadata._class_name: data_array.attrs})
1345 self.replace_dataset(data_array.values)
1346 self.write_metadata()
1347 elif how == "extend":
1348 self.extend_dataset(
1349 data_array.values,
1350 data_array.coords.indexes["time"][0].isoformat(),
1351 1e9 / data_array.coords.indexes["time"][0].freq.nanos,
1352 fill=fill,
1353 )
1354 # TODO need to check on metadata.
1356 def _get_diff_new_array_start(self, start_time: str | MTime) -> float:
1357 """
1358 Calculate time difference between new array start and existing start.
1360 Parameters
1361 ----------
1362 start_time : str or MTime
1363 Start time of the new array.
1365 Returns
1366 -------
1367 float
1368 Time difference in seconds (new_start - existing_start).
1369 Positive means new starts later, negative means new starts earlier.
1371 Examples
1372 --------
1373 >>> diff = channel._get_diff_new_array_start('1980-01-01T00:05:00')
1374 >>> print(f"New data starts {diff} seconds after existing")
1375 New data starts 300.0 seconds after existing
1376 """
1377 if not isinstance(start_time, MTime):
1378 start_time = MTime(time_stamp=start_time)
1379 t_diff = 0
1380 if start_time != self.start:
1381 t_diff = start_time - self.start
1382 return t_diff
1384 def _get_diff_new_array_end(self, end_time: str | MTime) -> float:
1385 """
1386 Calculate time difference between new array end and existing end.
1388 Parameters
1389 ----------
1390 end_time : str or MTime
1391 End time of the new array.
1393 Returns
1394 -------
1395 float
1396 Time difference in seconds (new_end - existing_end).
1397 Positive means new ends later, negative means new ends earlier.
1399 Examples
1400 --------
1401 >>> diff = channel._get_diff_new_array_end('1980-01-01T00:05:00')
1402 >>> print(f"Difference: {diff} seconds")
1403 Difference: -3300.0 seconds
1404 """
1405 if not isinstance(end_time, MTime):
1406 end_time = MTime(time_stamp=end_time)
1407 t_diff = 0
1408 if end_time != self.end:
1409 t_diff = end_time - self.end
1410 return t_diff
1412 @property
1413 def channel_entry(self) -> np.ndarray:
1414 """
1415 Create a structured array entry for channel summary tables.
1417 Returns
1418 -------
1419 np.ndarray
1420 Structured array with dtype=CHANNEL_DTYPE containing channel metadata
1421 and HDF5 references for survey-wide summaries.
1423 Notes
1424 -----
1425 This entry includes survey ID, station ID, run ID, location, component,
1426 time period, sample rate, and HDF5 references for navigation.
1428 Examples
1429 --------
1430 >>> entry = channel.channel_entry
1431 >>> print(entry['component'][0])
1432 'Ex'
1433 >>> print(entry['sample_rate'][0])
1434 256.0
1435 >>> print(entry['station'][0])
1436 'MT001'
1437 """
1438 return np.array(
1439 [
1440 (
1441 self.survey_id,
1442 self.hdf5_dataset.parent.parent.attrs["id"],
1443 self.hdf5_dataset.parent.attrs["id"],
1444 self.hdf5_dataset.parent.parent.attrs["location.latitude"],
1445 self.hdf5_dataset.parent.parent.attrs["location.longitude"],
1446 self.hdf5_dataset.parent.parent.attrs["location.elevation"],
1447 self.metadata.component,
1448 self.metadata.time_period.start,
1449 self.metadata.time_period.end,
1450 self.hdf5_dataset.size,
1451 self.metadata.sample_rate,
1452 self.metadata.type,
1453 self.metadata.measurement_azimuth,
1454 self.metadata.measurement_tilt,
1455 self.metadata.units,
1456 self.hdf5_dataset.ref,
1457 self.hdf5_dataset.parent.ref,
1458 self.hdf5_dataset.parent.parent.ref,
1459 )
1460 ],
1461 dtype=CHANNEL_DTYPE,
1462 )
1464 def time_slice(
1465 self,
1466 start: str | MTime,
1467 end: str | MTime | None = None,
1468 n_samples: int | None = None,
1469 return_type: str = "channel_ts",
1470 ) -> ChannelTS | xr.DataArray | pd.DataFrame | np.ndarray:
1471 """
1472 Extract a time slice from the channel dataset.
1474 Parameters
1475 ----------
1476 start : str or MTime
1477 Start time of the slice in UTC.
1478 end : str or MTime, optional
1479 End time of the slice. Mutually exclusive with n_samples.
1480 n_samples : int, optional
1481 Number of samples to extract. Mutually exclusive with end.
1482 return_type : {'channel_ts', 'xarray', 'pandas', 'numpy'}, optional
1483 Format for returned data. Default is 'channel_ts'.
1485 Returns
1486 -------
1487 ChannelTS or xr.DataArray or pd.DataFrame or np.ndarray
1488 Time slice in the requested format with appropriate metadata.
1490 Raises
1491 ------
1492 ValueError
1493 If both end and n_samples are provided or neither is provided.
1495 Notes
1496 -----
1497 - If the requested slice extends beyond available data, it will be
1498 automatically truncated with a warning.
1499 - Regional HDF5 references are used when possible for efficiency.
1501 Examples
1502 --------
1503 Extract by number of samples
1505 >>> ex = mth5_obj.get_channel('FL001', 'FL001a', 'Ex')
1506 >>> ex_slice = ex.time_slice(\"2015-01-08T19:49:15\", n_samples=4096)
1507 >>> print(type(ex_slice))
1508 <class 'mth5.timeseries.channel_ts.ChannelTS'>
1509 >>> print(f\"Slice shape: {ex_slice.ts.shape}\")\n Slice shape: (4096,)
1510 >>> ex_slice.plot()
1512 Extract by time range
1514 >>> ex_slice = ex.time_slice(\n ... \"2015-01-08T19:49:15\",
1515 ... end=\"2015-01-08T20:49:15\"\n ... )
1516 >>> print(f\"Duration: {ex_slice.end - ex_slice.start} seconds\")
1517 Duration: 3600.0 seconds
1519 Return as xarray for analysis
1521 >>> xr_slice = ex.time_slice(\n ... \"2015-01-08T19:49:15\",
1522 ... n_samples=1000,
1523 ... return_type='xarray'\n ... )
1524 >>> print(xr_slice.mean().values)
1525 0.152
1526 >>> xr_slice.plot()
1528 Return as pandas for tabular ops
1530 >>> df_slice = ex.time_slice(\n ... \"2015-01-08T19:49:15\",
1531 ... n_samples=500,
1532 ... return_type='pandas'\n ... )
1533 >>> df_slice['data'].describe()
1534 >>> df_slice.resample('10S').mean()
1536 Return as numpy for computation
1538 >>> np_slice = ex.time_slice(\n ... \"2015-01-08T19:49:15\",
1539 ... n_samples=100,
1540 ... return_type='numpy'\n ... )
1541 >>> np.fft.fft(np_slice)
1543 """
1545 start_index, end_index, npts = self._get_slice_index_values(
1546 start, end, n_samples
1547 )
1548 if npts > self.hdf5_dataset.size or end_index > self.hdf5_dataset.size:
1549 # leave as +1 to be inclusive
1550 end_index = self.hdf5_dataset.size
1551 msg = (
1552 "Requested slice is larger than data. "
1553 f"Slice length = {npts}, data length = {self.hdf5_dataset.shape}. "
1554 f"Setting end_index to {end_index}"
1555 )
1557 self.logger.warning(msg)
1558 if start_index < 0:
1559 # leave as +1 to be inclusive
1560 start_index = 0
1562 msg = (
1563 f"Requested start {start} is before data start {self.start}. "
1564 f"Setting start_index to {start_index} and start to {self.start}"
1565 )
1567 self.logger.warning(msg)
1568 start = self.start
1570 # create a regional reference that can be used
1571 try:
1572 regional_ref = self.hdf5_dataset.regionref[start_index:end_index]
1573 except (OSError, RuntimeError):
1574 self.logger.debug(
1575 "file is in read mode cannot set an internal reference, using index values"
1576 )
1577 regional_ref = slice(start_index, end_index)
1578 dt_index = make_dt_coordinates(start, self.sample_rate, npts)
1580 meta_dict = self.metadata.to_dict()[self.metadata._class_name]
1581 meta_dict["time_period.start"] = dt_index[0].isoformat()
1582 meta_dict["time_period.end"] = dt_index[-1].isoformat()
1584 data = None
1585 if return_type == "xarray":
1586 # need the +1 to be inclusive of the last point
1587 data = xr.DataArray(
1588 self.hdf5_dataset[regional_ref], coords=[("time", dt_index)]
1589 )
1590 data.attrs.update(meta_dict)
1591 elif return_type == "pandas":
1592 data = pd.DataFrame(
1593 {"data": self.hdf5_dataset[regional_ref]}, index=dt_index
1594 )
1595 data.attrs.update(meta_dict)
1596 elif return_type == "numpy":
1597 data = self.hdf5_dataset[regional_ref]
1598 elif return_type == "channel_ts":
1599 data = ChannelTS(
1600 self.metadata.type,
1601 data=self.hdf5_dataset[regional_ref],
1602 survey_metadata=self.survey_metadata.copy(),
1603 station_metadata=self.station_metadata.copy(),
1604 run_metadata=self.run_metadata.copy(),
1605 channel_metadata={self.metadata.type: meta_dict},
1606 channel_response=self.channel_response,
1607 )
1608 else:
1609 msg = "return_type not understood, must be [ pandas | numpy | channel_ts ]"
1610 self.logger.error(msg)
1611 raise ValueError(msg)
1612 return data
1614 def get_index_from_time(self, given_time: str | MTime) -> int:
1615 """
1616 Calculate the array index for a given time.
1618 Parameters
1619 ----------
1620 given_time : str or MTime
1621 Time to convert to index.
1623 Returns
1624 -------
1625 int
1626 Array index corresponding to the given time.
1628 Notes
1629 -----
1630 Index is calculated as: (time - start_time) * sample_rate
1631 and rounded to nearest integer.
1633 Examples
1634 --------
1635 >>> idx = channel.get_index_from_time('1980-01-01T00:00:10')
1636 >>> print(f"Index for 10 seconds: {idx}")
1637 Index for 10 seconds: 2560
1638 >>> # With 256 Hz sample rate: 10 * 256 = 2560
1640 >>> start_idx = channel.get_index_from_time(channel.start)
1641 >>> print(start_idx)
1642 0
1643 """
1644 if not isinstance(given_time, MTime):
1645 given_time = MTime(time_stamp=given_time)
1646 index = (
1647 given_time - self.metadata.time_period.start
1648 ) * self.metadata.sample_rate
1650 return int(round(index))
1652 def get_index_from_end_time(self, given_time: str | MTime) -> int:
1653 """
1654 Get the end index value (inclusive) for a given time.
1656 Parameters
1657 ----------
1658 given_time : str or MTime
1659 Time to convert to end index.
1661 Returns
1662 -------
1663 int
1664 Array index + 1 for inclusive slicing.
1666 Notes
1667 -----
1668 Adds 1 to the calculated index to make it suitable for
1669 inclusive end slicing (e.g., array[start:end]).
1671 Examples
1672 --------
1673 >>> end_idx = channel.get_index_from_end_time('1980-01-01T00:00:10')
1674 >>> data_slice = channel.hdf5_dataset[0:end_idx]
1675 >>> # Includes sample at exactly 10 seconds
1676 """
1677 return self.get_index_from_time(given_time) + 1
1679 def _get_slice_index_values(
1680 self,
1681 start: str | MTime,
1682 end: str | MTime | None = None,
1683 n_samples: int | None = None,
1684 ) -> tuple[int, int, int]:
1685 """
1686 Calculate start index, end index, and number of points for a time slice.
1688 Parameters
1689 ----------
1690 start : str or MTime
1691 Start time of the slice.
1692 end : str or MTime, optional
1693 End time of the slice. Mutually exclusive with n_samples.
1694 n_samples : int, optional
1695 Number of samples. Mutually exclusive with end.
1697 Returns
1698 -------
1699 tuple of (int, int, int)
1700 (start_index, end_index, n_points) for array slicing.
1702 Raises
1703 ------
1704 ValueError
1705 If both end and n_samples are provided or neither is provided.
1707 Examples
1708 --------
1709 >>> start_idx, end_idx, npts = channel._get_slice_index_values(
1710 ... '1980-01-01T00:00:05',
1711 ... n_samples=1000
1712 ... )
1713 >>> print(f"Indices: {start_idx} to {end_idx}, {npts} points")
1714 Indices: 1280 to 2280, 1000 points
1716 >>> start_idx, end_idx, npts = channel._get_slice_index_values(
1717 ... '1980-01-01T00:00:00',
1718 ... end='1980-01-01T00:00:10'
1719 ... )
1720 >>> print(f"10 second slice: {npts} points")
1721 10 second slice: 2560 points
1722 """
1723 start = MTime(time_stamp=start)
1724 if end is not None:
1725 end = MTime(time_stamp=end)
1726 if n_samples is not None:
1727 n_samples = int(n_samples)
1729 if n_samples is None and end is None:
1730 msg = "Must input either end_time or n_samples."
1731 self.logger.error(msg)
1732 raise ValueError(msg)
1734 if n_samples is not None and end is not None:
1735 msg = "Must input either end_time or n_samples, not both."
1736 self.logger.error(msg)
1737 raise ValueError(msg)
1739 # if end time is given
1740 if end is not None and n_samples is None:
1741 start_index = self.get_index_from_time(start)
1742 end_index = self.get_index_from_end_time(end)
1743 npts = int(end_index - start_index)
1744 # if n_samples are given
1745 elif end is None and n_samples is not None:
1746 start_index = self.get_index_from_time(start)
1747 # leave as +1 to be inclusive
1748 end_index = start_index + n_samples
1749 npts = n_samples
1751 return start_index, end_index, npts
1754@inherit_doc_string
1755class ElectricDataset(ChannelDataset):
1756 """
1757 Specialized container for electric field channel data.
1759 Inherits all functionality from ChannelDataset with electric field
1760 specific metadata handling.
1762 Parameters
1763 ----------
1764 group : h5py.Dataset
1765 HDF5 dataset containing electric field data.
1766 **kwargs : dict
1767 Additional keyword arguments passed to ChannelDataset.
1769 Examples
1770 --------
1771 >>> ex_dataset = run_group.get_channel('Ex')
1772 >>> print(type(ex_dataset))
1773 <class 'mth5.groups.channel_dataset.ElectricDataset'>
1774 >>> print(ex_dataset.metadata.type)
1775 'electric'
1776 >>> print(ex_dataset.metadata.units)
1777 'mV/km'
1778 """
1780 def __init__(self, group: h5py.Dataset, **kwargs: Any) -> None:
1781 super().__init__(group, **kwargs)
1784@inherit_doc_string
1785class MagneticDataset(ChannelDataset):
1786 """
1787 Specialized container for magnetic field channel data.
1789 Inherits all functionality from ChannelDataset with magnetic field
1790 specific metadata handling.
1792 Parameters
1793 ----------
1794 group : h5py.Dataset
1795 HDF5 dataset containing magnetic field data.
1796 **kwargs : dict
1797 Additional keyword arguments passed to ChannelDataset.
1799 Examples
1800 --------
1801 >>> hx_dataset = run_group.get_channel('Hx')
1802 >>> print(type(hx_dataset))
1803 <class 'mth5.groups.channel_dataset.MagneticDataset'>
1804 >>> print(hx_dataset.metadata.type)
1805 'magnetic'
1806 >>> print(hx_dataset.metadata.units)
1807 'nT'
1808 """
1810 def __init__(self, group: h5py.Dataset, **kwargs: Any) -> None:
1811 super().__init__(group, **kwargs)
1814@inherit_doc_string
1815class AuxiliaryDataset(ChannelDataset):
1816 """
1817 Specialized container for auxiliary channel data.
1819 Inherits all functionality from ChannelDataset with auxiliary channel
1820 specific metadata handling. Used for temperature, battery voltage, etc.
1822 Parameters
1823 ----------
1824 group : h5py.Dataset
1825 HDF5 dataset containing auxiliary data.
1826 **kwargs : dict
1827 Additional keyword arguments passed to ChannelDataset.
1829 Examples
1830 --------
1831 >>> temp_dataset = run_group.get_channel('Temperature')
1832 >>> print(type(temp_dataset))
1833 <class 'mth5.groups.channel_dataset.AuxiliaryDataset'>
1834 >>> print(temp_dataset.metadata.type)
1835 'auxiliary'
1836 >>> print(temp_dataset.metadata.units)
1837 'celsius'
1838 """
1840 def __init__(self, group: h5py.Dataset, **kwargs: Any) -> None:
1841 super().__init__(group, **kwargs)