Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ z3d_collection.py: 97%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Z3DCollection 

5================= 

6 

7An object to hold Z3D file information to make processing easier. 

8 

9 

10Created on Sat Apr 4 12:40:40 2020 

11 

12@author: peacock 

13""" 

14# ============================================================================= 

15# Imports 

16# ============================================================================= 

17from __future__ import annotations 

18 

19from pathlib import Path 

20from typing import Any 

21 

22import pandas as pd 

23from mt_metadata.timeseries import Station 

24 

25from mth5.io.collection import Collection 

26from mth5.io.zen import Z3D 

27from mth5.io.zen.coil_response import CoilResponse 

28 

29 

30# ============================================================================= 

31# Collection of Z3D Files 

32# ============================================================================= 

33 

34 

35class Z3DCollection(Collection): 

36 """ 

37 Collection manager for Z3D file operations and metadata processing. 

38 

39 This class provides functionality to handle collections of Z3D files, 

40 including metadata extraction, station information management, and 

41 dataframe creation for analysis workflows. 

42 

43 Parameters 

44 ---------- 

45 file_path : str or Path, optional 

46 Path to directory containing Z3D files, by default None 

47 **kwargs : dict 

48 Additional keyword arguments passed to parent Collection class 

49 

50 Attributes 

51 ---------- 

52 station_metadata_dict : dict[str, Station] 

53 Dictionary mapping station IDs to Station metadata objects 

54 file_ext : str 

55 File extension for Z3D files ("z3d") 

56 

57 Examples 

58 -------- 

59 >>> zc = Z3DCollection("/path/to/z3d/files") 

60 >>> df = zc.to_dataframe(sample_rates=[256, 4096]) 

61 >>> print(df.head()) 

62 """ 

63 

64 def __init__(self, file_path: str | Path | None = None, **kwargs: Any) -> None: 

65 """ 

66 Initialize Z3DCollection with optional file path. 

67 

68 Parameters 

69 ---------- 

70 file_path : str or Path, optional 

71 Path to directory containing Z3D files, by default None 

72 **kwargs : dict 

73 Additional keyword arguments passed to parent Collection class 

74 """ 

75 super().__init__(file_path=file_path, **kwargs) 

76 self.station_metadata_dict: dict[str, Station] = {} 

77 self.file_ext: str = "z3d" 

78 

79 def get_calibrations(self, antenna_calibration_file: str | Path) -> CoilResponse: 

80 """ 

81 Load coil calibration data from antenna calibration file. 

82 

83 Parameters 

84 ---------- 

85 antenna_calibration_file : str or Path 

86 Path to the antenna.cal file containing coil calibration data 

87 

88 Returns 

89 ------- 

90 CoilResponse 

91 CoilResponse object containing calibration information for 

92 various coil serial numbers 

93 

94 Examples 

95 -------- 

96 >>> zc = Z3DCollection("/path/to/z3d/files") 

97 >>> cal_obj = zc.get_calibrations("/path/to/antenna.cal") 

98 >>> print(cal_obj.has_coil_number("2324")) 

99 """ 

100 return CoilResponse(antenna_calibration_file) 

101 

102 def _sort_station_metadata( 

103 self, station_list: list[dict[str, Any]] 

104 ) -> dict[str, Station]: 

105 """ 

106 Process and consolidate station metadata from multiple Z3D files. 

107 

108 Takes a list of station metadata dictionaries and consolidates them 

109 by station ID, computing median values for coordinates when multiple 

110 measurements exist for the same station. 

111 

112 Parameters 

113 ---------- 

114 station_list : list of dict 

115 List of station metadata dictionaries, each containing station 

116 information with keys like 'id', 'location.latitude', etc. 

117 

118 Returns 

119 ------- 

120 dict[str, Station] 

121 Dictionary mapping station IDs to Station metadata objects 

122 with consolidated location information 

123 

124 Notes 

125 ----- 

126 For stations with multiple coordinate measurements, this method 

127 computes the median latitude, longitude, and elevation values 

128 to provide a robust central estimate. 

129 

130 Examples 

131 -------- 

132 >>> station_data = [ 

133 ... {'id': '001', 'location.latitude': 40.5, 'location.longitude': -116.8}, 

134 ... {'id': '001', 'location.latitude': 40.6, 'location.longitude': -116.9} 

135 ... ] 

136 >>> zc = Z3DCollection() 

137 >>> stations = zc._sort_station_metadata(station_data) 

138 >>> print(stations['001'].location.latitude) # Median value 

139 """ 

140 sdf = pd.DataFrame(station_list) 

141 info: dict[str, Station] = {} 

142 for station in sdf.id.unique(): 

143 station_df = sdf[sdf.id == station] 

144 station_metadata = Station() 

145 station_metadata.id = station 

146 station_metadata.location.latitude = station_df[ 

147 "location.latitude" 

148 ].median() 

149 station_metadata.location.longitude = station_df[ 

150 "location.longitude" 

151 ].median() 

152 station_metadata.location.elevation = station_df[ 

153 "location.elevation" 

154 ].median() 

155 

156 info[station] = station_metadata 

157 

158 return info 

159 

160 def to_dataframe( 

161 self, 

162 sample_rates: list[int] = [256, 4096], 

163 run_name_zeros: int = 4, 

164 calibration_path: str | Path | None = None, 

165 ) -> pd.DataFrame: 

166 """ 

167 Extract Z3D file information and create analysis-ready dataframe. 

168 

169 Processes all Z3D files in the collection, extracting metadata and 

170 file information to create a comprehensive dataframe suitable for 

171 magnetotelluric data analysis workflows. 

172 

173 Parameters 

174 ---------- 

175 sample_rates : list of int, default [256, 4096] 

176 Allowed sampling rates in Hz. Files with sample rates not in 

177 this list will trigger a warning and early return 

178 run_name_zeros : int, default 4 

179 Number of zero-padding digits for run names in dataframe sorting 

180 calibration_path : str or Path, optional 

181 Path to antenna calibration file. If None, calibration information 

182 will not be included, by default None 

183 

184 Returns 

185 ------- 

186 pd.DataFrame 

187 Dataframe containing Z3D file information with columns: 

188 - survey: Survey/job name from Z3D metadata 

189 - station: Station identifier 

190 - run: Automatically assigned run names based on start times 

191 - start/end: ISO format timestamps for data recording period 

192 - channel_id: Channel number from Z3D file 

193 - component: Measurement component (ex, ey, hx, hy, hz) 

194 - fn: Path to Z3D file 

195 - sample_rate: Sampling frequency in Hz 

196 - file_size: Size of Z3D file in bytes 

197 - n_samples: Number of data samples in file 

198 - sequence_number: Sequential numbering within station 

199 - dipole: Dipole length in meters (for electric channels) 

200 - coil_number: Coil serial number (for magnetic channels) 

201 - latitude/longitude/elevation: Station coordinates 

202 - instrument_id: ZEN box identifier 

203 - calibration_fn: Path to calibration file if available 

204 

205 Raises 

206 ------ 

207 AttributeError 

208 If Z3D files contain invalid or missing required metadata 

209 FileNotFoundError 

210 If calibration_path is specified but file doesn't exist 

211 

212 Examples 

213 -------- 

214 >>> zc = Z3DCollection("/path/to/z3d/files") 

215 >>> df = zc.to_dataframe(sample_rates=[256, 4096], 

216 ... calibration_path="/path/to/antenna.cal") 

217 >>> print(df[['station', 'component', 'sample_rate']].head()) 

218 >>> df.to_csv("/path/output/z3d_inventory.csv") 

219 

220 Notes 

221 ----- 

222 This method also populates the `station_metadata_dict` attribute 

223 with consolidated station metadata derived from all processed files. 

224 """ 

225 station_metadata: list[dict[str, Any]] = [] 

226 

227 # Handle optional calibration path 

228 cal_obj: CoilResponse | None = None 

229 if calibration_path is not None: 

230 cal_obj = self.get_calibrations(calibration_path) 

231 

232 entries: list[dict[str, Any]] = [] 

233 

234 for z3d_fn in set( 

235 self.get_files( 

236 [self.file_ext, self.file_ext.lower(), self.file_ext.upper()] 

237 ) 

238 ): 

239 z3d_obj = Z3D(z3d_fn) 

240 z3d_obj.read_all_info() 

241 station_metadata.append(z3d_obj.station_metadata.to_dict(single=True)) 

242 

243 # Validate sample rate: skip files with unsupported sample rates 

244 if ( 

245 z3d_obj.sample_rate is not None 

246 and int(z3d_obj.sample_rate) not in sample_rates 

247 ): 

248 self.logger.warning( 

249 f"Skipping {z3d_fn}: {z3d_obj.sample_rate} not in {sample_rates}" 

250 ) 

251 continue 

252 

253 entry = self.get_empty_entry_dict() 

254 entry["survey"] = z3d_obj.metadata.job_name 

255 entry["station"] = z3d_obj.station 

256 entry["run"] = None 

257 entry["start"] = z3d_obj.start.isoformat() 

258 entry["end"] = ( 

259 z3d_obj.end.isoformat() 

260 if hasattr(z3d_obj.end, "isoformat") 

261 else str(z3d_obj.end) 

262 ) 

263 entry["channel_id"] = z3d_obj.channel_number 

264 entry["component"] = z3d_obj.component 

265 entry["fn"] = z3d_fn 

266 entry["sample_rate"] = z3d_obj.sample_rate 

267 entry["file_size"] = z3d_obj.file_size 

268 entry["n_samples"] = z3d_obj.n_samples 

269 entry["sequence_number"] = 0 

270 entry["dipole"] = z3d_obj.dipole_length 

271 entry["coil_number"] = z3d_obj.coil_number 

272 entry["latitude"] = z3d_obj.latitude 

273 entry["longitude"] = z3d_obj.longitude 

274 entry["elevation"] = z3d_obj.elevation 

275 entry["instrument_id"] = f"ZEN_{int(z3d_obj.header.box_number):03}" 

276 

277 # Handle calibration file assignment 

278 if ( 

279 cal_obj is not None 

280 and z3d_obj.coil_number 

281 and cal_obj.has_coil_number(z3d_obj.coil_number) 

282 ): 

283 entry["calibration_fn"] = cal_obj.calibration_file 

284 else: 

285 entry["calibration_fn"] = None 

286 

287 entries.append(entry) 

288 

289 # If no entries were collected, return an empty DataFrame with the 

290 # expected columns so downstream dtype/sorting code can operate 

291 # without raising attribute errors. 

292 if len(entries) == 0: 

293 df = pd.DataFrame(columns=self._columns) 

294 df = self._sort_df(self._set_df_dtypes(df), run_name_zeros) 

295 # Ensure station metadata dict is at least an empty dict 

296 self.station_metadata_dict = {} 

297 return df 

298 

299 # Create and process dataframe 

300 df = self._sort_df(self._set_df_dtypes(pd.DataFrame(entries)), run_name_zeros) 

301 

302 # Store consolidated station metadata 

303 self.station_metadata_dict = self._sort_station_metadata(station_metadata) 

304 

305 return df 

306 

307 def assign_run_names(self, df: pd.DataFrame, zeros: int = 3) -> pd.DataFrame: 

308 """ 

309 Assign standardized run names to dataframe based on start times. 

310 

311 Creates run names using the pattern 'sr{sample_rate}_{block_number}' 

312 where block_number is assigned sequentially based on unique start 

313 times within each station. 

314 

315 Parameters 

316 ---------- 

317 df : pd.DataFrame 

318 Input dataframe containing Z3D file information with at least 

319 'station', 'start', and 'sample_rate' columns 

320 zeros : int, default 3 

321 Number of zero-padding digits for block numbers in run names 

322 

323 Returns 

324 ------- 

325 pd.DataFrame 

326 Modified dataframe with updated 'run' and 'sequence_number' 

327 columns assigned based on temporal ordering within each station 

328 

329 Examples 

330 -------- 

331 >>> zc = Z3DCollection() 

332 >>> df = pd.DataFrame({ 

333 ... 'station': ['001', '001', '002'], 

334 ... 'start': ['2022-01-01T10:00:00', '2022-01-01T12:00:00', '2022-01-01T10:00:00'], 

335 ... 'sample_rate': [256, 256, 4096] 

336 ... }) 

337 >>> df_with_runs = zc.assign_run_names(df, zeros=3) 

338 >>> print(df_with_runs['run'].tolist()) 

339 ['sr256_001', 'sr256_002', 'sr4096_001'] 

340 

341 Notes 

342 ----- 

343 This method modifies the input dataframe in-place by updating the 

344 'run' and 'sequence_number' columns. Start times are used to 

345 determine temporal ordering within each station. 

346 """ 

347 # Assign run names based on station and start time 

348 for station in df.station.unique(): 

349 starts = sorted(df[df.station == station].start.unique()) 

350 for block_num, start in enumerate(starts, 1): 

351 sample_rate = df[ 

352 (df.station == station) & (df.start == start) 

353 ].sample_rate.unique()[0] 

354 

355 df.loc[ 

356 (df.station == station) & (df.start == start), "run" 

357 ] = f"sr{sample_rate:.0f}_{block_num:0{zeros}}" 

358 df.loc[ 

359 (df.station == station) & (df.start == start), 

360 "sequence_number", 

361 ] = block_num 

362 return df