Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ processing \ aurora \ station.py: 95%
78 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# =====================================================
2# Imports
3# =====================================================
4from pathlib import Path
5from typing import Annotated, Union
7import pandas as pd
8from pydantic import computed_field, Field, field_validator, ValidationInfo
10from mt_metadata.base import MetadataBase
11from mt_metadata.common import TimePeriod
12from mt_metadata.processing.aurora.run import Run
15# =====================================================
16class Station(MetadataBase):
17 id: Annotated[
18 str,
19 Field(
20 default="",
21 description="Station ID",
22 alias=None,
23 json_schema_extra={
24 "units": None,
25 "required": True,
26 "examples": ["mt001"],
27 },
28 ),
29 ]
31 mth5_path: Annotated[
32 str | Path,
33 Field(
34 default="",
35 description="full path to MTH5 file where the station data is contained",
36 alias=None,
37 json_schema_extra={
38 "units": None,
39 "required": True,
40 "examples": ["/home/mt/experiment_01.h5"],
41 },
42 ),
43 ]
45 remote: Annotated[
46 bool,
47 Field(
48 default=False,
49 description="remote station (True) or local station (False)",
50 alias=None,
51 json_schema_extra={
52 "units": None,
53 "required": True,
54 "examples": ["False"],
55 },
56 ),
57 ]
59 runs: Annotated[
60 list[Run],
61 Field(
62 default_factory=list,
63 description="List of runs to process",
64 alias=None,
65 json_schema_extra={
66 "units": None,
67 "required": True,
68 "examples": ["001"],
69 },
70 ),
71 ]
73 @field_validator("mth5_path", mode="before")
74 @classmethod
75 def validate_mth5_path(cls, value: str | Path, info: ValidationInfo) -> str | Path:
76 try:
77 return Path(value)
78 except Exception as e:
79 raise ValueError(f"could not convert {value} to Path") from e
81 @field_validator("runs", mode="before")
82 @classmethod
83 def validate_runs(cls, values: Union[list, str, Run, dict], info: ValidationInfo):
84 runs = []
85 if not isinstance(values, list):
86 values = [values]
88 for item in values:
89 if isinstance(item, str):
90 run = Run(id=item)
91 elif isinstance(item, Run):
92 run = item
93 elif isinstance(item, dict):
94 run = Run()
95 run.from_dict(item)
97 else:
98 raise TypeError(f"not sure what to do with type {type(item)}")
100 runs.append(run)
102 return runs
104 def get_run(self, run_id) -> Run | None:
105 """
106 Get a run by ID
108 Parameters
109 ----------
110 run_id : TYPE
111 DESCRIPTION
113 Returns
114 -------
115 Run | None
116 DESCRIPTION
117 """
119 try:
120 return self.run_dict[run_id]
121 except KeyError:
122 return None
124 @computed_field
125 @property
126 def run_list(self) -> list[str]:
127 """list of run names"""
129 return [r.id for r in self.runs]
131 @computed_field
132 @property
133 def run_dict(self) -> dict[str, Run]:
134 """
135 need to have a dictionary, but it can't be an attribute cause that
136 gets confusing when reading in a json file
138 Returns
139 -------
140 dict[str, Run]
141 DESCRIPTION
142 """
143 return dict([(rr.id, rr) for rr in self.runs])
145 def to_dataset_dataframe(self) -> pd.DataFrame:
146 """
147 Create a dataset definition dataframe that can be used in the
148 processing
150 [
151 "station",
152 "run",
153 "start",
154 "end",
155 "mth5_path",
156 "sample_rate",
157 "input_channels",
158 "output_channels",
159 "remote",
160 ]
162 """
164 data_list = []
166 for run in self.runs:
167 for tp in run.time_periods:
168 entry = {
169 "station": self.id,
170 "run": run.id,
171 "start": str(tp.start), # Convert to string to avoid MTime issues
172 "end": str(tp.end), # Convert to string to avoid MTime issues
173 "mth5_path": self.mth5_path,
174 "sample_rate": run.sample_rate,
175 "input_channels": run.input_channel_names,
176 "output_channels": run.output_channel_names,
177 "remote": self.remote,
178 "channel_scale_factors": run.channel_scale_factors,
179 }
180 data_list.append(entry)
182 df = pd.DataFrame(data_list)
183 if len(df) > 0:
184 df["start"] = pd.to_datetime(df["start"])
185 df["end"] = pd.to_datetime(df["end"])
187 return df
189 def from_dataset_dataframe(self, df: pd.DataFrame):
190 """
191 set a data frame
193 [
194 "station",
195 "run",
196 "start",
197 "end",
198 "mth5_path",
199 "sample_rate",
200 "input_channels",
201 "output_channels",
202 "remote",
203 ]
205 Parameters
206 ----------
207 df : pd.DataFrame
208 DESCRIPTION
210 Returns
211 -------
212 TYPE
213 DESCRIPTION
214 """
216 self.runs = []
218 # Handle empty DataFrame case
219 if df.empty:
220 return
222 self.id = df.station.unique()[0]
223 self.mth5_path = df.mth5_path.unique()[0]
224 self.remote = df.remote.unique()[0]
226 for entry in df.itertuples():
227 try:
228 r = self.run_dict[entry.run]
229 r.time_periods.append(
230 TimePeriod(start=str(entry.start), end=str(entry.end))
231 )
233 except KeyError:
234 if hasattr(entry, "channel_scale_factors"):
235 channel_scale_factors = entry.channel_scale_factors
236 else:
237 channel_scale_factors = {}
238 r = Run(
239 id=entry.run,
240 sample_rate=entry.sample_rate,
241 input_channels=entry.input_channels,
242 output_channels=entry.output_channels,
243 time_periods=[
244 TimePeriod(start=str(entry.start), end=str(entry.end))
245 ],
246 )
247 r.set_channel_scale_factors(channel_scale_factors)
248 self.runs.append(r)