Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ zonge \ zonge.py: 79%
312 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3====================
4zonge
5====================
6 * Tools for interfacing with MTFT24
7 * Tools for interfacing with MTEdit
10Created on Tue Jul 11 10:53:23 2013
11@author: jpeacock-pr
12"""
14# ==============================================================================
15from pathlib import Path
17import numpy as np
18import pandas as pd
19from loguru import logger
21from mt_metadata.timeseries import Electric, Magnetic, Run, Survey
22from mt_metadata.transfer_functions.io.tools import get_nm_elev
23from mt_metadata.transfer_functions.tf import Station
25from .metadata import Header
28# ==============================================================================
29# deal with avg files output from mtedit
30# ==============================================================================
31class ZongeMTAvg:
32 """
33 deal with avg files output from mtedit
34 """
36 def __init__(self, fn=None, **kwargs):
37 self.header = Header()
39 self.info_keys = [
40 "skip",
41 "frequency",
42 "e_magnitude",
43 "b_magnitude",
44 "z_magnitude",
45 "z_phase",
46 "apparent_resistivity",
47 "apparent_resistivity_err",
48 "z_phase_err",
49 "coherency",
50 "fc_use",
51 "fc_try",
52 ]
53 self.info_fmt = []
55 self.z = None
56 self.z_err = None
57 self.t = None
58 self.t_err = None
59 self.components = []
61 self._comp_index_down = {
62 "zxx": (0, 0),
63 "zxy": (0, 1),
64 "zyx": (1, 0),
65 "zyy": (1, 1),
66 "tzx": (0, 0),
67 "tzy": (0, 1),
68 "zxxr": (0, 0),
69 "zxyr": (0, 1),
70 "zyxr": (1, 0),
71 "zyyr": (1, 1),
72 }
74 self._comp_index_up = {
75 "zxx": (1, 1),
76 "zxy": (1, 0),
77 "zyx": (0, 1),
78 "zyy": (0, 0),
79 "tzx": (0, 1),
80 "tzy": (0, 0),
81 "zxxr": (1, 1),
82 "zxyr": (1, 0),
83 "zyxr": (0, 1),
84 "zyyr": (0, 0),
85 }
87 self.freq_index_dict = None
88 self.z_positive = "down"
90 self.fn = fn
92 for key, value in kwargs.items():
93 setattr(self, key, value)
95 def _get_comp_index(self) -> dict[str, tuple[int, int]]:
96 """
97 get the correct component index dictionary based on z_positive
99 Down assumes x is north, y is east
101 Up assumes x is east, y is north
102 """
103 if self.z_positive == "down":
104 return self._comp_index_down
105 elif self.z_positive == "up":
106 return self._comp_index_up
107 else:
108 raise ValueError("z_postiive must be either [ 'up' | 'down' ]")
110 @property
111 def fn(self) -> Path | None:
112 return self._fn
114 @fn.setter
115 def fn(self, value: str | Path | None):
116 if value is not None:
117 self._fn = Path(value)
118 else:
119 self._fn = None
121 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None:
122 """
123 Read data from a file into the object as a pandas DataFrame
125 Parameters
126 ----------
127 fn : str | Path | None, optional
128 The file name to read from, by default None
129 get_elevation : bool, optional
130 Whether to get elevation data, by default False
131 """
133 if fn is not None:
134 self.fn = Path(fn)
136 if self.fn is None or not self.fn.exists():
137 raise FileNotFoundError(f"File not found: {self.fn}")
139 with self.fn.open("r") as fid:
140 lines = fid.readlines()
142 # read header
143 data_lines = self.header.read_header(lines)
145 data_list = []
146 for line in data_lines:
147 if "$" in line:
148 key, comp = [ss.strip() for ss in line.split("=")]
149 elif "skp" in line.lower() or len(line) < 2:
150 continue
151 else:
152 line = line.replace("*", "0.50")
153 values = [comp.lower()] + [float(ss.strip()) for ss in line.split(",")]
154 entry = dict(
155 [
156 (key.lower(), value)
157 for key, value in zip(["comp"] + self.info_keys, values)
158 ]
159 )
160 data_list.append(entry)
162 self.df = pd.DataFrame(data_list)
164 self.frequency = self.df.frequency.unique()
165 self.frequency.sort()
166 self.n_freq = self.frequency.size
167 self.components = self.df.comp.unique()
169 self.freq_index_dict = dict([(ff, ii) for ii, ff in enumerate(self.frequency)])
171 self.z, self.z_err = self._fill_z()
172 self.t, self.t_err = self._fill_t()
174 if self.header.elevation == 0 and get_elevation:
175 if self.header.latitude != 0 and self.header.longitude != 0:
176 self.header.elevation = get_nm_elev(
177 self.header.latitude, self.header.longitude
178 )
180 def to_complex(
181 self, zmag: np.typing.NDArray, zphase: np.typing.NDArray
182 ) -> tuple[np.typing.NDArray, np.typing.NDArray]:
183 """
184 Convert magnitude and phase to complex representation.
186 Outputs of mtedit are magnitude and phase of z, convert to real and
187 imaginary parts, phase is in milliradians.
189 Parameters
190 ----------
191 zmag : np.typing.NDArray
192 The magnitude array.
193 zphase : np.typing.NDArray
194 The phase array.
196 Returns
197 -------
198 tuple[np.typing.NDArray, np.typing.NDArray]
199 The real and imaginary parts of the complex representation.
201 """
203 if isinstance(zmag, np.ndarray):
204 assert len(zmag) == len(zphase)
205 zreal = zmag * np.cos((zphase / 1000))
206 zimag = zmag * np.sin((zphase / 1000))
207 return zreal, zimag
209 def to_amp_phase(
210 self, zreal: np.typing.NDArray, zimag: np.typing.NDArray
211 ) -> tuple[np.typing.NDArray, np.typing.NDArray]:
212 """
213 Convert to amplitude and phase from real and imaginary
215 Parameters
216 ----------
217 zreal : np.typing.NDArray
218 The real part of the complex representation.
219 zimag : np.typing.NDArray
220 The imaginary part of the complex representation.
222 Returns
223 -------
224 tuple[np.typing.NDArray, np.typing.NDArray]
225 The amplitude and phase representation.
227 """
229 if isinstance(zreal, np.ndarray):
230 assert len(zreal) == len(zimag)
231 zphase = np.arctan2(zimag, zreal) * 1000
232 zmag = np.sqrt(zreal**2 + zimag**2)
234 return zmag, zphase
236 def _fill_z(self) -> tuple[np.typing.NDArray, np.typing.NDArray]:
237 """
238 create Z array with data, need to take into account when the different
239 components have different frequencies, sometimes one might get skipped.
241 Parameters
242 ----------
243 None
245 Returns
246 -------
247 tuple[np.typing.NDArray, np.typing.NDArray]
248 The Z and Z error arrays.
249 """
251 z = np.zeros((self.n_freq, 2, 2), dtype=complex)
252 z_err = np.ones((self.n_freq, 2, 2), dtype=float)
254 comp_index = self._get_comp_index()
256 for row in self.df[self.df.comp.str.startswith("z")].itertuples():
257 ii, jj = comp_index[row.comp]
258 f_index = self.freq_index_dict[row.frequency]
259 z_real, z_imag = self.to_complex(row.z_magnitude, row.z_phase)
260 z_real_error, z_imag_error = self.to_complex(
261 (
262 np.sqrt(
263 (
264 (row.apparent_resistivity_err / 100)
265 * row.apparent_resistivity
266 )
267 * 5
268 * row.frequency
269 )
270 ),
271 row.z_phase_err,
272 )
274 z[f_index, ii, jj] = z_real + 1j * z_imag
276 z_err[f_index, ii, jj] = np.sqrt(z_real_error**2 + z_imag_error**2)
278 return z, z_err
280 def _fill_t(
281 self,
282 ) -> tuple[np.typing.NDArray, np.typing.NDArray] | tuple[None, None]:
283 """
284 fill tipper values
286 Returns
287 -------
288 tuple[np.typing.NDArray, np.typing.NDArray]
289 The T and T error arrays.
290 """
292 if "tzx" not in self.df.comp.to_list():
293 logger.debug(
294 "No Tipper found in {self.fn.name}",
295 )
296 return None, None
298 t = np.zeros((self.n_freq, 1, 2), dtype=complex)
299 t_err = np.ones((self.n_freq, 1, 2), dtype=float)
301 comp_index = self._get_comp_index()
303 for row in self.df[self.df.comp.str.startswith("t")].itertuples():
304 t_real, t_imag = self.to_complex(row.z_magnitude, row.z_phase)
305 ii, jj = comp_index[row.comp]
306 f_index = self.freq_index_dict[row.frequency]
308 if self.z_positive == "up":
309 t[f_index, ii, jj] = -1 * (t_real + t_imag * 1j)
310 else:
311 t[f_index, ii, jj] = t_real + t_imag * 1j
312 # error estimation
313 t_real_error, t_imag_error = self.to_complex(
314 (
315 np.sqrt(
316 (
317 (row.apparent_resistivity_err / 100)
318 * row.apparent_resistivity
319 )
320 )
321 ),
322 row.z_phase_err,
323 )
324 t_err[f_index, ii, jj] = np.sqrt(t_real**2 + t_imag**2)
326 return t, t_err
328 @property
329 def run_metadata(self) -> Run:
330 rm = Run(id="001")
331 rm.data_logger.id = self.header.instrument_id
332 rm.data_logger.type = self.header.instrument_type
333 rm.data_logger.manufacturer = "Zonge International"
334 if self.header.firmware is not None:
335 rm.data_logger.firmware.version = self.header.firmware
336 if self.header.start_time is not None:
337 rm.time_period.start = self.header.start_time
339 if "zxy" in self.components or "zxxr" in self.components:
340 rm.add_channel(self.ex_metadata)
341 rm.add_channel(self.ey_metadata)
342 rm.add_channel(self.hx_metadata)
343 rm.add_channel(self.hy_metadata)
345 if "tzx" in self.components or "rxxr" in self.components:
346 rm.add_channel(self.hz_metadata)
348 return rm
350 @property
351 def ex_metadata(self) -> Electric:
352 ch = Electric(component="ex")
353 if self.header._has_channel("zxy"):
354 ch.dipole_length = self.header._comp_dict["zxy"]["rx"].length
355 ch.measurement_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[0]
356 ch.translated_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[0]
357 ch.measurement_tilt = self.header._comp_dict["zxy"]["ch"].incl[0]
358 ch.translated_tilt = self.header._comp_dict["zxy"]["ch"].incl[0]
359 ch.channel_id = self.header._comp_dict["zxy"]["ch"].number[0]
360 ch.time_period.start = self.header.start_time
362 else:
363 ch.dipole_length = self.header.rx.length
364 ch.measurement_azimuth = self.header.rx.h_p_r[0]
365 ch.translated_azimuth = self.header.rx.h_p_r[0]
366 ch.channel_id = 4
368 return ch
370 @property
371 def ey_metadata(self) -> Electric:
372 ch = Electric(component="ey")
373 if self.header._has_channel("zyx"):
374 ch.dipole_length = self.header._comp_dict["zyx"]["rx"].length
375 ch.measurement_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[0]
376 ch.translated_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[0]
377 ch.measurement_tilt = self.header._comp_dict["zyx"]["ch"].incl[0]
378 ch.translated_tilt = self.header._comp_dict["zyx"]["ch"].incl[0]
379 ch.channel_id = self.header._comp_dict["zyx"]["ch"].number[0]
380 ch.time_period.start = self.header.start_time
382 else:
383 ch.dipole_length = self.header.rx.length
384 ch.measurement_azimuth = self.header.rx.h_p_r[0] + 90
385 ch.translated_azimuth = self.header.rx.h_p_r[0] + 90
386 ch.channel_id = 5
388 return ch
390 @property
391 def hx_metadata(self) -> Magnetic:
392 ch = Magnetic(component="hx")
393 if self.header._has_channel("zyx"):
394 ch.measurement_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[1]
395 ch.translated_azimuth = self.header._comp_dict["zyx"]["ch"].azimuth[1]
396 ch.measurement_tilt = self.header._comp_dict["zyx"]["ch"].incl[1]
397 ch.translated_tilt = self.header._comp_dict["zyx"]["ch"].incl[1]
398 ch.sensor.id = self.header._comp_dict["zyx"]["ch"].number[1]
399 ch.channel_id = 1
400 ch.time_period.start = self.header.start_time
402 else:
403 ch.measurement_azimuth = self.header.rx.h_p_r[0]
404 ch.translated_azimuth = self.header.rx.h_p_r[0]
405 ch.channel_id = 1
407 return ch
409 @property
410 def hy_metadata(self) -> Magnetic:
411 ch = Magnetic(component="hy")
412 if self.header._has_channel("zxy"):
413 ch.measurement_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[1]
414 ch.translated_azimuth = self.header._comp_dict["zxy"]["ch"].azimuth[1]
415 ch.measurement_tilt = self.header._comp_dict["zxy"]["ch"].incl[1]
416 ch.translated_tilt = self.header._comp_dict["zxy"]["ch"].incl[1]
417 ch.sensor.id = self.header._comp_dict["zxy"]["ch"].number[1]
418 ch.channel_id = 2
419 ch.time_period.start = self.header.start_time
421 else:
422 ch.measurement_azimuth = self.header.rx.h_p_r[0] + 90
423 ch.translated_azimuth = self.header.rx.h_p_r[0] + 90
424 ch.channel_id = 2
426 return ch
428 @property
429 def hz_metadata(self) -> Magnetic:
430 ch = Magnetic(component="hz")
431 if self.header._has_channel("tzx"):
432 # Parse comma-separated values safely
433 azimuth_values = self.header._comp_dict["tzx"]["ch"].azimuth
434 incl_values = self.header._comp_dict["tzx"]["ch"].incl
435 number_values = self.header._comp_dict["tzx"]["ch"].number
437 # Extract second value from comma-separated strings
438 if azimuth_values and "," in azimuth_values:
439 try:
440 azimuth_val = float(azimuth_values.split(",")[1].strip())
441 ch.measurement_azimuth = azimuth_val
442 ch.translated_azimuth = azimuth_val
443 except (IndexError, ValueError):
444 pass
446 if incl_values and "," in incl_values:
447 try:
448 incl_val = float(incl_values.split(",")[1].strip())
449 ch.measurement_tilt = incl_val
450 ch.translated_tilt = incl_val
451 except (IndexError, ValueError):
452 pass
454 if number_values and "," in number_values:
455 try:
456 ch.sensor.id = number_values.split(",")[1].strip()
457 except IndexError:
458 pass
460 ch.channel_id = "3"
461 if self.header.start_time:
462 ch.time_period.start = self.header.start_time
464 else:
465 ch.measurement_azimuth = self.header.rx.h_p_r[-1]
466 ch.translated_azimuth = self.header.rx.h_p_r[-1]
467 ch.channel_id = "3"
469 return ch
471 @property
472 def station_metadata(self) -> Station:
473 sm = Station()
475 sm.id = self.header.station
476 sm.location.latitude = self.header.latitude
477 sm.location.longitude = self.header.longitude
478 sm.location.elevation = self.header.elevation
479 sm.location.datum = self.header.datum.upper()
481 sm.transfer_function.id = self.header.station
482 sm.transfer_function.software.author = "Zonge International"
483 sm.transfer_function.software.name = "MTEdit"
484 sm.transfer_function.software.version = self.header.m_t_edit.version.split()[0]
485 sm.transfer_function.software.last_updated = (
486 self.header.m_t_edit.version.split()[-1]
487 )
489 for key, value in self.header.m_t_edit.to_dict(single=True).items():
490 if "version" in key:
491 continue
492 sm.transfer_function.processing_parameters.append(f"mtedit.{key}={value}")
494 sm.data_type = self.header.survey.type
495 sm.add_run(self.run_metadata)
496 sm.transfer_function.runs_processed = [self.run_metadata.id]
497 if self.header.start_time is not None:
498 sm.time_period.start = self.header.start_time
500 return sm
502 @station_metadata.setter
503 def station_metadata(self, sm):
504 self.header.station = sm.id
505 self.header.latitdude = sm.location.latitude
506 self.header.longitude = sm.location.longitude
508 if hasattr(sm.run[0].ex):
509 self.header.rx.length = sm.run[0].ex.dipole_length
511 @property
512 def survey_metadata(self) -> Survey:
513 sm = Survey()
514 sm.add_station(self.station_metadata)
515 sm.update_time_period()
516 return sm
518 def _create_dataframe_from_arrays(self) -> pd.DataFrame:
519 """
520 Create a DataFrame from Z and T arrays for writing.
521 This is used when data is available as arrays but not as a DataFrame.
522 """
523 if self.z is None or self.frequency is None:
524 raise ValueError("No impedance data or frequency array available")
526 data_list = []
527 comp_index = self._get_comp_index()
529 # Create data for each impedance component
530 for comp, (i, j) in comp_index.items():
531 if not comp.startswith(("z", "t")):
532 continue
534 for f_idx, freq in enumerate(self.frequency):
535 if comp.startswith("z") and self.z is not None:
536 z_val = self.z[f_idx, i, j]
537 z_err_val = (
538 self.z_err[f_idx, i, j] if self.z_err is not None else 0.0
539 )
541 # Convert complex impedance to magnitude and phase
542 z_mag, z_phase = self.to_amp_phase(z_val.real, z_val.imag)
544 # Calculate apparent resistivity
545 app_res = 0.2 * (1.0 / freq) * (z_mag**2)
546 app_res_err = 2.0 * z_err_val / z_mag * 100 if z_mag != 0 else 0.0
548 entry = {
549 "comp": comp,
550 "skip": 2, # Default skip value
551 "frequency": freq,
552 "e_magnitude": 1.0, # Default E magnitude
553 "b_magnitude": 1.0, # Default B magnitude
554 "z_magnitude": z_mag,
555 "z_phase": z_phase,
556 "apparent_resistivity": app_res,
557 "apparent_resistivity_err": app_res_err,
558 "z_phase_err": z_err_val * 1000, # Convert to milliradians
559 "coherency": 0.9, # Default coherency
560 "fc_use": 100, # Default values
561 "fc_try": 200,
562 }
563 data_list.append(entry)
565 elif comp.startswith("t") and self.t is not None:
566 t_val = self.t[f_idx, 0, j] # Tipper is 1x2
567 t_err_val = (
568 self.t_err[f_idx, 0, j] if self.t_err is not None else 0.0
569 )
571 # Convert complex tipper to magnitude and phase
572 t_mag, t_phase = self.to_amp_phase(t_val.real, t_val.imag)
574 entry = {
575 "comp": comp,
576 "skip": 2,
577 "frequency": freq,
578 "e_magnitude": 1.0,
579 "b_magnitude": 1.0,
580 "z_magnitude": t_mag,
581 "z_phase": t_phase,
582 "apparent_resistivity": 1.0, # Not applicable for tipper
583 "apparent_resistivity_err": t_err_val * 100,
584 "z_phase_err": t_err_val * 1000,
585 "coherency": 0.9,
586 "fc_use": 100,
587 "fc_try": 200,
588 }
589 data_list.append(entry)
591 self.df = pd.DataFrame(data_list)
592 self.components = self.df.comp.unique()
594 def write(self, fn: str | Path) -> None:
595 """
596 Write an .avg file
598 Parameters
599 ----------
600 fn : str or Path
601 Filename to write to
603 """
605 if fn is not None:
606 self.fn = Path(fn)
608 # Create DataFrame from arrays if it doesn't exist
609 if not hasattr(self, "df") or self.df is None or self.df.empty:
610 self._create_dataframe_from_arrays()
612 # Write header lines
613 header_lines = self.header.write_header()
615 # Group data by component and write each component section
616 for comp in self.components:
617 # Get component data
618 comp_data = self.df[self.df.comp == comp].copy()
619 if comp_data.empty:
620 continue
622 # Add component header line
623 header_lines.append(f"$Rx.Cmp = {comp.capitalize()}")
625 # Add component metadata lines if they exist in header
626 if hasattr(self.header, "_comp_dict") and comp in self.header._comp_dict:
627 comp_metadata = self.header._comp_dict[comp]
628 # Add component specific metadata lines based on the structure
629 # These would include lines like $Rx.Length, $Rx.Center, etc.
630 # The exact structure depends on the header metadata
631 for key, value in comp_metadata.items():
632 if key.startswith("rx_"):
633 header_key = key.replace("rx_", "$Rx.").replace("_", ".")
634 header_lines.append(f"{header_key}={value}")
635 elif key.startswith("ch_"):
636 header_key = key.replace("ch_", "$Ch.").replace("_", ".")
637 header_lines.append(f"{header_key}={value}")
639 # Add data column header
640 header_lines.append(
641 "Skp,Freq, E.mag, B.mag, Z.mag, Z.phz, "
642 "ARes.mag, ARes.%err,Z.perr, Coher, FC.NUse,FC.NTry"
643 )
645 # Sort data by frequency for this component
646 comp_data = comp_data.sort_values("frequency")
648 # Write data lines for this component
649 for _, row in comp_data.iterrows():
650 # Format each value according to the expected format to match the original file
651 line = (
652 f"{int(row.skip)}, "
653 f"{row.frequency:11.4E}, "
654 f"{row.e_magnitude:11.4E}, "
655 f"{row.b_magnitude:11.4E}, "
656 f"{row.z_magnitude:11.4E}, "
657 f"{row.z_phase:6.1f}, "
658 f" {row.apparent_resistivity:11.4E}, "
659 f"{row.apparent_resistivity_err:4.1f}, "
660 f" {row.z_phase_err:6.1f}, "
661 f" {row.coherency:5.3f}, "
662 f" {int(row.fc_use):2}, "
663 f" {int(row.fc_try):2}"
664 )
666 header_lines.append(line)
668 # Write to file
669 if self.fn is None:
670 raise ValueError("No filename specified for writing")
672 with self.fn.open("w") as fid:
673 fid.write("\n".join(header_lines))