Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ survey.py: 77%

234 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# ===================================================== 

2# Imports 

3# ===================================================== 

4from collections import OrderedDict 

5from typing import Annotated 

6 

7from loguru import logger 

8from pydantic import computed_field, Field, field_validator, ValidationInfo 

9from pyproj import CRS 

10 

11from mt_metadata.base import MetadataBase 

12from mt_metadata.common import ( 

13 AuthorPerson, 

14 BasicLocationNoDatum, 

15 Citation, 

16 Comment, 

17 Copyright, 

18 Fdsn, 

19 FundingSource, 

20 TimePeriodDate, 

21) 

22from mt_metadata.common.list_dict import ListDict 

23from mt_metadata.timeseries import Station 

24from mt_metadata.timeseries.filters import ( 

25 CoefficientFilter, 

26 FIRFilter, 

27 FrequencyResponseTableFilter, 

28 PoleZeroFilter, 

29 TimeDelayFilter, 

30) 

31 

32 

33# ===================================================== 

34 

35 

36class Survey(MetadataBase): 

37 id: Annotated[ 

38 str, 

39 Field( 

40 default="", 

41 description="Alpha numeric ID that will be unique for archiving.", 

42 alias=None, 

43 pattern=r"^[a-zA-Z0-9_\- ]*$", # Allow empty string (zero or more chars) 

44 json_schema_extra={ 

45 "units": None, 

46 "required": True, 

47 "examples": ["EMT20"], 

48 }, 

49 ), 

50 ] 

51 

52 comments: Annotated[ 

53 Comment, 

54 Field( 

55 default_factory=lambda: Comment(), 

56 description="Any comments about the survey.", 

57 alias=None, 

58 json_schema_extra={ 

59 "units": None, 

60 "required": False, 

61 "examples": ["long survey"], 

62 }, 

63 ), 

64 ] 

65 

66 datum: Annotated[ 

67 str | int, 

68 Field( 

69 default="WGS 84", 

70 description="Datum of latitude and longitude coordinates. Should be a well-known datum, such as WGS84, and will be the reference datum for all locations. This is important for the user, they need to make sure all coordinates in the survey and child items (i.e. stations, channels) are referenced to this datum.", 

71 alias=None, 

72 json_schema_extra={ 

73 "units": None, 

74 "required": True, 

75 "examples": ["WGS 84"], 

76 }, 

77 ), 

78 ] 

79 

80 geographic_name: Annotated[ 

81 str, 

82 Field( 

83 default="", 

84 description="Closest geographic reference to survey, usually a city but could be a landmark or some other common geographic reference point.", 

85 alias=None, 

86 json_schema_extra={ 

87 "units": None, 

88 "required": True, 

89 "examples": ["Yukon"], 

90 }, 

91 ), 

92 ] 

93 

94 name: Annotated[ 

95 str, 

96 Field( 

97 default="", 

98 description="Descriptive name of the survey.", 

99 alias=None, 

100 json_schema_extra={ 

101 "units": None, 

102 "required": True, 

103 "examples": ["MT Characterization of Yukon Terrane"], 

104 }, 

105 ), 

106 ] 

107 

108 project: Annotated[ 

109 str, 

110 Field( 

111 default="", 

112 description="Alpha numeric name for the project e.g USGS-GEOMAG.", 

113 alias=None, 

114 json_schema_extra={ 

115 "units": None, 

116 "required": True, 

117 "examples": ["YUTOO"], 

118 }, 

119 ), 

120 ] 

121 

122 stations: Annotated[ 

123 ListDict | list | dict | OrderedDict | tuple, 

124 Field( 

125 default_factory=ListDict, 

126 description="List of stations recorded in the survey.", 

127 alias=None, 

128 json_schema_extra={ 

129 "units": None, 

130 "required": False, 

131 "examples": ["ListDict[Station(id=id)]"], 

132 }, 

133 ), 

134 ] 

135 

136 filters: Annotated[ 

137 ListDict | list | dict | OrderedDict | tuple, 

138 Field( 

139 default_factory=ListDict, 

140 description="List of filters for channel responses.", 

141 alias=None, 

142 json_schema_extra={ 

143 "units": None, 

144 "required": False, 

145 "examples": ["ListDict[Filter()]"], 

146 }, 

147 ), 

148 ] 

149 

150 summary: Annotated[ 

151 str, 

152 Field( 

153 default="", 

154 description="Summary paragraph of survey including the purpose; difficulties; data quality; summary of outcomes if the data have been processed and modeled.", 

155 alias=None, 

156 json_schema_extra={ 

157 "units": None, 

158 "required": True, 

159 "examples": [ 

160 "long project of characterizing mineral resources in Yukon" 

161 ], 

162 }, 

163 ), 

164 ] 

165 

166 time_period: Annotated[ 

167 TimePeriodDate, 

168 Field( 

169 default_factory=TimePeriodDate, 

170 description="End date of the survey in UTC.", 

171 alias=None, 

172 json_schema_extra={ 

173 "units": None, 

174 "required": True, 

175 "examples": [ 

176 "TimePeriodDate(start_date='2000-01-01', end_date='2000-01-31')" 

177 ], 

178 }, 

179 ), 

180 ] 

181 

182 fdsn: Annotated[ 

183 Fdsn, 

184 Field( 

185 default_factory=Fdsn, 

186 description="FDSN web service information.", 

187 alias=None, 

188 json_schema_extra={ 

189 "units": None, 

190 "required": False, 

191 "examples": ["Fdsn()"], 

192 }, 

193 ), 

194 ] 

195 

196 acquired_by: Annotated[ 

197 AuthorPerson, 

198 Field( 

199 default_factory=AuthorPerson, 

200 description="Person or group that acquired the data.", 

201 alias=None, 

202 json_schema_extra={ 

203 "units": None, 

204 "required": False, 

205 "examples": ["Person()"], 

206 }, 

207 ), 

208 ] 

209 

210 funding_source: Annotated[ 

211 FundingSource, 

212 Field( 

213 default_factory=FundingSource, 

214 description="Funding source for the survey.", 

215 alias=None, 

216 json_schema_extra={ 

217 "units": None, 

218 "required": False, 

219 "examples": ["FundingSource()"], 

220 }, 

221 ), 

222 ] 

223 

224 citation_dataset: Annotated[ 

225 Citation, 

226 Field( 

227 default_factory=Citation, 

228 description="Citation for the dataset.", 

229 alias=None, 

230 json_schema_extra={ 

231 "units": None, 

232 "required": False, 

233 "examples": ["Citation()"], 

234 }, 

235 ), 

236 ] 

237 

238 citation_journal: Annotated[ 

239 Citation, 

240 Field( 

241 default_factory=Citation, 

242 description="Citation for the journal.", 

243 alias=None, 

244 json_schema_extra={ 

245 "units": None, 

246 "required": False, 

247 "examples": ["Citation()"], 

248 }, 

249 ), 

250 ] 

251 

252 northwest_corner: Annotated[ 

253 BasicLocationNoDatum, 

254 Field( 

255 default_factory=BasicLocationNoDatum, 

256 description="Northwest corner of the survey area.", 

257 alias=None, 

258 json_schema_extra={ 

259 "units": "degrees", 

260 "required": False, 

261 "examples": ["BasicLocationNoDatum()"], 

262 }, 

263 ), 

264 ] 

265 

266 southeast_corner: Annotated[ 

267 BasicLocationNoDatum, 

268 Field( 

269 default_factory=BasicLocationNoDatum, 

270 description="Southeast corner of the survey area.", 

271 alias=None, 

272 json_schema_extra={ 

273 "units": "degrees", 

274 "required": False, 

275 "examples": ["BasicLocationNoDatum()"], 

276 }, 

277 ), 

278 ] 

279 

280 country: Annotated[ 

281 list[str] | str | None, 

282 Field( 

283 default=None, 

284 description="Country where the survey was conducted.", 

285 alias=None, 

286 json_schema_extra={ 

287 "units": None, 

288 "required": False, 

289 "examples": ["Canada"], 

290 }, 

291 ), 

292 ] 

293 

294 state: Annotated[ 

295 list[str] | str | None, 

296 Field( 

297 default=None, 

298 description="State or province where the survey was conducted.", 

299 alias=None, 

300 json_schema_extra={ 

301 "units": None, 

302 "required": False, 

303 "examples": ["Yukon"], 

304 }, 

305 ), 

306 ] 

307 

308 project_lead: Annotated[ 

309 AuthorPerson, 

310 Field( 

311 default_factory=AuthorPerson, 

312 description="Person or group that led the project.", 

313 alias=None, 

314 json_schema_extra={ 

315 "units": None, 

316 "required": False, 

317 "examples": ["Person()"], 

318 }, 

319 ), 

320 ] 

321 

322 release_license: Annotated[ 

323 str, 

324 Field( 

325 default="CC-BY-4.0", 

326 description="Release license for the data.", 

327 alias=None, 

328 json_schema_extra={ 

329 "units": None, 

330 "required": True, 

331 "examples": ["CC-BY-4.0"], 

332 }, 

333 ), 

334 ] 

335 

336 @field_validator("comments", mode="before") 

337 @classmethod 

338 def validate_comments(cls, value, info: ValidationInfo) -> Comment: 

339 if isinstance(value, str): 

340 return Comment(value=value) 

341 return value 

342 

343 @field_validator("datum", mode="before") 

344 @classmethod 

345 def validate_datum(cls, value: str | int) -> str: 

346 """ 

347 Validate the datum value and convert it to the appropriate enum type. 

348 """ 

349 try: 

350 datum_crs = CRS.from_user_input(value) 

351 return datum_crs.name 

352 except Exception: 

353 raise ValueError( 

354 f"Invalid datum value: {value}. Must be a valid CRS string or identifier." 

355 ) 

356 

357 @field_validator("release_license", mode="before") 

358 @classmethod 

359 def validate_release_license(cls, value: str, info: ValidationInfo) -> str: 

360 """ 

361 Validate that the value is a valid license. 

362 """ 

363 if isinstance(value, str): 

364 copyright_object = Copyright(release_license=value) 

365 return copyright_object.release_license 

366 

367 @field_validator("country", "state", mode="before") 

368 @classmethod 

369 def validate_areas(cls, value) -> list[str]: 

370 """validate country and state to be a list""" 

371 if isinstance(value, str): 

372 return [item.strip() for item in value.split(",")] 

373 elif isinstance(value, (list, tuple)): 

374 return list(value) 

375 elif value == None: 

376 return None 

377 else: 

378 raise TypeError(f"Cannot make a list from types {type(value)}.") 

379 

380 @field_validator("stations", mode="before") 

381 @classmethod 

382 def validate_stations(cls, value, info: ValidationInfo) -> ListDict: 

383 if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)): 

384 msg = ( 

385 "input stations must be an iterable, should be a list or dict " 

386 f"not {type(value)}" 

387 ) 

388 logger.error(msg) 

389 raise TypeError(msg) 

390 

391 fails = [] 

392 stations = ListDict() 

393 if isinstance(value, (dict, ListDict, OrderedDict)): 

394 value_list = value.values() 

395 

396 elif isinstance(value, (list, tuple)): 

397 value_list = value 

398 

399 for ii, station_entry in enumerate(value_list): 

400 if isinstance(station_entry, (dict, OrderedDict)): 

401 try: 

402 station = Station() 

403 station.from_dict(station_entry) 

404 stations.append(station) 

405 except KeyError: 

406 msg = f"Item {ii} is not type(Station); type={type(station_entry)}" 

407 fails.append(msg) 

408 logger.error(msg) 

409 elif not isinstance(station_entry, (Station)): 

410 msg = f"Item {ii} is not type(Run); type={type(station_entry)}" 

411 fails.append(msg) 

412 logger.error(msg) 

413 else: 

414 stations.append(station_entry) 

415 if len(fails) > 0: 

416 raise TypeError("\n".join(fails)) 

417 

418 return stations 

419 

420 @field_validator("filters", mode="before") 

421 @classmethod 

422 def validate_filters( 

423 cls, value: str | list | ListDict, info: ValidationInfo 

424 ) -> ListDict: 

425 """ 

426 

427 Parameters 

428 ---------- 

429 value : _type_ 

430 _description_ 

431 info : ValidationInfo 

432 _description_ 

433 

434 Returns 

435 ------- 

436 ListDict 

437 _description_ 

438 """ 

439 filters = ListDict() 

440 fails = [] 

441 if value is None: 

442 return 

443 

444 if isinstance(value, list): 

445 if len(value) > 0: 

446 for ff in value: 

447 if isinstance(ff, (dict, OrderedDict, ListDict)): 

448 f_type = ff["type"] 

449 if f_type is None: 

450 msg = ( 

451 "filter type is None do not know how to read the filter" 

452 ) 

453 fails.append(msg) 

454 logger.error(msg) 

455 if f_type.lower() in ["zpk"]: 

456 f = PoleZeroFilter() 

457 elif f_type.lower() in ["coefficient"]: 

458 f = CoefficientFilter() 

459 elif f_type.lower() in ["time delay"]: 

460 f = TimeDelayFilter() 

461 elif f_type.lower() in ["fir"]: 

462 f = FIRFilter() 

463 elif f_type.lower() in ["frequency response table"]: 

464 f = FrequencyResponseTableFilter() 

465 else: 

466 msg = f"filter type {f_type} not supported." 

467 fails.append(msg) 

468 logger.error(msg) 

469 

470 f.from_dict(ff) 

471 filters[f.name] = f 

472 elif isinstance( 

473 ff, 

474 ( 

475 PoleZeroFilter, 

476 CoefficientFilter, 

477 FrequencyResponseTableFilter, 

478 TimeDelayFilter, 

479 FIRFilter, 

480 ), 

481 ): 

482 filters[ff.name] = ff 

483 else: 

484 msg = f"Item {ff} is not Filter type; type={type(ff)}" 

485 fails.append(msg) 

486 logger.error(msg) 

487 

488 elif not isinstance(value, (dict, OrderedDict, ListDict)): 

489 msg = ( 

490 "Filters must be a dictionary with keys = names of filters, " 

491 f"not {type(value)}" 

492 ) 

493 logger.error(msg) 

494 raise TypeError(msg) 

495 else: 

496 for k, v in value.items(): 

497 if not isinstance( 

498 v, 

499 ( 

500 PoleZeroFilter, 

501 CoefficientFilter, 

502 TimeDelayFilter, 

503 FrequencyResponseTableFilter, 

504 FIRFilter, 

505 ), 

506 ): 

507 msg = f"Item {k} is not Filter type; type={type(v)}" 

508 fails.append(msg) 

509 logger.error(msg) 

510 else: 

511 filters[k.lower()] = v 

512 if len(fails) > 0: 

513 raise TypeError("\n".join(fails)) 

514 

515 return filters 

516 

517 @computed_field 

518 @property 

519 def survey_extent(self) -> dict: 

520 """ 

521 Return the survey extent as a dictionary with keys 'northwest' and 'southeast'. 

522 """ 

523 return { 

524 "latitude": { 

525 "min": self.southeast_corner.latitude, 

526 "max": self.northwest_corner.latitude, 

527 }, 

528 "longitude": { 

529 "min": self.northwest_corner.longitude, 

530 "max": self.southeast_corner.longitude, 

531 }, 

532 } 

533 

534 def merge(self, other: "Survey", inplace=False) -> "Survey": 

535 """ 

536 Merge surveys together using the original metadata but adding other's stations. 

537 

538 Parameters 

539 ---------- 

540 other : Survey 

541 Survey object 

542 inplace : bool, optional 

543 merge in place, by default False 

544 

545 Returns 

546 ------- 

547 Survey 

548 merged surveys 

549 

550 Raises 

551 ------ 

552 TypeError 

553 If items cannot be merged. 

554 """ 

555 if isinstance(other, Survey): 

556 self.stations.extend(other.stations) 

557 self.update_all() 

558 if not inplace: 

559 return self 

560 else: 

561 msg = f"Can only merge Survey objects, not {type(other)}" 

562 logger.error(msg) 

563 raise TypeError(msg) 

564 

565 @property 

566 def n_stations(self) -> int: 

567 """ 

568 Return the number of stations in the station. 

569 

570 :return: number of runs in the station 

571 :rtype: int 

572 

573 """ 

574 return len(self.stations) 

575 

576 @property 

577 def station_names(self): 

578 """Return names of station in survey""" 

579 return self.stations.keys() 

580 

581 @property 

582 def filter_names(self): 

583 """return a list of filter names""" 

584 return list(self.filters.keys()) 

585 

586 def has_station(self, station_id): 

587 """ 

588 Has station id 

589 

590 :param station_id: station id verbatim 

591 :type station_id: string 

592 :return: True if exists or False if not 

593 :rtype: boolean 

594 

595 """ 

596 if station_id in self.station_names: 

597 return True 

598 return False 

599 

600 def station_index(self, station_id): 

601 """ 

602 Get station index 

603 

604 :param station_id: station id verbatim 

605 :type station_id: string 

606 :return: index value if station is found 

607 :rtype: integer 

608 

609 """ 

610 

611 if self.has_station(station_id): 

612 return self.station_names.index(station_id) 

613 return None 

614 

615 def add_station(self, station_obj, update=True): 

616 """ 

617 Add a station, if has the same name update that object. 

618 

619 :param station_obj: station object to add 

620 :type station_obj: `:class:`mt_metadata.timeseries.Station` 

621 

622 """ 

623 

624 if not isinstance(station_obj, Station): 

625 raise TypeError( 

626 f"Input must be a mt_metadata.timeseries.Station object not {type(station_obj)}" 

627 ) 

628 

629 if self.has_station(station_obj.id): 

630 self.stations[station_obj.id].update(station_obj) 

631 logger.warning( 

632 f"Station {station_obj.id} already exists, updating metadata" 

633 ) 

634 else: 

635 self.stations.append(station_obj) 

636 

637 if update: 

638 self.update_bounding_box() 

639 self.update_time_period() 

640 self.update_station_keys() 

641 

642 def get_station(self, station_id): 

643 """ 

644 Get a station from the station id 

645 

646 :param station_id: station id verbatim 

647 :type station_id: string 

648 :return: station object 

649 :rtype: :class:`mt_metadata.timeseries.Station` 

650 

651 """ 

652 

653 if self.has_station(station_id): 

654 return self.stations[station_id] 

655 else: 

656 logger.warning(f"Could not find station {station_id}") 

657 return None 

658 

659 def remove_station(self, station_id, update=True): 

660 """ 

661 remove a station from the survey 

662 

663 :param station_id: station id verbatim 

664 :type station_id: string 

665 

666 """ 

667 

668 if self.has_station(station_id): 

669 self.stations.remove(station_id) 

670 if update: 

671 self.update_bounding_box() 

672 self.update_time_period() 

673 else: 

674 logger.warning(f"Could not find {station_id} to remove.") 

675 

676 def update_station_keys(self): 

677 """ 

678 Update the keys in the stations ListDict to match current station IDs. 

679 

680 This is useful when station IDs have been modified after stations were 

681 added to the survey, ensuring that stations can be accessed by their 

682 current ID values. 

683 

684 :returns: mapping of old keys to new keys 

685 :rtype: dict 

686 

687 Example: 

688 >>> survey = Survey() 

689 >>> station = Station() 

690 >>> station.id = "" # empty ID initially 

691 >>> survey.add_station(station) 

692 >>> station.id = "MT001" # update the ID 

693 >>> key_mapping = survey.update_station_keys() 

694 >>> print(key_mapping) # {'': 'MT001'} 

695 >>> # Now station can be accessed as survey.stations['MT001'] 

696 """ 

697 return self.stations.update_keys() 

698 

699 def update_bounding_box(self): 

700 """ 

701 Update the bounding box of the survey from the station information 

702 

703 """ 

704 if self.n_stations > 0: 

705 lat = [] 

706 lon = [] 

707 for station in self.stations: 

708 if station.location.latitude is not None: 

709 lat.append(station.location.latitude) 

710 if station.location.longitude is not None: 

711 lon.append(station.location.longitude) 

712 

713 if not len(lat) == 0: 

714 self.southeast_corner.latitude = min(lat) 

715 self.northwest_corner.latitude = max(lat) 

716 if not len(lon) == 0: 

717 self.southeast_corner.longitude = max(lon) 

718 self.northwest_corner.longitude = min(lon) 

719 

720 def update_time_period(self): 

721 """ 

722 Update the start and end time of the survey based on the stations 

723 """ 

724 if self.__len__() > 0: 

725 start = [] 

726 end = [] 

727 for station in self.stations: 

728 if not station.time_period.start_is_default(): 

729 start.append(station.time_period.start) 

730 if not station.time_period.end_is_default(): 

731 end.append(station.time_period.end) 

732 

733 if start: 

734 if self.time_period.start_is_default(): 

735 self.time_period.start_date = min(start) 

736 else: 

737 if self.time_period.start_date > min(start): 

738 self.time_period.start_date = min(start) 

739 

740 if end: 

741 if self.time_period.end_is_default(): 

742 self.time_period.end_date = max(end) 

743 else: 

744 if self.time_period.end_date < max(end): 

745 self.time_period.end_date = max(end) 

746 

747 def update_all(self): 

748 """ 

749 Update time period and bounding box 

750 """ 

751 self.update_time_period() 

752 self.update_bounding_box()