Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ timeseries \ run_ts.py: 73%

610 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3.. module:: timeseries 

4 :synopsis: Deal with MT time series 

5 

6.. todo:: Check the conversion to netcdf. There are some weird serializations of 

7lists and arrays that goes on, seems easiest to convert all lists to strings and then 

8convert them back if read in. 

9 

10 

11:copyright: 

12 Jared Peacock (jpeacock@usgs.gov) 

13 

14:license: 

15 MIT 

16""" 

17 

18# ============================================================================== 

19# Imports 

20# ============================================================================== 

21from __future__ import annotations 

22 

23import inspect 

24 

25import numpy as np 

26import scipy 

27import xarray as xr 

28from loguru import logger 

29from matplotlib import pyplot as plt 

30from matplotlib.figure import Figure 

31from mt_metadata import timeseries 

32from mt_metadata.common.list_dict import ListDict 

33from mt_metadata.common.mttime import MTime 

34from mt_metadata.timeseries.filters import ChannelResponse 

35from obspy.core import Stream 

36 

37from .channel_ts import ChannelTS 

38from .ts_helpers import get_decimation_sample_rates, make_dt_coordinates 

39 

40 

41# ============================================================================= 

42# make a dictionary of available metadata classes 

43# ============================================================================= 

44meta_classes = dict(inspect.getmembers(timeseries, inspect.isclass)) 

45 

46 

47# ============================================================================= 

48# run container 

49# ============================================================================= 

50class RunTS: 

51 """ 

52 Container for MT time series data from a single run. 

53 

54 Holds all run time series in one aligned xarray Dataset with channels as 

55 data variables and time as the coordinate. Manages metadata for survey, 

56 station, and run levels. 

57 

58 Parameters 

59 ---------- 

60 array_list : list[ChannelTS] | list[xr.DataArray] | xr.Dataset | None, optional 

61 List of ChannelTS objects, xarray DataArrays, or an xarray Dataset 

62 containing the time series data. All channels will be aligned to a 

63 common time index. 

64 run_metadata : timeseries.Run | dict | None, optional 

65 Metadata for the run. Can be a Run object or dictionary. 

66 station_metadata : timeseries.Station | dict | None, optional 

67 Metadata for the station. Can be a Station object or dictionary. 

68 survey_metadata : timeseries.Survey | dict | None, optional 

69 Metadata for the survey. Can be a Survey object or dictionary. 

70 

71 Attributes 

72 ---------- 

73 dataset : xr.Dataset 

74 xarray Dataset containing all channel data with time coordinate 

75 survey_metadata : timeseries.Survey 

76 Survey-level metadata 

77 station_metadata : timeseries.Station 

78 Station-level metadata 

79 run_metadata : timeseries.Run 

80 Run-level metadata 

81 filters : dict[str, Filter] 

82 Dictionary of channel response filters keyed by filter name 

83 sample_rate : float 

84 Sample rate in samples per second 

85 channels : list[str] 

86 List of channel names in the dataset 

87 

88 Examples 

89 -------- 

90 Create an empty RunTS: 

91 

92 >>> from mth5.timeseries import RunTS 

93 >>> run = RunTS() 

94 

95 Create RunTS from ChannelTS objects: 

96 

97 >>> from mth5.timeseries import ChannelTS, RunTS 

98 >>> ex = ChannelTS('electric', data=ex_data, 

99 ... channel_metadata={'component': 'ex'}) 

100 >>> ey = ChannelTS('electric', data=ey_data, 

101 ... channel_metadata={'component': 'ey'}) 

102 >>> run = RunTS(array_list=[ex, ey]) 

103 >>> print(run.channels) 

104 ['ex', 'ey'] 

105 

106 Access individual channels: 

107 

108 >>> ex_channel = run.ex # Returns ChannelTS object 

109 >>> print(ex_channel.sample_rate) 

110 256.0 

111 

112 See Also 

113 -------- 

114 ChannelTS : Individual channel time series container 

115 

116 Notes 

117 ----- 

118 When multiple channels are provided with different start/end times, 

119 they will be automatically aligned using the earliest start and latest 

120 end times, with NaN values filling gaps. 

121 

122 """ 

123 

124 def __init__( 

125 self, 

126 array_list: list[ChannelTS] | list[xr.DataArray] | xr.Dataset | None = None, 

127 run_metadata: timeseries.Run | dict | None = None, 

128 station_metadata: timeseries.Station | dict | None = None, 

129 survey_metadata: timeseries.Survey | dict | None = None, 

130 ) -> None: 

131 self.logger = logger 

132 self._survey_metadata = self._initialize_metadata() 

133 self._dataset = xr.Dataset() 

134 self._filters: dict[str, ChannelResponse] = {} 

135 

136 self.survey_metadata = survey_metadata 

137 self.station_metadata = station_metadata 

138 self.run_metadata = run_metadata 

139 

140 self._sample_rate: float | None = self._check_sample_rate_at_init() 

141 

142 # load the arrays first this will write run and station metadata 

143 if array_list is not None: 

144 self.dataset = array_list 

145 

146 def _check_sample_rate_at_init(self) -> float | None: 

147 """ 

148 Check if sample_rate is specified in run_metadata at initialization. 

149 

150 Returns 

151 ------- 

152 float | None 

153 Sample rate from run_metadata if available, otherwise None. 

154 

155 Notes 

156 ----- 

157 If data is subsequently loaded, a check will be done to ensure 

158 sample rates match. If they don't, the data sample_rate is used. 

159 

160 """ 

161 sr = None 

162 if self.run_metadata is not None: 

163 sr = self.run_metadata.sample_rate 

164 

165 return sr 

166 

167 def __str__(self) -> str: 

168 """String representation of RunTS.""" 

169 s_list = [ 

170 f"Survey: {self.survey_metadata.id}", 

171 f"Station: {self.station_metadata.id}", 

172 f"Run: {self.run_metadata.id}", 

173 f"Start: {self.start}", 

174 f"End: {self.end}", 

175 f"Sample Rate: {self.sample_rate}", 

176 f"Components: {self.channels}", 

177 ] 

178 return "\n\t".join(["RunTS Summary:"] + s_list) 

179 

180 def __repr__(self) -> str: 

181 """String representation of RunTS.""" 

182 return self.__str__() 

183 

184 def __eq__(self, other: object) -> bool: 

185 """ 

186 Test equality between two RunTS objects. 

187 

188 Parameters 

189 ---------- 

190 other : object 

191 Object to compare with. 

192 

193 Returns 

194 ------- 

195 bool 

196 True if objects are equal, False otherwise. 

197 

198 Raises 

199 ------ 

200 TypeError 

201 If other is not a RunTS object. 

202 """ 

203 if not isinstance(other, RunTS): 

204 raise TypeError(f"Cannot compare RunTS with {type(other)}.") 

205 if not other.survey_metadata == self.survey_metadata: 

206 return False 

207 if not other.station_metadata == self.station_metadata: 

208 return False 

209 if not other.run_metadata == self.run_metadata: 

210 return False 

211 if self.dataset.equals(other.dataset) is False: 

212 return False 

213 return True 

214 

215 def __neq__(self, other: object) -> bool: 

216 """Test inequality between two RunTS objects.""" 

217 return not self.__eq__(other) 

218 

219 def __add__(self, other: RunTS) -> RunTS: 

220 """ 

221 Add two runs together to create a combined run. 

222 

223 Combines runs using the following steps: 

224 

225 1. xr.combine_by_coords([original, other]) 

226 2. Compute monotonic time index spanning full time range 

227 3. Interpolate to new time index using slinear method 

228 

229 Parameters 

230 ---------- 

231 other : RunTS 

232 Another RunTS object to combine with this one. 

233 

234 Returns 

235 ------- 

236 RunTS 

237 Combined run with monotonic time index and metadata from the 

238 first run. 

239 

240 Raises 

241 ------ 

242 TypeError 

243 If input is not a RunTS object. 

244 

245 Examples 

246 -------- 

247 >>> run1 = RunTS(array_list=[ex1, ey1]) 

248 >>> run2 = RunTS(array_list=[ex2, ey2]) 

249 >>> combined = run1 + run2 

250 >>> print(combined.start, combined.end) 

251 

252 Notes 

253 ----- 

254 For more control over the merging process (gap filling method, 

255 resampling, etc.), use the `merge()` method instead. 

256 

257 See Also 

258 -------- 

259 merge : More flexible merging with customization options 

260 

261 """ 

262 if not isinstance(other, RunTS): 

263 raise TypeError(f"Cannot combine {type(other)} with RunTS.") 

264 # combine into a data set use override to keep attrs from original 

265 combined_ds = xr.combine_by_coords( 

266 [self.dataset, other.dataset], combine_attrs="override" 

267 ) 

268 

269 # Handle datetime.timedelta for Python 3.12+ compatibility 

270 duration = combined_ds.time.max().values - combined_ds.time.min().values 

271 if hasattr(duration, "total_seconds"): 

272 # Python datetime.timedelta 

273 duration_ns = duration.total_seconds() * 1e9 

274 elif hasattr(duration, "view"): 

275 # numpy timedelta64 - convert to nanoseconds 

276 duration_ns = float(duration / np.timedelta64(1, "ns")) 

277 else: 

278 # Already numeric 

279 duration_ns = float(duration) 

280 

281 n_samples = (self.sample_rate * duration_ns / 1e9) + 1 

282 

283 new_dt_index = make_dt_coordinates( 

284 combined_ds.time.min().values, self.sample_rate, n_samples 

285 ) 

286 

287 new_run = RunTS( 

288 run_metadata=self.run_metadata, 

289 station_metadata=self.station_metadata, 

290 survey_metadata=self.survey_metadata, 

291 ) 

292 

293 new_run.dataset = combined_ds.interp(time=new_dt_index, method="slinear") 

294 

295 new_run.run_metadata.update_time_period() 

296 new_run.station_metadata.update_time_period() 

297 new_run.survey_metadata.update_time_period() 

298 new_run.filters = self.filters 

299 new_run.filters.update(other.filters) 

300 

301 return new_run 

302 

303 def _initialize_metadata(self) -> timeseries.Survey: 

304 """ 

305 Create a hierarchical metadata structure with default values. 

306 

307 Creates a Survey object containing a Station which contains a Run, 

308 all with default IDs of "0". This provides the base structure for 

309 storing metadata at all levels. 

310 

311 Returns 

312 ------- 

313 timeseries.Survey 

314 Survey metadata object with nested station and run. 

315 

316 """ 

317 

318 survey_metadata = timeseries.Survey(id="0") 

319 survey_metadata.stations.append(timeseries.Station(id="0")) 

320 survey_metadata.stations[0].runs.append(timeseries.Run(id="0")) 

321 

322 return survey_metadata 

323 

324 def _validate_run_metadata( 

325 self, run_metadata: timeseries.Run | dict 

326 ) -> timeseries.Run: 

327 """ 

328 Validate and convert run metadata to proper format. 

329 

330 Parameters 

331 ---------- 

332 run_metadata : timeseries.Run | dict 

333 Run metadata as a Run object or dictionary. 

334 

335 Returns 

336 ------- 

337 timeseries.Run 

338 Validated Run metadata object (copy of input). 

339 

340 Raises 

341 ------ 

342 TypeError 

343 If input is neither a Run object nor a dictionary. 

344 

345 """ 

346 

347 if not isinstance(run_metadata, timeseries.Run): 

348 if isinstance(run_metadata, dict): 

349 if "run" not in [cc.lower() for cc in run_metadata.keys()]: 

350 run_metadata = {"Run": run_metadata} 

351 r_metadata = timeseries.Run() 

352 r_metadata.from_dict(run_metadata) 

353 self.logger.debug("Loading from metadata dict") 

354 return r_metadata 

355 else: 

356 msg = ( 

357 "input metadata must be type {type(self.run_metadata)} " 

358 "or dict, not {type(run_metadata)}" 

359 ) 

360 self.logger.error(msg) 

361 raise TypeError(msg) 

362 return run_metadata.copy() 

363 

364 def _validate_station_metadata( 

365 self, station_metadata: timeseries.Station | dict 

366 ) -> timeseries.Station: 

367 """ 

368 Validate and convert station metadata to proper format. 

369 

370 Parameters 

371 ---------- 

372 station_metadata : timeseries.Station | dict 

373 Station metadata as a Station object or dictionary. 

374 

375 Returns 

376 ------- 

377 timeseries.Station 

378 Validated Station metadata object (copy of input). 

379 

380 Raises 

381 ------ 

382 TypeError 

383 If input is neither a Station object nor a dictionary. 

384 """ 

385 

386 if isinstance(station_metadata, timeseries.Station): 

387 return station_metadata.copy() 

388 

389 if isinstance(station_metadata, dict): 

390 if "station" not in [cc.lower() for cc in station_metadata.keys()]: 

391 station_metadata = {"Station": station_metadata} 

392 st_metadata = timeseries.Station() # type: ignore 

393 st_metadata.from_dict(station_metadata) 

394 self.logger.debug("Loading from metadata dict") 

395 return st_metadata 

396 else: 

397 msg = ( 

398 f"input metadata must be type {type(self.station_metadata)} " 

399 "or dict, not {type(station_metadata)}" 

400 ) 

401 self.logger.error(msg) 

402 raise TypeError(msg) 

403 

404 def _validate_survey_metadata( 

405 self, survey_metadata: timeseries.Survey | dict 

406 ) -> timeseries.Survey: 

407 """ 

408 Validate and convert survey metadata to proper format. 

409 

410 Parameters 

411 ---------- 

412 survey_metadata : timeseries.Survey | dict 

413 Survey metadata as a Survey object or dictionary. 

414 

415 Returns 

416 ------- 

417 timeseries.Survey 

418 Validated Survey metadata object (copy of input). 

419 

420 Raises 

421 ------ 

422 TypeError 

423 If input is neither a Survey object nor a dictionary. 

424 """ 

425 if isinstance(survey_metadata, timeseries.Survey): 

426 return survey_metadata.copy() 

427 

428 if isinstance(survey_metadata, dict): 

429 if "survey" not in [cc.lower() for cc in survey_metadata.keys()]: 

430 survey_metadata = {"Survey": survey_metadata} 

431 sv_metadata = timeseries.Survey() # type: ignore 

432 sv_metadata.from_dict(survey_metadata) 

433 self.logger.debug("Loading from metadata dict") 

434 return sv_metadata 

435 else: 

436 msg = ( 

437 f"input metadata must be type {type(self.survey_metadata)} " 

438 "or dict, not {type(survey_metadata)}" 

439 ) 

440 self.logger.error(msg) 

441 raise TypeError(msg) 

442 

443 def _validate_array_list( 

444 self, array_list: list[ChannelTS] | list[xr.DataArray] | tuple 

445 ) -> list[xr.DataArray]: 

446 """ 

447 Validate and convert array list to proper format. 

448 

449 Checks that all entries are ChannelTS or xarray.DataArray objects, 

450 converts to DataArray format, extracts metadata and filters, and 

451 aligns all channels to a common time index. 

452 

453 Parameters 

454 ---------- 

455 array_list : list[ChannelTS] | list[xr.DataArray] | tuple 

456 List or tuple of ChannelTS objects or xarray DataArrays. 

457 

458 Returns 

459 ------- 

460 list[xr.DataArray] 

461 List of validated and aligned xarray DataArrays. 

462 

463 Raises 

464 ------ 

465 TypeError 

466 If array_list is not a list or tuple, or if any element is not 

467 a ChannelTS or DataArray object. 

468 

469 Notes 

470 ----- 

471 This method also updates the station and run metadata from the 

472 ChannelTS objects if present, and extracts channel response filters. 

473 

474 """ 

475 if not isinstance(array_list, (tuple, list)): 

476 msg = f"array_list must be a list or tuple, not {type(array_list)}" 

477 self.logger.error(msg) 

478 raise TypeError(msg) 

479 valid_list = [] 

480 station_metadata = timeseries.Station() # type: ignore 

481 run_metadata = timeseries.Run() # type: ignore 

482 channels = ListDict() # type: ignore 

483 

484 for index, item in enumerate(array_list): 

485 if not isinstance(item, (ChannelTS, xr.DataArray)): 

486 msg = f"array entry {index} must be ChannelTS object not {type(item)}" 

487 self.logger.error(msg) 

488 raise TypeError(msg) 

489 if isinstance(item, ChannelTS): 

490 valid_list.append(item.to_xarray()) 

491 

492 # if a channelTS is input then it comes with run and station metadata 

493 # use those first, then the user can update later. 

494 

495 if item.station_metadata.id not in ["0", None, ""]: 

496 if station_metadata.id not in ["0", None, ""]: 

497 station_metadata.update(item.station_metadata, match=["id"]) 

498 else: 

499 station_metadata.update(item.station_metadata) 

500 if item.run_metadata.id not in ["0", None, ""]: 

501 if run_metadata.id not in ["0", None, ""]: 

502 run_metadata.update(item.run_metadata, match=["id"]) 

503 else: 

504 run_metadata.update(item.run_metadata) 

505 channels.append(item.channel_metadata) 

506 

507 # get the filters from the channel 

508 if item.channel_response.filters_list != []: 

509 for ff in item.channel_response.filters_list: 

510 self._filters[ff.name] = ff 

511 else: 

512 valid_list.append(item) 

513 # need to make sure that the station metadata was actually updated, 

514 # should have an ID. 

515 run_metadata.channels = channels 

516 if station_metadata.id not in ["0", None, ""]: 

517 station_metadata.runs = ListDict() 

518 station_metadata.runs.append(run_metadata) 

519 # need to add the other runs that are in the metadata for 

520 # completeness. 

521 for run in self.station_metadata.runs: 

522 if run.id not in [run_metadata.id, "0", None, ""]: 

523 station_metadata.add_run(run) 

524 

525 if self.station_metadata.id != station_metadata.id: 

526 logger.warning( 

527 f"Station ID {station_metadata.id} from ChannelTS does " 

528 "not match original station ID {self.station_metadata.id}. " 

529 "Updating ID to match." 

530 ) 

531 self.station_metadata = station_metadata 

532 # if the run metadata was updated 

533 elif run_metadata.id not in ["0", None, ""]: 

534 if self.run_metadata.id != run_metadata.id: 

535 logger.warning( 

536 f"Run ID {run_metadata.id} from ChannelTS does " 

537 "not match original run ID {self.run_metadata.id}. " 

538 "Updating ID to match." 

539 ) 

540 self.run_metadata = run_metadata 

541 # if the run metadata or station metadata was not updated from channel 

542 # metadata, then update just the channels. 

543 else: 

544 self.run_metadata.channels = channels 

545 # need to align the time series. 

546 valid_list = self._align_channels(valid_list) 

547 

548 return valid_list 

549 

550 def _align_channels(self, valid_list: list[xr.DataArray]) -> list[xr.DataArray]: 

551 """ 

552 Align channels to a common time index. 

553 

554 Checks for common start and end times across all channels. If not 

555 common, reindexes each channel to a new time index spanning from 

556 the earliest start to latest end at the common sample rate. 

557 

558 Parameters 

559 ---------- 

560 valid_list : list[xr.DataArray] 

561 List of channel DataArrays to align. 

562 

563 Returns 

564 ------- 

565 list[xr.DataArray] 

566 List of aligned DataArrays with common time index. 

567 

568 Notes 

569 ----- 

570 Uses 'nearest' method for reindexing with tolerance of one sample. 

571 Missing data is filled with NaN values. 

572 

573 """ 

574 

575 earliest_start = self._get_earliest_start(valid_list) 

576 latest_end = self._get_latest_end(valid_list) 

577 reindex = False 

578 if not self._check_common_start(valid_list): 

579 self.logger.info( 

580 f"Channels do not have a common start, using earliest: {earliest_start}" 

581 ) 

582 reindex = True 

583 if not self._check_common_end(valid_list): 

584 self.logger.info( 

585 f"Channels do not have a common end, using latest: {latest_end}" 

586 ) 

587 reindex = True 

588 if reindex: 

589 sample_rate = self._check_sample_rate(valid_list) 

590 

591 new_time_index = self._get_common_time_index( 

592 earliest_start, latest_end, sample_rate 

593 ) 

594 tolerance = f"{(1e9 / sample_rate):.0f}ns" 

595 aligned_list = [] 

596 for ch in valid_list: 

597 aligned_list.append( 

598 ch.reindex( 

599 time=new_time_index, 

600 method="nearest", 

601 tolerance=tolerance, 

602 ) 

603 ) 

604 else: 

605 aligned_list = valid_list 

606 return aligned_list 

607 

608 def _check_sample_rate(self, valid_list: list[xr.DataArray]) -> float: 

609 """ 

610 Check that all channels have the same sample rate. 

611 

612 Parameters 

613 ---------- 

614 valid_list : list[xr.DataArray] 

615 List of channel DataArrays. 

616 

617 Returns 

618 ------- 

619 float 

620 The common sample rate. 

621 

622 Raises 

623 ------ 

624 ValueError 

625 If channels have different sample rates. 

626 """ 

627 sr_test = list( 

628 set( 

629 [(item.sample_rate) for item in valid_list] 

630 + [np.round(item.sps_filters.fs, 3) for item in valid_list] 

631 ) 

632 ) 

633 

634 if len(sr_test) != 1: 

635 msg = f"sample rates are not all the same {sr_test}" 

636 self.logger.error(msg) 

637 raise ValueError(msg) 

638 return sr_test[0] 

639 

640 def _check_common_start(self, valid_list: list[xr.DataArray]) -> bool: 

641 """ 

642 Check if all channels have the same start time. 

643 

644 Parameters 

645 ---------- 

646 valid_list : list[xr.DataArray] 

647 List of channel DataArrays. 

648 

649 Returns 

650 ------- 

651 bool 

652 True if all channels start at the same time, False otherwise. 

653 

654 """ 

655 start_list = list(set([item.coords["time"].values[0] for item in valid_list])) 

656 if len(start_list) != 1: 

657 return False 

658 return True 

659 

660 def _check_common_end(self, valid_list: list[xr.DataArray]) -> bool: 

661 """ 

662 Check if all channels have the same end time. 

663 

664 Parameters 

665 ---------- 

666 valid_list : list[xr.DataArray] 

667 List of channel DataArrays. 

668 

669 Returns 

670 ------- 

671 bool 

672 True if all channels end at the same time, False otherwise. 

673 

674 """ 

675 end_list = list(set([item.coords["time"].values[-1] for item in valid_list])) 

676 if len(end_list) != 1: 

677 return False 

678 return True 

679 

680 def _get_earliest_start(self, valid_list: list[xr.DataArray]) -> np.datetime64: 

681 """ 

682 Get the earliest start time from all channels. 

683 

684 Parameters 

685 ---------- 

686 valid_list : list[xr.DataArray] 

687 List of channel DataArrays. 

688 

689 Returns 

690 ------- 

691 np.datetime64 

692 Earliest start time. 

693 """ 

694 

695 return min([item.coords["time"].values[0] for item in valid_list]) 

696 

697 def _get_latest_end(self, valid_list: list[xr.DataArray]) -> np.datetime64: 

698 """ 

699 Get the latest end time from all channels. 

700 

701 Parameters 

702 ---------- 

703 valid_list : list[xr.DataArray] 

704 List of channel DataArrays. 

705 

706 Returns 

707 ------- 

708 np.datetime64 

709 Latest end time. 

710 """ 

711 

712 return max([item.coords["time"].values[-1] for item in valid_list]) 

713 

714 def _get_common_time_index( 

715 self, start: np.datetime64, end: np.datetime64, sample_rate: float 

716 ) -> np.ndarray: 

717 """ 

718 Generate a common time index for channel alignment. 

719 

720 Parameters 

721 ---------- 

722 start : np.datetime64 

723 Start time. 

724 end : np.datetime64 

725 End time. 

726 sample_rate : float 

727 Sample rate in samples per second. 

728 

729 Returns 

730 ------- 

731 np.ndarray 

732 Array of datetime64 timestamps. 

733 """ 

734 # Handle datetime.timedelta for Python 3.12+ compatibility 

735 duration = end - start 

736 if hasattr(duration, "total_seconds"): 

737 # Python datetime.timedelta 

738 duration_ns = duration.total_seconds() * 1e9 

739 elif hasattr(duration, "view"): 

740 # numpy timedelta64 - convert to nanoseconds 

741 # duration / np.timedelta64(1, 'ns') gives us the value in nanoseconds 

742 duration_ns = float(duration / np.timedelta64(1, "ns")) 

743 else: 

744 # Already numeric 

745 duration_ns = float(duration) 

746 

747 n_samples = int(sample_rate * duration_ns / 1e9) + 1 

748 

749 return make_dt_coordinates(start, sample_rate, n_samples) 

750 

751 def _get_channel_response(self, ch_name: str) -> ChannelResponse: 

752 """ 

753 Get the channel response filter from the filter dictionary. 

754 

755 Parameters 

756 ---------- 

757 ch_name : str 

758 Name of the channel. 

759 

760 Returns 

761 ------- 

762 ChannelResponse 

763 ChannelResponse object containing the filter list for the channel. 

764 

765 Notes 

766 ----- 

767 Looks for filters in the dataset attributes under 'filter.name' or 

768 'filters' keys and retrieves them from the internal filters dictionary. 

769 

770 """ 

771 

772 filter_list = [] 

773 if ch_name in self.dataset.keys(): 

774 if "filter.name" in self.dataset[ch_name].attrs.keys(): 

775 for filter_name in self.dataset[ch_name].attrs["filter.name"]: 

776 try: 

777 filter_list.append(self.filters[filter_name]) 

778 except KeyError: 

779 self.logger.debug(f"Could not find {filter_name} in filters") 

780 elif "filters" in self.dataset[ch_name].attrs.keys(): 

781 for ch_filter in self.dataset[ch_name].attrs["filters"]: 

782 try: 

783 filter_list.append( 

784 self.filters[ch_filter["applied_filter"]["name"]] 

785 ) 

786 except KeyError: 

787 self.logger.debug( 

788 f"Could not find {ch_filter['applied_filter']['name']} in filters" 

789 ) 

790 return ChannelResponse(filters_list=filter_list) 

791 

792 def __getattr__(self, name: str) -> ChannelTS | None: 

793 """Enable accessing channels as attributes (e.g., run.ex).""" 

794 # change to look for keys directly and use type to set channel type 

795 if name in self.dataset.keys(): 

796 ch_response_filter = self._get_channel_response(name) 

797 # if cannot get filters, but the filters name indicates that 

798 # filters should be there don't input the channel response filter 

799 # cause then an empty filters_list will set filter.name to [] 

800 if ch_response_filter.filters_list == []: 

801 ch_response_filter = None 

802 return ChannelTS( 

803 self.dataset[name].attrs["type"], 

804 self.dataset[name], 

805 run_metadata=self.run_metadata.copy(), 

806 station_metadata=self.station_metadata.copy(), 

807 channel_response=ch_response_filter, 

808 ) 

809 else: 

810 # this is a hack for now until figure out who is calling shape, size 

811 if name[0] == "_": 

812 return None 

813 if name not in ["shape", "size"]: 

814 try: 

815 return super().__getattribute__(name) 

816 except AttributeError: 

817 msg = f"RunTS has no attribute {name}" 

818 self.logger.error(msg) 

819 raise NameError(msg) 

820 

821 def copy(self, data: bool = True) -> RunTS: 

822 """ 

823 Create a copy of the RunTS object. 

824 

825 Parameters 

826 ---------- 

827 data : bool, optional 

828 If True, copy the data along with timeseries. If False, only 

829 copy the metadata (default is True). 

830 

831 Returns 

832 ------- 

833 RunTS 

834 A copy of the RunTS object. 

835 

836 Examples 

837 -------- 

838 Create a copy with data: 

839 

840 >>> run_copy = run.copy() 

841 

842 Create a metadata-only copy: 

843 

844 >>> run_meta = run.copy(data=False) 

845 >>> print(run_meta.has_data()) 

846 False 

847 

848 """ 

849 

850 if not data: 

851 return RunTS( 

852 run_metadata=self.run_metadata.copy(), 

853 station_metadata=self.station_metadata.copy(), 

854 survey_metadata=self.survey_metadata.copy(), 

855 ) 

856 else: 

857 return RunTS( 

858 array_list=self.dataset, 

859 run_metadata=self.run_metadata.copy(), 

860 station_metadata=self.station_metadata.copy(), 

861 survey_metadata=self.survey_metadata.copy(), 

862 ) 

863 

864 ### Properties ------------------------------------------------------------ 

865 @property 

866 def survey_metadata(self) -> timeseries.Survey: 

867 """ 

868 Survey timeseries. 

869 

870 Returns 

871 ------- 

872 timeseries.Survey 

873 Survey-level metadata object. 

874 """ 

875 return self._survey_metadata 

876 

877 @survey_metadata.setter 

878 def survey_metadata(self, survey_metadata: timeseries.Survey | dict | None) -> None: 

879 """ 

880 Set survey timeseries. 

881 

882 Parameters 

883 ---------- 

884 survey_metadata : timeseries.Survey | dict | None 

885 Survey metadata object or dictionary. If None, no action is taken. 

886 

887 """ 

888 if survey_metadata is None: 

889 return 

890 

891 survey_metadata = self._validate_survey_metadata(survey_metadata) 

892 self._survey_metadata.update(survey_metadata) 

893 for station in survey_metadata.stations: 

894 if station.id not in self._survey_metadata.stations.keys(): 

895 self._survey_metadata.add_station( 

896 self._validate_station_metadata(station), update=False 

897 ) 

898 

899 @property 

900 def station_metadata(self) -> timeseries.Station: 

901 """ 

902 Station timeseries. 

903 

904 Returns 

905 ------- 

906 timeseries.Station 

907 Station-level metadata object (first station in survey). 

908 """ 

909 

910 return self.survey_metadata.stations[0] 

911 

912 @station_metadata.setter 

913 def station_metadata( 

914 self, station_metadata: timeseries.Station | dict | None 

915 ) -> None: 

916 """ 

917 Set station timeseries. 

918 

919 Parameters 

920 ---------- 

921 station_metadata : timeseries.Station | dict | None 

922 Station metadata object or dictionary. If None, no action is taken. 

923 

924 Notes 

925 ----- 

926 Preserves existing run metadata and merges with new station timeseries. 

927 

928 """ 

929 

930 if station_metadata is not None: 

931 station_metadata = self._validate_station_metadata(station_metadata) 

932 

933 runs = ListDict() 

934 if self.run_metadata.id not in ["0", 0]: 

935 runs.append(self.run_metadata.copy()) 

936 runs.extend(station_metadata.runs) 

937 if len(runs) == 0: 

938 runs[0] = timeseries.Run(id="0") 

939 # be sure there is a level below 

940 if len(runs[0].channels) == 0: 

941 ch_metadata = timeseries.Auxiliary() 

942 ch_metadata.type = "auxiliary" 

943 runs[0].channels.append(ch_metadata) 

944 stations = ListDict() 

945 stations.append(station_metadata) 

946 stations[0].runs = runs 

947 

948 self.survey_metadata.stations = stations 

949 

950 @property 

951 def run_metadata(self) -> timeseries.Run: 

952 """ 

953 Run timeseries. 

954 

955 Returns 

956 ------- 

957 timeseries.Run 

958 Run-level metadata object (first run in first station). 

959 """ 

960 run_metadata = self.survey_metadata.stations[0].runs[0] 

961 

962 return run_metadata 

963 

964 @run_metadata.setter 

965 def run_metadata(self, run_metadata: timeseries.Run | dict | None) -> None: 

966 """ 

967 Set run timeseries. 

968 

969 Parameters 

970 ---------- 

971 run_metadata : timeseries.Run | dict | None 

972 Run metadata object or dictionary. If None, no action is taken. 

973 

974 Notes 

975 ----- 

976 Updates the runs list while preserving other existing runs. 

977 

978 """ 

979 

980 if run_metadata is not None: 

981 run_metadata = self._validate_run_metadata(run_metadata) 

982 runs = ListDict() 

983 runs.append(run_metadata) 

984 runs.extend(self.station_metadata.runs, skip_keys=[run_metadata.id, "0"]) 

985 self._survey_metadata.stations[0].runs = runs 

986 

987 def has_data(self) -> bool: 

988 """ 

989 Check if the RunTS contains any data. 

990 

991 Returns 

992 ------- 

993 bool 

994 True if channels with data exist, False otherwise. 

995 

996 Examples 

997 -------- 

998 >>> run = RunTS() 

999 >>> print(run.has_data()) 

1000 False 

1001 >>> run.add_channel(ex_channel) 

1002 >>> print(run.has_data()) 

1003 True 

1004 """ 

1005 if len(self.channels) > 0: 

1006 return True 

1007 return False 

1008 

1009 @property 

1010 def summarize_metadata(self) -> dict[str, any]: 

1011 """ 

1012 Get a summary of all channel timeseries. 

1013 

1014 Flattens the metadata from all channels into a single dictionary 

1015 with keys in the format 'channel.attribute'. 

1016 

1017 Returns 

1018 ------- 

1019 dict[str, any] 

1020 Dictionary with flattened metadata from all channels. 

1021 

1022 Examples 

1023 -------- 

1024 >>> meta_summary = run.summarize_metadata 

1025 >>> print(meta_summary.keys()) 

1026 dict_keys(['ex.time_period.start', 'ex.sample_rate', ...]) 

1027 

1028 """ 

1029 meta_dict = {} 

1030 for comp in self.dataset.data_vars: 

1031 for mkey, mvalue in self.dataset[comp].attrs.items(): 

1032 meta_dict[f"{comp}.{mkey}"] = mvalue 

1033 return meta_dict 

1034 

1035 def validate_metadata(self) -> None: 

1036 """ 

1037 Check to make sure that the metadata matches what is in the data set. 

1038 

1039 updates metadata from the data. 

1040 

1041 Check the start and end times, channels recorded 

1042 

1043 """ 

1044 

1045 # check sampling rate 

1046 if self.has_data(): 

1047 # check start time 

1048 if self.start != self.run_metadata.time_period.start: 

1049 if self.run_metadata.time_period.start != "1980-01-01T00:00:00+00:00": 

1050 msg = ( 

1051 f"start time of dataset {self.start} does not " 

1052 f"match metadata start {self.run_metadata.time_period.start} " 

1053 f"updating metatdata value to {self.start}" 

1054 ) 

1055 self.logger.warning(msg) 

1056 self.run_metadata.time_period.start = self.start.isoformat() 

1057 # check end time 

1058 if self.end != self.run_metadata.time_period.end: 

1059 if self.run_metadata.time_period.end != "1980-01-01T00:00:00+00:00": 

1060 msg = ( 

1061 f"end time of dataset {self.end} does not " 

1062 f"match metadata end {self.run_metadata.time_period.end} " 

1063 f"updating metatdata value to {self.end}" 

1064 ) 

1065 self.logger.warning(msg) 

1066 self.run_metadata.time_period.end = self.end.isoformat() 

1067 # check sample rate 

1068 # get the data sample rate and check against metadata 

1069 data_sr = self._compute_sample_rate() 

1070 # if sample rate is not set, use data value 

1071 if self.sample_rate in [0.0, None]: 

1072 self._sample_rate = data_sr 

1073 

1074 # if sample rates don't match, update to data value 

1075 elif self.sample_rate != data_sr: 

1076 msg = ( 

1077 f"sample rate of dataset {data_sr} does not " 

1078 f"match metadata sample rate {self.sample_rate} " 

1079 f"updating metatdata value to {data_sr}" 

1080 ) 

1081 self.logger.critical(msg) 

1082 self._sample_rate = data_sr 

1083 self.run_metadata.sample_rate = data_sr 

1084 # need to check that the run metadata sample rate matches, 

1085 # data sample rate overules 

1086 if self.run_metadata.sample_rate != self._sample_rate: 

1087 msg = ( 

1088 f"sample rate of dataset {data_sr} is different than " 

1089 f"metadata sample rate {self.run_metadata.sample_rate} " 

1090 f"updating metatdata value to {data_sr}" 

1091 ) 

1092 self.logger.warning(msg) 

1093 self.run_metadata.sample_rate = data_sr 

1094 

1095 # update station and survey time periods 

1096 if self.run_metadata.id not in self.station_metadata.runs.keys(): 

1097 self.station_metadata.runs[0].update(self.run_metadata) 

1098 self.station_metadata.update_time_period() 

1099 self.survey_metadata.update_time_period() 

1100 

1101 def set_dataset( 

1102 self, 

1103 array_list: list[ChannelTS] | list[xr.DataArray] | xr.Dataset, 

1104 align_type: str = "outer", 

1105 ) -> None: 

1106 """ 

1107 Set the dataset from a list of channels or existing dataset. 

1108 

1109 Creates an xarray Dataset from the input channels or dataset, validates 

1110 metadata consistency, and updates dataset attributes with run metadata. 

1111 

1112 Parameters 

1113 ---------- 

1114 array_list : list[ChannelTS] | list[xr.DataArray] | xr.Dataset 

1115 Input data as a list of ChannelTS objects, list of xarray DataArrays, 

1116 or an existing xarray Dataset. 

1117 align_type : str, optional 

1118 Method for aligning channels with different time indexes: 

1119 

1120 * 'outer' - use union of all time indexes (default) 

1121 * 'inner' - use intersection of time indexes 

1122 * 'left' - use time index from first channel 

1123 * 'right' - use time index from last channel 

1124 * 'exact' - raise ValueError if indexes don't match exactly 

1125 * 'override' - rewrite indexes to match first channel (requires same size) 

1126 

1127 Notes 

1128 ----- 

1129 This method performs the following operations: 

1130 

1131 1. Validates the input array_list 

1132 2. Converts ChannelTS objects to xarray format 

1133 3. Combines channels into a single Dataset 

1134 4. Validates metadata consistency 

1135 5. Updates dataset attributes with run metadata 

1136 

1137 When providing ChannelTS objects, their metadata and filters are 

1138 automatically extracted and merged into the run's metadata structure. 

1139 

1140 Examples 

1141 -------- 

1142 Set dataset from ChannelTS objects: 

1143 

1144 >>> ex = ChannelTS('electric', data=ex_data, 

1145 ... channel_metadata={'component': 'ex'}) 

1146 >>> ey = ChannelTS('electric', data=ey_data, 

1147 ... channel_metadata={'component': 'ey'}) 

1148 >>> run.set_dataset([ex, ey]) 

1149 

1150 Set dataset with custom alignment: 

1151 

1152 >>> run.set_dataset([ex, ey, hx], align_type='inner') 

1153 

1154 Set dataset from existing xarray Dataset: 

1155 

1156 >>> import xarray as xr 

1157 >>> ds = xr.Dataset({'ex': ex_da, 'ey': ey_da}) 

1158 >>> run.set_dataset(ds) 

1159 

1160 See Also 

1161 -------- 

1162 dataset : Property for setting dataset with default alignment 

1163 add_channel : Add a single channel to existing dataset 

1164 _validate_array_list : Validation and conversion of array list 

1165 

1166 """ 

1167 if isinstance(array_list, (list, tuple)): 

1168 x_array_list = self._validate_array_list(array_list) 

1169 

1170 # input as a dictionary 

1171 xdict = dict([(x.component.lower(), x) for x in x_array_list]) 

1172 self._dataset = xr.Dataset(xdict) 

1173 elif isinstance(array_list, xr.Dataset): 

1174 self._dataset = array_list 

1175 self.validate_metadata() 

1176 self._dataset.attrs.update(self.run_metadata.to_dict(single=True)) 

1177 

1178 def add_channel(self, channel: xr.DataArray | ChannelTS) -> None: 

1179 """ 

1180 Add a channel to the dataset. 

1181 

1182 The channel must have the same sample rate and time coordinates that 

1183 are compatible with the existing dataset. If start times don't match, 

1184 NaN values will be placed where timing doesn't align. 

1185 

1186 Parameters 

1187 ---------- 

1188 channel : xr.DataArray | ChannelTS 

1189 A channel as an xarray DataArray or ChannelTS object to add. 

1190 

1191 Raises 

1192 ------ 

1193 ValueError 

1194 If the channel has a different sample rate than the run, or if 

1195 the input is not a DataArray or ChannelTS. 

1196 

1197 Examples 

1198 -------- 

1199 Add a ChannelTS: 

1200 

1201 >>> hz = ChannelTS('magnetic', data=hz_data, 

1202 ... channel_metadata={'component': 'hz'}) 

1203 >>> run.add_channel(hz) 

1204 >>> print(run.channels) 

1205 ['ex', 'ey', 'hx', 'hy', 'hz'] 

1206 

1207 Add an xarray DataArray: 

1208 

1209 >>> import xarray as xr 

1210 >>> data_array = xr.DataArray(data, coords={'time': times}) 

1211 >>> run.add_channel(data_array) 

1212 

1213 """ 

1214 

1215 if isinstance(channel, xr.DataArray): 

1216 c = ChannelTS() 

1217 c.ts = channel 

1218 elif isinstance(channel, ChannelTS): 

1219 c = channel 

1220 self.run_metadata.channels.append(c.channel_metadata) 

1221 for ff in c.channel_response.filters_list: 

1222 self._filters[ff.name] = ff 

1223 else: 

1224 raise ValueError("Input Channel must be type xarray.DataArray or ChannelTS") 

1225 ### need to validate the channel to make sure sample rate is the same 

1226 if c.sample_rate != self.sample_rate: 

1227 msg = ( 

1228 f"Channel sample rate is not correct, current {self.sample_rate} " 

1229 + f"input {c.sample_rate}" 

1230 ) 

1231 self.logger.error(msg) 

1232 raise ValueError(msg) 

1233 ### should probably check for other metadata like station and run? 

1234 if len(self.dataset.dims) == 0: 

1235 self.dataset = c.data_array.to_dataset() 

1236 else: 

1237 self.dataset = xr.merge([self.dataset, c.data_array.to_dataset()]) 

1238 

1239 @property 

1240 def dataset(self) -> xr.Dataset: 

1241 """ 

1242 The xarray Dataset containing all channel data. 

1243 

1244 Returns 

1245 ------- 

1246 xr.Dataset 

1247 Dataset with channels as data variables and time as coordinate. 

1248 

1249 Examples 

1250 -------- 

1251 >>> print(run.dataset) 

1252 <xarray.Dataset> 

1253 Dimensions: (time: 4096) 

1254 Coordinates: 

1255 * time (time) datetime64[ns] ... 

1256 Data variables: 

1257 ex (time) float64 ... 

1258 ey (time) float64 ... 

1259 """ 

1260 return self._dataset 

1261 

1262 @dataset.setter 

1263 def dataset( 

1264 self, array_list: list[ChannelTS] | list[xr.DataArray] | xr.Dataset 

1265 ) -> None: 

1266 """ 

1267 Set the dataset. 

1268 

1269 Parameters 

1270 ---------- 

1271 array_list : list[ChannelTS] | list[xr.DataArray] | xr.Dataset 

1272 Data to set as the dataset. 

1273 

1274 Notes 

1275 ----- 

1276 Data will be aligned using min and max times. For different alignment, 

1277 use set_dataset() with the align_type parameter. 

1278 """ 

1279 msg = ( 

1280 "Data will be aligned using the min and max time. " 

1281 "If that is not correct use set_dataset and change the alignment type." 

1282 ) 

1283 self.logger.debug(msg) 

1284 self.set_dataset(array_list) 

1285 

1286 @property 

1287 def start(self) -> MTime: 

1288 """ 

1289 Start time of the run in UTC. 

1290 

1291 Returns 

1292 ------- 

1293 MTime 

1294 Start time from the dataset if data exists, otherwise from 

1295 run_metadata. 

1296 

1297 Examples 

1298 -------- 

1299 >>> print(run.start) 

1300 2020-01-01T00:00:00+00:00 

1301 """ 

1302 if self.has_data(): 

1303 return MTime( 

1304 time_stamp=self.dataset.coords["time"].to_index()[0].isoformat() 

1305 ) 

1306 return self.run_metadata.time_period.start 

1307 

1308 @property 

1309 def end(self) -> MTime: 

1310 """ 

1311 End time of the run in UTC. 

1312 

1313 Returns 

1314 ------- 

1315 MTime 

1316 End time from the dataset if data exists, otherwise from 

1317 run_metadata. 

1318 

1319 Examples 

1320 -------- 

1321 >>> print(run.end) 

1322 2020-01-01T01:00:00+00:00 

1323 """ 

1324 if self.has_data(): 

1325 return MTime( 

1326 time_stamp=self.dataset.coords["time"].to_index()[-1].isoformat() 

1327 ) 

1328 return self.run_metadata.time_period.end 

1329 

1330 def _compute_sample_rate(self) -> float: 

1331 """ 

1332 Compute sample rate from the time coordinate. 

1333 

1334 Returns 

1335 ------- 

1336 float 

1337 Sample rate in samples per second, rounded to nearest integer. 

1338 

1339 Raises 

1340 ------ 

1341 ValueError 

1342 If time indexing fails. 

1343 

1344 Notes 

1345 ----- 

1346 Uses scipy.stats.mode to find the most common time difference. 

1347 

1348 """ 

1349 

1350 try: 

1351 dt_array = np.diff(self.dataset.coords["time"].to_index()) / np.timedelta64( 

1352 1, "s" 

1353 ) 

1354 best_dt, counts = scipy.stats.mode(dt_array) 

1355 return round( 

1356 1.0 / np.float64(best_dt), 

1357 0, 

1358 ) 

1359 except AttributeError: 

1360 self.logger.warning("Something weird happend with xarray time indexing") 

1361 

1362 raise ValueError("Something weird happend with xarray time indexing") 

1363 

1364 @property 

1365 def sample_rate(self) -> float: 

1366 """ 

1367 Sample rate in samples per second. 

1368 

1369 Returns 

1370 ------- 

1371 float 

1372 Sample rate estimated from time differences if data exists, 

1373 otherwise from timeseries. 

1374 

1375 Examples 

1376 -------- 

1377 >>> print(run.sample_rate) 

1378 256.0 

1379 """ 

1380 if self.has_data(): 

1381 if self._sample_rate is None: 

1382 self._sample_rate = self._compute_sample_rate() 

1383 else: 

1384 if self._sample_rate is None: 

1385 self._sample_rate = self.run_metadata.sample_rate 

1386 

1387 return self._sample_rate 

1388 

1389 @property 

1390 def sample_interval(self) -> float: 

1391 """ 

1392 Sample interval in seconds (inverse of sample_rate). 

1393 

1394 Returns 

1395 ------- 

1396 float 

1397 Sample interval = 1 / sample_rate, or 0.0 if sample_rate is 0. 

1398 

1399 Examples 

1400 -------- 

1401 >>> print(run.sample_interval) 

1402 0.00390625 # for 256 Hz 

1403 

1404 """ 

1405 

1406 if self.sample_rate != 0: 

1407 return 1.0 / self.sample_rate 

1408 return 0.0 

1409 

1410 @property 

1411 def channels(self) -> list[str]: 

1412 """ 

1413 List of channel names in the dataset. 

1414 

1415 Returns 

1416 ------- 

1417 list[str] 

1418 List of channel component names (e.g., ['ex', 'ey', 'hx']). 

1419 

1420 Examples 

1421 -------- 

1422 >>> print(run.channels) 

1423 ['ex', 'ey', 'hx', 'hy', 'hz'] 

1424 """ 

1425 return [cc for cc in list(self.dataset.data_vars)] 

1426 

1427 @property 

1428 def filters(self) -> dict[str, ChannelResponse]: 

1429 """ 

1430 Dictionary of channel response filters. 

1431 

1432 Returns 

1433 ------- 

1434 dict[str, ChannelResponse] 

1435 Dictionary keyed by filter name containing ChannelResponse objects. 

1436 

1437 Examples 

1438 -------- 

1439 >>> print(run.filters.keys()) 

1440 dict_keys(['v_to_counts', 'dipole_100m']) 

1441 """ 

1442 return self._filters 

1443 

1444 @filters.setter 

1445 def filters(self, value: dict[str, ChannelResponse]) -> None: 

1446 """ 

1447 Set the filters dictionary. 

1448 

1449 Parameters 

1450 ---------- 

1451 value : dict[str, ChannelResponse] 

1452 Dictionary of filter name to ChannelResponse object mappings. 

1453 

1454 Raises 

1455 ------ 

1456 TypeError 

1457 If value is not a dictionary. 

1458 

1459 Notes 

1460 ----- 

1461 Use dictionary methods (update, etc.) to modify the filters dict. 

1462 

1463 """ 

1464 if not isinstance(value, dict): 

1465 raise TypeError("input must be a dictionary") 

1466 self._filters = value 

1467 

1468 def to_obspy_stream( 

1469 self, network_code: str | None = None, encoding: str | None = None 

1470 ) -> Stream: 

1471 """ 

1472 Convert time series to an ObsPy Stream object. 

1473 

1474 Creates an ObsPy Stream containing a Trace for each channel in the run. 

1475 

1476 Parameters 

1477 ---------- 

1478 network_code : str | None, optional 

1479 Two-letter network code provided by FDSN DMC. If None, uses 

1480 station timeseries. 

1481 encoding : str | None, optional 

1482 Data encoding format (e.g., 'STEIM2', 'FLOAT64'). If None, uses 

1483 default encoding. 

1484 

1485 Returns 

1486 ------- 

1487 obspy.core.Stream 

1488 Stream object containing Trace objects for all channels. 

1489 

1490 Examples 

1491 -------- 

1492 >>> stream = run.to_obspy_stream(network_code='MT') 

1493 >>> print(stream) 

1494 3 Trace(s) in Stream: 

1495 MT.MT001..EX | 2020-01-01T00:00:00 - ... | 256.0 Hz, 4096 samples 

1496 

1497 See Also 

1498 -------- 

1499 from_obspy_stream : Create RunTS from ObsPy Stream 

1500 ChannelTS.to_obspy_trace : Convert single channel 

1501 

1502 """ 

1503 

1504 trace_list = [] 

1505 for channel in self.channels: 

1506 ts_obj = getattr(self, channel) 

1507 trace_list.append( 

1508 ts_obj.to_obspy_trace(network_code=network_code, encoding=encoding) 

1509 ) 

1510 return Stream(traces=trace_list) 

1511 

1512 def wrangle_leap_seconds_from_obspy( 

1513 self, array_list: list[ChannelTS] 

1514 ) -> list[ChannelTS]: 

1515 """ 

1516 Handle potential leap second issues from ObsPy streams. 

1517 

1518 Removes runs with only one sample that are numerically identical to 

1519 adjacent samples, which may be artifacts of leap second handling. 

1520 

1521 Parameters 

1522 ---------- 

1523 array_list : list[ChannelTS] 

1524 List of ChannelTS objects from ObsPy conversion. 

1525 

1526 Returns 

1527 ------- 

1528 list[ChannelTS] 

1529 Filtered list with single-sample runs removed. 

1530 

1531 Notes 

1532 ----- 

1533 This is experimental handling for issue #169. The exact behavior of 

1534 ObsPy's leap second handling is not fully documented. 

1535 

1536 """ 

1537 msg = f"Possible Leap Second Bug -- see issue #169" 

1538 self.logger.warning(msg) 

1539 return [x for x in array_list if x.n_samples != 1] 

1540 

1541 def from_obspy_stream( 

1542 self, obspy_stream: Stream, run_metadata: timeseries.Run | None = None 

1543 ) -> None: 

1544 """ 

1545 Get a run from an :class:`obspy.core.stream` which is a list of 

1546 :class:`obspy.core.Trace` objects. 

1547 

1548 :param obspy_stream: Obspy Stream object 

1549 :type obspy_stream: :class:`obspy.core.Stream` 

1550 

1551 Development Notes: 

1552 - There is a baked in assumption here that the channel nomenclature 

1553 in obspy is e1,e2,h1,h2,h3 and we want to convert to mth5 conventions 

1554 ex,ey,hx,hy,hz. This should be made more flexible in the future. 

1555 - A bug was found that was creating channels e1, ex, ey in the same run 

1556 when reading from obspy -- this is fixed here by renaming the components and a workaround 

1557 to reset the station's channels_recorded list. 

1558 

1559 

1560 """ 

1561 # mapping from obspy to mth5 conventions 

1562 OBSPY_RENAMER = { 

1563 "e1": "ex", 

1564 "e2": "ey", 

1565 "h1": "hx", 

1566 "h2": "hy", 

1567 "h3": "hz", 

1568 } 

1569 

1570 if not isinstance(obspy_stream, Stream): 

1571 msg = f"Input must be obspy.core.Stream not {type(obspy_stream)}" 

1572 self.logger.error(msg) 

1573 raise TypeError(msg) 

1574 

1575 array_list = [] 

1576 station_list = [] 

1577 for obs_trace in obspy_stream: 

1578 channel_ts = ChannelTS() 

1579 channel_ts.from_obspy_trace(obs_trace) 

1580 

1581 if channel_ts.channel_metadata.component in OBSPY_RENAMER.keys(): 

1582 channel_ts.channel_metadata.component = OBSPY_RENAMER[ 

1583 channel_ts.channel_metadata.component 

1584 ] 

1585 

1586 # If run_metadata is provided then it will likely have channel metadata 

1587 # that we want to use to update the channel_ts metadata. Then use 

1588 # the provided channel_metadata to update the channel_ts metadata. 

1589 if run_metadata: 

1590 if run_metadata.has_channel(channel_ts.component): 

1591 ch = run_metadata.get_channel(channel_ts.component) 

1592 channel_ts.channel_metadata.update(ch) 

1593 channel_ts.run_metadata.update(run_metadata) 

1594 else: 

1595 self.logger.warning(f"could not find {channel_ts.component}") 

1596 

1597 # workaround to reset channel's station.metadata -- (handles obspy renaming). 

1598 channels_recorded = [] 

1599 for ch in channel_ts.station_metadata.channels_recorded: 

1600 if ch in OBSPY_RENAMER.keys(): 

1601 channels_recorded.append(OBSPY_RENAMER[ch]) 

1602 else: 

1603 channels_recorded.append(ch) 

1604 channel_ts.station_metadata.channels_recorded = channels_recorded 

1605 

1606 station_list.append(channel_ts.station_metadata.fdsn.id) 

1607 array_list.append(channel_ts) 

1608 

1609 try: 

1610 station = list(set([ss for ss in station_list if ss is not None]))[0] 

1611 except IndexError: 

1612 station = None 

1613 msg = "Could not find station name" 

1614 self.logger.warning(msg) 

1615 self.station_metadata.fdsn.id = station 

1616 

1617 # handle leap second issue -- if number of channels in run metadata 

1618 if run_metadata is not None: 

1619 if len(run_metadata.channels) != len(array_list): 

1620 array_list = self.wrangle_leap_seconds_from_obspy(array_list) 

1621 self.set_dataset(array_list) 

1622 

1623 # need to be sure update any input timeseries. 

1624 if run_metadata is not None: 

1625 self.station_metadata.runs = ListDict() 

1626 self.station_metadata.add_run(run_metadata) 

1627 self.validate_metadata() 

1628 

1629 def get_slice( 

1630 self, 

1631 start: str | MTime, 

1632 end: str | MTime | None = None, 

1633 n_samples: int | None = None, 

1634 ) -> RunTS: 

1635 """ 

1636 Extract a time slice from the run. 

1637 

1638 Gets a chunk of data from the run, finding the closest points to the 

1639 given parameters. Uses pandas slice_indexer for robust slicing. 

1640 

1641 Parameters 

1642 ---------- 

1643 start : str | MTime 

1644 Start time of the slice (ISO format string or MTime object). 

1645 end : str | MTime | None, optional 

1646 End time of the slice. Required if n_samples not provided. 

1647 n_samples : int | None, optional 

1648 Number of samples to get. Required if end not provided. 

1649 

1650 Returns 

1651 ------- 

1652 RunTS 

1653 New RunTS object containing the requested slice with copies of 

1654 metadata and filters. 

1655 

1656 Raises 

1657 ------ 

1658 ValueError 

1659 If neither end nor n_samples is provided. 

1660 

1661 Examples 

1662 -------- 

1663 Get slice by start and end times: 

1664 

1665 >>> slice1 = run.get_slice('2020-01-01T00:00:00', 

1666 ... '2020-01-01T00:01:00') 

1667 >>> print(slice1.start, slice1.end) 

1668 

1669 Get slice by start time and number of samples: 

1670 

1671 >>> slice2 = run.get_slice('2020-01-01T00:00:00', n_samples=1024) 

1672 >>> print(len(slice2.dataset.time)) 

1673 1024 

1674 

1675 Notes 

1676 ----- 

1677 Uses pandas slice_indexer which handles near-matches better than 

1678 xarray's native slicing. The actual slice may be slightly adjusted 

1679 to match available data points. 

1680 

1681 """ 

1682 if not isinstance(start, MTime): 

1683 start = MTime(time_stamp=start) 

1684 if n_samples is not None: 

1685 seconds = (n_samples - 1) / self.sample_rate 

1686 end = start + seconds 

1687 elif end is not None: 

1688 if not isinstance(end, MTime): 

1689 end = MTime(time_stamp=end) 

1690 else: 

1691 raise ValueError("Must input n_samples or end") 

1692 chunk = self.dataset.indexes["time"].slice_indexer( 

1693 start=np.datetime64(start.iso_no_tz), 

1694 end=np.datetime64(end.iso_no_tz), 

1695 ) 

1696 

1697 new_runts = RunTS() 

1698 new_runts.station_metadata = self.station_metadata 

1699 new_runts.run_metadata = self.run_metadata 

1700 new_runts.filters = self.filters 

1701 new_runts.dataset = self._dataset.isel(indexers={"time": chunk}) 

1702 

1703 return new_runts 

1704 

1705 def calibrate(self, **kwargs) -> RunTS: 

1706 """ 

1707 Remove instrument response from all channels. 

1708 

1709 Applies the channel response filters to calibrate each channel, 

1710 creating a new run with calibrated data. 

1711 

1712 Parameters 

1713 ---------- 

1714 **kwargs 

1715 Additional keyword arguments passed to each channel's 

1716 remove_instrument_response method. 

1717 

1718 Returns 

1719 ------- 

1720 RunTS 

1721 New RunTS object with calibrated channels. 

1722 

1723 Examples 

1724 -------- 

1725 >>> calibrated_run = run.calibrate() 

1726 >>> # Calibration typically converts from counts to physical units 

1727 

1728 See Also 

1729 -------- 

1730 ChannelTS.remove_instrument_response : Calibrate single channel 

1731 

1732 """ 

1733 

1734 new_run = RunTS(run_metadata=self.run_metadata) 

1735 new_run.station_metadata = self.station_metadata 

1736 

1737 for channel in self.channels: 

1738 ch_ts = getattr(self, channel) 

1739 calibrated_ch_ts = ch_ts.remove_instrument_response(**kwargs) 

1740 new_run.add_channel(calibrated_ch_ts) 

1741 return new_run 

1742 

1743 def decimate( 

1744 self, new_sample_rate: float, inplace: bool = False, max_decimation: int = 8 

1745 ) -> RunTS | None: 

1746 """ 

1747 Decimate data to a new sample rate using multi-stage decimation. 

1748 

1749 Applies FIR filtering and downsampling in multiple stages to achieve 

1750 the target sample rate while preventing aliasing. 

1751 

1752 Parameters 

1753 ---------- 

1754 new_sample_rate : float 

1755 Target sample rate in samples per second. 

1756 inplace : bool, optional 

1757 If True, modify the current run. If False, return a new run 

1758 (default is False). 

1759 max_decimation : int, optional 

1760 Maximum decimation factor for each stage (default is 8). 

1761 

1762 Returns 

1763 ------- 

1764 RunTS | None 

1765 If inplace is False, returns new decimated RunTS. Otherwise None. 

1766 

1767 Examples 

1768 -------- 

1769 Decimate from 256 Hz to 1 Hz: 

1770 

1771 >>> decimated_run = run.decimate(1.0) 

1772 >>> print(decimated_run.sample_rate) 

1773 1.0 

1774 

1775 Decimate in place: 

1776 

1777 >>> run.decimate(16.0, inplace=True) 

1778 >>> print(run.sample_rate) 

1779 16.0 

1780 

1781 Notes 

1782 ----- 

1783 NaN values are filled with 0 before decimation to prevent NaN 

1784 propagation. Multi-stage decimation is used to maintain signal 

1785 quality and prevent aliasing. 

1786 

1787 See Also 

1788 -------- 

1789 resample_poly : Alternative resampling using polyphase filtering 

1790 resample : Simple resampling without anti-aliasing 

1791 

1792 """ 

1793 

1794 sr_list = get_decimation_sample_rates( 

1795 self.sample_rate, new_sample_rate, max_decimation 

1796 ) 

1797 # need to fill nans with 0 otherwise they wipeout the decimation values 

1798 # and all becomes nan. 

1799 new_ds = self.dataset.fillna(0) 

1800 for step_sr in sr_list: 

1801 new_ds = new_ds.sps_filters.decimate(step_sr) 

1802 new_ds.attrs["sample_rate"] = new_sample_rate 

1803 self.run_metadata.sample_rate = new_ds.attrs["sample_rate"] 

1804 

1805 if inplace: 

1806 self.dataset = new_ds 

1807 else: 

1808 # return new_ds 

1809 return RunTS( 

1810 new_ds, 

1811 run_metadata=self.run_metadata, 

1812 station_metadata=self.station_metadata, 

1813 survey_metadata=self.survey_metadata, 

1814 ) 

1815 

1816 def resample_poly( 

1817 self, new_sample_rate: float, pad_type: str = "mean", inplace: bool = False 

1818 ) -> RunTS | None: 

1819 """ 

1820 Resample data using polyphase filtering. 

1821 

1822 Uses scipy.signal.resample_poly to resample while applying an FIR 

1823 filter to remove aliasing. Generally more accurate than simple 

1824 resampling but slower than decimation. 

1825 

1826 Parameters 

1827 ---------- 

1828 new_sample_rate : float 

1829 Target sample rate in samples per second. 

1830 pad_type : str, optional 

1831 Padding method for edge effects: 'mean', 'median', 'zero' 

1832 (default is 'mean'). 

1833 inplace : bool, optional 

1834 If True, modify current run. If False, return new run 

1835 (default is False). 

1836 

1837 Returns 

1838 ------- 

1839 RunTS | None 

1840 If inplace is False, returns new resampled RunTS. Otherwise None. 

1841 

1842 Examples 

1843 -------- 

1844 Resample from 256 Hz to 100 Hz: 

1845 

1846 >>> resampled_run = run.resample_poly(100.0) 

1847 >>> print(resampled_run.sample_rate) 

1848 100.0 

1849 

1850 Notes 

1851 ----- 

1852 NaN values are filled with 0 before resampling. The polyphase method 

1853 is particularly good for arbitrary sample rate ratios. 

1854 

1855 See Also 

1856 -------- 

1857 decimate : Multi-stage decimation for downsampling 

1858 resample : Simple nearest-neighbor resampling 

1859 

1860 """ 

1861 

1862 # need to fill nans with 0 otherwise they wipeout the decimation values 

1863 # and all becomes nan. 

1864 new_ds = self.dataset.fillna(0) 

1865 new_ds = new_ds.sps_filters.resample_poly(new_sample_rate, pad_type=pad_type) 

1866 

1867 new_ds.attrs["sample_rate"] = new_sample_rate 

1868 self.run_metadata.sample_rate = new_ds.attrs["sample_rate"] 

1869 

1870 if inplace: 

1871 self.dataset = new_ds 

1872 else: 

1873 return RunTS( 

1874 new_ds, 

1875 run_metadata=self.run_metadata, 

1876 station_metadata=self.station_metadata, 

1877 survey_metadata=self.survey_metadata, 

1878 ) 

1879 

1880 def resample(self, new_sample_rate: float, inplace: bool = False) -> RunTS | None: 

1881 """ 

1882 Resample data to a new sample rate using nearest-neighbor method. 

1883 

1884 Simple resampling without anti-aliasing filtering. Use decimate or 

1885 resample_poly for better quality when downsampling. 

1886 

1887 Parameters 

1888 ---------- 

1889 new_sample_rate : float 

1890 Target sample rate in samples per second. 

1891 inplace : bool, optional 

1892 If True, modify current run. If False, return new run 

1893 (default is False). 

1894 

1895 Returns 

1896 ------- 

1897 RunTS | None 

1898 If inplace is False, returns new resampled RunTS. Otherwise None. 

1899 

1900 Examples 

1901 -------- 

1902 >>> resampled_run = run.resample(128.0) 

1903 >>> print(resampled_run.sample_rate) 

1904 128.0 

1905 

1906 Warnings 

1907 -------- 

1908 This method does not apply anti-aliasing filtering. For downsampling, 

1909 consider using decimate() or resample_poly() instead. 

1910 

1911 See Also 

1912 -------- 

1913 decimate : Proper downsampling with anti-aliasing 

1914 resample_poly : High-quality resampling with polyphase filtering 

1915 

1916 """ 

1917 

1918 new_dt_freq = "{0:.0f}N".format(1e9 / (new_sample_rate)) 

1919 

1920 new_ds = self.dataset.resample(time=new_dt_freq).nearest(tolerance=new_dt_freq) 

1921 new_ds.attrs["sample_rate"] = new_sample_rate 

1922 self.run_metadata.sample_rate = new_ds.attrs["sample_rate"] 

1923 

1924 if inplace: 

1925 self.dataset = new_ds 

1926 else: 

1927 # return new_ds 

1928 return RunTS( 

1929 new_ds, 

1930 run_metadata=self.run_metadata, 

1931 station_metadata=self.station_metadata, 

1932 survey_metadata=self.survey_metadata, 

1933 ) 

1934 

1935 def merge( 

1936 self, 

1937 other: RunTS | list[RunTS], 

1938 gap_method: str = "slinear", 

1939 new_sample_rate: float | None = None, 

1940 resample_method: str = "poly", 

1941 ) -> RunTS: 

1942 """ 

1943 Merge multiple runs into a single run. 

1944 

1945 Combines this run with one or more other runs, optionally resampling 

1946 to a common sample rate and filling gaps with interpolation. 

1947 

1948 Parameters 

1949 ---------- 

1950 other : RunTS | list[RunTS] 

1951 Another RunTS object or list of RunTS objects to merge. 

1952 gap_method : str, optional 

1953 Interpolation method for filling gaps: 'linear', 'nearest', 

1954 'zero', 'slinear', 'quadratic', 'cubic' (default is 'slinear'). 

1955 new_sample_rate : float | None, optional 

1956 If provided, all runs will be resampled to this rate before 

1957 merging. If None, uses the sample rate of the first run. 

1958 resample_method : str, optional 

1959 Resampling method if new_sample_rate is provided: 'decimate' 

1960 or 'poly' (default is 'poly'). 

1961 

1962 Returns 

1963 ------- 

1964 RunTS 

1965 New merged RunTS object with monotonic time index. 

1966 

1967 Raises 

1968 ------ 

1969 TypeError 

1970 If other is not a RunTS or list of RunTS objects. 

1971 

1972 Examples 

1973 -------- 

1974 Merge two runs: 

1975 

1976 >>> run1 = RunTS(array_list=[ex1, ey1]) 

1977 >>> run2 = RunTS(array_list=[ex2, ey2]) 

1978 >>> merged = run1.merge(run2) 

1979 

1980 Merge multiple runs with resampling: 

1981 

1982 >>> runs = [run1, run2, run3] 

1983 >>> merged = run1.merge(runs, new_sample_rate=1.0, 

1984 ... resample_method='poly') 

1985 

1986 Notes 

1987 ----- 

1988 The merge process: 

1989 

1990 1. Optionally resample all runs to common sample rate 

1991 2. Combine datasets using xr.combine_by_coords 

1992 3. Create monotonic time index spanning full range 

1993 4. Interpolate to new index filling gaps 

1994 5. Merge all filter dictionaries 

1995 

1996 Metadata is taken from the first run (self). 

1997 

1998 See Also 

1999 -------- 

2000 __add__ : Simple merging with + operator 

2001 

2002 """ 

2003 if new_sample_rate is not None: 

2004 merge_sample_rate = new_sample_rate 

2005 if resample_method == "decimate": 

2006 combine_list = [self.decimate(new_sample_rate).dataset] 

2007 elif resample_method == "poly": 

2008 combine_list = [self.resample_poly(new_sample_rate).dataset] 

2009 else: 

2010 merge_sample_rate = self.sample_rate 

2011 combine_list = [self.dataset] 

2012 ts_filters = self.filters 

2013 if isinstance(other, (list, tuple)): 

2014 for run in other: 

2015 if not isinstance(run, RunTS): 

2016 raise TypeError(f"Cannot combine {type(run)} with RunTS.") 

2017 if new_sample_rate is not None: 

2018 if resample_method == "decimate": 

2019 run = run.decimate(new_sample_rate) 

2020 elif resample_method == "poly": 

2021 run = run.resample_poly(new_sample_rate) 

2022 combine_list.append(run.dataset) 

2023 ts_filters.update(run.filters) 

2024 else: 

2025 if not isinstance(other, RunTS): 

2026 raise TypeError(f"Cannot combine {type(other)} with RunTS.") 

2027 if new_sample_rate is not None: 

2028 if resample_method == "decimate": 

2029 other = other.decimate(new_sample_rate) 

2030 elif resample_method == "poly": 

2031 other = other.resample_poly(new_sample_rate) 

2032 combine_list.append(other.dataset) 

2033 ts_filters.update(other.filters) 

2034 # combine into a data set use override to keep attrs from original 

2035 

2036 combined_ds = xr.combine_by_coords(combine_list, combine_attrs="override") 

2037 

2038 # Handle datetime.timedelta for Python 3.12+ compatibility 

2039 duration = combined_ds.time.max().values - combined_ds.time.min().values 

2040 if hasattr(duration, "total_seconds"): 

2041 # Python datetime.timedelta 

2042 duration_ns = duration.total_seconds() * 1e9 

2043 elif hasattr(duration, "view"): 

2044 # numpy timedelta64 - convert to nanoseconds 

2045 duration_ns = float(duration / np.timedelta64(1, "ns")) 

2046 else: 

2047 # Already numeric 

2048 duration_ns = float(duration) 

2049 

2050 n_samples = (merge_sample_rate * duration_ns / 1e9) + 1 

2051 

2052 new_dt_index = make_dt_coordinates( 

2053 combined_ds.time.min().values, merge_sample_rate, n_samples 

2054 ) 

2055 

2056 run_metadata = self.run_metadata.copy() 

2057 run_metadata.sample_rate = merge_sample_rate 

2058 

2059 new_run = RunTS( 

2060 run_metadata=self.run_metadata, 

2061 station_metadata=self.station_metadata, 

2062 survey_metadata=self.survey_metadata, 

2063 ) 

2064 

2065 ## tried reindex then interpolate_na, but that has issues if the 

2066 ## intial time index does not exactly match up with the new time index 

2067 ## and then get a bunch of Nan, unless use nearest or pad, but then 

2068 ## gaps are not filled correctly, just do a interp seems easier. 

2069 new_run.dataset = combined_ds.interp(time=new_dt_index, method=gap_method) 

2070 

2071 # update channel attributes 

2072 for ch in new_run.channels: 

2073 new_run.dataset[ch].attrs["time_period.start"] = new_run.start 

2074 new_run.dataset[ch].attrs["time_period.end"] = new_run.end 

2075 new_run.run_metadata.update_time_period() 

2076 new_run.station_metadata.update_time_period() 

2077 new_run.survey_metadata.update_time_period() 

2078 new_run.filters = ts_filters 

2079 

2080 return new_run 

2081 

2082 def plot( 

2083 self, 

2084 color_map: dict[str, tuple[float, float, float]] | None = None, 

2085 channel_order: list[str] | None = None, 

2086 ) -> Figure: 

2087 """ 

2088 Plot all channels as time series. 

2089 

2090 Creates a multi-panel figure with each channel in its own subplot, 

2091 sharing a common time axis. 

2092 

2093 Parameters 

2094 ---------- 

2095 color_map : dict[str, tuple[float, float, float]] | None, optional 

2096 Dictionary mapping channel names to RGB color tuples (values 0-1). 

2097 Default colors: 

2098 

2099 - ex: (1, 0.2, 0.2) - red 

2100 - ey: (1, 0.5, 0) - orange 

2101 - hx: (0, 0.5, 1) - blue 

2102 - hy: (0.5, 0.2, 1) - purple 

2103 - hz: (0.2, 1, 1) - cyan 

2104 

2105 channel_order : list[str] | None, optional 

2106 Order of channels from top to bottom. If None, uses order from 

2107 self.channels. 

2108 

2109 Returns 

2110 ------- 

2111 matplotlib.figure.Figure 

2112 Figure object containing the plot. 

2113 

2114 Examples 

2115 -------- 

2116 Plot with default settings: 

2117 

2118 >>> fig = run.plot() 

2119 >>> fig.savefig('timeseries.png') 

2120 

2121 Plot with custom colors and order: 

2122 

2123 >>> colors = {'ex': (1, 0, 0), 'ey': (0, 1, 0)} 

2124 >>> fig = run.plot(color_map=colors, channel_order=['ey', 'ex']) 

2125 

2126 Warnings 

2127 -------- 

2128 May be slow for large datasets (millions of points). Consider 

2129 using get_slice() first to plot a subset. 

2130 

2131 """ 

2132 

2133 if color_map is None: 

2134 color_map = { 

2135 "ex": (1, 0.2, 0.2), 

2136 "ey": (1, 0.5, 0), 

2137 "hx": (0, 0.5, 1), 

2138 "hy": (0.5, 0.2, 1), 

2139 "hz": (0.2, 1, 1), 

2140 } 

2141 

2142 if channel_order is not None: 

2143 ch_list = channel_order 

2144 else: 

2145 ch_list = self.channels 

2146 n_channels = len(self.channels) 

2147 

2148 fig = plt.figure() 

2149 fig.subplots_adjust(hspace=0) 

2150 ax_list = [] 

2151 for ii, comp in enumerate(ch_list, 1): 

2152 try: 

2153 color = color_map[comp] 

2154 except KeyError: 

2155 color = (0, 0.4, 0.8) 

2156 if ii == 1: 

2157 ax = plt.subplot(n_channels, 1, ii) 

2158 else: 

2159 ax = plt.subplot(n_channels, 1, ii, sharex=ax_list[0]) 

2160 self.dataset[comp].plot.line(ax=ax, color=color) 

2161 ax.grid(which="major", color=(0.65, 0.65, 0.65), ls="--", lw=0.75) 

2162 ax.grid(which="minor", color=(0.85, 0.85, 0.85), ls="--", lw=0.5) 

2163 ax.set_axisbelow(True) 

2164 if ii != len(ch_list): 

2165 plt.setp(ax.get_xticklabels(), visible=False) 

2166 ax_list.append(ax) 

2167 return fig 

2168 

2169 def plot_spectra( 

2170 self, 

2171 spectra_type: str = "welch", 

2172 color_map: dict[str, tuple[float, float, float]] | None = None, 

2173 **kwargs, 

2174 ) -> Figure: 

2175 """ 

2176 Plot power spectral density for all channels. 

2177 

2178 Computes and plots the power spectrum of each channel on a single 

2179 log-log plot with period on x-axis. 

2180 

2181 Parameters 

2182 ---------- 

2183 spectra_type : str, optional 

2184 Type of spectral estimate to compute. Currently only 'welch' 

2185 is supported (default is 'welch'). 

2186 color_map : dict[str, tuple[float, float, float]] | None, optional 

2187 Dictionary mapping channel names to RGB color tuples (values 0-1). 

2188 Uses same defaults as plot(). 

2189 **kwargs 

2190 Additional keyword arguments passed to the spectra computation 

2191 method (e.g., nperseg, window for Welch's method). 

2192 

2193 Returns 

2194 ------- 

2195 matplotlib.figure.Figure 

2196 Figure object containing the spectra plot. 

2197 

2198 Examples 

2199 -------- 

2200 Plot spectra with default settings: 

2201 

2202 >>> fig = run.plot_spectra() 

2203 

2204 Plot with custom Welch parameters: 

2205 

2206 >>> fig = run.plot_spectra(nperseg=1024, window='hann') 

2207 

2208 Notes 

2209 ----- 

2210 The plot shows: 

2211 

2212 - Period (seconds) on bottom x-axis 

2213 - Frequency (Hz) on top x-axis 

2214 - Power (dB) on y-axis 

2215 

2216 See Also 

2217 -------- 

2218 ChannelTS.welch_spectra : Compute Welch power spectrum 

2219 

2220 """ 

2221 

2222 if color_map is None: 

2223 color_map = { 

2224 "ex": (1, 0.2, 0.2), 

2225 "ey": (1, 0.5, 0), 

2226 "hx": (0, 0.5, 1), 

2227 "hy": (0.5, 0.2, 1), 

2228 "hz": (0.2, 1, 1), 

2229 } 

2230 

2231 fig = plt.figure() 

2232 ax = fig.add_subplot(1, 1, 1) 

2233 line_list = [] 

2234 label_list = [] 

2235 for comp in self.channels: 

2236 ch = getattr(self, comp) 

2237 plot_freq, power = ch.welch_spectra(**kwargs) 

2238 (l1,) = ax.loglog(1.0 / plot_freq, power, lw=1.5, color=color_map[comp]) 

2239 line_list.append(l1) 

2240 label_list.append(comp) 

2241 ax.set_xlabel("Period (s)", fontdict={"size": 10, "weight": "bold"}) 

2242 ax.set_ylabel("Power (dB)", fontdict={"size": 10, "weight": "bold"}) 

2243 ax.axis("tight") 

2244 ax.grid(which="both") 

2245 

2246 ax2 = ax.twiny() 

2247 ax2.loglog(plot_freq, power, lw=0) 

2248 ax2.set_xlabel("Frequency (Hz)", fontdict={"size": 10, "weight": "bold"}) 

2249 ax2.set_xlim([1 / cc for cc in ax.get_xlim()]) 

2250 

2251 ax.legend(line_list, label_list) 

2252 

2253 plt.show() 

2254 

2255 return fig