Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ station.py: 85%
193 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# =====================================================
2# Imports
3# =====================================================
4from collections import OrderedDict
5from typing import Annotated
7import numpy as np
8from loguru import logger
9from pydantic import Field, field_validator, model_validator, ValidationInfo
10from typing_extensions import Self
12from mt_metadata import NULL_VALUES
13from mt_metadata.base import MetadataBase
14from mt_metadata.common import (
15 AuthorPerson,
16 ChannelLayoutEnum,
17 Comment,
18 DataTypeEnum,
19 Fdsn,
20 Orientation,
21 Provenance,
22 StationLocation,
23 TimePeriod,
24)
25from mt_metadata.common.list_dict import ListDict
26from mt_metadata.timeseries import Run
29# =====================================================
32class Station(MetadataBase):
33 channel_layout: Annotated[
34 ChannelLayoutEnum,
35 Field(
36 default=ChannelLayoutEnum.X,
37 description="How the station channels were laid out.",
38 alias=None,
39 json_schema_extra={
40 "units": None,
41 "required": False,
42 "examples": ["X"],
43 },
44 ),
45 ]
47 channels_recorded: Annotated[
48 list[str],
49 Field(
50 default_factory=list,
51 description="List of components recorded by the station. Should be a summary of all channels recorded. Dropped channels will be recorded in Run metadata.",
52 alias=None,
53 json_schema_extra={
54 "units": None,
55 "required": True,
56 "examples": ['"[ Ex, Ey, Hx, Hy, Hz, T]"'],
57 },
58 ),
59 ]
61 comments: Annotated[
62 Comment,
63 Field(
64 default_factory=Comment, # type: ignore
65 description="Any comments on the station.",
66 alias=None,
67 json_schema_extra={
68 "units": None,
69 "required": False,
70 "examples": ["cows chewed cables"],
71 },
72 ),
73 ]
75 data_type: Annotated[
76 DataTypeEnum,
77 Field(
78 default="BBMT",
79 description="Type of data recorded. If multiple types input as a comma separated list.",
80 alias=None,
81 json_schema_extra={
82 "units": None,
83 "required": True,
84 "examples": ["BBMT"],
85 },
86 ),
87 ]
89 fdsn: Annotated[
90 Fdsn,
91 Field(
92 default_factory=Fdsn,
93 description="FDSN information for the station.",
94 alias=None,
95 json_schema_extra={
96 "units": None,
97 "required": False,
98 "examples": ["Fdsn()"],
99 },
100 ),
101 ]
103 geographic_name: Annotated[
104 str,
105 Field(
106 default="",
107 description="Closest geographic name to the station, usually a city, but could be another common geographic location.",
108 alias=None,
109 json_schema_extra={
110 "units": None,
111 "required": True,
112 "examples": ["Whitehorse, YK"],
113 },
114 ),
115 ]
117 id: Annotated[
118 str,
119 Field(
120 default="",
121 description="Station ID name. This should be an alpha numeric name that is typically 5-6 characters long. Commonly the project name in 2 or 3 letters and the station number.",
122 alias=None,
123 pattern="^[a-zA-Z0-9_]*$",
124 json_schema_extra={
125 "units": None,
126 "required": True,
127 "examples": ["MT001"],
128 },
129 ),
130 ]
132 run_list: Annotated[
133 list[str],
134 Field(
135 default_factory=list,
136 description="List of runs recorded by the station. Should be a summary of all runs recorded.",
137 alias=None,
138 json_schema_extra={
139 "units": None,
140 "required": True,
141 "examples": ["[ mt001a, mt001b, mt001c ]"],
142 },
143 ),
144 ]
146 location: Annotated[
147 StationLocation,
148 Field(
149 default_factory=StationLocation, # type: ignore
150 description="Location of the station.",
151 alias=None,
152 json_schema_extra={
153 "units": None,
154 "required": False,
155 "examples": ["StationLocation(latitude=60.0, longitude=-135.0)"],
156 },
157 ),
158 ]
160 orientation: Annotated[
161 Orientation,
162 Field(
163 default_factory=Orientation, # type: ignore
164 description="Orientation of the station.",
165 alias=None,
166 json_schema_extra={
167 "units": None,
168 "required": False,
169 "examples": ["Orientation(north=0, east=0, vertical=1)"],
170 },
171 ),
172 ]
174 acquired_by: Annotated[
175 AuthorPerson,
176 Field(
177 default_factory=AuthorPerson, # type: ignore
178 description="Group or person who acquired the data.",
179 alias=None,
180 json_schema_extra={
181 "units": None,
182 "required": False,
183 "examples": ["Person()"],
184 },
185 ),
186 ]
188 provenance: Annotated[
189 Provenance,
190 Field(
191 default_factory=Provenance, # type: ignore
192 description="Provenance of the data.",
193 alias=None,
194 json_schema_extra={
195 "units": None,
196 "required": False,
197 "examples": ["Provenance()"],
198 },
199 ),
200 ]
202 time_period: Annotated[
203 TimePeriod,
204 Field(
205 default_factory=TimePeriod, # type: ignore
206 description="Time period of the data.",
207 alias=None,
208 json_schema_extra={
209 "units": None,
210 "required": False,
211 "examples": ["TimePeriod(start='2020-01-01', end='2020-12-31')"],
212 },
213 ),
214 ]
216 runs: Annotated[
217 ListDict | list | dict | OrderedDict | tuple,
218 Field(
219 default_factory=ListDict,
220 description="List of runs recorded by the station.",
221 alias=None,
222 json_schema_extra={
223 "units": None,
224 "required": False,
225 "examples": ["[Run(id='mt001a'), Run(id='mt001b'), Run(id='mt001c')]"],
226 },
227 ),
228 ]
230 @field_validator("comments", mode="before")
231 @classmethod
232 def validate_comments(cls, value, info: ValidationInfo) -> Comment:
233 if isinstance(value, str):
234 return Comment(value=value)
235 return value
237 @field_validator("channels_recorded", "run_list", mode="before")
238 @classmethod
239 def validate_list_of_strings(cls, value, info: ValidationInfo) -> list[str]:
240 """
241 Validate that the value is a list of strings.
242 """
243 if value in NULL_VALUES:
244 return
246 if isinstance(value, np.ndarray):
247 value = value.astype(str).tolist()
249 elif isinstance(value, (list, tuple)):
250 value = [str(v) for v in value]
252 elif isinstance(value, (str)):
253 value = [v.strip() for v in value.split(",")]
255 else:
256 raise TypeError(
257 "'channels_recorded' must be set with a list of strings not "
258 f"{type(value)}."
259 )
260 return value
262 @model_validator(mode="after")
263 def validate_runs_and_channels_recorded(self) -> Self:
264 """
265 Validate that the value is a list of strings.
266 """
268 # need to make each another object list() otherwise the contents
269 # get overwritten with the new channel.
270 if self.run_list != list(self.runs.keys()):
271 if len(self.run_list) > len(self.runs.keys()):
272 for run_id in self.run_list:
273 if run_id not in self.runs.keys():
274 self.runs.append(Run(id=run_id))
275 else:
276 self.update_all()
277 estimate_channels_recorded = self._get_channels_recorded()
278 if self.channels_recorded != estimate_channels_recorded:
279 if len(self.channels_recorded) > len(estimate_channels_recorded):
280 if len(self.runs) > 0:
281 for channel in self.channels_recorded:
282 if channel not in estimate_channels_recorded:
283 self.runs[0].add_channel(channel)
284 else:
285 self.update_channels_recorded()
286 return self
288 @model_validator(mode="after")
289 def validate_station_id(self) -> Self:
290 """
291 Validate that the value is a list of strings.
292 """
293 if self.id in NULL_VALUES:
294 if self.fdsn.id is not None:
295 # Use object.__setattr__ to avoid triggering validation recursively
296 object.__setattr__(self, "id", self.fdsn.id)
298 return self
300 @field_validator("runs", mode="before")
301 @classmethod
302 def validate_runs(cls, value, info: ValidationInfo) -> ListDict:
303 if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)):
304 msg = (
305 "input runs must be an iterable, should be a list or dict "
306 f"not {type(value)}"
307 )
308 logger.error(msg)
309 raise TypeError(msg)
311 fails = []
312 runs = ListDict()
313 if isinstance(value, (dict, ListDict, OrderedDict)):
314 value_list = value.values()
316 elif isinstance(value, (list, tuple)):
317 value_list = value
319 for ii, run_entry in enumerate(value_list):
320 if isinstance(run_entry, (dict, OrderedDict)):
321 try:
322 run = Run()
323 run.from_dict(run_entry)
324 runs.append(run)
325 except KeyError:
326 msg = f"Item {ii} is not type(Run); type={type(run_entry)}"
327 fails.append(msg)
328 logger.error(msg)
329 elif not isinstance(run_entry, (Run)):
330 msg = f"Item {ii} is not type(Run); type={type(run_entry)}"
331 fails.append(msg)
332 logger.error(msg)
333 else:
334 runs.append(run_entry)
335 if len(fails) > 0:
336 raise TypeError("\n".join(fails))
338 return runs
340 def merge(self, other, inplace=False):
341 if isinstance(other, Station):
342 self.runs.extend(other.runs)
343 self.update_all()
344 if not inplace:
345 return self
346 else:
347 msg = f"Can only merge Station objects, not {type(other)}"
348 logger.error(msg)
349 raise TypeError(msg)
351 @property
352 def n_runs(self) -> int:
353 """
354 Return the number of runs in the station.
356 :return: number of runs in the station
357 :rtype: int
359 """
360 return len(self.runs)
362 def has_run(self, run_id):
363 """
364 Check to see if the run id already exists
366 :param run_id: run id verbatim
367 :type run_id: string
368 :return: Tru if exists, False if not
369 :rtype: boolean
371 """
372 if run_id in self.run_list:
373 return True
374 return False
376 def run_index(self, run_id):
377 """
378 Get the index of the run_id
380 :param run_id: run id verbatim
381 :type run_id: string
382 :return: index of the run
383 :rtype: integer
385 """
387 if self.has_run(run_id):
388 return self.run_list.index(run_id)
389 return None
391 def _empty_channels_recorded(self):
392 """
393 Empty the channels recorded lists.
394 """
395 self.channels_recorded.clear()
397 def _empty_run_list(self):
398 """
399 Empty the runs lists.
400 """
401 self.run_list.clear()
403 def _get_channels_recorded(self) -> list[str]:
404 """
405 Get the channels recorded list.
407 :return: channels recorded list
408 :rtype: list[str]
410 """
411 ch_list = []
412 for run in self.runs:
413 ch_list += run.channels_recorded_all
414 return sorted(set([cc for cc in ch_list if cc is not None]))
416 def update_channels_recorded(self) -> None:
417 """
418 Update the channels recorded lists based on the channels in the run.
419 """
420 self._empty_channels_recorded()
421 self.channels_recorded = self._get_channels_recorded()
423 def update_run_list(self) -> None:
424 """
425 Update the run list based on the runs in the station.
426 """
427 self._empty_run_list()
428 self.run_list = list(self.runs.keys())
430 def update_time_period(self):
431 """
432 update time period from run information
433 """
434 if self.__len__() > 0:
435 start = []
436 end = []
437 for run in self.runs:
438 if run.time_period.start != "1980-01-01T00:00:00+00:00":
439 start.append(run.time_period.start)
440 if run.time_period.end != "1980-01-01T00:00:00+00:00":
441 end.append(run.time_period.end)
442 if start:
443 if self.time_period.start == "1980-01-01T00:00:00+00:00":
444 self.time_period.start = min(start)
445 else:
446 if self.time_period.start > min(start):
447 self.time_period.start = min(start)
448 if end:
449 if self.time_period.end == "1980-01-01T00:00:00+00:00":
450 self.time_period.end = max(end)
451 else:
452 if self.time_period.end < max(end):
453 self.time_period.end = max(end)
455 def update_all(self):
456 """
457 Update the time period, channels recorded and run list.
459 """
460 self.update_time_period()
461 # self.update_channels_recorded()
462 self.update_run_list()
464 def add_run(self, run_obj, update=True):
465 """
466 Add a run, if one of the same name exists overwrite it.
468 :param run_obj: run object to add
469 :type run_obj: :class:`mt_metadata.timeseries.Run`
471 """
473 if not isinstance(run_obj, Run):
474 raise TypeError(
475 f"Input must be a mt_metadata.timeseries.Run object not {type(run_obj)}"
476 )
478 if run_obj.id is None:
479 raise ValueError("The input run id is None. Input a string or integer.")
480 if self.has_run(run_obj.id):
481 self.runs[run_obj.id].update(run_obj)
482 logger.debug(f"Station {run_obj.id} already exists, updating metadata")
483 else:
484 self.runs.append(run_obj)
486 if update:
487 self.update_all()
489 def get_run(self, run_id):
490 """
491 Get a :class:`mt_metadata.timeseries.Run` object from the given
492 id
494 :param run_id: run id verbatim
495 :type run_id: string
497 """
499 if self.has_run(run_id):
500 return self.runs[run_id]
501 logger.warning(f"Could not find {run_id} in runs.")
502 return None
504 def remove_run(self, run_id, update=True):
505 """
506 remove a run from the survey
508 :param run_id: run id verbatim
509 :type run_id: string
511 """
513 if self.has_run(run_id):
514 self.runs.remove(run_id)
515 if update:
516 self.update_all()
517 else:
518 logger.warning(f"Could not find {run_id} to remove.")
520 def update_run_keys(self):
521 """
522 Update the keys in the runs ListDict to match current run IDs.
524 This is useful when run IDs have been modified after runs were
525 added to the station, ensuring that runs can be accessed by their
526 current ID values.
528 :returns: mapping of old keys to new keys
529 :rtype: dict
531 Example:
532 >>> station = Station()
533 >>> run = Run()
534 >>> run.id = "" # empty ID initially
535 >>> station.add_run(run)
536 >>> run.id = "001" # update the ID
537 >>> key_mapping = station.update_run_keys()
538 >>> print(key_mapping) # {'': '001'}
539 >>> # Now run can be accessed as station.runs['001']
540 """
541 return self.runs.update_keys()
543 def sort_runs_by_time(self, inplace=True, ascending=True):
544 """
545 return a list of runs sorted by start time in the order of ascending or
546 descending.
548 :param ascending: DESCRIPTION, defaults to True
549 :type ascending: TYPE, optional
550 :return: DESCRIPTION
551 :rtype: TYPE
553 """
555 run_ids = []
556 run_starts = []
557 for run_key, run_obj in self.runs.items():
558 run_ids.append(run_key)
559 run_starts.append(run_obj.time_period.start.split("+")[0])
561 index = np.argsort(np.array(run_starts, dtype=np.datetime64))
563 new_runs = ListDict()
564 for ii in index:
565 new_runs[run_ids[ii]] = self.runs[run_ids[ii]]
567 if inplace:
568 self.runs = new_runs
569 else:
570 return new_runs