Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ survey.py: 77%
234 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# =====================================================
2# Imports
3# =====================================================
4from collections import OrderedDict
5from typing import Annotated
7from loguru import logger
8from pydantic import computed_field, Field, field_validator, ValidationInfo
9from pyproj import CRS
11from mt_metadata.base import MetadataBase
12from mt_metadata.common import (
13 AuthorPerson,
14 BasicLocationNoDatum,
15 Citation,
16 Comment,
17 Copyright,
18 Fdsn,
19 FundingSource,
20 TimePeriodDate,
21)
22from mt_metadata.common.list_dict import ListDict
23from mt_metadata.timeseries import Station
24from mt_metadata.timeseries.filters import (
25 CoefficientFilter,
26 FIRFilter,
27 FrequencyResponseTableFilter,
28 PoleZeroFilter,
29 TimeDelayFilter,
30)
33# =====================================================
36class Survey(MetadataBase):
37 id: Annotated[
38 str,
39 Field(
40 default="",
41 description="Alpha numeric ID that will be unique for archiving.",
42 alias=None,
43 pattern=r"^[a-zA-Z0-9_\- ]*$", # Allow empty string (zero or more chars)
44 json_schema_extra={
45 "units": None,
46 "required": True,
47 "examples": ["EMT20"],
48 },
49 ),
50 ]
52 comments: Annotated[
53 Comment,
54 Field(
55 default_factory=lambda: Comment(),
56 description="Any comments about the survey.",
57 alias=None,
58 json_schema_extra={
59 "units": None,
60 "required": False,
61 "examples": ["long survey"],
62 },
63 ),
64 ]
66 datum: Annotated[
67 str | int,
68 Field(
69 default="WGS 84",
70 description="Datum of latitude and longitude coordinates. Should be a well-known datum, such as WGS84, and will be the reference datum for all locations. This is important for the user, they need to make sure all coordinates in the survey and child items (i.e. stations, channels) are referenced to this datum.",
71 alias=None,
72 json_schema_extra={
73 "units": None,
74 "required": True,
75 "examples": ["WGS 84"],
76 },
77 ),
78 ]
80 geographic_name: Annotated[
81 str,
82 Field(
83 default="",
84 description="Closest geographic reference to survey, usually a city but could be a landmark or some other common geographic reference point.",
85 alias=None,
86 json_schema_extra={
87 "units": None,
88 "required": True,
89 "examples": ["Yukon"],
90 },
91 ),
92 ]
94 name: Annotated[
95 str,
96 Field(
97 default="",
98 description="Descriptive name of the survey.",
99 alias=None,
100 json_schema_extra={
101 "units": None,
102 "required": True,
103 "examples": ["MT Characterization of Yukon Terrane"],
104 },
105 ),
106 ]
108 project: Annotated[
109 str,
110 Field(
111 default="",
112 description="Alpha numeric name for the project e.g USGS-GEOMAG.",
113 alias=None,
114 json_schema_extra={
115 "units": None,
116 "required": True,
117 "examples": ["YUTOO"],
118 },
119 ),
120 ]
122 stations: Annotated[
123 ListDict | list | dict | OrderedDict | tuple,
124 Field(
125 default_factory=ListDict,
126 description="List of stations recorded in the survey.",
127 alias=None,
128 json_schema_extra={
129 "units": None,
130 "required": False,
131 "examples": ["ListDict[Station(id=id)]"],
132 },
133 ),
134 ]
136 filters: Annotated[
137 ListDict | list | dict | OrderedDict | tuple,
138 Field(
139 default_factory=ListDict,
140 description="List of filters for channel responses.",
141 alias=None,
142 json_schema_extra={
143 "units": None,
144 "required": False,
145 "examples": ["ListDict[Filter()]"],
146 },
147 ),
148 ]
150 summary: Annotated[
151 str,
152 Field(
153 default="",
154 description="Summary paragraph of survey including the purpose; difficulties; data quality; summary of outcomes if the data have been processed and modeled.",
155 alias=None,
156 json_schema_extra={
157 "units": None,
158 "required": True,
159 "examples": [
160 "long project of characterizing mineral resources in Yukon"
161 ],
162 },
163 ),
164 ]
166 time_period: Annotated[
167 TimePeriodDate,
168 Field(
169 default_factory=TimePeriodDate,
170 description="End date of the survey in UTC.",
171 alias=None,
172 json_schema_extra={
173 "units": None,
174 "required": True,
175 "examples": [
176 "TimePeriodDate(start_date='2000-01-01', end_date='2000-01-31')"
177 ],
178 },
179 ),
180 ]
182 fdsn: Annotated[
183 Fdsn,
184 Field(
185 default_factory=Fdsn,
186 description="FDSN web service information.",
187 alias=None,
188 json_schema_extra={
189 "units": None,
190 "required": False,
191 "examples": ["Fdsn()"],
192 },
193 ),
194 ]
196 acquired_by: Annotated[
197 AuthorPerson,
198 Field(
199 default_factory=AuthorPerson,
200 description="Person or group that acquired the data.",
201 alias=None,
202 json_schema_extra={
203 "units": None,
204 "required": False,
205 "examples": ["Person()"],
206 },
207 ),
208 ]
210 funding_source: Annotated[
211 FundingSource,
212 Field(
213 default_factory=FundingSource,
214 description="Funding source for the survey.",
215 alias=None,
216 json_schema_extra={
217 "units": None,
218 "required": False,
219 "examples": ["FundingSource()"],
220 },
221 ),
222 ]
224 citation_dataset: Annotated[
225 Citation,
226 Field(
227 default_factory=Citation,
228 description="Citation for the dataset.",
229 alias=None,
230 json_schema_extra={
231 "units": None,
232 "required": False,
233 "examples": ["Citation()"],
234 },
235 ),
236 ]
238 citation_journal: Annotated[
239 Citation,
240 Field(
241 default_factory=Citation,
242 description="Citation for the journal.",
243 alias=None,
244 json_schema_extra={
245 "units": None,
246 "required": False,
247 "examples": ["Citation()"],
248 },
249 ),
250 ]
252 northwest_corner: Annotated[
253 BasicLocationNoDatum,
254 Field(
255 default_factory=BasicLocationNoDatum,
256 description="Northwest corner of the survey area.",
257 alias=None,
258 json_schema_extra={
259 "units": "degrees",
260 "required": False,
261 "examples": ["BasicLocationNoDatum()"],
262 },
263 ),
264 ]
266 southeast_corner: Annotated[
267 BasicLocationNoDatum,
268 Field(
269 default_factory=BasicLocationNoDatum,
270 description="Southeast corner of the survey area.",
271 alias=None,
272 json_schema_extra={
273 "units": "degrees",
274 "required": False,
275 "examples": ["BasicLocationNoDatum()"],
276 },
277 ),
278 ]
280 country: Annotated[
281 list[str] | str | None,
282 Field(
283 default=None,
284 description="Country where the survey was conducted.",
285 alias=None,
286 json_schema_extra={
287 "units": None,
288 "required": False,
289 "examples": ["Canada"],
290 },
291 ),
292 ]
294 state: Annotated[
295 list[str] | str | None,
296 Field(
297 default=None,
298 description="State or province where the survey was conducted.",
299 alias=None,
300 json_schema_extra={
301 "units": None,
302 "required": False,
303 "examples": ["Yukon"],
304 },
305 ),
306 ]
308 project_lead: Annotated[
309 AuthorPerson,
310 Field(
311 default_factory=AuthorPerson,
312 description="Person or group that led the project.",
313 alias=None,
314 json_schema_extra={
315 "units": None,
316 "required": False,
317 "examples": ["Person()"],
318 },
319 ),
320 ]
322 release_license: Annotated[
323 str,
324 Field(
325 default="CC-BY-4.0",
326 description="Release license for the data.",
327 alias=None,
328 json_schema_extra={
329 "units": None,
330 "required": True,
331 "examples": ["CC-BY-4.0"],
332 },
333 ),
334 ]
336 @field_validator("comments", mode="before")
337 @classmethod
338 def validate_comments(cls, value, info: ValidationInfo) -> Comment:
339 if isinstance(value, str):
340 return Comment(value=value)
341 return value
343 @field_validator("datum", mode="before")
344 @classmethod
345 def validate_datum(cls, value: str | int) -> str:
346 """
347 Validate the datum value and convert it to the appropriate enum type.
348 """
349 try:
350 datum_crs = CRS.from_user_input(value)
351 return datum_crs.name
352 except Exception:
353 raise ValueError(
354 f"Invalid datum value: {value}. Must be a valid CRS string or identifier."
355 )
357 @field_validator("release_license", mode="before")
358 @classmethod
359 def validate_release_license(cls, value: str, info: ValidationInfo) -> str:
360 """
361 Validate that the value is a valid license.
362 """
363 if isinstance(value, str):
364 copyright_object = Copyright(release_license=value)
365 return copyright_object.release_license
367 @field_validator("country", "state", mode="before")
368 @classmethod
369 def validate_areas(cls, value) -> list[str]:
370 """validate country and state to be a list"""
371 if isinstance(value, str):
372 return [item.strip() for item in value.split(",")]
373 elif isinstance(value, (list, tuple)):
374 return list(value)
375 elif value == None:
376 return None
377 else:
378 raise TypeError(f"Cannot make a list from types {type(value)}.")
380 @field_validator("stations", mode="before")
381 @classmethod
382 def validate_stations(cls, value, info: ValidationInfo) -> ListDict:
383 if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)):
384 msg = (
385 "input stations must be an iterable, should be a list or dict "
386 f"not {type(value)}"
387 )
388 logger.error(msg)
389 raise TypeError(msg)
391 fails = []
392 stations = ListDict()
393 if isinstance(value, (dict, ListDict, OrderedDict)):
394 value_list = value.values()
396 elif isinstance(value, (list, tuple)):
397 value_list = value
399 for ii, station_entry in enumerate(value_list):
400 if isinstance(station_entry, (dict, OrderedDict)):
401 try:
402 station = Station()
403 station.from_dict(station_entry)
404 stations.append(station)
405 except KeyError:
406 msg = f"Item {ii} is not type(Station); type={type(station_entry)}"
407 fails.append(msg)
408 logger.error(msg)
409 elif not isinstance(station_entry, (Station)):
410 msg = f"Item {ii} is not type(Run); type={type(station_entry)}"
411 fails.append(msg)
412 logger.error(msg)
413 else:
414 stations.append(station_entry)
415 if len(fails) > 0:
416 raise TypeError("\n".join(fails))
418 return stations
420 @field_validator("filters", mode="before")
421 @classmethod
422 def validate_filters(
423 cls, value: str | list | ListDict, info: ValidationInfo
424 ) -> ListDict:
425 """
427 Parameters
428 ----------
429 value : _type_
430 _description_
431 info : ValidationInfo
432 _description_
434 Returns
435 -------
436 ListDict
437 _description_
438 """
439 filters = ListDict()
440 fails = []
441 if value is None:
442 return
444 if isinstance(value, list):
445 if len(value) > 0:
446 for ff in value:
447 if isinstance(ff, (dict, OrderedDict, ListDict)):
448 f_type = ff["type"]
449 if f_type is None:
450 msg = (
451 "filter type is None do not know how to read the filter"
452 )
453 fails.append(msg)
454 logger.error(msg)
455 if f_type.lower() in ["zpk"]:
456 f = PoleZeroFilter()
457 elif f_type.lower() in ["coefficient"]:
458 f = CoefficientFilter()
459 elif f_type.lower() in ["time delay"]:
460 f = TimeDelayFilter()
461 elif f_type.lower() in ["fir"]:
462 f = FIRFilter()
463 elif f_type.lower() in ["frequency response table"]:
464 f = FrequencyResponseTableFilter()
465 else:
466 msg = f"filter type {f_type} not supported."
467 fails.append(msg)
468 logger.error(msg)
470 f.from_dict(ff)
471 filters[f.name] = f
472 elif isinstance(
473 ff,
474 (
475 PoleZeroFilter,
476 CoefficientFilter,
477 FrequencyResponseTableFilter,
478 TimeDelayFilter,
479 FIRFilter,
480 ),
481 ):
482 filters[ff.name] = ff
483 else:
484 msg = f"Item {ff} is not Filter type; type={type(ff)}"
485 fails.append(msg)
486 logger.error(msg)
488 elif not isinstance(value, (dict, OrderedDict, ListDict)):
489 msg = (
490 "Filters must be a dictionary with keys = names of filters, "
491 f"not {type(value)}"
492 )
493 logger.error(msg)
494 raise TypeError(msg)
495 else:
496 for k, v in value.items():
497 if not isinstance(
498 v,
499 (
500 PoleZeroFilter,
501 CoefficientFilter,
502 TimeDelayFilter,
503 FrequencyResponseTableFilter,
504 FIRFilter,
505 ),
506 ):
507 msg = f"Item {k} is not Filter type; type={type(v)}"
508 fails.append(msg)
509 logger.error(msg)
510 else:
511 filters[k.lower()] = v
512 if len(fails) > 0:
513 raise TypeError("\n".join(fails))
515 return filters
517 @computed_field
518 @property
519 def survey_extent(self) -> dict:
520 """
521 Return the survey extent as a dictionary with keys 'northwest' and 'southeast'.
522 """
523 return {
524 "latitude": {
525 "min": self.southeast_corner.latitude,
526 "max": self.northwest_corner.latitude,
527 },
528 "longitude": {
529 "min": self.northwest_corner.longitude,
530 "max": self.southeast_corner.longitude,
531 },
532 }
534 def merge(self, other: "Survey", inplace=False) -> "Survey":
535 """
536 Merge surveys together using the original metadata but adding other's stations.
538 Parameters
539 ----------
540 other : Survey
541 Survey object
542 inplace : bool, optional
543 merge in place, by default False
545 Returns
546 -------
547 Survey
548 merged surveys
550 Raises
551 ------
552 TypeError
553 If items cannot be merged.
554 """
555 if isinstance(other, Survey):
556 self.stations.extend(other.stations)
557 self.update_all()
558 if not inplace:
559 return self
560 else:
561 msg = f"Can only merge Survey objects, not {type(other)}"
562 logger.error(msg)
563 raise TypeError(msg)
565 @property
566 def n_stations(self) -> int:
567 """
568 Return the number of stations in the station.
570 :return: number of runs in the station
571 :rtype: int
573 """
574 return len(self.stations)
576 @property
577 def station_names(self):
578 """Return names of station in survey"""
579 return self.stations.keys()
581 @property
582 def filter_names(self):
583 """return a list of filter names"""
584 return list(self.filters.keys())
586 def has_station(self, station_id):
587 """
588 Has station id
590 :param station_id: station id verbatim
591 :type station_id: string
592 :return: True if exists or False if not
593 :rtype: boolean
595 """
596 if station_id in self.station_names:
597 return True
598 return False
600 def station_index(self, station_id):
601 """
602 Get station index
604 :param station_id: station id verbatim
605 :type station_id: string
606 :return: index value if station is found
607 :rtype: integer
609 """
611 if self.has_station(station_id):
612 return self.station_names.index(station_id)
613 return None
615 def add_station(self, station_obj, update=True):
616 """
617 Add a station, if has the same name update that object.
619 :param station_obj: station object to add
620 :type station_obj: `:class:`mt_metadata.timeseries.Station`
622 """
624 if not isinstance(station_obj, Station):
625 raise TypeError(
626 f"Input must be a mt_metadata.timeseries.Station object not {type(station_obj)}"
627 )
629 if self.has_station(station_obj.id):
630 self.stations[station_obj.id].update(station_obj)
631 logger.warning(
632 f"Station {station_obj.id} already exists, updating metadata"
633 )
634 else:
635 self.stations.append(station_obj)
637 if update:
638 self.update_bounding_box()
639 self.update_time_period()
640 self.update_station_keys()
642 def get_station(self, station_id):
643 """
644 Get a station from the station id
646 :param station_id: station id verbatim
647 :type station_id: string
648 :return: station object
649 :rtype: :class:`mt_metadata.timeseries.Station`
651 """
653 if self.has_station(station_id):
654 return self.stations[station_id]
655 else:
656 logger.warning(f"Could not find station {station_id}")
657 return None
659 def remove_station(self, station_id, update=True):
660 """
661 remove a station from the survey
663 :param station_id: station id verbatim
664 :type station_id: string
666 """
668 if self.has_station(station_id):
669 self.stations.remove(station_id)
670 if update:
671 self.update_bounding_box()
672 self.update_time_period()
673 else:
674 logger.warning(f"Could not find {station_id} to remove.")
676 def update_station_keys(self):
677 """
678 Update the keys in the stations ListDict to match current station IDs.
680 This is useful when station IDs have been modified after stations were
681 added to the survey, ensuring that stations can be accessed by their
682 current ID values.
684 :returns: mapping of old keys to new keys
685 :rtype: dict
687 Example:
688 >>> survey = Survey()
689 >>> station = Station()
690 >>> station.id = "" # empty ID initially
691 >>> survey.add_station(station)
692 >>> station.id = "MT001" # update the ID
693 >>> key_mapping = survey.update_station_keys()
694 >>> print(key_mapping) # {'': 'MT001'}
695 >>> # Now station can be accessed as survey.stations['MT001']
696 """
697 return self.stations.update_keys()
699 def update_bounding_box(self):
700 """
701 Update the bounding box of the survey from the station information
703 """
704 if self.n_stations > 0:
705 lat = []
706 lon = []
707 for station in self.stations:
708 if station.location.latitude is not None:
709 lat.append(station.location.latitude)
710 if station.location.longitude is not None:
711 lon.append(station.location.longitude)
713 if not len(lat) == 0:
714 self.southeast_corner.latitude = min(lat)
715 self.northwest_corner.latitude = max(lat)
716 if not len(lon) == 0:
717 self.southeast_corner.longitude = max(lon)
718 self.northwest_corner.longitude = min(lon)
720 def update_time_period(self):
721 """
722 Update the start and end time of the survey based on the stations
723 """
724 if self.__len__() > 0:
725 start = []
726 end = []
727 for station in self.stations:
728 if not station.time_period.start_is_default():
729 start.append(station.time_period.start)
730 if not station.time_period.end_is_default():
731 end.append(station.time_period.end)
733 if start:
734 if self.time_period.start_is_default():
735 self.time_period.start_date = min(start)
736 else:
737 if self.time_period.start_date > min(start):
738 self.time_period.start_date = min(start)
740 if end:
741 if self.time_period.end_is_default():
742 self.time_period.end_date = max(end)
743 else:
744 if self.time_period.end_date < max(end):
745 self.time_period.end_date = max(end)
747 def update_all(self):
748 """
749 Update time period and bounding box
750 """
751 self.update_time_period()
752 self.update_bounding_box()