Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ phoenix \ phoenix_collection.py: 92%
113 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Phoenix file collection module for organizing and processing Phoenix MTU data files.
5This module provides the PhoenixCollection class for discovering, organizing,
6and managing Phoenix magnetotelluric receiver files within a directory structure.
8Created on Thu Aug 4 16:48:47 2022
10@author: jpeacock
11"""
12from __future__ import annotations
14from collections import OrderedDict
15from pathlib import Path
17import numpy as np
18import pandas as pd
20from mth5.io import Collection
21from mth5.io.phoenix import open_phoenix, PhoenixReceiverMetadata
24# =============================================================================
27class PhoenixCollection(Collection):
28 """
29 Collection manager for Phoenix MTU data files.
31 Organizes Phoenix magnetotelluric receiver files into runs based on
32 timing and sample rates. Handles multiple sample rates (30, 150, 2400,
33 24000, 96000 Hz) and manages receiver metadata.
35 Parameters
36 ----------
37 file_path : str | Path | None, optional
38 Path to the directory containing Phoenix data files. Can be the
39 station folder or a parent folder containing multiple stations.
40 **kwargs
41 Additional keyword arguments passed to parent Collection class.
43 Attributes
44 ----------
45 metadata_dict : dict[str, PhoenixReceiverMetadata]
46 Dictionary mapping station IDs to their receiver metadata.
48 Examples
49 --------
50 Create a collection from a station directory:
52 >>> from mth5.io.phoenix import PhoenixCollection
53 >>> collection = PhoenixCollection(r"/path/to/station")
54 >>> runs = collection.get_runs(sample_rates=[150, 24000])
55 >>> print(runs.keys())
56 dict_keys(['MT001'])
58 Process multiple sample rates:
60 >>> df = collection.to_dataframe(sample_rates=[150, 2400, 24000])
61 >>> print(df.columns)
62 Index(['survey', 'station', 'run', 'start', 'end', ...])
64 Notes
65 -----
66 The class automatically discovers station folders by locating
67 'recmeta.json' files and organizes time series files by sample rate.
69 File extensions are mapped as:
71 - 30 Hz: td_30
72 - 150 Hz: td_150
73 - 2400 Hz: td_2400
74 - 24000 Hz: td_24k
75 - 96000 Hz: td_96k
77 See Also
78 --------
79 mth5.io.Collection : Base collection class
80 mth5.io.phoenix.PhoenixReceiverMetadata : Receiver metadata handler
82 """
84 def __init__(self, file_path: str | Path | None = None, **kwargs) -> None:
85 self._file_extension_map = {
86 30: "td_30",
87 150: "td_150",
88 2400: "td_2400",
89 24000: "td_24k",
90 96000: "td_96k",
91 }
93 self._default_channel_map = {
94 0: "E1",
95 1: "H3",
96 2: "H2",
97 3: "H1",
98 4: "H4",
99 5: "H5",
100 6: "H6",
101 7: "E2",
102 }
104 super().__init__(file_path=file_path, **kwargs)
106 self.metadata_dict = {}
108 self._receiver_metadata_name = "recmeta.json"
110 def _read_receiver_metadata_json(
111 self, rec_fn: str | Path
112 ) -> PhoenixReceiverMetadata | None:
113 """
114 Read receiver metadata from JSON file.
116 Loads and parses the recmeta.json file containing station and
117 channel configuration information.
119 Parameters
120 ----------
121 rec_fn : str | Path
122 Path to the recmeta.json metadata file.
124 Returns
125 -------
126 PhoenixReceiverMetadata | None
127 Receiver metadata object if file exists, None otherwise.
129 Examples
130 --------
131 >>> metadata = collection._read_receiver_metadata_json(
132 ... Path("/data/station/recmeta.json")
133 ... )
134 >>> print(metadata.station_metadata.id)
135 'MT001'
137 """
139 if Path(rec_fn).is_file():
140 return PhoenixReceiverMetadata(fn=rec_fn)
141 else:
142 self.logger.warning(
143 f"Could not find {self._receiver_metadata_name} in {self.file_path}"
144 )
145 return None
147 def _locate_station_folders(self) -> list[Path]:
148 """
149 Locate all station folders containing recmeta.json files.
151 Recursively searches the collection path for directories containing
152 the receiver metadata file (recmeta.json), which identifies a valid
153 Phoenix station folder.
155 Returns
156 -------
157 list[Path]
158 List of Path objects pointing to station folders.
160 Examples
161 --------
162 >>> folders = collection._locate_station_folders()
163 >>> print([f.name for f in folders])
164 ['MT001', 'MT002', 'MT003']
166 Notes
167 -----
168 Each station folder must contain a recmeta.json file to be recognized.
169 The search is recursive, allowing for nested directory structures.
171 """
172 station_folders = []
173 for folder in self.file_path.rglob("**/"):
174 rec_fn = folder.joinpath("recmeta.json")
175 if rec_fn.exists():
176 station_folders.append(folder)
178 return station_folders
180 def to_dataframe(
181 self,
182 sample_rates: list[int] | int = [150, 24000],
183 run_name_zeros: int = 4,
184 calibration_path: str | Path | None = None,
185 ) -> pd.DataFrame:
186 """
187 Create a DataFrame cataloging all Phoenix files in the collection.
189 Scans all station folders for time series files at specified sample
190 rates and creates a comprehensive inventory with metadata for each file.
192 Parameters
193 ----------
194 sample_rates : list[int] | int, optional
195 Sample rate(s) to include in Hz. Valid values are 30, 150, 2400,
196 24000, 96000. Can be a single integer or list (default is [150, 24000]).
197 run_name_zeros : int, optional
198 Number of zeros for zero-padding run names (default is 4).
199 For example, 4 produces 'sr150_0001'.
200 calibration_path : str | Path | None, optional
201 Path to calibration files. Currently unused but reserved for
202 future functionality.
204 Returns
205 -------
206 pd.DataFrame
207 DataFrame with one row per file containing columns:
209 - survey: Survey ID from metadata
210 - station: Station ID from metadata
211 - run: Run ID (assigned by assign_run_names)
212 - start: File start time (ISO format)
213 - end: File end time (ISO format)
214 - channel_id: Numeric channel identifier
215 - component: Channel component name (e.g., 'Ex', 'Hy')
216 - fn: Full file path
217 - sample_rate: Sample rate in Hz
218 - file_size: File size in bytes
219 - n_samples: Number of samples in file
220 - sequence_number: File sequence number for continuous data
221 - instrument_id: Recording/receiver ID
222 - calibration_fn: Path to calibration file (currently None)
224 Examples
225 --------
226 Get DataFrame for standard sample rates:
228 >>> df = collection.to_dataframe(sample_rates=[150, 24000])
229 >>> print(df.shape)
230 (245, 14)
231 >>> print(df.station.unique())
232 ['MT001']
234 Process single sample rate:
236 >>> df_150 = collection.to_dataframe(sample_rates=150)
237 >>> print(df_150.sample_rate.unique())
238 [150.]
240 Check file coverage:
242 >>> for comp in df.component.unique():
243 ... comp_df = df[df.component == comp]
244 ... print(f"{comp}: {len(comp_df)} files")
245 Ex: 35 files
246 Ey: 35 files
247 Hx: 35 files
249 Notes
250 -----
251 - Calibration files (identified by 'calibration' in filename) are
252 automatically skipped
253 - Files that cannot be opened are logged and skipped
254 - The DataFrame is sorted by station, sample_rate, and start time
255 - Run names must be assigned separately using assign_run_names()
257 See Also
258 --------
259 assign_run_names : Assign run identifiers based on timing
260 get_runs : Get organized runs directly
262 """
264 if not isinstance(sample_rates, (list, tuple)):
265 sample_rates = [sample_rates]
267 station_folders = self._locate_station_folders()
269 entries = []
270 for folder in station_folders:
271 rec_fn = folder.joinpath(self._receiver_metadata_name)
272 receiver_metadata = self._read_receiver_metadata_json(rec_fn)
273 self.metadata_dict[
274 receiver_metadata.station_metadata.id
275 ] = receiver_metadata
277 for sr in sample_rates:
278 for fn in folder.rglob(f"*{self._file_extension_map[int(sr)]}"):
279 if "calibration" in fn.as_posix().lower():
280 self.logger.debug(f"skipping calibration time series {fn}")
281 continue
282 try:
283 phx_obj = open_phoenix(fn)
284 except OSError:
285 self.logger.warning(f"Skipping {fn.name}")
286 continue
287 if hasattr(phx_obj, "read_segment"):
288 segment = phx_obj.read_segment(metadata_only=True)
289 try:
290 start = segment.segment_start_time.isoformat()
291 except IOError:
292 self.logger.warning(f"Could not read file {fn}, SKIPPING")
293 continue
294 end = segment.segment_end_time.isoformat()
295 n_samples = segment.n_samples
297 else:
298 start = phx_obj.segment_start_time.isoformat()
299 end = phx_obj.segment_end_time.isoformat()
300 n_samples = phx_obj.max_samples
302 entry = self.get_empty_entry_dict()
303 entry["survey"] = receiver_metadata.survey_metadata.id
304 entry["station"] = receiver_metadata.station_metadata.id
305 entry["run"] = (None,)
306 entry["start"] = start
307 entry["end"] = end
308 entry["channel_id"] = phx_obj.channel_id
309 entry["component"] = receiver_metadata.channel_map[
310 phx_obj.channel_id
311 ]
312 entry["fn"] = fn
313 entry["sample_rate"] = phx_obj.sample_rate
314 entry["file_size"] = phx_obj.file_size
315 entry["n_samples"] = n_samples
316 entry["sequence_number"] = phx_obj.seq
317 entry["instrument_id"] = phx_obj.recording_id
318 entry["calibration_fn"] = None
319 entries.append(entry)
321 df = self._sort_df(self._set_df_dtypes(pd.DataFrame(entries)), run_name_zeros)
323 return df
325 def assign_run_names(self, df: pd.DataFrame, zeros: int = 4) -> pd.DataFrame:
326 """
327 Assign run names based on temporal continuity.
329 Analyzes file timing to group files into runs. For continuous data
330 (< 1000 Hz), maintains a single run as long as files are contiguous.
331 For segmented data (≥ 1000 Hz), assigns a unique run to each segment.
333 Parameters
334 ----------
335 df : pd.DataFrame
336 DataFrame returned by `to_dataframe` method with file inventory.
337 zeros : int, optional
338 Number of zeros for zero-padding run names (default is 4).
340 Returns
341 -------
342 pd.DataFrame
343 DataFrame with 'run' column populated. Run names follow the
344 format 'sr{rate}_{number:0{zeros}}', e.g., 'sr150_0001'.
346 Examples
347 --------
348 Assign run names to a DataFrame:
350 >>> df = collection.to_dataframe(sample_rates=[150, 24000])
351 >>> df_with_runs = collection.assign_run_names(df, zeros=4)
352 >>> print(df_with_runs.run.unique())
353 ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...]
355 Check for data gaps in continuous data:
357 >>> df_150 = df_with_runs[df_with_runs.sample_rate == 150]
358 >>> print(df_150.run.unique())
359 ['sr150_0001', 'sr150_0002'] # Gap detected between runs
361 Count segments in high-rate data:
363 >>> df_24k = df_with_runs[df_with_runs.sample_rate == 24000]
364 >>> n_segments = len(df_24k.run.unique())
365 >>> print(f"Found {n_segments} segments at 24 kHz")
366 Found 43 segments at 24 kHz
368 Notes
369 -----
370 **Continuous Data (< 1000 Hz):**
372 - Maintains single run ID while files are temporally contiguous
373 - Detects gaps by comparing end time of file N with start time of
374 file N+1
375 - Increments run counter when gap > 0 seconds detected
377 **Segmented Data (≥ 1000 Hz):**
379 - Each unique start time receives a new run ID
380 - Typically results in one run per segment/file
382 The run naming scheme uses the sample rate in the identifier:
384 - 30 Hz → 'sr30_NNNN'
385 - 150 Hz → 'sr150_NNNN'
386 - 2400 Hz → 'sr2400_NNNN'
387 - 24000 Hz → 'sr24k_NNNN'
388 - 96000 Hz → 'sr96k_NNNN'
390 """
392 rdf = df.copy()
393 sample_rates = rdf.sample_rate.unique()
395 for station in df.station.unique():
396 for sr in sample_rates:
397 run_stem = self._file_extension_map[int(sr)].split("_")[-1]
398 # continuous data
399 if sr < 1000:
400 sdf = rdf.loc[
401 (rdf.station == station) & (rdf.sample_rate == sr)
402 ].sort_values("sequence_number")
403 starts = np.sort(sdf.loc[sdf.sample_rate == sr].start.unique())
404 ends = np.sort(sdf.loc[sdf.sample_rate == sr].end.unique())
406 # find any breaks in the data
407 diff = ends[0:-1] - starts[1:]
408 diff = diff.astype("timedelta64[s]").astype(float)
410 breaks = np.nonzero(diff)[0]
412 # this logic probably needs some work. Need to figure
413 # out how to set pandas values
414 count = 1
415 if len(breaks) > 0:
416 start_breaks = starts[breaks]
417 for ii in range(len(start_breaks)):
418 count += 1
419 rdf.loc[
420 (rdf.station == station)
421 & (rdf.start == start_breaks[ii])
422 & (rdf.sample_rate == sr),
423 "run",
424 ] = f"sr{run_stem}_{count:0{zeros}}"
426 else:
427 rdf.loc[
428 (rdf.station == station) & (rdf.sample_rate == sr),
429 "run",
430 ] = f"sr{run_stem}_{count:0{zeros}}"
432 # segmented data
433 else:
434 starts = rdf.loc[
435 (rdf.station == station) & (rdf.sample_rate == sr),
436 "start",
437 ].unique()
438 for ii, s in enumerate(starts, 1):
439 rdf.loc[
440 (rdf.start == s) & (rdf.sample_rate == sr), "run"
441 ] = f"sr{run_stem}_{ii:0{zeros}}"
443 return rdf
445 def get_runs(
446 self,
447 sample_rates: list[int] | int,
448 run_name_zeros: int = 4,
449 calibration_path: str | Path | None = None,
450 ) -> OrderedDict[str, OrderedDict[str, pd.DataFrame]]:
451 """
452 Organize Phoenix files into runs ready for reading.
454 Creates a nested dictionary structure organizing files by station and
455 run. For each run, returns only the first file(s) needed to initialize
456 reading, as continuous readers will automatically load sequences.
458 Parameters
459 ----------
460 sample_rates : list[int] | int
461 Sample rate(s) to include in Hz. Valid values are 30, 150, 2400,
462 24000, 96000. Can be a single integer or list.
463 run_name_zeros : int, optional
464 Number of zeros for zero-padding run names (default is 4).
465 calibration_path : str | Path | None, optional
466 Path to calibration files. Currently unused but reserved for
467 future functionality.
469 Returns
470 -------
471 OrderedDict[str, OrderedDict[str, pd.DataFrame]]
472 Nested OrderedDict with structure:
474 - Keys: station IDs
475 - Values: OrderedDict of runs
477 - Keys: run IDs (e.g., 'sr150_0001')
478 - Values: DataFrame with first file(s) for each channel
480 Examples
481 --------
482 Get runs for standard sample rates:
484 >>> from mth5.io.phoenix import PhoenixCollection
485 >>> collection = PhoenixCollection(r"/path/to/station")
486 >>> runs = collection.get_runs(sample_rates=[150, 24000])
487 >>> print(runs.keys())
488 odict_keys(['MT001'])
490 Access specific station's runs:
492 >>> station_runs = runs['MT001']
493 >>> print(list(station_runs.keys()))
494 ['sr150_0001', 'sr24k_0001', 'sr24k_0002', ...]
496 Get first file for a specific run:
498 >>> run_df = runs['MT001']['sr150_0001']
499 >>> print(run_df[['component', 'fn', 'start']])
500 component fn start
501 0 Ex /path/to/8441_2020...td_150 2020-06-02T19:00:00
502 1 Ey /path/to/8441_2020...td_150 2020-06-02T19:00:00
504 Iterate over all runs:
506 >>> for station_id, station_runs in runs.items():
507 ... for run_id, run_df in station_runs.items():
508 ... print(f"{station_id}/{run_id}: {len(run_df)} channels")
509 MT001/sr150_0001: 5 channels
510 MT001/sr24k_0001: 5 channels
512 Get single sample rate:
514 >>> runs_150 = collection.get_runs(sample_rates=150)
515 >>> run_ids = list(runs_150['MT001'].keys())
516 >>> print([r for r in run_ids if 'sr150' in r])
517 ['sr150_0001']
519 Notes
520 -----
521 **For Continuous Data (< 1000 Hz):**
523 Returns only the first file in each sequence per channel. The Phoenix
524 reader will automatically load the complete sequence when reading.
526 **For Segmented Data (≥ 1000 Hz):**
528 Returns the first file for each segment. Each segment must be read
529 separately.
531 **DataFrame Content:**
533 Each DataFrame contains one row per channel component with the earliest
534 file for that component in the run. This ensures all channels start from
535 the same time.
537 The method internally:
539 1. Calls to_dataframe() to inventory all files
540 2. Calls assign_run_names() to group files into runs
541 3. Selects first file(s) for each run and component
542 4. Returns organized structure for easy iteration
544 See Also
545 --------
546 to_dataframe : Create complete file inventory
547 assign_run_names : Group files into runs
548 mth5.io.phoenix.read_phoenix : Read Phoenix files
550 """
552 df = self.to_dataframe(
553 sample_rates=sample_rates,
554 run_name_zeros=run_name_zeros,
555 calibration_path=calibration_path,
556 )
558 run_dict = OrderedDict()
560 for station in sorted(df.station.unique()):
561 run_dict[station] = OrderedDict()
563 for run_id in sorted(
564 df[df.station == station].run.unique(),
565 key=lambda x: x[-run_name_zeros:],
566 ):
567 run_df = df[(df.station == station) & (df.run == run_id)]
569 first_row_list = []
570 for comp in run_df.component.unique():
571 comp_df = run_df[run_df.component == comp]
572 comp_df = comp_df[comp_df.start == comp_df.start.min()]
573 first_row_list.append(comp_df)
575 # run_dict[station][run_id] = run_df[
576 # run_df.start == run_df.start.min()
577 # ]
578 # need to get the earliest file for each component separately
579 run_dict[station][run_id] = pd.concat(first_row_list)
581 return run_dict