Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ processing \ aurora \ station.py: 95%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4from pathlib import Path 

5from typing import Annotated, Union 

6 

7import pandas as pd 

8from pydantic import computed_field, Field, field_validator, ValidationInfo 

9 

10from mt_metadata.base import MetadataBase 

11from mt_metadata.common import TimePeriod 

12from mt_metadata.processing.aurora.run import Run 

13 

14 

15# ===================================================== 

16class Station(MetadataBase): 

17 id: Annotated[ 

18 str, 

19 Field( 

20 default="", 

21 description="Station ID", 

22 alias=None, 

23 json_schema_extra={ 

24 "units": None, 

25 "required": True, 

26 "examples": ["mt001"], 

27 }, 

28 ), 

29 ] 

30 

31 mth5_path: Annotated[ 

32 str | Path, 

33 Field( 

34 default="", 

35 description="full path to MTH5 file where the station data is contained", 

36 alias=None, 

37 json_schema_extra={ 

38 "units": None, 

39 "required": True, 

40 "examples": ["/home/mt/experiment_01.h5"], 

41 }, 

42 ), 

43 ] 

44 

45 remote: Annotated[ 

46 bool, 

47 Field( 

48 default=False, 

49 description="remote station (True) or local station (False)", 

50 alias=None, 

51 json_schema_extra={ 

52 "units": None, 

53 "required": True, 

54 "examples": ["False"], 

55 }, 

56 ), 

57 ] 

58 

59 runs: Annotated[ 

60 list[Run], 

61 Field( 

62 default_factory=list, 

63 description="List of runs to process", 

64 alias=None, 

65 json_schema_extra={ 

66 "units": None, 

67 "required": True, 

68 "examples": ["001"], 

69 }, 

70 ), 

71 ] 

72 

73 @field_validator("mth5_path", mode="before") 

74 @classmethod 

75 def validate_mth5_path(cls, value: str | Path, info: ValidationInfo) -> str | Path: 

76 try: 

77 return Path(value) 

78 except Exception as e: 

79 raise ValueError(f"could not convert {value} to Path") from e 

80 

81 @field_validator("runs", mode="before") 

82 @classmethod 

83 def validate_runs(cls, values: Union[list, str, Run, dict], info: ValidationInfo): 

84 runs = [] 

85 if not isinstance(values, list): 

86 values = [values] 

87 

88 for item in values: 

89 if isinstance(item, str): 

90 run = Run(id=item) 

91 elif isinstance(item, Run): 

92 run = item 

93 elif isinstance(item, dict): 

94 run = Run() 

95 run.from_dict(item) 

96 

97 else: 

98 raise TypeError(f"not sure what to do with type {type(item)}") 

99 

100 runs.append(run) 

101 

102 return runs 

103 

104 def get_run(self, run_id) -> Run | None: 

105 """ 

106 Get a run by ID 

107 

108 Parameters 

109 ---------- 

110 run_id : TYPE 

111 DESCRIPTION 

112 

113 Returns 

114 ------- 

115 Run | None 

116 DESCRIPTION 

117 """ 

118 

119 try: 

120 return self.run_dict[run_id] 

121 except KeyError: 

122 return None 

123 

124 @computed_field 

125 @property 

126 def run_list(self) -> list[str]: 

127 """list of run names""" 

128 

129 return [r.id for r in self.runs] 

130 

131 @computed_field 

132 @property 

133 def run_dict(self) -> dict[str, Run]: 

134 """ 

135 need to have a dictionary, but it can't be an attribute cause that 

136 gets confusing when reading in a json file 

137 

138 Returns 

139 ------- 

140 dict[str, Run] 

141 DESCRIPTION 

142 """ 

143 return dict([(rr.id, rr) for rr in self.runs]) 

144 

145 def to_dataset_dataframe(self) -> pd.DataFrame: 

146 """ 

147 Create a dataset definition dataframe that can be used in the 

148 processing 

149 

150 [ 

151 "station", 

152 "run", 

153 "start", 

154 "end", 

155 "mth5_path", 

156 "sample_rate", 

157 "input_channels", 

158 "output_channels", 

159 "remote", 

160 ] 

161 

162 """ 

163 

164 data_list = [] 

165 

166 for run in self.runs: 

167 for tp in run.time_periods: 

168 entry = { 

169 "station": self.id, 

170 "run": run.id, 

171 "start": str(tp.start), # Convert to string to avoid MTime issues 

172 "end": str(tp.end), # Convert to string to avoid MTime issues 

173 "mth5_path": self.mth5_path, 

174 "sample_rate": run.sample_rate, 

175 "input_channels": run.input_channel_names, 

176 "output_channels": run.output_channel_names, 

177 "remote": self.remote, 

178 "channel_scale_factors": run.channel_scale_factors, 

179 } 

180 data_list.append(entry) 

181 

182 df = pd.DataFrame(data_list) 

183 if len(df) > 0: 

184 df["start"] = pd.to_datetime(df["start"]) 

185 df["end"] = pd.to_datetime(df["end"]) 

186 

187 return df 

188 

189 def from_dataset_dataframe(self, df: pd.DataFrame): 

190 """ 

191 set a data frame 

192 

193 [ 

194 "station", 

195 "run", 

196 "start", 

197 "end", 

198 "mth5_path", 

199 "sample_rate", 

200 "input_channels", 

201 "output_channels", 

202 "remote", 

203 ] 

204 

205 Parameters 

206 ---------- 

207 df : pd.DataFrame 

208 DESCRIPTION 

209 

210 Returns 

211 ------- 

212 TYPE 

213 DESCRIPTION 

214 """ 

215 

216 self.runs = [] 

217 

218 # Handle empty DataFrame case 

219 if df.empty: 

220 return 

221 

222 self.id = df.station.unique()[0] 

223 self.mth5_path = df.mth5_path.unique()[0] 

224 self.remote = df.remote.unique()[0] 

225 

226 for entry in df.itertuples(): 

227 try: 

228 r = self.run_dict[entry.run] 

229 r.time_periods.append( 

230 TimePeriod(start=str(entry.start), end=str(entry.end)) 

231 ) 

232 

233 except KeyError: 

234 if hasattr(entry, "channel_scale_factors"): 

235 channel_scale_factors = entry.channel_scale_factors 

236 else: 

237 channel_scale_factors = {} 

238 r = Run( 

239 id=entry.run, 

240 sample_rate=entry.sample_rate, 

241 input_channels=entry.input_channels, 

242 output_channels=entry.output_channels, 

243 time_periods=[ 

244 TimePeriod(start=str(entry.start), end=str(entry.end)) 

245 ], 

246 ) 

247 r.set_channel_scale_factors(channel_scale_factors) 

248 self.runs.append(r)