Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ z3d_collection.py: 97%
79 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Z3DCollection
5=================
7An object to hold Z3D file information to make processing easier.
10Created on Sat Apr 4 12:40:40 2020
12@author: peacock
13"""
14# =============================================================================
15# Imports
16# =============================================================================
17from __future__ import annotations
19from pathlib import Path
20from typing import Any
22import pandas as pd
23from mt_metadata.timeseries import Station
25from mth5.io.collection import Collection
26from mth5.io.zen import Z3D
27from mth5.io.zen.coil_response import CoilResponse
30# =============================================================================
31# Collection of Z3D Files
32# =============================================================================
35class Z3DCollection(Collection):
36 """
37 Collection manager for Z3D file operations and metadata processing.
39 This class provides functionality to handle collections of Z3D files,
40 including metadata extraction, station information management, and
41 dataframe creation for analysis workflows.
43 Parameters
44 ----------
45 file_path : str or Path, optional
46 Path to directory containing Z3D files, by default None
47 **kwargs : dict
48 Additional keyword arguments passed to parent Collection class
50 Attributes
51 ----------
52 station_metadata_dict : dict[str, Station]
53 Dictionary mapping station IDs to Station metadata objects
54 file_ext : str
55 File extension for Z3D files ("z3d")
57 Examples
58 --------
59 >>> zc = Z3DCollection("/path/to/z3d/files")
60 >>> df = zc.to_dataframe(sample_rates=[256, 4096])
61 >>> print(df.head())
62 """
64 def __init__(self, file_path: str | Path | None = None, **kwargs: Any) -> None:
65 """
66 Initialize Z3DCollection with optional file path.
68 Parameters
69 ----------
70 file_path : str or Path, optional
71 Path to directory containing Z3D files, by default None
72 **kwargs : dict
73 Additional keyword arguments passed to parent Collection class
74 """
75 super().__init__(file_path=file_path, **kwargs)
76 self.station_metadata_dict: dict[str, Station] = {}
77 self.file_ext: str = "z3d"
79 def get_calibrations(self, antenna_calibration_file: str | Path) -> CoilResponse:
80 """
81 Load coil calibration data from antenna calibration file.
83 Parameters
84 ----------
85 antenna_calibration_file : str or Path
86 Path to the antenna.cal file containing coil calibration data
88 Returns
89 -------
90 CoilResponse
91 CoilResponse object containing calibration information for
92 various coil serial numbers
94 Examples
95 --------
96 >>> zc = Z3DCollection("/path/to/z3d/files")
97 >>> cal_obj = zc.get_calibrations("/path/to/antenna.cal")
98 >>> print(cal_obj.has_coil_number("2324"))
99 """
100 return CoilResponse(antenna_calibration_file)
102 def _sort_station_metadata(
103 self, station_list: list[dict[str, Any]]
104 ) -> dict[str, Station]:
105 """
106 Process and consolidate station metadata from multiple Z3D files.
108 Takes a list of station metadata dictionaries and consolidates them
109 by station ID, computing median values for coordinates when multiple
110 measurements exist for the same station.
112 Parameters
113 ----------
114 station_list : list of dict
115 List of station metadata dictionaries, each containing station
116 information with keys like 'id', 'location.latitude', etc.
118 Returns
119 -------
120 dict[str, Station]
121 Dictionary mapping station IDs to Station metadata objects
122 with consolidated location information
124 Notes
125 -----
126 For stations with multiple coordinate measurements, this method
127 computes the median latitude, longitude, and elevation values
128 to provide a robust central estimate.
130 Examples
131 --------
132 >>> station_data = [
133 ... {'id': '001', 'location.latitude': 40.5, 'location.longitude': -116.8},
134 ... {'id': '001', 'location.latitude': 40.6, 'location.longitude': -116.9}
135 ... ]
136 >>> zc = Z3DCollection()
137 >>> stations = zc._sort_station_metadata(station_data)
138 >>> print(stations['001'].location.latitude) # Median value
139 """
140 sdf = pd.DataFrame(station_list)
141 info: dict[str, Station] = {}
142 for station in sdf.id.unique():
143 station_df = sdf[sdf.id == station]
144 station_metadata = Station()
145 station_metadata.id = station
146 station_metadata.location.latitude = station_df[
147 "location.latitude"
148 ].median()
149 station_metadata.location.longitude = station_df[
150 "location.longitude"
151 ].median()
152 station_metadata.location.elevation = station_df[
153 "location.elevation"
154 ].median()
156 info[station] = station_metadata
158 return info
160 def to_dataframe(
161 self,
162 sample_rates: list[int] = [256, 4096],
163 run_name_zeros: int = 4,
164 calibration_path: str | Path | None = None,
165 ) -> pd.DataFrame:
166 """
167 Extract Z3D file information and create analysis-ready dataframe.
169 Processes all Z3D files in the collection, extracting metadata and
170 file information to create a comprehensive dataframe suitable for
171 magnetotelluric data analysis workflows.
173 Parameters
174 ----------
175 sample_rates : list of int, default [256, 4096]
176 Allowed sampling rates in Hz. Files with sample rates not in
177 this list will trigger a warning and early return
178 run_name_zeros : int, default 4
179 Number of zero-padding digits for run names in dataframe sorting
180 calibration_path : str or Path, optional
181 Path to antenna calibration file. If None, calibration information
182 will not be included, by default None
184 Returns
185 -------
186 pd.DataFrame
187 Dataframe containing Z3D file information with columns:
188 - survey: Survey/job name from Z3D metadata
189 - station: Station identifier
190 - run: Automatically assigned run names based on start times
191 - start/end: ISO format timestamps for data recording period
192 - channel_id: Channel number from Z3D file
193 - component: Measurement component (ex, ey, hx, hy, hz)
194 - fn: Path to Z3D file
195 - sample_rate: Sampling frequency in Hz
196 - file_size: Size of Z3D file in bytes
197 - n_samples: Number of data samples in file
198 - sequence_number: Sequential numbering within station
199 - dipole: Dipole length in meters (for electric channels)
200 - coil_number: Coil serial number (for magnetic channels)
201 - latitude/longitude/elevation: Station coordinates
202 - instrument_id: ZEN box identifier
203 - calibration_fn: Path to calibration file if available
205 Raises
206 ------
207 AttributeError
208 If Z3D files contain invalid or missing required metadata
209 FileNotFoundError
210 If calibration_path is specified but file doesn't exist
212 Examples
213 --------
214 >>> zc = Z3DCollection("/path/to/z3d/files")
215 >>> df = zc.to_dataframe(sample_rates=[256, 4096],
216 ... calibration_path="/path/to/antenna.cal")
217 >>> print(df[['station', 'component', 'sample_rate']].head())
218 >>> df.to_csv("/path/output/z3d_inventory.csv")
220 Notes
221 -----
222 This method also populates the `station_metadata_dict` attribute
223 with consolidated station metadata derived from all processed files.
224 """
225 station_metadata: list[dict[str, Any]] = []
227 # Handle optional calibration path
228 cal_obj: CoilResponse | None = None
229 if calibration_path is not None:
230 cal_obj = self.get_calibrations(calibration_path)
232 entries: list[dict[str, Any]] = []
234 for z3d_fn in set(
235 self.get_files(
236 [self.file_ext, self.file_ext.lower(), self.file_ext.upper()]
237 )
238 ):
239 z3d_obj = Z3D(z3d_fn)
240 z3d_obj.read_all_info()
241 station_metadata.append(z3d_obj.station_metadata.to_dict(single=True))
243 # Validate sample rate: skip files with unsupported sample rates
244 if (
245 z3d_obj.sample_rate is not None
246 and int(z3d_obj.sample_rate) not in sample_rates
247 ):
248 self.logger.warning(
249 f"Skipping {z3d_fn}: {z3d_obj.sample_rate} not in {sample_rates}"
250 )
251 continue
253 entry = self.get_empty_entry_dict()
254 entry["survey"] = z3d_obj.metadata.job_name
255 entry["station"] = z3d_obj.station
256 entry["run"] = None
257 entry["start"] = z3d_obj.start.isoformat()
258 entry["end"] = (
259 z3d_obj.end.isoformat()
260 if hasattr(z3d_obj.end, "isoformat")
261 else str(z3d_obj.end)
262 )
263 entry["channel_id"] = z3d_obj.channel_number
264 entry["component"] = z3d_obj.component
265 entry["fn"] = z3d_fn
266 entry["sample_rate"] = z3d_obj.sample_rate
267 entry["file_size"] = z3d_obj.file_size
268 entry["n_samples"] = z3d_obj.n_samples
269 entry["sequence_number"] = 0
270 entry["dipole"] = z3d_obj.dipole_length
271 entry["coil_number"] = z3d_obj.coil_number
272 entry["latitude"] = z3d_obj.latitude
273 entry["longitude"] = z3d_obj.longitude
274 entry["elevation"] = z3d_obj.elevation
275 entry["instrument_id"] = f"ZEN_{int(z3d_obj.header.box_number):03}"
277 # Handle calibration file assignment
278 if (
279 cal_obj is not None
280 and z3d_obj.coil_number
281 and cal_obj.has_coil_number(z3d_obj.coil_number)
282 ):
283 entry["calibration_fn"] = cal_obj.calibration_file
284 else:
285 entry["calibration_fn"] = None
287 entries.append(entry)
289 # If no entries were collected, return an empty DataFrame with the
290 # expected columns so downstream dtype/sorting code can operate
291 # without raising attribute errors.
292 if len(entries) == 0:
293 df = pd.DataFrame(columns=self._columns)
294 df = self._sort_df(self._set_df_dtypes(df), run_name_zeros)
295 # Ensure station metadata dict is at least an empty dict
296 self.station_metadata_dict = {}
297 return df
299 # Create and process dataframe
300 df = self._sort_df(self._set_df_dtypes(pd.DataFrame(entries)), run_name_zeros)
302 # Store consolidated station metadata
303 self.station_metadata_dict = self._sort_station_metadata(station_metadata)
305 return df
307 def assign_run_names(self, df: pd.DataFrame, zeros: int = 3) -> pd.DataFrame:
308 """
309 Assign standardized run names to dataframe based on start times.
311 Creates run names using the pattern 'sr{sample_rate}_{block_number}'
312 where block_number is assigned sequentially based on unique start
313 times within each station.
315 Parameters
316 ----------
317 df : pd.DataFrame
318 Input dataframe containing Z3D file information with at least
319 'station', 'start', and 'sample_rate' columns
320 zeros : int, default 3
321 Number of zero-padding digits for block numbers in run names
323 Returns
324 -------
325 pd.DataFrame
326 Modified dataframe with updated 'run' and 'sequence_number'
327 columns assigned based on temporal ordering within each station
329 Examples
330 --------
331 >>> zc = Z3DCollection()
332 >>> df = pd.DataFrame({
333 ... 'station': ['001', '001', '002'],
334 ... 'start': ['2022-01-01T10:00:00', '2022-01-01T12:00:00', '2022-01-01T10:00:00'],
335 ... 'sample_rate': [256, 256, 4096]
336 ... })
337 >>> df_with_runs = zc.assign_run_names(df, zeros=3)
338 >>> print(df_with_runs['run'].tolist())
339 ['sr256_001', 'sr256_002', 'sr4096_001']
341 Notes
342 -----
343 This method modifies the input dataframe in-place by updating the
344 'run' and 'sequence_number' columns. Start times are used to
345 determine temporal ordering within each station.
346 """
347 # Assign run names based on station and start time
348 for station in df.station.unique():
349 starts = sorted(df[df.station == station].start.unique())
350 for block_num, start in enumerate(starts, 1):
351 sample_rate = df[
352 (df.station == station) & (df.start == start)
353 ].sample_rate.unique()[0]
355 df.loc[
356 (df.station == station) & (df.start == start), "run"
357 ] = f"sr{sample_rate:.0f}_{block_num:0{zeros}}"
358 df.loc[
359 (df.station == station) & (df.start == start),
360 "sequence_number",
361 ] = block_num
362 return df