Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ channel.py: 67%
286 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# =====================================================
2# Imports
3# =====================================================
5from collections import OrderedDict
6from typing import Annotated
8import numpy as np
9from loguru import logger
10from pydantic import (
11 AliasChoices,
12 computed_field,
13 Field,
14 field_validator,
15 PrivateAttr,
16 ValidationInfo,
17)
19from mt_metadata import NULL_VALUES
20from mt_metadata.base import helpers, MetadataBase
21from mt_metadata.common import (
22 BasicLocation,
23 Comment,
24 DataQuality,
25 Fdsn,
26 Instrument,
27 TimePeriod,
28)
29from mt_metadata.common.units import get_unit_object, Unit
30from mt_metadata.timeseries import AppliedFilter
31from mt_metadata.timeseries.filters import ChannelResponse
32from mt_metadata.utils.exceptions import MTSchemaError
33from mt_metadata.utils.validators import validate_name
36# =====================================================
39# this is a channel base for channels that have multiple sensors and locations like an
40# electric dipole.
41class ChannelBase(MetadataBase):
42 _channel_type: str = PrivateAttr("base")
43 channel_number: Annotated[
44 int,
45 Field(
46 default=0,
47 description="Channel number on the data logger.",
48 alias=None,
49 json_schema_extra={
50 "units": None,
51 "required": True,
52 "examples": ["1"],
53 },
54 ),
55 ]
57 channel_id: Annotated[
58 str | None,
59 Field(
60 default=None,
61 description="channel id given by the user or data logger",
62 alias=None,
63 json_schema_extra={
64 "units": None,
65 "required": False,
66 "examples": ["1001.11"],
67 },
68 ),
69 ]
71 comments: Annotated[
72 Comment,
73 Field(
74 default_factory=Comment,
75 description="Any comments about the channel.",
76 alias=None,
77 json_schema_extra={
78 "units": None,
79 "required": False,
80 "examples": ["ambient air temperature was chilly, ice on cables"],
81 },
82 ),
83 ]
85 component: Annotated[
86 str,
87 Field(
88 default="auxiliary_default",
89 description="Name of the component measured, can be uppercase and/or lowercase. For now electric channels should start with an 'e' and magnetic channels start with an 'h', followed by the component. If there are multiples of the same channel the name could include an integer. {type}{component}{number} --> Ex01.",
90 alias=None,
91 pattern=r"\w+", # At least one word character
92 json_schema_extra={
93 "units": None,
94 "required": True,
95 "examples": ["ex"],
96 },
97 ),
98 ]
100 measurement_azimuth: Annotated[
101 float,
102 Field(
103 default=0.0,
104 description="Horizontal azimuth of the channel in measurement coordinate system spcified in station.orientation.reference_frame. Default reference frame is a geographic right-handed coordinate system with north=0, east=90, vertical=+ downward.",
105 validation_alias=AliasChoices("measurement_azimuth", "azimuth"),
106 json_schema_extra={
107 "units": "degrees",
108 "required": True,
109 "examples": [0.0],
110 },
111 ),
112 ]
114 measurement_tilt: Annotated[
115 float,
116 Field(
117 default=0.0,
118 description="Vertical tilt of the channel in measurement coordinate system specified in station.orientation.reference_frame. Default reference frame is a geographic right-handed coordinate system with north=0, east=90, vertical=+ downward.",
119 validation_alias=AliasChoices("measurement_tilt", "dip"),
120 json_schema_extra={
121 "units": "degrees",
122 "required": True,
123 "examples": [0],
124 },
125 ),
126 ]
128 sample_rate: Annotated[
129 float,
130 Field(
131 default=0.0,
132 description="Digital sample rate",
133 validation_alias=AliasChoices("sample_rate", "sampling_rate"),
134 json_schema_extra={
135 "units": "samples per second",
136 "required": True,
137 "examples": [8.0],
138 },
139 ),
140 ]
142 translated_azimuth: Annotated[
143 float | None,
144 Field(
145 default=None,
146 description="Horizontal azimuth of the channel in translated coordinate system, this should only be used for derived product. For instance if you collected your data in geomagnetic coordinates and then translated them to geographic coordinates you would set measurement_azimuth=0, translated_azimuth=-12.5 for a declination angle of N12.5E.",
147 alias=None,
148 json_schema_extra={
149 "units": "degrees",
150 "required": False,
151 "examples": [0.0],
152 },
153 ),
154 ]
156 translated_tilt: Annotated[
157 float | None,
158 Field(
159 default=None,
160 description="Tilt of channel in translated coordinate system, this should only be used for derived product. For instance if you collected your data using a tripod you would set measurement_tilt=45, translated_tilt=0 for a vertical component.",
161 alias=None,
162 json_schema_extra={
163 "units": "degrees",
164 "required": False,
165 "examples": [0.0],
166 },
167 ),
168 ]
170 type: Annotated[
171 str,
172 Field(
173 default="base",
174 description="Data type for the channel, should be a descriptive word that a user can understand.",
175 alias=None,
176 json_schema_extra={
177 "units": None,
178 "required": True,
179 "examples": ["temperature"],
180 },
181 ),
182 ]
184 units: Annotated[
185 str,
186 Field(
187 default="",
188 description="Units of the data, should be in SI units and represented as the full name of the unit all lowercase. If a complex unit use 'per' and '-'.",
189 alias=None,
190 json_schema_extra={
191 "units": None,
192 "required": True,
193 "examples": ["celsius"],
194 },
195 ),
196 ]
198 data_quality: Annotated[
199 DataQuality,
200 Field(
201 default_factory=DataQuality,
202 description="Data quality for the channel.",
203 alias=None,
204 json_schema_extra={
205 "units": None,
206 "required": False,
207 "examples": ["DataQuality()"],
208 },
209 ),
210 ]
212 filters: Annotated[
213 list[AppliedFilter],
214 Field(
215 default_factory=list,
216 description="Filter data for the channel.",
217 alias=None,
218 json_schema_extra={
219 "units": None,
220 "required": True,
221 "examples": [
222 "AppliedFilter(name='filter_name', applied=True, stage=1)"
223 ],
224 },
225 ),
226 ]
228 time_period: Annotated[
229 TimePeriod,
230 Field(
231 default_factory=TimePeriod,
232 description="Time period for the channel.",
233 alias=None,
234 json_schema_extra={
235 "units": None,
236 "required": False,
237 "examples": ["TimePeriod(start='2020-01-01', end='2020-12-31')"],
238 },
239 ),
240 ]
242 fdsn: Annotated[
243 Fdsn,
244 Field(
245 default_factory=Fdsn,
246 description="FDSN information for the channel.",
247 alias=None,
248 json_schema_extra={
249 "units": None,
250 "required": False,
251 "examples": ["Fdsn()"],
252 },
253 ),
254 ]
256 @field_validator("component", mode="before")
257 @classmethod
258 def validate_component(cls, value: str) -> str:
259 """make sure the value is all lower case"""
260 if not isinstance(value, str):
261 raise TypeError(f"Component must be a string not {type(value)}")
263 return value.lower()
265 @field_validator("comments", mode="before")
266 @classmethod
267 def validate_comments(cls, value, info: ValidationInfo) -> Comment:
268 """
269 Validate that the value is a valid comment.
270 """
271 if isinstance(value, (str, list)):
272 return Comment(value=value)
273 return value
275 @field_validator("units", mode="before")
276 @classmethod
277 def validate_units(cls, value: str, info: ValidationInfo) -> str:
278 """
279 validate units base on input string will return the long name
281 Parameters
282 ----------
283 value : units string
284 unit string separated by either '/' for division or ' ' for
285 multiplication. Or 'per' and ' ', respectively
286 info : ValidationInfo
287 _description_
289 Returns
290 -------
291 str
292 return the long descriptive name of the unit. For example 'kilometers'.
293 """
294 if value in [None, ""]:
295 return ""
296 try:
297 unit_object = get_unit_object(value)
298 return unit_object.name
299 except ValueError as error:
300 raise KeyError(error)
301 except KeyError as error:
302 raise KeyError(error)
304 @field_validator("type", mode="before")
305 @classmethod
306 def validate_type(cls, value, info: ValidationInfo) -> str:
307 """
308 Validate that the type channel
309 """
310 # Get the expected filter type based on the actual class
311 # Make sure derived classes define their own _filter_type as class variable
312 expected_type = getattr(cls, "_channel_type", "base").default
314 if value != expected_type:
315 logger.warning(
316 f"Channel type is set to {value}, but should be "
317 f"{expected_type} for {cls.__name__}."
318 )
319 return expected_type
321 @field_validator("filters", mode="before")
322 @classmethod
323 def parse_filters_string(cls, value):
324 """Parse string representation of filters into list of AppliedFilter objects"""
325 if isinstance(value, str):
326 import ast
327 import json
328 import re
329 from collections import OrderedDict
331 # Handle string representation of list of dicts
332 if value.strip().startswith("[") and value.strip().endswith("]"):
333 try:
334 # First try json.loads for JSON-style strings (handles true/false/null)
335 parsed_data = json.loads(value)
336 except (ValueError, json.JSONDecodeError):
337 try:
338 # Fall back to ast.literal_eval for Python-style literals
339 parsed_data = ast.literal_eval(value)
340 except (ValueError, SyntaxError):
341 try:
342 # Handle OrderedDict function calls with eval and safe namespace
343 # Clean up newlines between dictionary items by adding commas
344 cleaned = re.sub(r"}\s+\{", "}, {", value)
346 # Use eval with a restricted namespace
347 safe_namespace = {
348 "__builtins__": {},
349 "OrderedDict": OrderedDict,
350 "True": True,
351 "False": False,
352 "None": None,
353 "true": True, # JSON-style booleans
354 "false": False,
355 "null": None,
356 }
358 parsed_data = eval(cleaned, safe_namespace)
359 except Exception as e:
360 logger.warning(f"Failed to parse filters string: {e}")
361 return []
363 # Convert to AppliedFilter objects
364 filters = []
365 for item in parsed_data:
366 if isinstance(item, dict) and "applied_filter" in item:
367 filter_data = item["applied_filter"]
368 # Convert OrderedDict to regular dict if needed
369 if hasattr(filter_data, "items"):
370 filter_data = dict(filter_data)
371 filters.append(AppliedFilter(**filter_data))
372 elif isinstance(item, dict):
373 # Direct dict representation
374 filters.append(AppliedFilter(**item))
376 return filters
378 # Handle single filter string representations
379 elif value.strip():
380 logger.warning(f"Unknown filter string format: {value}")
381 return []
382 elif isinstance(value, list):
383 # Assume list of AppliedFilter objects or dicts
384 filters = []
385 for item in value:
386 if isinstance(item, AppliedFilter):
387 filters.append(item)
388 elif isinstance(item, dict):
389 filters.append(AppliedFilter(**item))
390 else:
391 logger.warning(f"Unknown filter list item type: {type(item)}")
392 return filters
394 return value
396 @field_validator("filters", mode="after")
397 @classmethod
398 def validate_filters(cls, value, info):
399 """sort the filters by stage number and check for duplicates"""
400 # Get the instance being validated
401 instance = info.data if hasattr(info, "data") else None
403 # Sort filters by stage number, treating None as 0
404 value.sort(key=lambda f: f.stage if f.stage is not None else 0)
406 # TEMPORARILY DISABLED: Check for duplicates
407 # There's a known issue in MTH5 serialization that causes filter duplication
408 # This will be re-enabled once the MTH5 duplication bug is fixed
409 # TODO: Re-enable duplicate validation after fixing MTH5 filter duplication
410 # seen = set()
411 # for f in value:
412 # if f.name in seen:
413 # raise ValueError(f"Duplicate filter found: {f.name}")
414 # seen.add(f.name)
416 return value
418 def add_filter(
419 self,
420 applied_filter: AppliedFilter | None = None,
421 name: str | None = None,
422 applied: bool = True,
423 stage: int | None = None,
424 comments: Comment | str | None = None,
425 ) -> None:
426 """
427 Add a filter to the filter list.
429 Parameters
430 ----------
431 name : str
432 Name of the filter.
433 applied : bool, optional
434 Whether the filter has been applied, by default True.
435 stage : int | None, optional
436 Stage of the filter in the processing chain, by default None.
437 """
438 if applied_filter is not None:
439 if not isinstance(applied_filter, AppliedFilter):
440 raise TypeError("applied_filter must be an instance of AppliedFilter")
442 # Check if filter with this name already exists
443 if any(f.name == applied_filter.name for f in self.filters):
444 logger.debug(
445 f"Filter '{applied_filter.name}' already exists, skipping duplicate"
446 )
447 return
449 if applied_filter.stage is None:
450 applied_filter.stage = len(self.filters) + 1
451 self.filters.append(applied_filter)
452 else:
453 if name is None:
454 raise ValueError("name must be provided if applied_filter is None")
455 if not isinstance(name, str):
456 raise TypeError("name must be a string")
458 # Check if filter with this name already exists
459 if any(f.name == name for f in self.filters):
460 logger.debug(f"Filter '{name}' already exists, skipping duplicate")
461 return
463 if stage is None:
464 stage = len(self.filters) + 1
466 # Build kwargs for AppliedFilter, excluding None comments to use default
467 filter_kwargs = {"name": name, "applied": applied, "stage": stage}
468 if comments is not None:
469 filter_kwargs["comments"] = comments
471 self.filters.append(AppliedFilter(**filter_kwargs))
473 # Sort filters and validate for duplicates
474 self._sort_filters()
475 # Note: Skipping duplicate validation since we already check above
477 @computed_field
478 @property
479 def filter_names(self) -> list[str]:
480 """
481 List of filter names applied to the channel.
483 Returns
484 -------
485 list[str]
486 List of filter names.
487 """
488 return [f.name for f in self.filters]
490 def remove_filter(self, name: str, reset_stages: bool = True) -> None:
491 """
492 Remove a filter from the filter list.
494 Parameters
495 ----------
496 name : str
497 Name of the filter to remove.
498 reset_stages : bool, optional
499 Whether to reset the stages of the remaining filters, by default True.
500 """
502 new_list = []
503 for f in self.filters:
504 if f.name == name:
505 continue
506 if reset_stages:
507 f.stage = len(new_list) + 1
508 new_list.append(f)
509 self.filters = new_list
511 def get_filter(self, name: str) -> AppliedFilter | None:
512 """
513 Get a filter from the filter list by name.
515 Parameters
516 ----------
517 name : str
518 Name of the filter to get.
520 Returns
521 -------
522 AppliedFilter | None
523 The filter with the given name, or None if not found.
524 """
525 for f in self.filters:
526 if f.name == name:
527 return f
528 logger.warning(f"Could not find filter {name} in channel filters")
529 return None
531 def _sort_filters(self) -> None:
532 """
533 Sort the list of filters applied to the channel by stage number.
535 Returns
536 -------
537 None
538 """
539 # Sort filters by stage number, treating None as 0
540 self.filters.sort(key=lambda f: f.stage if f.stage is not None else 0)
542 def _validate_no_duplicates(self) -> None:
543 """
544 Check for duplicate filter names and raise an error if found.
546 Returns
547 -------
548 None
550 Raises
551 ------
552 ValueError
553 If duplicate filter names are found.
554 """
555 seen = set()
556 for f in self.filters:
557 if f.name in seen:
558 raise ValueError(f"Duplicate filter found: {f.name}")
559 seen.add(f.name)
561 def channel_response(self, filters_dict):
562 """
563 full channel response from a dictionary of filter objects
564 """
566 mt_filter_list = []
567 for applied_filter in self.filters:
568 try:
569 mt_filter = filters_dict[applied_filter.name]
570 mt_filter_list.append(mt_filter)
571 except KeyError:
572 msg = f"Could not find {applied_filter.name} in filters dictionary, skipping"
573 logger.error(msg)
574 continue
575 # compute instrument sensitivity and units in/out
576 return ChannelResponse(filters_list=mt_filter_list)
578 @property
579 def unit_object(self) -> Unit:
580 """
581 Some channels have a unit object that is used to convert between units.
582 This is a property that returns the unit object for the channel.
583 The unit object is created using the units attribute of the channel.
584 The unit object is used to convert between units and to get the unit
586 Returns
587 -------
588 Unit
589 BaseModel object with unit attributes
590 """
591 return get_unit_object(self.units)
593 def _validate_filtered_applied(
594 self, applied: list | np.typing.NDArray | str | None
595 ) -> list:
596 applied_values = _applied_values_map(treat_null_values_as=False)
597 # the returned type from a hdf5 dataset is a numpy array.
598 if isinstance(applied, np.ndarray):
599 return applied.tolist()
601 # sets an empty list to one default value
602 if isinstance(applied, list) and len(applied) == 0:
603 return []
605 # Handle string case
606 if isinstance(applied, str):
607 # Handle simple strings
608 if applied in applied_values.keys():
609 return [
610 applied_values[applied],
611 ]
613 # Handle string-lists (e.g. from json)
614 if applied.find("[") >= 0:
615 applied = applied.replace("[", "").replace("]", "")
616 if applied.count(",") > 0:
617 return [ss.strip().lower() for ss in applied.split(",")]
618 else:
619 return [ss.lower() for ss in applied.split()]
620 elif isinstance(applied, list):
621 return applied
622 elif isinstance(applied, tuple):
623 return list(applied)
624 else:
625 msg = f"Input applied cannot be of type {type(applied)}"
626 logger.error(msg)
627 raise MTSchemaError(msg)
629 def _validate_filtered_name(
630 self, names: list | np.typing.NDArray | str | None
631 ) -> list:
632 if names is None:
633 return []
635 if isinstance(names, str):
636 return [ss.strip().lower() for ss in names.split(",")]
637 elif isinstance(names, list):
638 return [ss.strip().lower() for ss in names]
639 elif isinstance(names, np.ndarray):
640 names = names.astype(np.str_)
641 return [ss.strip().lower() for ss in names]
642 else:
643 msg = "names must be a string or list of strings not {0}, type {1}"
644 logger.error(msg.format(names, type(names)))
645 raise MTSchemaError(msg.format(names, type(names)))
647 def _find_filter_keys(self, meta_dict: dict) -> str | None:
648 """
649 Search for filter-related keys in the meta_dict.
651 Parameters
652 ----------
653 meta_dict : dict
654 Dictionary to search for filter keys.
656 Returns
657 -------
658 str | None
659 Returns 'filter' if any keys have 'filter' as base (before '.'),
660 'filtered' if any keys have 'filtered' as base (before '.'),
661 or None if no filter-related keys are found.
662 """
663 keys = list(meta_dict.keys())
665 for key in keys:
666 # Split by '.' and check the base key
667 base_key = key.split(".")[0]
669 # Check for 'filter' base (legacy format)
670 if base_key == "filter":
671 return "filter"
673 # Check for 'filtered' base (old format)
674 if base_key == "filtered":
675 return "filtered"
677 return None
679 def from_dict(self, meta_dict: dict, skip_none: bool = False) -> None:
680 """
681 fill attributes from a dictionary but need to make it
682 backwards compatible with accepting filtered.applied and
683 filtered.name as lists.
685 Parameters
686 ----------
687 meta_dict : dict
688 dictionary of attributes to set.
689 skip_none : bool, optional
690 If True, skip attributes with None values, by default False.
692 Raises
693 -------
694 MTSchemaError
695 If the input dictionary is not valid.
697 """
698 if not isinstance(meta_dict, (dict, OrderedDict)):
699 msg = f"Input must be a dictionary not {type(meta_dict)}"
700 logger.error(msg)
701 raise MTSchemaError(msg)
702 keys = list(meta_dict.keys())
703 if len(keys) == 1:
704 if isinstance(meta_dict[keys[0]], (dict, OrderedDict)):
705 class_name = keys[0]
706 if class_name.lower() != validate_name(self.__class__.__name__):
707 msg = (
708 "name of input dictionary is not the same as class type "
709 f"input = {class_name}, class type = {self.__class__.__name__}"
710 )
711 logger.debug(msg, class_name, self.__class__.__name__)
712 meta_dict = helpers.flatten_dict(meta_dict[class_name])
713 else:
714 meta_dict = helpers.flatten_dict(meta_dict)
716 else:
717 logger.debug(
718 f"Assuming input dictionary is of type {self.__class__.__name__}",
719 )
720 meta_dict = helpers.flatten_dict(meta_dict)
722 # Use helper method to detect filter format
723 filter_format = self._find_filter_keys(meta_dict)
725 # Handle different filter formats based on detection
726 if filter_format == "filtered":
727 # Handle old format filters using f-string formatting
728 old_format_applied = meta_dict.pop(f"{filter_format}.applied", None)
729 old_format_names = meta_dict.pop(f"{filter_format}.name", None)
731 if old_format_applied is not None and old_format_names is not None:
732 filter_applied = self._validate_filtered_applied(old_format_applied)
733 filter_name = self._validate_filtered_name(old_format_names)
734 if filter_applied and filter_name:
735 logger.warning(
736 f"{filter_format}.applied and {filter_format}.name are deprecated, use filters as a list of AppliedFilter objects instead"
737 )
738 if len(filter_applied) != len(filter_name):
739 msg = (
740 f"{filter_format}.applied and {filter_format}.name must be the same length, "
741 f"got {len(filter_applied)} and {len(filter_name)}"
742 )
743 logger.error(msg)
744 raise MTSchemaError(msg)
745 for name, applied in zip(filter_name, filter_applied):
746 self.add_filter(name=name, applied=applied)
748 elif filter_format == "filter":
749 # Handle legacy single 'filter' attribute - just remove and warn
750 legacy_filter = meta_dict.pop(filter_format, None)
751 if legacy_filter is not None:
752 logger.warning(
753 f"The '{filter_format}' attribute is deprecated and will be ignored. Use 'filters' as a list of AppliedFilter objects instead."
754 )
756 # Handle new format filters separately to combine with old format
757 new_format_filters = meta_dict.pop("filters", None)
759 for name, value in meta_dict.items():
760 if skip_none:
761 if value in NULL_VALUES:
762 continue
763 self.update_attribute(name, value)
765 # Process new format filters after other attributes, adding to existing filters
766 if new_format_filters is not None:
767 if isinstance(new_format_filters, str):
768 self.filters = new_format_filters
769 else:
770 for filter_dict in new_format_filters:
771 if isinstance(filter_dict, dict):
772 # Create AppliedFilter from dict using from_dict method to handle nested attributes
773 applied_filter = AppliedFilter()
774 applied_filter.from_dict(filter_dict)
775 self.add_filter(applied_filter=applied_filter)
776 elif isinstance(filter_dict, AppliedFilter):
777 self.add_filter(applied_filter=filter_dict)
778 elif isinstance(filter_dict, str):
779 logger.warning(
780 f"String filter format not supported in add_filters: {filter_dict}"
781 )
782 else:
783 logger.warning(f"Unknown filter format: {type(filter_dict)}")
786# this would be a normal channel that has a single sensor and location.
787class Channel(ChannelBase):
788 sensor: Annotated[
789 Instrument,
790 Field(
791 default_factory=Instrument,
792 description="Sensor for the channel.",
793 alias=None,
794 json_schema_extra={
795 "units": None,
796 "required": False,
797 "examples": "Instrument()",
798 },
799 ),
800 ]
802 location: Annotated[
803 BasicLocation,
804 Field(
805 default_factory=BasicLocation,
806 description="Location information for the channel.",
807 alias=None,
808 json_schema_extra={
809 "units": None,
810 "required": False,
811 "examples": [
812 "BasicLocation(latitude=0.0, longitude=0.0, elevation=0.0)"
813 ],
814 },
815 ),
816 ]
819def _applied_values_map(treat_null_values_as: bool = True) -> dict:
820 """
821 helper function to simplify logic in applied setter.
823 Notes:
824 The logic in the setter was getting quite complicated handling many types.
825 A reasonable solution seemed to be to map each of the allowed values to a bool
826 via dict and then use this dict when setting applied values.
828 :return: dict
829 Mapping of all tolerated single-values for setting applied booleans
830 """
831 null_values = [None, "none", "None", "NONE", "null"]
832 null_values_map = {x: treat_null_values_as for x in null_values}
833 true_values = [True, 1, "1", "True", "true"]
834 true_values_map = {x: True for x in true_values}
835 false_values = [False, 0, "0", "False", "false"]
836 false_values_map = {x: False for x in false_values}
837 values_map = {**null_values_map, **true_values_map, **false_values_map}
838 return values_map