Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ processing \ kernel_dataset.py: 75%

448 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1""" 

2Magnetotelluric kernel dataset processing module. 

3 

4This module contains a class for representing a dataset that can be processed. 

5 

6The module provides functionality for: 

7- Managing magnetotelluric time series intervals 

8- Supporting single station and remote reference processing 

9- Handling run combination and time interval restrictions 

10- Interfacing with MTH5 data structures 

11 

12Development Notes 

13----------------- 

14Players on the stage: One or more mth5s. 

15 

16Each mth5 has a "run_summary" dataframe available. Run_summary provides options for 

17the local and possibly remote reference stations. Candidates for local station are 

18the unique values in the station column. 

19 

20For any candidate station, there are some integer n runs available. 

21This yields 2^n - 1 possible combinations that can be processed, neglecting any 

22flagging of time intervals within any run, or any joining of runs. 

23(There are actually 2**n, but we ignore the empty set, so -1) 

24 

25Intuition suggests default ought to be to process n runs in n+1 configurations: 

26{all runs} + each run individually. This will give a bulk answer, and bad runs can 

27be flagged by comparing them. After an initial processing, the tfs can be reviewed 

28and the problematic runs can be addressed. 

29 

30The user can interact with the run_summary_df, selecting sub dataframes via querying, 

31and in future maybe via some GUI (or a spreadsheet). 

32 

33The intended usage process is as follows: 

34 0. Start with a list of mth5s 

35 1. Extract a run_summary 

36 2. Stare at the run_summary_df, and select a station "S" to process 

37 3. Select a non-empty set of runs for station "S" 

38 4. Select a remote reference "RR", (this is allowed to be None) 

39 5. Extract the sub-dataframe corresponding to acquisition_runs from "S" and "RR" 

40 7. If the remote is not None: 

41 - Drop the runs (rows) associated with RR that do not intersect with S 

42 - Restrict start/end times of RR runs that intersect with S so overlap is complete. 

43 - Restrict start/end times of S runs so that they intersect with remote 

44 8. This is now a TFKernel Dataset Definition (ish). Initialize a default processing 

45 object and pass it this df. 

46 

47Examples 

48-------- 

49>>> cc = ConfigCreator() 

50>>> p = cc.create_from_kernel_dataset(kernel_dataset) 

51- Optionally pass emtf_band_file=emtf_band_setup_file 

52 9. Edit the Processing Config appropriately, 

53 

54TODO: Consider supporting a default value for 'channel_scale_factors' that is None, 

55TODO: Might need to groupby survey & station, for now consider station_id unique. 

56""" 

57 

58from __future__ import annotations 

59 

60import copy 

61 

62# ============================================================================= 

63# Imports 

64# ============================================================================= 

65from pathlib import Path 

66from typing import Any 

67 

68import mt_metadata.timeseries 

69import pandas as pd 

70from loguru import logger 

71from mt_metadata.common.list_dict import ListDict 

72from mt_metadata.timeseries import Survey 

73from mt_metadata.transfer_functions.tf import Station 

74 

75import mth5.timeseries.run_ts 

76from mth5.mth5 import MTH5 

77from mth5.processing import KERNEL_DATASET_DTYPE, MINI_SUMMARY_COLUMNS 

78from mth5.processing.run_summary import RunSummary 

79from mth5.utils.helpers import initialize_mth5 

80 

81 

82# ============================================================================= 

83 

84 

85class KernelDataset: 

86 """ 

87 Magnetotelluric kernel dataset for time series processing. 

88 

89 This class works with mth5-derived channel_summary or run_summary dataframes 

90 that specify time series intervals. It manages acquisition "runs" that can be 

91 merged into processing runs, with support for both single station and remote 

92 reference processing configurations. 

93 

94 Parameters 

95 ---------- 

96 df : pd.DataFrame | None, optional 

97 Pre-formed dataframe with dataset configuration. Normally built from a 

98 run_summary, by default None 

99 local_station_id : str, optional 

100 Local station identifier for the dataset. Normally passed via 

101 from_run_summary method, by default "" 

102 remote_station_id : str | None, optional 

103 Remote reference station identifier. Normally passed via from_run_summary 

104 method, by default None 

105 **kwargs : dict 

106 Additional keyword arguments to set as attributes 

107 

108 Attributes 

109 ---------- 

110 df : pd.DataFrame | None 

111 Main dataset dataframe with time series intervals 

112 local_station_id : str | None 

113 Local station identifier 

114 remote_station_id : str | None 

115 Remote reference station identifier 

116 survey_metadata : dict 

117 Survey metadata container 

118 initialized : bool 

119 Whether MTH5 objects have been initialized 

120 local_mth5_obj : Any | None 

121 Local station MTH5 object 

122 remote_mth5_obj : Any | None 

123 Remote station MTH5 object 

124 

125 Notes 

126 ----- 

127 The class is closely related to (may actually be an extension of) RunSummary. 

128 The main idea is to specify one or two stations, and a list of acquisition "runs" 

129 that can be merged into a "processing run". Each acquisition run can be further 

130 divided into non-overlapping chunks by specifying time-intervals associated with 

131 that acquisition run. 

132 

133 The time intervals can be used for several purposes but primarily: 

134 - STFT processing for merged FC data structures 

135 - Binding together into xarray time series for gap filling 

136 - Managing and analyzing availability of reference time series 

137 

138 Examples 

139 -------- 

140 Create a kernel dataset from run summary: 

141 

142 >>> from mth5.processing.run_summary import RunSummary 

143 >>> run_summary = RunSummary() 

144 >>> dataset = KernelDataset() 

145 >>> dataset.from_run_summary(run_summary, "station01", "station02") 

146 

147 Process single station data: 

148 

149 >>> single_dataset = KernelDataset() 

150 >>> single_dataset.from_run_summary(run_summary, "station01") 

151 

152 See Also 

153 -------- 

154 RunSummary : Data summary for processing configuration 

155 """ 

156 

157 def __init__( 

158 self, 

159 df: pd.DataFrame | None = None, 

160 local_station_id: str = "", 

161 remote_station_id: str | None = None, 

162 **kwargs: Any, 

163 ) -> None: 

164 """ 

165 Initialize KernelDataset instance. 

166 

167 Parameters 

168 ---------- 

169 df : pd.DataFrame | None, optional 

170 Pre-formed dataframe with dataset configuration, by default None 

171 local_station_id : str, optional 

172 Local station identifier, by default "" 

173 remote_station_id : str | None, optional 

174 Remote reference station identifier, by default None 

175 **kwargs : dict 

176 Additional keyword arguments to set as attributes 

177 """ 

178 self.df = df 

179 self.local_station_id = local_station_id 

180 self.remote_station_id = remote_station_id 

181 self._mini_summary_columns = MINI_SUMMARY_COLUMNS 

182 self.survey_metadata: dict[str, Any] = {} 

183 self.initialized: bool = False 

184 self.local_mth5_obj: Any = None 

185 self.remote_mth5_obj: Any = None 

186 self._local_mth5_path: Path | None = None 

187 self._remote_mth5_path: Path | None = None 

188 

189 for key, value in kwargs.items(): 

190 setattr(self, key, value) 

191 

192 def __str__(self) -> str: 

193 """ 

194 Return string representation of the dataset. 

195 

196 Returns 

197 ------- 

198 str 

199 String representation showing mini summary 

200 """ 

201 return str(self.mini_summary) 

202 

203 def __repr__(self) -> str: 

204 """ 

205 Return detailed string representation. 

206 

207 Returns 

208 ------- 

209 str 

210 Detailed string representation 

211 """ 

212 return self.__str__() 

213 

214 @property 

215 def df(self) -> pd.DataFrame | None: 

216 """ 

217 Main dataset dataframe. 

218 

219 Returns 

220 ------- 

221 pd.DataFrame | None 

222 Dataset dataframe with time series intervals, or None if not set 

223 """ 

224 return self._df 

225 

226 @df.setter 

227 def df(self, value: pd.DataFrame | None) -> None: 

228 """ 

229 Set the dataset dataframe with proper validation. 

230 

231 Parameters 

232 ---------- 

233 value : pd.DataFrame | None 

234 Dataframe to set, or None to clear 

235 

236 Raises 

237 ------ 

238 TypeError 

239 If value is not a DataFrame or None 

240 """ 

241 if value is None: 

242 self._df = None 

243 return 

244 

245 if not isinstance(value, pd.DataFrame): 

246 msg = f"Need to set df with a Pandas.DataFrame not type({type(value)})" 

247 logger.error(msg) 

248 raise TypeError(msg) 

249 

250 self._df = self._add_duration_column( 

251 self._set_datetime_columns(self._add_columns(value)), inplace=False 

252 ) 

253 

254 def _has_df(self) -> bool: 

255 """ 

256 Check if dataframe is set and not empty. 

257 

258 Returns 

259 ------- 

260 bool 

261 True if dataframe exists and is not empty 

262 """ 

263 if self._df is not None: 

264 if not self._df.empty: 

265 return True 

266 return False 

267 return False 

268 

269 def _df_has_local_station_id(self, df: pd.DataFrame) -> bool: 

270 """ 

271 Check if dataframe contains the local station ID. 

272 

273 Parameters 

274 ---------- 

275 df : pd.DataFrame 

276 Dataframe to check 

277 

278 Returns 

279 ------- 

280 bool 

281 True if local station ID exists in dataframe 

282 """ 

283 return (df.station == self.local_station_id).any() 

284 

285 def _df_has_remote_station_id(self, df: pd.DataFrame) -> bool: 

286 """ 

287 Check if dataframe contains the remote station ID. 

288 

289 Parameters 

290 ---------- 

291 df : pd.DataFrame 

292 Dataframe to check 

293 

294 Returns 

295 ------- 

296 bool 

297 True if remote station ID exists in dataframe 

298 """ 

299 return (df.station == self.remote_station_id).any() 

300 

301 def _set_datetime_columns(self, df: pd.DataFrame) -> pd.DataFrame: 

302 """ 

303 Ensure start and end columns are datetime objects. 

304 

305 Parameters 

306 ---------- 

307 df : pd.DataFrame 

308 Input dataframe 

309 

310 Returns 

311 ------- 

312 pd.DataFrame 

313 Dataframe with datetime columns properly set 

314 """ 

315 try: 

316 df.start = pd.to_datetime(df.start, format="mixed") 

317 df.end = pd.to_datetime(df.end, format="mixed") 

318 except ValueError: 

319 df.start = pd.to_datetime(df.start) 

320 df.end = pd.to_datetime(df.end) 

321 

322 return df 

323 

324 def clone(self) -> "KernelDataset": 

325 """ 

326 Create a deep copy of the dataset. 

327 

328 Returns 

329 ------- 

330 KernelDataset 

331 Deep copy of this instance 

332 """ 

333 return copy.deepcopy(self) 

334 

335 def clone_dataframe(self) -> pd.DataFrame | None: 

336 """ 

337 Create a deep copy of the dataframe. 

338 

339 Returns 

340 ------- 

341 pd.DataFrame | None 

342 Deep copy of the dataframe, or None if dataframe is not set 

343 """ 

344 return copy.deepcopy(self.df) 

345 

346 def _add_columns( 

347 self, 

348 df: pd.DataFrame, 

349 null_columns: list[str] | tuple[str, ...] = ("fc",), 

350 ) -> pd.DataFrame: 

351 """ 

352 Add missing columns with appropriate dtypes. 

353 

354 Parameters 

355 ---------- 

356 df : pd.DataFrame 

357 Kernel dataset dataframe, possibly missing some columns 

358 null_columns : list[str] | tuple[str, ...], optional 

359 Columns that will initialize to null rather than their expected dtype, 

360 by default ("fc",) 

361 

362 Returns 

363 ------- 

364 pd.DataFrame 

365 Kernel dataset dataframe with all required columns present 

366 

367 Raises 

368 ------ 

369 ValueError 

370 If required columns (survey, station, run, start, end) are missing 

371 """ 

372 for col, dtype in KERNEL_DATASET_DTYPE: 

373 if col not in df.columns: 

374 if col in ["survey", "station", "run", "start", "end"]: 

375 raise ValueError(f"{col} must be a filled column in the dataframe") 

376 

377 try: 

378 df[col] = dtype(0) 

379 assigned_dtype = dtype 

380 except TypeError: 

381 df[col] = None # TODO: update to pd.NA 

382 assigned_dtype = type(None) 

383 

384 if col in null_columns: 

385 df[col] = pd.NA 

386 assigned_dtype = type(pd.NA) 

387 

388 msg = ( 

389 f"KernelDataset DataFrame needs column {col}, adding and " 

390 f"setting dtype to {assigned_dtype}." 

391 ) 

392 logger.debug(msg) 

393 

394 return df 

395 

396 @property 

397 def local_station_id(self) -> str | None: 

398 """ 

399 Local station identifier. 

400 

401 Returns 

402 ------- 

403 str | None 

404 Local station identifier 

405 """ 

406 return self._local_station_id 

407 

408 @local_station_id.setter 

409 def local_station_id(self, value: str | None) -> None: 

410 """ 

411 Set local station identifier. 

412 

413 Parameters 

414 ---------- 

415 value : str | None 

416 Station identifier to set 

417 

418 Raises 

419 ------ 

420 ValueError 

421 If value cannot be converted to string 

422 NameError 

423 If station ID is not found in dataframe when dataframe exists 

424 """ 

425 if value is None: 

426 self._local_station_id = None 

427 else: 

428 try: 

429 self._local_station_id = str(value) 

430 except ValueError: 

431 raise ValueError( 

432 f"Bad type {type(value)}. " 

433 "Cannot convert local_station_id value to string." 

434 ) 

435 if self._has_df() and self.df is not None: 

436 if not self._df_has_local_station_id(self.df): 

437 raise NameError( 

438 f"Could not find {self._local_station_id} in dataframe" 

439 ) 

440 

441 @property 

442 def local_mth5_path(self) -> Path | None: 

443 """ 

444 Local station MTH5 file path. 

445 

446 Returns 

447 ------- 

448 Path | None 

449 Path to local station MTH5 file, extracted from dataframe or 

450 stored path, or None if not available 

451 """ 

452 if self._has_df() and self._df is not None: 

453 unique_paths = self._df.loc[ 

454 self._df.station == self.local_station_id, "mth5_path" 

455 ].unique() 

456 if len(unique_paths) > 0: 

457 return Path(unique_paths[0]) 

458 return None 

459 else: 

460 return self._local_mth5_path 

461 

462 @local_mth5_path.setter 

463 def local_mth5_path(self, value: str | Path | None) -> None: 

464 """ 

465 Set local MTH5 path. 

466 

467 Parameters 

468 ---------- 

469 value : str | Path | None 

470 Path to MTH5 file 

471 """ 

472 self._local_mth5_path = self.set_path(value) 

473 

474 def has_local_mth5(self) -> bool: 

475 """ 

476 Check if local MTH5 file exists. 

477 

478 Returns 

479 ------- 

480 bool 

481 True if local MTH5 file exists on filesystem 

482 """ 

483 if self.local_mth5_path is None: 

484 return False 

485 else: 

486 return self.local_mth5_path.exists() 

487 

488 @property 

489 def remote_station_id(self) -> str | None: 

490 """ 

491 Remote reference station identifier. 

492 

493 Returns 

494 ------- 

495 str | None 

496 Remote station identifier 

497 """ 

498 return self._remote_station_id 

499 

500 @remote_station_id.setter 

501 def remote_station_id(self, value: str | None) -> None: 

502 """ 

503 Set remote station identifier. 

504 

505 Parameters 

506 ---------- 

507 value : str | None 

508 Remote station identifier 

509 

510 Raises 

511 ------ 

512 ValueError 

513 If value cannot be converted to string 

514 NameError 

515 If station ID is not found in dataframe when dataframe exists 

516 """ 

517 if value is None: 

518 self._remote_station_id = None 

519 else: 

520 try: 

521 self._remote_station_id = str(value) 

522 except ValueError: 

523 raise ValueError( 

524 f"Bad type {type(value)}. " 

525 "Cannot convert remote_station_id value to string." 

526 ) 

527 if self._has_df(): 

528 if not self._df_has_remote_station_id(self.df): 

529 raise NameError( 

530 f"Could not find {self._remote_station_id} in dataframe" 

531 ) 

532 

533 @property 

534 def remote_mth5_path(self) -> Path: 

535 """Remote mth5 path. 

536 :return: Remote station MTH5 path, a property extracted from the dataframe. 

537 :rtype: Path 

538 """ 

539 if self._has_df() and self.remote_station_id is not None: 

540 return Path( 

541 self._df.loc[ 

542 self._df.station == self.remote_station_id, "mth5_path" 

543 ].unique()[0] 

544 ) 

545 else: 

546 return self._remote_mth5_path 

547 

548 @remote_mth5_path.setter 

549 def remote_mth5_path(self, value: str | Path | None): 

550 """ 

551 Set the remote mth5 path. 

552 

553 Parameters 

554 ---------- 

555 value : str | Path | None 

556 Path to the remote mth5 file 

557 """ 

558 self._remote_mth5_path = self.set_path(value) 

559 

560 def has_remote_mth5(self) -> bool: 

561 """Test if remote mth5 exists.""" 

562 if self.remote_mth5_path is None: 

563 return False 

564 else: 

565 return self.remote_mth5_path.exists() 

566 

567 @property 

568 def processing_id(self) -> str: 

569 """Its difficult to come up with unique ids without crazy long names 

570 so this is a generic id of local-remote, the station metadata 

571 will have run information and the config parameters. 

572 """ 

573 if self.remote_station_id is not None: 

574 return ( 

575 f"{self.local_station_id}_rr_{self.remote_station_id}_" 

576 f"sr{int(self.sample_rate)}" 

577 ) 

578 else: 

579 return f"{self.local_station_id}_sr{int(self.sample_rate)}" 

580 

581 @property 

582 def input_channels(self) -> list[str]: 

583 """ 

584 Get input channels from dataframe. 

585 

586 Returns 

587 ------- 

588 list[str] 

589 Input channel identifiers (sources) 

590 

591 Raises 

592 ------ 

593 AttributeError 

594 If dataframe is not available or local_df has no input_channels 

595 """ 

596 if self._has_df() and self.df is not None: 

597 local_data = self.local_df 

598 if local_data is not None and not local_data.empty: 

599 return local_data.input_channels.iat[0] 

600 return [] 

601 

602 @property 

603 def output_channels(self) -> list[str]: 

604 """ 

605 Get output channels from dataframe. 

606 

607 Returns 

608 ------- 

609 list[str] 

610 Output channel identifiers 

611 

612 Raises 

613 ------ 

614 AttributeError 

615 If dataframe is not available or local_df has no output_channels 

616 """ 

617 if self._has_df() and self.df is not None: 

618 local_data = self.local_df 

619 if local_data is not None and not local_data.empty: 

620 return local_data.output_channels.iat[0] 

621 return [] 

622 

623 @property 

624 def remote_channels(self) -> list[str]: 

625 """ 

626 Get remote reference channels from dataframe. 

627 

628 Returns 

629 ------- 

630 list[str] 

631 Remote reference channel identifiers 

632 

633 Raises 

634 ------ 

635 AttributeError 

636 If dataframe is not available or remote_df has no remote_channels 

637 """ 

638 if ( 

639 self._has_df() 

640 and self.df is not None 

641 and self.remote_station_id is not None 

642 ): 

643 remote_data = self.remote_df 

644 if remote_data is not None and not remote_data.empty: 

645 return remote_data.input_channels.iat[0] 

646 return [] 

647 

648 @property 

649 def local_df(self) -> pd.DataFrame | None: 

650 """ 

651 Get dataframe subset for local station runs. 

652 

653 Returns 

654 ------- 

655 pd.DataFrame | None 

656 Local station runs data, or None if dataframe not available 

657 """ 

658 if self._has_df() and self.df is not None: 

659 return self.df[self.df.station == self.local_station_id] 

660 return None 

661 

662 @property 

663 def remote_df(self) -> pd.DataFrame | None: 

664 """ 

665 Get dataframe subset for remote station runs. 

666 

667 Returns 

668 ------- 

669 pd.DataFrame | None 

670 Remote station runs data, or None if dataframe not available 

671 or no remote station configured 

672 """ 

673 if ( 

674 self._has_df() 

675 and self.df is not None 

676 and self.remote_station_id is not None 

677 ): 

678 return self.df[self.df.station == self.remote_station_id] 

679 return None 

680 

681 @classmethod 

682 def set_path(cls, value: str | Path | None) -> Path | None: 

683 """ 

684 Set and validate a file path. 

685 

686 Parameters 

687 ---------- 

688 value : str | Path | None 

689 Path value to set and validate 

690 

691 Returns 

692 ------- 

693 Path | None 

694 Validated Path object, or None if input is None 

695 

696 Raises 

697 ------ 

698 IOError 

699 If path does not exist on filesystem 

700 ValueError 

701 If value cannot be converted to Path 

702 """ 

703 if value is None: 

704 return None 

705 

706 if isinstance(value, (str, Path)): 

707 return_path = Path(value) 

708 if not return_path.exists(): 

709 raise IOError(f"Cannot find file: {return_path}") 

710 return return_path 

711 else: 

712 raise ValueError(f"Cannot convert type {type(value)} to Path") 

713 

714 def from_run_summary( 

715 self, 

716 run_summary: RunSummary, 

717 local_station_id: str | None = None, 

718 remote_station_id: str | None = None, 

719 sample_rate: float | int | None = None, 

720 ) -> None: 

721 """ 

722 Initialize the dataframe from a run summary. 

723 

724 Parameters 

725 ---------- 

726 run_summary : RunSummary 

727 Summary of available data for processing from one or more stations 

728 local_station_id : str | None, optional 

729 Label of the station for which an estimate will be computed, 

730 by default None 

731 remote_station_id : str | None, optional 

732 Label of the remote reference station, by default None 

733 sample_rate : float | int | None, optional 

734 Sample rate to filter data by, by default None 

735 

736 Raises 

737 ------ 

738 ValueError 

739 If restricting to specified stations yields empty dataset or 

740 if local and remote stations do not overlap for remote reference 

741 """ 

742 

743 self.df = None 

744 

745 if local_station_id is not None: 

746 self.local_station_id = local_station_id 

747 if remote_station_id is not None: 

748 self.remote_station_id = remote_station_id 

749 

750 if sample_rate is not None: 

751 run_summary = run_summary.set_sample_rate(sample_rate) 

752 

753 station_ids = [local_station_id] 

754 if self.remote_station_id: 

755 station_ids.append(remote_station_id) 

756 # Filter out None values before passing to function 

757 station_ids_filtered = [sid for sid in station_ids if sid is not None] 

758 df = restrict_to_station_list( 

759 run_summary.df, station_ids_filtered, inplace=False 

760 ) 

761 

762 # Check df is non-empty 

763 if len(df) == 0: 

764 msg = f"Restricting run_summary df to {station_ids} yields an empty set" 

765 logger.critical(msg) 

766 raise ValueError(msg) 

767 

768 # Check that the columns have data 

769 len_df_before_drop_dataless_rows = len(df) 

770 df = df[df.has_data] 

771 n_dropped = len_df_before_drop_dataless_rows - len(df) 

772 if n_dropped: 

773 msg = f"Dropped {n_dropped} rows from Kernel Dataset due to missing data" 

774 logger.warning(msg) 

775 

776 # Check df is non-empty (again) 

777 if len(df) == 0: 

778 msg = ( 

779 "Restricting run_summary df to runs that have data yields an empty set" 

780 ) 

781 logger.critical(msg) 

782 raise ValueError(msg) 

783 

784 # add columns column 

785 df = self._add_columns(df) 

786 

787 # set remote reference 

788 if self.remote_station_id: 

789 cond = df.station == remote_station_id 

790 df.remote = cond 

791 

792 # be sure to set date time columns and restrict to simultaneous runs 

793 df = self._set_datetime_columns(df) 

794 if self.remote_station_id: 

795 df = self.restrict_run_intervals_to_simultaneous(df) 

796 

797 # Again check df is non-empty 

798 if len(df) == 0: 

799 msg = ( 

800 f"Local: {local_station_id} and remote: " 

801 f"{remote_station_id} do not overlap. Remote reference " 

802 "processing not a valid option." 

803 ) 

804 logger.error(msg) 

805 raise ValueError(msg) 

806 

807 self.df = df 

808 

809 self.survey_metadata = self.get_metadata_from_df(self.local_df) 

810 

811 def get_metadata_from_df(self, df: pd.DataFrame) -> Survey: 

812 """ 

813 Extract metadata from the dataframe. The data frame should only include one 

814 station. So use self.local_df or self.remote_df. (Run Summary) 

815 

816 Parameters 

817 ---------- 

818 df : pd.DataFrame 

819 Dataframe to extract metadata from 

820 

821 Returns 

822 ------- 

823 dict[str, Any] 

824 Dictionary containing survey metadata 

825 """ 

826 if df is None or df.empty: 

827 return {} 

828 

829 mth5_path = df["mth5_path"].unique()[0] 

830 if len(mth5_path) == 0: 

831 raise ValueError( 

832 f"Cannot find MTH5 path for local station {self.local_station_id}" 

833 ) 

834 

835 h5_station_reference = df["station_hdf5_reference"].unique()[0] 

836 

837 if h5_station_reference is None: 

838 logger.warning( 

839 f"Cannot find h5_station_reference for local station {self.local_station_id}" 

840 ) 

841 return {} 

842 with MTH5() as m: 

843 m.open_mth5(mth5_path) 

844 station_group = m.from_reference(h5_station_reference) 

845 survey_metadata = station_group.survey_metadata 

846 

847 # survey metadata returns a time series station, so need to update to a 

848 # transfer function object 

849 

850 tf_station = Station() 

851 tf_station.update(survey_metadata.stations[self.local_station_id]) 

852 

853 # remove runs that are not in the dataframe 

854 processing_runs = df.run.unique() 

855 for run in tf_station.runs.keys(): 

856 if run not in processing_runs: 

857 tf_station.remove_run(run) 

858 else: 

859 tf_station.runs[run].update( 

860 survey_metadata.stations[self.local_station_id].runs[run] 

861 ) 

862 

863 # add to survey metadata by removing the old one first 

864 survey_metadata.remove_station(self.local_station_id) 

865 survey_metadata.add_station(tf_station) 

866 

867 return survey_metadata 

868 

869 @property 

870 def mini_summary(self) -> pd.DataFrame: 

871 """ 

872 Return a dataframe that fits in terminal display. 

873 

874 Returns 

875 ------- 

876 pd.DataFrame 

877 Subset of the main dataframe with key columns for summary display 

878 """ 

879 return self.df[self._mini_summary_columns] 

880 

881 @property 

882 def local_survey_id(self) -> str: 

883 """ 

884 Return string label for local survey id. 

885 

886 Returns 

887 ------- 

888 str 

889 Survey ID for the local station 

890 """ 

891 survey_id = self.df.loc[~self.df.remote].survey.unique()[0] 

892 if survey_id in ["none"]: 

893 survey_id = "0" 

894 return survey_id 

895 

896 @property 

897 def local_survey_metadata(self) -> mt_metadata.timeseries.Survey: 

898 """Return survey metadata for local station.""" 

899 return self.survey_metadata 

900 # except KeyError: 

901 # msg = f"Unexpected key {self.local_survey_id} not found in survey_metadata" 

902 # msg += f"{msg} WARNING -- Maybe old MTH5 -- trying to use key '0'" 

903 # logger.warning(msg) 

904 # return self.survey_metadata["0"] 

905 

906 def _add_duration_column(self, df, inplace=True) -> None: 

907 """Adds a column to self.df with times end-start (in seconds).""" 

908 

909 timedeltas = df.end - df.start 

910 durations = [x.total_seconds() for x in timedeltas] 

911 if inplace: 

912 df["duration"] = durations 

913 return df 

914 else: 

915 new_df = df.copy() 

916 new_df["duration"] = durations 

917 return new_df 

918 

919 def _update_duration_column(self, inplace=True) -> None: 

920 """Calls add_duration_column (after possible manual manipulation of start/end.""" 

921 

922 if inplace: 

923 self._df = self._add_duration_column(self._df, inplace) 

924 else: 

925 return self._add_duration_column(self._df, inplace) 

926 

927 def drop_runs_shorter_than( 

928 self, 

929 minimum_duration: float, 

930 units: str = "s", 

931 inplace: bool = True, 

932 ) -> pd.DataFrame | None: 

933 """ 

934 Drop runs from dataframe that are shorter than minimum duration. 

935 

936 Parameters 

937 ---------- 

938 minimum_duration : float 

939 The minimum allowed duration for a run (in units of units) 

940 units : str, optional 

941 Time units, by default "s". Currently only seconds are supported 

942 inplace : bool, optional 

943 Whether to modify dataframe in place, by default True 

944 

945 Returns 

946 ------- 

947 pd.DataFrame | None 

948 Modified dataframe if inplace=False, None if inplace=True 

949 

950 Raises 

951 ------ 

952 NotImplementedError 

953 If units other than seconds are specified 

954 

955 Notes 

956 ----- 

957 This method needs to have duration refreshed beforehand. 

958 """ 

959 if units != "s": 

960 msg = "Expected units are seconds : units='s'" 

961 raise NotImplementedError(msg) 

962 

963 drop_cond = self.df.duration < minimum_duration 

964 if inplace: 

965 self._update_duration_column(inplace) 

966 self.df.drop(self.df[drop_cond].index, inplace=inplace) 

967 self.df.reset_index(drop=True, inplace=True) 

968 return None 

969 else: 

970 new_df = self._update_duration_column(inplace) 

971 new_df = self.df.drop(self.df[drop_cond].index) 

972 new_df.reset_index(drop=True, inplace=True) 

973 return new_df 

974 

975 def select_station_runs( 

976 self, 

977 station_runs_dict: dict, 

978 keep_or_drop: bool, 

979 inplace: bool = True, 

980 ) -> pd.DataFrame | None: 

981 """ 

982 Partition dataframe based on station_runs_dict and return one partition. 

983 

984 Parameters 

985 ---------- 

986 station_runs_dict : dict 

987 Keys are string IDs of stations to keep/drop. 

988 Values are lists of string labels for run_ids to keep/drop. 

989 Example: {"mt01": ["0001", "0003"]} 

990 keep_or_drop : bool 

991 If True: returns df with only the station-runs specified 

992 If False: returns df with station_runs_dict entries removed 

993 inplace : bool, optional 

994 If True, modifies dataframe in place, by default True 

995 

996 Returns 

997 ------- 

998 pd.DataFrame | None 

999 Modified dataframe if inplace=False, None if inplace=True 

1000 """ 

1001 

1002 for station_id, run_ids in station_runs_dict.items(): 

1003 if isinstance(run_ids, str): 

1004 run_ids = [ 

1005 run_ids, 

1006 ] 

1007 cond1 = self.df["station"] == station_id 

1008 cond2 = self.df["run"].isin(run_ids) 

1009 if keep_or_drop == "keep": 

1010 drop_df = self.df[cond1 & ~cond2] 

1011 else: 

1012 drop_df = self.df[cond1 & cond2] 

1013 

1014 if inplace: 

1015 self.df.drop(drop_df.index, inplace=True) 

1016 self.df.reset_index(drop=True, inplace=True) 

1017 else: 

1018 df = self.df.drop(drop_df.index, inplace=False) 

1019 df = df.reset_index(drop=True, inplace=True) 

1020 return df 

1021 

1022 def set_run_times(self, run_time_dict: dict, inplace: bool = True): 

1023 """ 

1024 Set run times from a dictionary. 

1025 

1026 Parameters 

1027 ---------- 

1028 run_time_dict : dict 

1029 Dictionary formatted as {run_id: {start, end}} 

1030 inplace : bool, optional 

1031 Whether to modify dataframe in place, by default True 

1032 

1033 Returns 

1034 ------- 

1035 pd.DataFrame | None 

1036 Modified dataframe if inplace=False, None if inplace=True 

1037 """ 

1038 msg = "Need to set run time with a dictionary in the form of {run_id: {start, end}}" 

1039 if not isinstance(run_time_dict, dict): 

1040 raise TypeError(msg) 

1041 

1042 for key, times in run_time_dict.items(): 

1043 if not isinstance(times, dict): 

1044 raise TypeError(msg) 

1045 if not "start" in times.keys() or "end" not in times.keys(): 

1046 raise KeyError(msg) 

1047 

1048 cond1 = self.df.run == key 

1049 cond2 = self.df.start <= times["start"] 

1050 cond3 = self.df.end >= times["end"] 

1051 self.df.loc[cond1 & cond2 & cond3, "start"] = times["start"] 

1052 self.df.loc[cond1 & cond2 & cond3, "end"] = times["end"] 

1053 self._update_duration_column() 

1054 self.df = self.restrict_run_intervals_to_simultaneous(self.df) 

1055 

1056 @property 

1057 def is_single_station(self) -> bool: 

1058 """Returns True if no RR station.""" 

1059 if self.local_station_id: 

1060 if self.remote_station_id: 

1061 return False 

1062 else: 

1063 return True 

1064 else: 

1065 return False 

1066 

1067 def restrict_run_intervals_to_simultaneous(self, df: pd.DataFrame) -> None: 

1068 """For each run in local_station_id check if it has overlap with other runs 

1069 

1070 There is room for optimization here 

1071 

1072 Note that you can wind up splitting runs here. For example, in that case where 

1073 local is running continuously, but remote is intermittent. Then the local 

1074 run may break into several chunks. 

1075 :rtype: None 

1076 """ 

1077 local_df = df[df.station == self.local_station_id] 

1078 remote_df = df[df.station == self.remote_station_id] 

1079 output_sub_runs = [] 

1080 for i_local, local_row in local_df.iterrows(): 

1081 for i_remote, remote_row in remote_df.iterrows(): 

1082 if intervals_overlap( 

1083 local_row.start, 

1084 local_row.end, 

1085 remote_row.start, 

1086 remote_row.end, 

1087 ): 

1088 # print(f"OVERLAP {i_local}, {i_remote}") 

1089 olap_start, olap_end = overlap( 

1090 local_row.start, 

1091 local_row.end, 

1092 remote_row.start, 

1093 remote_row.end, 

1094 ) 

1095 

1096 local_sub_run = local_row.copy(deep=True) 

1097 remote_sub_run = remote_row.copy(deep=True) 

1098 local_sub_run.start = olap_start 

1099 local_sub_run.end = olap_end 

1100 remote_sub_run.start = olap_start 

1101 remote_sub_run.end = olap_end 

1102 output_sub_runs.append(local_sub_run) 

1103 output_sub_runs.append(remote_sub_run) 

1104 else: 

1105 pass 

1106 # print(f"NOVERLAP {i_local}, {i_remote}") 

1107 new_df = pd.DataFrame(output_sub_runs) 

1108 new_df = new_df.reset_index(drop=True) 

1109 

1110 if new_df.empty: 

1111 msg = ( 

1112 f"Local: {self.local_station_id} and " 

1113 f"remote: {self.remote_station_id} do " 

1114 f"not overlap, Remote reference processing not a valid option." 

1115 ) 

1116 logger.error(msg) 

1117 raise ValueError(msg) 

1118 

1119 return new_df 

1120 

1121 def get_station_metadata( 

1122 self, local_station_id: str 

1123 ) -> mt_metadata.timeseries.Station: 

1124 """Returns the station metadata. 

1125 

1126 Development Notes: 

1127 TODO: This appears to be unused. Was probably a precursor to the 

1128 update_survey_metadata() method. Delete if unused. If used fill out doc: 

1129 "Helper function for archiving the TF -- returns an object we can use to populate 

1130 station metadata in the _____" 

1131 :param local_station_id: The name of the local station. 

1132 :type local_station_id: str 

1133 :rtype: mt_metadata.timeseries.Station 

1134 """ 

1135 # get a list of local runs: 

1136 cond = self.df["station"] == local_station_id 

1137 sub_df = self.df[cond] 

1138 sub_df.drop_duplicates(subset="run", inplace=True) 

1139 

1140 # sanity check: 

1141 run_ids = sub_df.run.unique() 

1142 assert len(run_ids) == len(sub_df) 

1143 

1144 station_metadata = sub_df.mth5_obj[0].from_reference( 

1145 sub_df.station_hdf5_reference[0] 

1146 ) 

1147 station_metadata.runs = ListDict() 

1148 for i, row in sub_df.iterrows(): 

1149 local_run_obj = self.get_run_object(row) 

1150 station_metadata.add_run(local_run_obj.metadata) 

1151 return station_metadata 

1152 

1153 def get_run_object( 

1154 self, index_or_row: int | pd.Series 

1155 ) -> mt_metadata.timeseries.Run: 

1156 """ 

1157 Get the run object associated with a row of the dataframe. 

1158 

1159 Parameters 

1160 ---------- 

1161 index_or_row : int | pd.Series 

1162 Row index or row Series from the dataframe 

1163 

1164 Returns 

1165 ------- 

1166 mt_metadata.timeseries.Run 

1167 The run object associated with the row 

1168 

1169 Notes 

1170 ----- 

1171 This method may be deprecated in favor of direct calls to 

1172 run_obj = row.mth5_obj.from_reference(row.run_hdf5_reference) in pipelines. 

1173 """ 

1174 if isinstance(index_or_row, int): 

1175 row = self.df.loc[index_or_row] 

1176 else: 

1177 row = index_or_row 

1178 run_obj = row.mth5_obj.from_reference(row.run_hdf5_reference) 

1179 return run_obj 

1180 

1181 @property 

1182 def num_sample_rates(self) -> int: 

1183 """Returns the number of unique sample rates in the dataframe.""" 

1184 return len(self.df.sample_rate.unique()) 

1185 

1186 @property 

1187 def sample_rate(self) -> float: 

1188 r"""Returns the sample rate that of the data in the dataframe.""" 

1189 if self.num_sample_rates != 1: 

1190 msg = "Aurora does not yet process data from mixed sample rates" 

1191 logger.error(f"{msg}") 

1192 raise NotImplementedError(msg) 

1193 sample_rate = self.df.sample_rate.unique()[0] 

1194 return sample_rate 

1195 

1196 # this should be deprecated in the future in favor of usin get_metadata_from_df 

1197 def update_survey_metadata( 

1198 self, i: int, row: pd.Series, run_ts: mth5.timeseries.run_ts.RunTS 

1199 ) -> None: 

1200 """Wrangle survey_metadata into kernel_dataset. 

1201 

1202 Development Notes: 

1203 - The survey metadata needs to be passed to TF before exporting data. 

1204 - This was factored out of initialize_dataframe_for_processing 

1205 - TODO: It looks like we don't need to pass the whole run_ts, just its metadata 

1206 There may be some performance implications to passing the whole object. 

1207 Consider passing run_ts.survey_metadata, run_ts.run_metadata, 

1208 run_ts.station_metadata only 

1209 :param i: This would be the index of row, if we were sure that the dataframe was cleanly indexed. 

1210 :type i: int 

1211 :param row: 

1212 :type row: pd.Series 

1213 :param run_ts: Mth5 object having the survey_metadata. 

1214 :type run_ts: mth5.timeseries.run_ts.RunTS 

1215 :rtype: None 

1216 """ 

1217 survey_id = run_ts.survey_metadata.id 

1218 if survey_id not in self.survey_metadata.keys(): 

1219 self.survey_metadata[survey_id] = run_ts.survey_metadata 

1220 else: 

1221 if row.station in self.survey_metadata[survey_id].stations.keys(): 

1222 self.survey_metadata[survey_id].stations[row.station].add_run( 

1223 run_ts.run_metadata 

1224 ) 

1225 else: 

1226 self.survey_metadata[survey_id].add_station(run_ts.station_metadata) 

1227 if len(self.survey_metadata.keys()) > 1: 

1228 raise NotImplementedError 

1229 

1230 @property 

1231 def mth5_objs(self): 

1232 """Mth5 objs. 

1233 :return: Dictionary [station_id: mth5_obj]. 

1234 :rtype: dict 

1235 """ 

1236 mth5_obj_dict = {} 

1237 mth5_obj_dict[self.local_station_id] = self.local_mth5_obj 

1238 if self.remote_station_id is not None: 

1239 mth5_obj_dict[self.remote_station_id] = self.remote_mth5_obj 

1240 return mth5_obj_dict 

1241 

1242 def initialize_mth5s(self, mode: str = "r"): 

1243 """ 

1244 Return a dictionary of open mth5 objects, keyed by station_id. 

1245 

1246 Parameters 

1247 ---------- 

1248 mode : str, optional 

1249 File opening mode, by default "r" (read-only) 

1250 

1251 Returns 

1252 ------- 

1253 dict 

1254 Dictionary keyed by station IDs containing MTH5 objects: 

1255 - local station id: mth5.mth5.MTH5 

1256 - remote station id: mth5.mth5.MTH5 (if present) 

1257 

1258 Notes 

1259 ----- 

1260 Future versions for multiple station processing may need 

1261 nested dict structure with [survey_id][station]. 

1262 """ 

1263 self.local_mth5_obj = initialize_mth5(self.local_mth5_path, mode=mode) 

1264 if self.remote_station_id: 

1265 self.remote_mth5_obj = initialize_mth5(self.remote_mth5_path, mode="r") 

1266 

1267 self.initialized = True 

1268 

1269 return self.mth5_objs 

1270 

1271 def initialize_dataframe_for_processing(self) -> None: 

1272 """Adds extra columns needed for processing to the dataframe. 

1273 

1274 Populates them with mth5 objects, run_hdf5_reference, and xr.Datasets. 

1275 

1276 Development Notes: 

1277 Note #1: When assigning xarrays to dataframe cells, df dislikes xr.Dataset, 

1278 so we convert to xr.DataArray before packing df 

1279 

1280 Note #2: [OPTIMIZATION] By accessing the run_ts and packing the "run_dataarray" column of the df, we 

1281 perform a non-lazy operation, and essentially forcing the entire decimation_level=0 dataset to be 

1282 loaded into memory. Seeking a lazy method to handle this maybe worthwhile. For example, using 

1283 a df.apply() approach to initialize only one row at a time would allow us to generate the FCs one 

1284 row at a time and never ingest more than one run of data at a time ... 

1285 

1286 Note #3: Uncommenting the continue statement here is desireable, will speed things up, but 

1287 is not yet tested. A nice test would be to have two stations, some runs having FCs built 

1288 and others not having FCs built. What goes wrong is in update_survey_metadata. 

1289 Need a way to get the survey metadata from a run, not a run_ts if possible 

1290 """ 

1291 

1292 self.add_columns_for_processing() 

1293 

1294 for index, row in self.df.iterrows(): 

1295 run_obj = row.mth5_obj.get_run(row.station, row.run, survey=row.survey) 

1296 self.df.loc[index, "run_hdf5_reference"] = run_obj.hdf5_group.ref 

1297 

1298 if pd.notna(row.fc) and row.fc: 

1299 msg = f"row {row} already has fcs prescribed by processing config" 

1300 msg += "-- skipping time series initialisation" 

1301 logger.info(msg) 

1302 # see Note #3 

1303 # continue 

1304 # the line below is not lazy, See Note #2 

1305 run_ts = run_obj.to_runts(start=row.start, end=row.end) 

1306 self.df.at[index, "run_dataarray"] = run_ts.dataset.to_array("channel") 

1307 

1308 # self.update_survey_metadata(i, row, run_ts) 

1309 

1310 logger.info("Dataset dataframe initialized successfully, updated metadata.") 

1311 

1312 def add_columns_for_processing(self) -> None: 

1313 """Add columns to the dataframe used during processing. 

1314 

1315 Development Notes: 

1316 - This was originally in pipelines. 

1317 - Q: Should mth5_objs be keyed by survey-station? 

1318 - A: Yes, and ... 

1319 since the KernelDataset dataframe will be iterated over, should probably 

1320 write an iterator method. This can iterate over survey-station tuples 

1321 for multiple station processing. 

1322 - Currently the model of keeping all these data objects "live" in the df 

1323 seems to work OK, but is not well suited to HPC or lazy processing. 

1324 :param mth5_objs: Keys are station_id, values are MTH5 objects. 

1325 :type mth5_objs: dict, 

1326 """ 

1327 if not self.initialized: 

1328 raise ValueError("mth5 objects have not been initialized yet.") 

1329 

1330 if self._has_df(): 

1331 self._df.loc[self._df.station == self.local_station_id, "mth5_obj"] = ( 

1332 self.local_mth5_obj 

1333 ) 

1334 if self.remote_station_id is not None: 

1335 self._df.loc[self._df.station == self.remote_station_id, "mth5_obj"] = ( 

1336 self.remote_mth5_obj 

1337 ) 

1338 

1339 def close_mth5s(self) -> None: 

1340 """Loop over all unique mth5_objs in dataset df and make sure they are closed.+.""" 

1341 mth5_objs = self.df["mth5_obj"].unique() 

1342 for mth5_obj in mth5_objs: 

1343 mth5_obj.close_mth5() 

1344 return 

1345 

1346 

1347def restrict_to_station_list( 

1348 df: pd.DataFrame, 

1349 station_ids: str | list[str], 

1350 inplace: bool = True, 

1351) -> pd.DataFrame: 

1352 """ 

1353 Drop all rows where station_ids are NOT in the provided list. 

1354 

1355 Operates on a deepcopy of dataframe if inplace=False. 

1356 

1357 Parameters 

1358 ---------- 

1359 df : pd.DataFrame 

1360 A run summary dataframe 

1361 station_ids : str | list[str] 

1362 Station ids to keep, normally local and remote 

1363 inplace : bool, optional 

1364 If True, modifies dataframe in place, by default True 

1365 

1366 Returns 

1367 ------- 

1368 pd.DataFrame 

1369 Filtered dataframe with only specified stations 

1370 """ 

1371 if isinstance(station_ids, str): 

1372 station_ids = [station_ids] 

1373 if not inplace: 

1374 df = copy.deepcopy(df) 

1375 cond1 = ~df["station"].isin(station_ids) 

1376 df.drop(df[cond1].index, inplace=True) 

1377 df = df.reset_index(drop=True) 

1378 return df 

1379 

1380 

1381def intervals_overlap( 

1382 start1: pd.Timestamp, 

1383 end1: pd.Timestamp, 

1384 start2: pd.Timestamp, 

1385 end2: pd.Timestamp, 

1386) -> bool: 

1387 """Checks if intervals 1, and 2 overlap. 

1388 

1389 Interval 1 is (start1, end1), Interval 2 is (start2, end2), 

1390 

1391 Development Notes: 

1392 This may work vectorized out of the box but has not been tested. 

1393 Also, it is intended to work with pd.Timestamp objects, but should work 

1394 for many objects that have an ordering associated. 

1395 This website was used as a reference when writing the method: 

1396 https://stackoverflow.com/questions/3721249/python-date-interval-intersection 

1397 :param start1: Start of interval 1. 

1398 :type start1: pd.Timestamp 

1399 :param end1: End of interval 1. 

1400 :type end1: pd.Timestamp 

1401 :param start2: Start of interval 2. 

1402 :type start2: pd.Timestamp 

1403 :param end2: End of interval 2. 

1404 :type end2: pd.Timestamp 

1405 :return cond: True of the intervals overlap, False if they do now. 

1406 :rtype cond: bool 

1407 """ 

1408 cond = (start1 <= start2 <= end1) or (start2 <= start1 <= end2) 

1409 return cond 

1410 

1411 

1412def overlap( 

1413 t1_start: pd.Timestamp, 

1414 t1_end: pd.Timestamp, 

1415 t2_start: pd.Timestamp, 

1416 t2_end: pd.Timestamp, 

1417) -> tuple: 

1418 """Get the start and end times of the overlap between two intervals. 

1419 

1420 Interval 1 is (start1, end1), Interval 2 is (start2, end2), 

1421 

1422 Development Notes: 

1423 Possibly some nicer syntax in this discussion: 

1424 https://stackoverflow.com/questions/3721249/python-date-interval-intersection 

1425 - Intended to work with pd.Timestamp objects, but should work for many objects 

1426 that have an ordering associated. 

1427 :param t1_start: The start of interval 1. 

1428 :type t1_start: pd.Timestamp 

1429 :param t1_end: The end of interval 1. 

1430 :type t1_end: pd.Timestamp 

1431 :param t2_start: The start of interval 2. 

1432 :type t2_start: pd.Timestamp 

1433 :param t2_end: The end of interval 2. 

1434 :type t2_end: pd.Timestamp 

1435 :return start, end: Start, end are either same type as input, or they are None,None. 

1436 :rtype start, end: tuple 

1437 """ 

1438 if t1_start <= t2_start <= t2_end <= t1_end: 

1439 return t2_start, t2_end 

1440 elif t1_start <= t2_start <= t1_end: 

1441 return t2_start, t1_end 

1442 elif t1_start <= t2_end <= t1_end: 

1443 return t1_start, t2_end 

1444 elif t2_start <= t1_start <= t1_end <= t2_end: 

1445 return t1_start, t1_end 

1446 else: 

1447 return None, None