Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ station.py: 85%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4from collections import OrderedDict 

5from typing import Annotated 

6 

7import numpy as np 

8from loguru import logger 

9from pydantic import Field, field_validator, model_validator, ValidationInfo 

10from typing_extensions import Self 

11 

12from mt_metadata import NULL_VALUES 

13from mt_metadata.base import MetadataBase 

14from mt_metadata.common import ( 

15 AuthorPerson, 

16 ChannelLayoutEnum, 

17 Comment, 

18 DataTypeEnum, 

19 Fdsn, 

20 Orientation, 

21 Provenance, 

22 StationLocation, 

23 TimePeriod, 

24) 

25from mt_metadata.common.list_dict import ListDict 

26from mt_metadata.timeseries import Run 

27 

28 

29# ===================================================== 

30 

31 

32class Station(MetadataBase): 

33 channel_layout: Annotated[ 

34 ChannelLayoutEnum, 

35 Field( 

36 default=ChannelLayoutEnum.X, 

37 description="How the station channels were laid out.", 

38 alias=None, 

39 json_schema_extra={ 

40 "units": None, 

41 "required": False, 

42 "examples": ["X"], 

43 }, 

44 ), 

45 ] 

46 

47 channels_recorded: Annotated[ 

48 list[str], 

49 Field( 

50 default_factory=list, 

51 description="List of components recorded by the station. Should be a summary of all channels recorded. Dropped channels will be recorded in Run metadata.", 

52 alias=None, 

53 json_schema_extra={ 

54 "units": None, 

55 "required": True, 

56 "examples": ['"[ Ex, Ey, Hx, Hy, Hz, T]"'], 

57 }, 

58 ), 

59 ] 

60 

61 comments: Annotated[ 

62 Comment, 

63 Field( 

64 default_factory=Comment, # type: ignore 

65 description="Any comments on the station.", 

66 alias=None, 

67 json_schema_extra={ 

68 "units": None, 

69 "required": False, 

70 "examples": ["cows chewed cables"], 

71 }, 

72 ), 

73 ] 

74 

75 data_type: Annotated[ 

76 DataTypeEnum, 

77 Field( 

78 default="BBMT", 

79 description="Type of data recorded. If multiple types input as a comma separated list.", 

80 alias=None, 

81 json_schema_extra={ 

82 "units": None, 

83 "required": True, 

84 "examples": ["BBMT"], 

85 }, 

86 ), 

87 ] 

88 

89 fdsn: Annotated[ 

90 Fdsn, 

91 Field( 

92 default_factory=Fdsn, 

93 description="FDSN information for the station.", 

94 alias=None, 

95 json_schema_extra={ 

96 "units": None, 

97 "required": False, 

98 "examples": ["Fdsn()"], 

99 }, 

100 ), 

101 ] 

102 

103 geographic_name: Annotated[ 

104 str, 

105 Field( 

106 default="", 

107 description="Closest geographic name to the station, usually a city, but could be another common geographic location.", 

108 alias=None, 

109 json_schema_extra={ 

110 "units": None, 

111 "required": True, 

112 "examples": ["Whitehorse, YK"], 

113 }, 

114 ), 

115 ] 

116 

117 id: Annotated[ 

118 str, 

119 Field( 

120 default="", 

121 description="Station ID name. This should be an alpha numeric name that is typically 5-6 characters long. Commonly the project name in 2 or 3 letters and the station number.", 

122 alias=None, 

123 pattern="^[a-zA-Z0-9_]*$", 

124 json_schema_extra={ 

125 "units": None, 

126 "required": True, 

127 "examples": ["MT001"], 

128 }, 

129 ), 

130 ] 

131 

132 run_list: Annotated[ 

133 list[str], 

134 Field( 

135 default_factory=list, 

136 description="List of runs recorded by the station. Should be a summary of all runs recorded.", 

137 alias=None, 

138 json_schema_extra={ 

139 "units": None, 

140 "required": True, 

141 "examples": ["[ mt001a, mt001b, mt001c ]"], 

142 }, 

143 ), 

144 ] 

145 

146 location: Annotated[ 

147 StationLocation, 

148 Field( 

149 default_factory=StationLocation, # type: ignore 

150 description="Location of the station.", 

151 alias=None, 

152 json_schema_extra={ 

153 "units": None, 

154 "required": False, 

155 "examples": ["StationLocation(latitude=60.0, longitude=-135.0)"], 

156 }, 

157 ), 

158 ] 

159 

160 orientation: Annotated[ 

161 Orientation, 

162 Field( 

163 default_factory=Orientation, # type: ignore 

164 description="Orientation of the station.", 

165 alias=None, 

166 json_schema_extra={ 

167 "units": None, 

168 "required": False, 

169 "examples": ["Orientation(north=0, east=0, vertical=1)"], 

170 }, 

171 ), 

172 ] 

173 

174 acquired_by: Annotated[ 

175 AuthorPerson, 

176 Field( 

177 default_factory=AuthorPerson, # type: ignore 

178 description="Group or person who acquired the data.", 

179 alias=None, 

180 json_schema_extra={ 

181 "units": None, 

182 "required": False, 

183 "examples": ["Person()"], 

184 }, 

185 ), 

186 ] 

187 

188 provenance: Annotated[ 

189 Provenance, 

190 Field( 

191 default_factory=Provenance, # type: ignore 

192 description="Provenance of the data.", 

193 alias=None, 

194 json_schema_extra={ 

195 "units": None, 

196 "required": False, 

197 "examples": ["Provenance()"], 

198 }, 

199 ), 

200 ] 

201 

202 time_period: Annotated[ 

203 TimePeriod, 

204 Field( 

205 default_factory=TimePeriod, # type: ignore 

206 description="Time period of the data.", 

207 alias=None, 

208 json_schema_extra={ 

209 "units": None, 

210 "required": False, 

211 "examples": ["TimePeriod(start='2020-01-01', end='2020-12-31')"], 

212 }, 

213 ), 

214 ] 

215 

216 runs: Annotated[ 

217 ListDict | list | dict | OrderedDict | tuple, 

218 Field( 

219 default_factory=ListDict, 

220 description="List of runs recorded by the station.", 

221 alias=None, 

222 json_schema_extra={ 

223 "units": None, 

224 "required": False, 

225 "examples": ["[Run(id='mt001a'), Run(id='mt001b'), Run(id='mt001c')]"], 

226 }, 

227 ), 

228 ] 

229 

230 @field_validator("comments", mode="before") 

231 @classmethod 

232 def validate_comments(cls, value, info: ValidationInfo) -> Comment: 

233 if isinstance(value, str): 

234 return Comment(value=value) 

235 return value 

236 

237 @field_validator("channels_recorded", "run_list", mode="before") 

238 @classmethod 

239 def validate_list_of_strings(cls, value, info: ValidationInfo) -> list[str]: 

240 """ 

241 Validate that the value is a list of strings. 

242 """ 

243 if value in NULL_VALUES: 

244 return 

245 

246 if isinstance(value, np.ndarray): 

247 value = value.astype(str).tolist() 

248 

249 elif isinstance(value, (list, tuple)): 

250 value = [str(v) for v in value] 

251 

252 elif isinstance(value, (str)): 

253 value = [v.strip() for v in value.split(",")] 

254 

255 else: 

256 raise TypeError( 

257 "'channels_recorded' must be set with a list of strings not " 

258 f"{type(value)}." 

259 ) 

260 return value 

261 

262 @model_validator(mode="after") 

263 def validate_runs_and_channels_recorded(self) -> Self: 

264 """ 

265 Validate that the value is a list of strings. 

266 """ 

267 

268 # need to make each another object list() otherwise the contents 

269 # get overwritten with the new channel. 

270 if self.run_list != list(self.runs.keys()): 

271 if len(self.run_list) > len(self.runs.keys()): 

272 for run_id in self.run_list: 

273 if run_id not in self.runs.keys(): 

274 self.runs.append(Run(id=run_id)) 

275 else: 

276 self.update_all() 

277 estimate_channels_recorded = self._get_channels_recorded() 

278 if self.channels_recorded != estimate_channels_recorded: 

279 if len(self.channels_recorded) > len(estimate_channels_recorded): 

280 if len(self.runs) > 0: 

281 for channel in self.channels_recorded: 

282 if channel not in estimate_channels_recorded: 

283 self.runs[0].add_channel(channel) 

284 else: 

285 self.update_channels_recorded() 

286 return self 

287 

288 @model_validator(mode="after") 

289 def validate_station_id(self) -> Self: 

290 """ 

291 Validate that the value is a list of strings. 

292 """ 

293 if self.id in NULL_VALUES: 

294 if self.fdsn.id is not None: 

295 # Use object.__setattr__ to avoid triggering validation recursively 

296 object.__setattr__(self, "id", self.fdsn.id) 

297 

298 return self 

299 

300 @field_validator("runs", mode="before") 

301 @classmethod 

302 def validate_runs(cls, value, info: ValidationInfo) -> ListDict: 

303 if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)): 

304 msg = ( 

305 "input runs must be an iterable, should be a list or dict " 

306 f"not {type(value)}" 

307 ) 

308 logger.error(msg) 

309 raise TypeError(msg) 

310 

311 fails = [] 

312 runs = ListDict() 

313 if isinstance(value, (dict, ListDict, OrderedDict)): 

314 value_list = value.values() 

315 

316 elif isinstance(value, (list, tuple)): 

317 value_list = value 

318 

319 for ii, run_entry in enumerate(value_list): 

320 if isinstance(run_entry, (dict, OrderedDict)): 

321 try: 

322 run = Run() 

323 run.from_dict(run_entry) 

324 runs.append(run) 

325 except KeyError: 

326 msg = f"Item {ii} is not type(Run); type={type(run_entry)}" 

327 fails.append(msg) 

328 logger.error(msg) 

329 elif not isinstance(run_entry, (Run)): 

330 msg = f"Item {ii} is not type(Run); type={type(run_entry)}" 

331 fails.append(msg) 

332 logger.error(msg) 

333 else: 

334 runs.append(run_entry) 

335 if len(fails) > 0: 

336 raise TypeError("\n".join(fails)) 

337 

338 return runs 

339 

340 def merge(self, other, inplace=False): 

341 if isinstance(other, Station): 

342 self.runs.extend(other.runs) 

343 self.update_all() 

344 if not inplace: 

345 return self 

346 else: 

347 msg = f"Can only merge Station objects, not {type(other)}" 

348 logger.error(msg) 

349 raise TypeError(msg) 

350 

351 @property 

352 def n_runs(self) -> int: 

353 """ 

354 Return the number of runs in the station. 

355 

356 :return: number of runs in the station 

357 :rtype: int 

358 

359 """ 

360 return len(self.runs) 

361 

362 def has_run(self, run_id): 

363 """ 

364 Check to see if the run id already exists 

365 

366 :param run_id: run id verbatim 

367 :type run_id: string 

368 :return: Tru if exists, False if not 

369 :rtype: boolean 

370 

371 """ 

372 if run_id in self.run_list: 

373 return True 

374 return False 

375 

376 def run_index(self, run_id): 

377 """ 

378 Get the index of the run_id 

379 

380 :param run_id: run id verbatim 

381 :type run_id: string 

382 :return: index of the run 

383 :rtype: integer 

384 

385 """ 

386 

387 if self.has_run(run_id): 

388 return self.run_list.index(run_id) 

389 return None 

390 

391 def _empty_channels_recorded(self): 

392 """ 

393 Empty the channels recorded lists. 

394 """ 

395 self.channels_recorded.clear() 

396 

397 def _empty_run_list(self): 

398 """ 

399 Empty the runs lists. 

400 """ 

401 self.run_list.clear() 

402 

403 def _get_channels_recorded(self) -> list[str]: 

404 """ 

405 Get the channels recorded list. 

406 

407 :return: channels recorded list 

408 :rtype: list[str] 

409 

410 """ 

411 ch_list = [] 

412 for run in self.runs: 

413 ch_list += run.channels_recorded_all 

414 return sorted(set([cc for cc in ch_list if cc is not None])) 

415 

416 def update_channels_recorded(self) -> None: 

417 """ 

418 Update the channels recorded lists based on the channels in the run. 

419 """ 

420 self._empty_channels_recorded() 

421 self.channels_recorded = self._get_channels_recorded() 

422 

423 def update_run_list(self) -> None: 

424 """ 

425 Update the run list based on the runs in the station. 

426 """ 

427 self._empty_run_list() 

428 self.run_list = list(self.runs.keys()) 

429 

430 def update_time_period(self): 

431 """ 

432 update time period from run information 

433 """ 

434 if self.__len__() > 0: 

435 start = [] 

436 end = [] 

437 for run in self.runs: 

438 if run.time_period.start != "1980-01-01T00:00:00+00:00": 

439 start.append(run.time_period.start) 

440 if run.time_period.end != "1980-01-01T00:00:00+00:00": 

441 end.append(run.time_period.end) 

442 if start: 

443 if self.time_period.start == "1980-01-01T00:00:00+00:00": 

444 self.time_period.start = min(start) 

445 else: 

446 if self.time_period.start > min(start): 

447 self.time_period.start = min(start) 

448 if end: 

449 if self.time_period.end == "1980-01-01T00:00:00+00:00": 

450 self.time_period.end = max(end) 

451 else: 

452 if self.time_period.end < max(end): 

453 self.time_period.end = max(end) 

454 

455 def update_all(self): 

456 """ 

457 Update the time period, channels recorded and run list. 

458 

459 """ 

460 self.update_time_period() 

461 # self.update_channels_recorded() 

462 self.update_run_list() 

463 

464 def add_run(self, run_obj, update=True): 

465 """ 

466 Add a run, if one of the same name exists overwrite it. 

467 

468 :param run_obj: run object to add 

469 :type run_obj: :class:`mt_metadata.timeseries.Run` 

470 

471 """ 

472 

473 if not isinstance(run_obj, Run): 

474 raise TypeError( 

475 f"Input must be a mt_metadata.timeseries.Run object not {type(run_obj)}" 

476 ) 

477 

478 if run_obj.id is None: 

479 raise ValueError("The input run id is None. Input a string or integer.") 

480 if self.has_run(run_obj.id): 

481 self.runs[run_obj.id].update(run_obj) 

482 logger.debug(f"Station {run_obj.id} already exists, updating metadata") 

483 else: 

484 self.runs.append(run_obj) 

485 

486 if update: 

487 self.update_all() 

488 

489 def get_run(self, run_id): 

490 """ 

491 Get a :class:`mt_metadata.timeseries.Run` object from the given 

492 id 

493 

494 :param run_id: run id verbatim 

495 :type run_id: string 

496 

497 """ 

498 

499 if self.has_run(run_id): 

500 return self.runs[run_id] 

501 logger.warning(f"Could not find {run_id} in runs.") 

502 return None 

503 

504 def remove_run(self, run_id, update=True): 

505 """ 

506 remove a run from the survey 

507 

508 :param run_id: run id verbatim 

509 :type run_id: string 

510 

511 """ 

512 

513 if self.has_run(run_id): 

514 self.runs.remove(run_id) 

515 if update: 

516 self.update_all() 

517 else: 

518 logger.warning(f"Could not find {run_id} to remove.") 

519 

520 def update_run_keys(self): 

521 """ 

522 Update the keys in the runs ListDict to match current run IDs. 

523 

524 This is useful when run IDs have been modified after runs were 

525 added to the station, ensuring that runs can be accessed by their 

526 current ID values. 

527 

528 :returns: mapping of old keys to new keys 

529 :rtype: dict 

530 

531 Example: 

532 >>> station = Station() 

533 >>> run = Run() 

534 >>> run.id = "" # empty ID initially 

535 >>> station.add_run(run) 

536 >>> run.id = "001" # update the ID 

537 >>> key_mapping = station.update_run_keys() 

538 >>> print(key_mapping) # {'': '001'} 

539 >>> # Now run can be accessed as station.runs['001'] 

540 """ 

541 return self.runs.update_keys() 

542 

543 def sort_runs_by_time(self, inplace=True, ascending=True): 

544 """ 

545 return a list of runs sorted by start time in the order of ascending or 

546 descending. 

547 

548 :param ascending: DESCRIPTION, defaults to True 

549 :type ascending: TYPE, optional 

550 :return: DESCRIPTION 

551 :rtype: TYPE 

552 

553 """ 

554 

555 run_ids = [] 

556 run_starts = [] 

557 for run_key, run_obj in self.runs.items(): 

558 run_ids.append(run_key) 

559 run_starts.append(run_obj.time_period.start.split("+")[0]) 

560 

561 index = np.argsort(np.array(run_starts, dtype=np.datetime64)) 

562 

563 new_runs = ListDict() 

564 for ii in index: 

565 new_runs[run_ids[ii]] = self.runs[run_ids[ii]] 

566 

567 if inplace: 

568 self.runs = new_runs 

569 else: 

570 return new_runs