Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ phoenix_collection.py: 92%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Phoenix file collection module for organizing and processing Phoenix MTU data files. 

4 

5This module provides the PhoenixCollection class for discovering, organizing, 

6and managing Phoenix magnetotelluric receiver files within a directory structure. 

7 

8Created on Thu Aug 4 16:48:47 2022 

9 

10@author: jpeacock 

11""" 

12from __future__ import annotations 

13 

14from collections import OrderedDict 

15from pathlib import Path 

16 

17import numpy as np 

18import pandas as pd 

19 

20from mth5.io import Collection 

21from mth5.io.phoenix import open_phoenix, PhoenixReceiverMetadata 

22 

23 

24# ============================================================================= 

25 

26 

27class PhoenixCollection(Collection): 

28 """ 

29 Collection manager for Phoenix MTU data files. 

30 

31 Organizes Phoenix magnetotelluric receiver files into runs based on 

32 timing and sample rates. Handles multiple sample rates (30, 150, 2400, 

33 24000, 96000 Hz) and manages receiver metadata. 

34 

35 Parameters 

36 ---------- 

37 file_path : str | Path | None, optional 

38 Path to the directory containing Phoenix data files. Can be the 

39 station folder or a parent folder containing multiple stations. 

40 **kwargs 

41 Additional keyword arguments passed to parent Collection class. 

42 

43 Attributes 

44 ---------- 

45 metadata_dict : dict[str, PhoenixReceiverMetadata] 

46 Dictionary mapping station IDs to their receiver metadata. 

47 

48 Examples 

49 -------- 

50 Create a collection from a station directory: 

51 

52 >>> from mth5.io.phoenix import PhoenixCollection 

53 >>> collection = PhoenixCollection(r"/path/to/station") 

54 >>> runs = collection.get_runs(sample_rates=[150, 24000]) 

55 >>> print(runs.keys()) 

56 dict_keys(['MT001']) 

57 

58 Process multiple sample rates: 

59 

60 >>> df = collection.to_dataframe(sample_rates=[150, 2400, 24000]) 

61 >>> print(df.columns) 

62 Index(['survey', 'station', 'run', 'start', 'end', ...]) 

63 

64 Notes 

65 ----- 

66 The class automatically discovers station folders by locating 

67 'recmeta.json' files and organizes time series files by sample rate. 

68 

69 File extensions are mapped as: 

70 

71 - 30 Hz: td_30 

72 - 150 Hz: td_150 

73 - 2400 Hz: td_2400 

74 - 24000 Hz: td_24k 

75 - 96000 Hz: td_96k 

76 

77 See Also 

78 -------- 

79 mth5.io.Collection : Base collection class 

80 mth5.io.phoenix.PhoenixReceiverMetadata : Receiver metadata handler 

81 

82 """ 

83 

84 def __init__(self, file_path: str | Path | None = None, **kwargs) -> None: 

85 self._file_extension_map = { 

86 30: "td_30", 

87 150: "td_150", 

88 2400: "td_2400", 

89 24000: "td_24k", 

90 96000: "td_96k", 

91 } 

92 

93 self._default_channel_map = { 

94 0: "E1", 

95 1: "H3", 

96 2: "H2", 

97 3: "H1", 

98 4: "H4", 

99 5: "H5", 

100 6: "H6", 

101 7: "E2", 

102 } 

103 

104 super().__init__(file_path=file_path, **kwargs) 

105 

106 self.metadata_dict = {} 

107 

108 self._receiver_metadata_name = "recmeta.json" 

109 

110 def _read_receiver_metadata_json( 

111 self, rec_fn: str | Path 

112 ) -> PhoenixReceiverMetadata | None: 

113 """ 

114 Read receiver metadata from JSON file. 

115 

116 Loads and parses the recmeta.json file containing station and 

117 channel configuration information. 

118 

119 Parameters 

120 ---------- 

121 rec_fn : str | Path 

122 Path to the recmeta.json metadata file. 

123 

124 Returns 

125 ------- 

126 PhoenixReceiverMetadata | None 

127 Receiver metadata object if file exists, None otherwise. 

128 

129 Examples 

130 -------- 

131 >>> metadata = collection._read_receiver_metadata_json( 

132 ... Path("/data/station/recmeta.json") 

133 ... ) 

134 >>> print(metadata.station_metadata.id) 

135 'MT001' 

136 

137 """ 

138 

139 if Path(rec_fn).is_file(): 

140 return PhoenixReceiverMetadata(fn=rec_fn) 

141 else: 

142 self.logger.warning( 

143 f"Could not find {self._receiver_metadata_name} in {self.file_path}" 

144 ) 

145 return None 

146 

147 def _locate_station_folders(self) -> list[Path]: 

148 """ 

149 Locate all station folders containing recmeta.json files. 

150 

151 Recursively searches the collection path for directories containing 

152 the receiver metadata file (recmeta.json), which identifies a valid 

153 Phoenix station folder. 

154 

155 Returns 

156 ------- 

157 list[Path] 

158 List of Path objects pointing to station folders. 

159 

160 Examples 

161 -------- 

162 >>> folders = collection._locate_station_folders() 

163 >>> print([f.name for f in folders]) 

164 ['MT001', 'MT002', 'MT003'] 

165 

166 Notes 

167 ----- 

168 Each station folder must contain a recmeta.json file to be recognized. 

169 The search is recursive, allowing for nested directory structures. 

170 

171 """ 

172 station_folders = [] 

173 for folder in self.file_path.rglob("**/"): 

174 rec_fn = folder.joinpath("recmeta.json") 

175 if rec_fn.exists(): 

176 station_folders.append(folder) 

177 

178 return station_folders 

179 

180 def to_dataframe( 

181 self, 

182 sample_rates: list[int] | int = [150, 24000], 

183 run_name_zeros: int = 4, 

184 calibration_path: str | Path | None = None, 

185 ) -> pd.DataFrame: 

186 """ 

187 Create a DataFrame cataloging all Phoenix files in the collection. 

188 

189 Scans all station folders for time series files at specified sample 

190 rates and creates a comprehensive inventory with metadata for each file. 

191 

192 Parameters 

193 ---------- 

194 sample_rates : list[int] | int, optional 

195 Sample rate(s) to include in Hz. Valid values are 30, 150, 2400, 

196 24000, 96000. Can be a single integer or list (default is [150, 24000]). 

197 run_name_zeros : int, optional 

198 Number of zeros for zero-padding run names (default is 4). 

199 For example, 4 produces 'sr150_0001'. 

200 calibration_path : str | Path | None, optional 

201 Path to calibration files. Currently unused but reserved for 

202 future functionality. 

203 

204 Returns 

205 ------- 

206 pd.DataFrame 

207 DataFrame with one row per file containing columns: 

208 

209 - survey: Survey ID from metadata 

210 - station: Station ID from metadata 

211 - run: Run ID (assigned by assign_run_names) 

212 - start: File start time (ISO format) 

213 - end: File end time (ISO format) 

214 - channel_id: Numeric channel identifier 

215 - component: Channel component name (e.g., 'Ex', 'Hy') 

216 - fn: Full file path 

217 - sample_rate: Sample rate in Hz 

218 - file_size: File size in bytes 

219 - n_samples: Number of samples in file 

220 - sequence_number: File sequence number for continuous data 

221 - instrument_id: Recording/receiver ID 

222 - calibration_fn: Path to calibration file (currently None) 

223 

224 Examples 

225 -------- 

226 Get DataFrame for standard sample rates: 

227 

228 >>> df = collection.to_dataframe(sample_rates=[150, 24000]) 

229 >>> print(df.shape) 

230 (245, 14) 

231 >>> print(df.station.unique()) 

232 ['MT001'] 

233 

234 Process single sample rate: 

235 

236 >>> df_150 = collection.to_dataframe(sample_rates=150) 

237 >>> print(df_150.sample_rate.unique()) 

238 [150.] 

239 

240 Check file coverage: 

241 

242 >>> for comp in df.component.unique(): 

243 ... comp_df = df[df.component == comp] 

244 ... print(f"{comp}: {len(comp_df)} files") 

245 Ex: 35 files 

246 Ey: 35 files 

247 Hx: 35 files 

248 

249 Notes 

250 ----- 

251 - Calibration files (identified by 'calibration' in filename) are 

252 automatically skipped 

253 - Files that cannot be opened are logged and skipped 

254 - The DataFrame is sorted by station, sample_rate, and start time 

255 - Run names must be assigned separately using assign_run_names() 

256 

257 See Also 

258 -------- 

259 assign_run_names : Assign run identifiers based on timing 

260 get_runs : Get organized runs directly 

261 

262 """ 

263 

264 if not isinstance(sample_rates, (list, tuple)): 

265 sample_rates = [sample_rates] 

266 

267 station_folders = self._locate_station_folders() 

268 

269 entries = [] 

270 for folder in station_folders: 

271 rec_fn = folder.joinpath(self._receiver_metadata_name) 

272 receiver_metadata = self._read_receiver_metadata_json(rec_fn) 

273 self.metadata_dict[ 

274 receiver_metadata.station_metadata.id 

275 ] = receiver_metadata 

276 

277 for sr in sample_rates: 

278 for fn in folder.rglob(f"*{self._file_extension_map[int(sr)]}"): 

279 if "calibration" in fn.as_posix().lower(): 

280 self.logger.debug(f"skipping calibration time series {fn}") 

281 continue 

282 try: 

283 phx_obj = open_phoenix(fn) 

284 except OSError: 

285 self.logger.warning(f"Skipping {fn.name}") 

286 continue 

287 if hasattr(phx_obj, "read_segment"): 

288 segment = phx_obj.read_segment(metadata_only=True) 

289 try: 

290 start = segment.segment_start_time.isoformat() 

291 except IOError: 

292 self.logger.warning(f"Could not read file {fn}, SKIPPING") 

293 continue 

294 end = segment.segment_end_time.isoformat() 

295 n_samples = segment.n_samples 

296 

297 else: 

298 start = phx_obj.segment_start_time.isoformat() 

299 end = phx_obj.segment_end_time.isoformat() 

300 n_samples = phx_obj.max_samples 

301 

302 entry = self.get_empty_entry_dict() 

303 entry["survey"] = receiver_metadata.survey_metadata.id 

304 entry["station"] = receiver_metadata.station_metadata.id 

305 entry["run"] = (None,) 

306 entry["start"] = start 

307 entry["end"] = end 

308 entry["channel_id"] = phx_obj.channel_id 

309 entry["component"] = receiver_metadata.channel_map[ 

310 phx_obj.channel_id 

311 ] 

312 entry["fn"] = fn 

313 entry["sample_rate"] = phx_obj.sample_rate 

314 entry["file_size"] = phx_obj.file_size 

315 entry["n_samples"] = n_samples 

316 entry["sequence_number"] = phx_obj.seq 

317 entry["instrument_id"] = phx_obj.recording_id 

318 entry["calibration_fn"] = None 

319 entries.append(entry) 

320 

321 df = self._sort_df(self._set_df_dtypes(pd.DataFrame(entries)), run_name_zeros) 

322 

323 return df 

324 

325 def assign_run_names(self, df: pd.DataFrame, zeros: int = 4) -> pd.DataFrame: 

326 """ 

327 Assign run names based on temporal continuity. 

328 

329 Analyzes file timing to group files into runs. For continuous data 

330 (< 1000 Hz), maintains a single run as long as files are contiguous. 

331 For segmented data (≥ 1000 Hz), assigns a unique run to each segment. 

332 

333 Parameters 

334 ---------- 

335 df : pd.DataFrame 

336 DataFrame returned by `to_dataframe` method with file inventory. 

337 zeros : int, optional 

338 Number of zeros for zero-padding run names (default is 4). 

339 

340 Returns 

341 ------- 

342 pd.DataFrame 

343 DataFrame with 'run' column populated. Run names follow the 

344 format 'sr{rate}_{number:0{zeros}}', e.g., 'sr150_0001'. 

345 

346 Examples 

347 -------- 

348 Assign run names to a DataFrame: 

349 

350 >>> df = collection.to_dataframe(sample_rates=[150, 24000]) 

351 >>> df_with_runs = collection.assign_run_names(df, zeros=4) 

352 >>> print(df_with_runs.run.unique()) 

353 ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...] 

354 

355 Check for data gaps in continuous data: 

356 

357 >>> df_150 = df_with_runs[df_with_runs.sample_rate == 150] 

358 >>> print(df_150.run.unique()) 

359 ['sr150_0001', 'sr150_0002'] # Gap detected between runs 

360 

361 Count segments in high-rate data: 

362 

363 >>> df_24k = df_with_runs[df_with_runs.sample_rate == 24000] 

364 >>> n_segments = len(df_24k.run.unique()) 

365 >>> print(f"Found {n_segments} segments at 24 kHz") 

366 Found 43 segments at 24 kHz 

367 

368 Notes 

369 ----- 

370 **Continuous Data (< 1000 Hz):** 

371 

372 - Maintains single run ID while files are temporally contiguous 

373 - Detects gaps by comparing end time of file N with start time of 

374 file N+1 

375 - Increments run counter when gap > 0 seconds detected 

376 

377 **Segmented Data (≥ 1000 Hz):** 

378 

379 - Each unique start time receives a new run ID 

380 - Typically results in one run per segment/file 

381 

382 The run naming scheme uses the sample rate in the identifier: 

383 

384 - 30 Hz → 'sr30_NNNN' 

385 - 150 Hz → 'sr150_NNNN' 

386 - 2400 Hz → 'sr2400_NNNN' 

387 - 24000 Hz → 'sr24k_NNNN' 

388 - 96000 Hz → 'sr96k_NNNN' 

389 

390 """ 

391 

392 rdf = df.copy() 

393 sample_rates = rdf.sample_rate.unique() 

394 

395 for station in df.station.unique(): 

396 for sr in sample_rates: 

397 run_stem = self._file_extension_map[int(sr)].split("_")[-1] 

398 # continuous data 

399 if sr < 1000: 

400 sdf = rdf.loc[ 

401 (rdf.station == station) & (rdf.sample_rate == sr) 

402 ].sort_values("sequence_number") 

403 starts = np.sort(sdf.loc[sdf.sample_rate == sr].start.unique()) 

404 ends = np.sort(sdf.loc[sdf.sample_rate == sr].end.unique()) 

405 

406 # find any breaks in the data 

407 diff = ends[0:-1] - starts[1:] 

408 diff = diff.astype("timedelta64[s]").astype(float) 

409 

410 breaks = np.nonzero(diff)[0] 

411 

412 # this logic probably needs some work. Need to figure 

413 # out how to set pandas values 

414 count = 1 

415 if len(breaks) > 0: 

416 start_breaks = starts[breaks] 

417 for ii in range(len(start_breaks)): 

418 count += 1 

419 rdf.loc[ 

420 (rdf.station == station) 

421 & (rdf.start == start_breaks[ii]) 

422 & (rdf.sample_rate == sr), 

423 "run", 

424 ] = f"sr{run_stem}_{count:0{zeros}}" 

425 

426 else: 

427 rdf.loc[ 

428 (rdf.station == station) & (rdf.sample_rate == sr), 

429 "run", 

430 ] = f"sr{run_stem}_{count:0{zeros}}" 

431 

432 # segmented data 

433 else: 

434 starts = rdf.loc[ 

435 (rdf.station == station) & (rdf.sample_rate == sr), 

436 "start", 

437 ].unique() 

438 for ii, s in enumerate(starts, 1): 

439 rdf.loc[ 

440 (rdf.start == s) & (rdf.sample_rate == sr), "run" 

441 ] = f"sr{run_stem}_{ii:0{zeros}}" 

442 

443 return rdf 

444 

445 def get_runs( 

446 self, 

447 sample_rates: list[int] | int, 

448 run_name_zeros: int = 4, 

449 calibration_path: str | Path | None = None, 

450 ) -> OrderedDict[str, OrderedDict[str, pd.DataFrame]]: 

451 """ 

452 Organize Phoenix files into runs ready for reading. 

453 

454 Creates a nested dictionary structure organizing files by station and 

455 run. For each run, returns only the first file(s) needed to initialize 

456 reading, as continuous readers will automatically load sequences. 

457 

458 Parameters 

459 ---------- 

460 sample_rates : list[int] | int 

461 Sample rate(s) to include in Hz. Valid values are 30, 150, 2400, 

462 24000, 96000. Can be a single integer or list. 

463 run_name_zeros : int, optional 

464 Number of zeros for zero-padding run names (default is 4). 

465 calibration_path : str | Path | None, optional 

466 Path to calibration files. Currently unused but reserved for 

467 future functionality. 

468 

469 Returns 

470 ------- 

471 OrderedDict[str, OrderedDict[str, pd.DataFrame]] 

472 Nested OrderedDict with structure: 

473 

474 - Keys: station IDs 

475 - Values: OrderedDict of runs 

476 

477 - Keys: run IDs (e.g., 'sr150_0001') 

478 - Values: DataFrame with first file(s) for each channel 

479 

480 Examples 

481 -------- 

482 Get runs for standard sample rates: 

483 

484 >>> from mth5.io.phoenix import PhoenixCollection 

485 >>> collection = PhoenixCollection(r"/path/to/station") 

486 >>> runs = collection.get_runs(sample_rates=[150, 24000]) 

487 >>> print(runs.keys()) 

488 odict_keys(['MT001']) 

489 

490 Access specific station's runs: 

491 

492 >>> station_runs = runs['MT001'] 

493 >>> print(list(station_runs.keys())) 

494 ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...] 

495 

496 Get first file for a specific run: 

497 

498 >>> run_df = runs['MT001']['sr150_0001'] 

499 >>> print(run_df[['component', 'fn', 'start']]) 

500 component fn start 

501 0 Ex /path/to/8441_2020...td_150 2020-06-02T19:00:00 

502 1 Ey /path/to/8441_2020...td_150 2020-06-02T19:00:00 

503 

504 Iterate over all runs: 

505 

506 >>> for station_id, station_runs in runs.items(): 

507 ... for run_id, run_df in station_runs.items(): 

508 ... print(f"{station_id}/{run_id}: {len(run_df)} channels") 

509 MT001/sr150_0001: 5 channels 

510 MT001/sr24k_0001: 5 channels 

511 

512 Get single sample rate: 

513 

514 >>> runs_150 = collection.get_runs(sample_rates=150) 

515 >>> run_ids = list(runs_150['MT001'].keys()) 

516 >>> print([r for r in run_ids if 'sr150' in r]) 

517 ['sr150_0001'] 

518 

519 Notes 

520 ----- 

521 **For Continuous Data (< 1000 Hz):** 

522 

523 Returns only the first file in each sequence per channel. The Phoenix 

524 reader will automatically load the complete sequence when reading. 

525 

526 **For Segmented Data (≥ 1000 Hz):** 

527 

528 Returns the first file for each segment. Each segment must be read 

529 separately. 

530 

531 **DataFrame Content:** 

532 

533 Each DataFrame contains one row per channel component with the earliest 

534 file for that component in the run. This ensures all channels start from 

535 the same time. 

536 

537 The method internally: 

538 

539 1. Calls to_dataframe() to inventory all files 

540 2. Calls assign_run_names() to group files into runs 

541 3. Selects first file(s) for each run and component 

542 4. Returns organized structure for easy iteration 

543 

544 See Also 

545 -------- 

546 to_dataframe : Create complete file inventory 

547 assign_run_names : Group files into runs 

548 mth5.io.phoenix.read_phoenix : Read Phoenix files 

549 

550 """ 

551 

552 df = self.to_dataframe( 

553 sample_rates=sample_rates, 

554 run_name_zeros=run_name_zeros, 

555 calibration_path=calibration_path, 

556 ) 

557 

558 run_dict = OrderedDict() 

559 

560 for station in sorted(df.station.unique()): 

561 run_dict[station] = OrderedDict() 

562 

563 for run_id in sorted( 

564 df[df.station == station].run.unique(), 

565 key=lambda x: x[-run_name_zeros:], 

566 ): 

567 run_df = df[(df.station == station) & (df.run == run_id)] 

568 

569 first_row_list = [] 

570 for comp in run_df.component.unique(): 

571 comp_df = run_df[run_df.component == comp] 

572 comp_df = comp_df[comp_df.start == comp_df.start.min()] 

573 first_row_list.append(comp_df) 

574 

575 # run_dict[station][run_id] = run_df[ 

576 # run_df.start == run_df.start.min() 

577 # ] 

578 # need to get the earliest file for each component separately 

579 run_dict[station][run_id] = pd.concat(first_row_list) 

580 

581 return run_dict