Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ zfiles \ zmm.py: 73%
536 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Sep 28 12:34:23 2017
4@author: jrpeacock
6Translated from code by B. Murphy.
7"""
9# ==============================================================================
10# Imports
11# ==============================================================================
12from pathlib import Path
14import numpy as np
15import xarray as xr
16from loguru import logger
18from mt_metadata import DEFAULT_CHANNEL_NOMENCLATURE
19from mt_metadata.common.list_dict import ListDict
20from mt_metadata.timeseries import Electric, Magnetic, Run, Survey
21from mt_metadata.transfer_functions.io.tools import get_nm_elev
22from mt_metadata.transfer_functions.tf import Station
24from .metadata import Channel
27# ==============================================================================
28PERIOD_FORMAT = ".10g"
31class ZMMError(Exception):
32 pass
35class ZMMHeader(object):
36 """
37 Container for Header of an Egbert file
38 """
40 def __init__(self, fn=None, **kwargs):
41 self.processing_type = None
42 self.num_channels = None
43 self.num_freq = None
44 self._header_count = 0
45 self._component_dict = None
46 self.ex = None
47 self.ey = None
48 self.hx = None
49 self.hy = None
50 self.hz = None
51 self._zfn = None
52 self.fn = fn
53 self.station_metadata = Station()
54 self._channel_order = ["hx", "hy", "hz", "ex", "ey"]
55 self._header_lines = [
56 "TRANSFER FUNCTIONS IN MEASUREMENT COORDINATES",
57 "********* WITH FULL ERROR COVARIANCE ********",
58 ]
60 @property
61 def fn(self):
62 return self._zfn
64 @fn.setter
65 def fn(self, value):
66 if value is None:
67 return
68 value = Path(value)
69 if value.suffix.lower() in [".zmm", ".zrr", ".zss"]:
70 self._zfn = value
71 else:
72 msg = f"Input file must be a *.zmm or *.zrr file not {value.suffix}"
73 logger.error(msg)
74 raise ValueError(msg)
76 @property
77 def latitude(self):
78 return self.station_metadata.location.latitude
80 @latitude.setter
81 def latitude(self, lat):
82 self.station_metadata.location.latitude = lat
84 @property
85 def longitude(self):
86 return self.station_metadata.location.longitude
88 @longitude.setter
89 def longitude(self, lon):
90 self.station_metadata.location.longitude = lon
92 @property
93 def elevation(self):
94 return self.station_metadata.location.elevation
96 @elevation.setter
97 def elevation(self, value):
98 self.station_metadata.location.elevation = value
100 @property
101 def declination(self):
102 return self.station_metadata.location.declination.value
104 @declination.setter
105 def declination(self, value):
106 self.station_metadata.location.declination.value = value
108 @property
109 def station(self):
110 return self.station_metadata.id
112 @station.setter
113 def station(self, value):
114 self.station_metadata.id = value
116 def read_header(self, fn: str | Path | None = None) -> None:
117 """
118 Read the header information from a ZMM file.
120 Parameters
121 ----------
122 fn : str | Path | None, optional
123 The file name to read, by default None
124 """
126 if fn is not None:
127 self.fn = fn
128 with open(self.fn, "r") as fid:
129 line = fid.readline()
131 self._header_count = 0
132 header_list = []
133 while "period" not in line:
134 header_list.append(line)
135 self._header_count += 1
137 line = fid.readline()
138 self.station_metadata.comments.value = ""
139 self.station_metadata.transfer_function.processing_type = header_list[2].strip()
140 station = header_list[3].lower().strip()
141 if station.count(":") > 0:
142 station = station.split(":")[1]
143 station = station.strip()
144 self.station = station
145 self.station_metadata.runs = ListDict()
146 self.station_metadata.add_run(Run(id=f"{self.station}a"))
147 self.station_metadata.transfer_function.id = self.station
149 for ii, line in enumerate(header_list):
150 if line.find("**") >= 0:
151 if self.station_metadata.comments.value:
152 self.station_metadata.comments.value += (
153 "\n" + line.replace("*", "").strip()
154 )
155 else:
156 self.station_metadata.comments.value = line.replace("*", "").strip()
157 elif ii == 2:
158 self.processing_type = line.lower().strip()
159 elif "station" in line:
160 self.station_metadata.id = line.split(":")[1].strip()
161 elif "coordinate" in line:
162 line_list = line.strip().split()
163 self.latitude = line_list[1]
164 lon = float(line_list[2])
165 if lon > 180:
166 lon -= 360
167 self.longitude = lon
169 self.station_metadata.location.declination.value = float(line_list[-1])
170 elif "number" in line:
171 line_list = line.strip().split()
172 self.num_channels = int(line_list[3])
173 self.num_freq = int(line_list[-1])
174 elif "orientations" in line:
175 pass
176 elif line.strip()[-2:].lower() in ["ex", "ey", "hx", "hy", "hz"]:
177 line_list = line.strip().split()
178 comp = line_list[-1].lower()
179 channel_dict = {"channel": comp}
180 channel_dict["number"] = int(
181 line_list[0]
182 ) # Changed from "chn_num" to "number"
183 channel_dict["azimuth"] = float(
184 line_list[1]
185 ) # Changed from "azm" to "azimuth"
186 channel_dict["tilt"] = float(line_list[2])
187 channel_dict["dl"] = line_list[3]
188 if channel_dict["number"] == 0: # Changed from "chn_num" to "number"
189 channel_dict["number"] = self.num_channels
190 setattr(self, comp, Channel(**channel_dict))
192 if comp in ["ex", "ey"]:
193 ch = Electric()
194 elif comp in ["hx", "hy", "hz"]:
195 ch = Magnetic()
196 ch.component = comp
197 ch.measurement_azimuth = channel_dict[
198 "azimuth"
199 ] # Changed from "azm" to "azimuth"
200 ch.measurement_tilt = channel_dict["tilt"]
201 ch.translated_azimuth = channel_dict[
202 "azimuth"
203 ] # Changed from "azm" to "azimuth"
204 ch.translated_tilt = channel_dict["tilt"]
205 ch.channel_number = channel_dict[
206 "number"
207 ] # Changed from "chn_num" to "number"
209 self.station_metadata.runs[0].add_channel(ch)
211 def write_header(self) -> list[str]:
212 """
213 write a zmm header
215 TRANSFER FUNCTIONS IN MEASUREMENT COORDINATES
216 ********** WITH FULL ERROR COVARINCE*********
218 300
219 coordinate 34.727 -115.735 declination 13.10
220 number of channels 5 number of frequencies 38
221 orientations and tilts of each channel
222 1 0.00 0.00 300 Hx
223 2 90.00 0.00 300 Hy
224 3 0.00 0.00 300 Hz
225 4 0.00 0.00 300 Ex
226 5 90.00 0.00 300 Ey
228 returns
229 -------
230 list[str]
231 The formatted header lines.
233 """
234 lines = [
235 self._header_lines[0],
236 self._header_lines[1],
237 ]
238 lines += [f"{self.station_metadata.transfer_function.processing_type}"]
239 lines += [f"station: {self.station}"]
240 lines += [
241 f"coordinate {self.latitude:>9.3f} {self.longitude:>9.3f} declination {self.declination:>8.2f}"
242 ]
243 lines += [
244 f"number of channels {self.num_channels:>3d} number of frequencies {self.num_freq:>3d}"
245 ]
246 lines += [" orientations and tilts of each channel"]
247 for ii, ch in enumerate(self._channel_order):
248 try:
249 channel = getattr(self, ch)
250 if channel.number == None:
251 channel.number = int(ii)
252 if channel.tilt is None:
253 channel.tilt = 0.0
254 if channel.azimuth is None:
255 channel.azimuth = 0.0
256 lines += [
257 (
258 f"{channel.number:>5d} "
259 f"{channel.azimuth:>8.2f} "
260 f"{channel.tilt:>8.2f} "
261 f"{self.station:>3} "
262 f"{channel.channel.capitalize():>3}"
263 )
264 ]
265 except (AttributeError, TypeError):
266 logger.warning(f"Could not find {ch}")
267 continue
268 return lines
270 @property
271 def channel_dict(self) -> dict[str, str]:
272 channels = {}
273 for cc in ["ex", "ey", "hx", "hy", "hz"]:
274 ch = getattr(self, cc)
275 if ch is not None:
276 # Ensure channel value is extracted as string from ChannelEnum
277 channel_value = ch.channel
278 if hasattr(channel_value, "value"):
279 channels[cc] = channel_value.value
280 else:
281 channels[cc] = str(channel_value)
282 return channels
284 @property
285 def channels_recorded(self) -> list[str]:
286 channels = {}
287 for cc in ["ex", "ey", "hx", "hy", "hz"]:
288 ch = getattr(self, cc)
289 if ch is not None:
290 # Ensure channel value is extracted as string from ChannelEnum
291 channel_value = ch.channel
292 if hasattr(channel_value, "value"):
293 channels[ch.index] = channel_value.value
294 else:
295 channels[ch.index] = str(channel_value)
296 ordered_channels = [channels[k] for k in sorted(channels.keys())]
297 return ordered_channels
299 @property
300 def input_channels(self) -> list[str]:
301 return self.channels_recorded[0:2]
303 @property
304 def output_channels(self) -> list[str]:
305 return self.channels_recorded[2:]
307 @property
308 def has_tipper(self) -> bool:
309 if "hz" in self.channels_recorded:
310 return True
311 return False
313 @property
314 def has_impedance(self) -> bool:
315 if "ex" in self.channels_recorded and "ey" in self.channels_recorded:
316 return True
317 return False
320class ZMM(ZMMHeader):
321 """
322 Container for Egberts zrr format.
324 """
326 def __init__(self, fn: str | Path | None = None, **kwargs):
327 super().__init__()
329 self.fn = fn
330 self._header_count = 0
331 self.transfer_functions = None
332 self.sigma_e = None
333 self.sigma_s = None
334 self.periods = None
335 self.dataset = None
336 self.decimation_dict = {}
337 self.channel_nomenclature = DEFAULT_CHANNEL_NOMENCLATURE
339 self._transfer_function = self._initialize_transfer_function()
341 for key in list(kwargs.keys()):
342 setattr(self, key, kwargs[key])
343 if self.fn is not None:
344 self.read()
346 def __str__(self) -> str:
347 lines = [f"Station: {self.station}", "-" * 50]
348 lines.append(f"\tSurvey: {self.survey_metadata.id}")
349 lines.append(f"\tProject: {self.survey_metadata.project}")
350 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}")
351 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}")
352 lines.append(f"\tLatitude: {self.latitude:.3f}")
353 lines.append(f"\tLongitude: {self.longitude:.3f}")
354 lines.append(f"\tElevation: {self.elevation:.3f}")
355 if "ex" in self.output_channels:
356 lines.append("\tImpedance: True")
357 else:
358 lines.append("\tImpedance: False")
359 if "hz" in self.output_channels:
360 lines.append("\tTipper: True")
361 else:
362 lines.append("\tTipper: False")
363 if self.periods is not None:
364 lines.append(f"\tNumber of periods: {self.periods.size}")
365 lines.append(
366 f"\t\tPeriod Range: {self.periods.min():.5E} -- {self.periods.max():.5E} s"
367 )
368 lines.append(
369 f"\t\tFrequency Range {1./self.periods.max():.5E} -- {1./self.periods.min():.5E} s"
370 )
371 return "\n".join(lines)
373 def __repr__(self):
374 lines = []
375 lines.append(f"station='{self.station}'")
376 lines.append(f"latitude={self.latitude:.2f}")
377 lines.append(f"longitude={self.longitude:.2f}")
378 lines.append(f"elevation={self.elevation:.2f}")
380 return f"MT( {(', ').join(lines)} )"
382 def __eq__(self, other: object) -> bool:
383 """
384 compare equals
386 :param other: DESCRIPTION
387 :type other: TYPE
388 :return: DESCRIPTION
389 :rtype: TYPE
391 """
393 if not isinstance(other, ZMM):
394 raise TypeError(f"Cannot compare type {type(other)} with ZMM.")
396 is_equal = True
397 if self.station_metadata != other.station_metadata:
398 is_equal = False
399 if not self.dataset.equals(other.dataset):
400 is_equal = False
401 logger.info("Datasets are not equal")
402 print(self.dataset.fillna(0) != other.dataset.fillna(0).all())
403 return is_equal
405 @property
406 def channel_nomenclature(self) -> dict[str, str]:
407 return self._channel_nomenclature
409 @channel_nomenclature.setter
410 def channel_nomenclature(self, ch_dict: dict[str, str]) -> None:
411 """
412 channel dictionary
413 """
415 if not isinstance(ch_dict, dict):
416 raise TypeError(
417 "Channel_nomenclature must be a dictionary with keys "
418 "['ex', 'ey', 'hx', 'hy', 'hz']."
419 )
421 self._channel_nomenclature = ch_dict
422 # unpack channel nomenclature dict
423 for comp in DEFAULT_CHANNEL_NOMENCLATURE.keys():
424 try:
425 setattr(self, f"_{comp}", ch_dict[comp])
426 except KeyError:
427 setattr(self, f"_{comp}", comp)
428 ch_dict[comp] = comp
430 self._ex_ey = [self._ex, self._ey]
431 self._hx_hy = [self._hx, self._hy]
432 self._ex_ey_hz = [self._ex, self._ey, self._hz]
434 @property
435 def _ch_input_dict(self) -> dict[str, list[str]]:
436 return {
437 "isp": self._hx_hy,
438 "res": self._ex_ey_hz,
439 "tf": self._hx_hy,
440 "tf_error": self._hx_hy,
441 "all": [self._ex, self._ey, self._hz, self._hx, self._hy],
442 }
444 @property
445 def _ch_output_dict(self) -> dict[str, list[str]]:
446 return {
447 "isp": self._hx_hy,
448 "res": self._ex_ey_hz,
449 "tf": self._ex_ey_hz,
450 "tf_error": self._ex_ey_hz,
451 "all": [self._ex, self._ey, self._hz, self._hx, self._hy],
452 }
454 def _initialize_transfer_function(self, periods: list[float] = [1]) -> None:
455 """
456 create an empty x array for the data. For now this accommodates
457 a single processed station.
460 Parameters
461 ----------
462 periods : list[float], optional
463 List of periods to create the transfer function for, defaults to [1].
465 Returns
466 -------
467 None
469 """
470 # create an empty array for the transfer function
471 tf = xr.DataArray(
472 data=0.0 + 0j,
473 dims=["period", "output", "input"],
474 coords={
475 "period": periods,
476 "output": self._ch_output_dict["all"],
477 "input": self._ch_input_dict["all"],
478 },
479 name="transfer_function",
480 )
482 tf_err = xr.DataArray(
483 data=0.0,
484 dims=["period", "output", "input"],
485 coords={
486 "period": periods,
487 "output": self._ch_output_dict["all"],
488 "input": self._ch_input_dict["all"],
489 },
490 name="error",
491 )
493 inv_signal_power = xr.DataArray(
494 data=0.0 + 0j,
495 dims=["period", "output", "input"],
496 coords={
497 "period": periods,
498 "output": self._ch_output_dict["all"],
499 "input": self._ch_input_dict["all"],
500 },
501 name="inverse_signal_power",
502 )
504 residual_covariance = xr.DataArray(
505 data=0.0 + 0j,
506 dims=["period", "output", "input"],
507 coords={
508 "period": periods,
509 "output": self._ch_output_dict["all"],
510 "input": self._ch_input_dict["all"],
511 },
512 name="residual_covariance",
513 )
515 # will need to add in covariance in some fashion
516 return xr.Dataset(
517 {
518 tf.name: tf,
519 tf_err.name: tf_err,
520 inv_signal_power.name: inv_signal_power,
521 residual_covariance.name: residual_covariance,
522 },
523 coords={
524 "period": periods,
525 "output": self._ch_output_dict["all"],
526 "input": self._ch_input_dict["all"],
527 },
528 )
530 @property
531 def frequencies(self) -> np.typing.NDArray[np.float64] | None:
532 if self.periods is None:
533 return None
534 return 1.0 / self.periods
536 def initialize_arrays(self) -> None:
537 """
538 make initial arrays based on number of frequencies and channels
539 """
540 if self.num_freq is None:
541 return
542 self.periods = np.zeros(self.num_freq)
543 self.transfer_functions = np.zeros(
544 (self.num_freq, self.num_channels - 2, 2), dtype=np.complex64
545 )
547 # residual covariance -- square matrix with dimension as number of
548 # predicted channels
549 self.sigma_e = np.zeros(
550 (self.num_freq, self.num_channels - 2, self.num_channels - 2),
551 dtype=np.complex64,
552 )
554 # inverse coherent signal power -- square matrix, with dimension as the
555 # number of predictor channels
556 # since EMTF and this code assume N predictors is 2,
557 # this dimension is hard-coded
558 self.sigma_s = np.zeros((self.num_freq, 2, 2), dtype=np.complex64)
560 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None:
561 """
562 Read in Egbert zrr/zmm file
564 Parameters
565 ----------
566 fn : str | Path | None, optional
567 The file name to read, by default None
569 get_elevation : bool, optional
570 If True, fetch elevation from the National Map, by default False
572 Raises
573 ------
574 ZMMError
575 If the file cannot be read or is not in the expected format.
576 """
577 if fn is not None:
578 self.fn = fn
579 self.read_header()
580 self.channel_nomenclature = self.channel_dict
581 self.initialize_arrays()
583 self._transfer_function = self._initialize_transfer_function()
584 self.dataset = self._initialize_transfer_function()
586 ### read each data block and fill the appropriate array
587 for ii, period_block in enumerate(self._get_period_blocks()):
588 data_block = self._read_period_block(period_block)
589 self.periods[ii] = data_block["period"]
591 self._fill_tf_array_from_block(data_block["tf"], ii)
592 self._fill_sig_array_from_block(data_block["sig"], ii)
593 self._fill_res_array_from_block(data_block["res"], ii)
594 self._fill_dataset()
596 self.station_metadata.id = self.station
597 self.station_metadata.data_type = "MT"
598 self.station_metadata.channels_recorded = self.channels_recorded
599 # provenance
600 self.station_metadata.provenance.software.name = "EMTF"
601 self.station_metadata.provenance.software.version = "1"
602 self.station_metadata.transfer_function.runs_processed = (
603 self.station_metadata.run_list
604 )
605 self.station_metadata.transfer_function.software.name = "EMTF"
606 self.station_metadata.transfer_function.software.version = "1"
607 self.station_metadata.runs[0].sample_rate = np.median(
608 np.array([d["sample_rate"] for k, d in self.decimation_dict.items()])
609 )
611 # add information to runs
612 for rr in self.station_metadata.runs:
613 if self.transfer_functions.shape[1] >= 2:
614 rr.ex = self.ex_metadata
615 rr.ey = self.ey_metadata
616 rr.hx = self.hx_metadata
617 rr.hy = self.hy_metadata
618 if self.hz is not None:
619 rr.hz = self.hz_metadata
621 if self.elevation in [0, None] and get_elevation:
622 if self.latitude != 0 and self.longitude != 0:
623 self.elevation = get_nm_elev(
624 self.latitude,
625 self.longitude,
626 )
628 def write(
629 self, fn: str | Path | None = None, decimation_levels: dict | None = None
630 ) -> None:
631 """
632 write a zmm file
634 decimation_levels should be a dictionary with keys
636 * decimation_level
638 values will be a dictionary with keys
640 * frequency_band, value = (min, max)
641 * n_points, value = int
642 * sampling_freq, value = float
644 Parameters
645 ----------
646 fn : str | Path | None, optional
647 The file name to write, by default None
649 decimation_levels : dict, optional
650 A dictionary containing decimation levels and their properties, by default None.
652 Raises
653 ------
654 ZMMError
655 If the file cannot be written or is not in the expected format.
656 """
657 if fn is not None:
658 self.fn = fn
659 lines = self.write_header()
660 lines += [
661 "",
662 ] # add 1 space separating header from data
663 for p in self.dataset.period.data:
664 a = self.dataset.sel(period=p)
665 try:
666 dec_dict = self.decimation_dict[f"{p:{PERIOD_FORMAT}}"]
667 except KeyError:
668 dec_dict = {
669 "level": 0,
670 "bands": (0, 0),
671 "npts": 0,
672 "sample_rate": self.station_metadata.runs[0].sample_rate,
673 }
674 lines += [
675 (
676 f"period : {p:^18.5f} "
677 f"decimation level {dec_dict['level']:^8d}"
678 f"freq. band from {dec_dict['bands'][0]:>5d} to {dec_dict['bands'][1]:>5d}"
679 )
680 ]
681 lines += [
682 f"number of data point {dec_dict['npts']} sampling freq. {dec_dict['sample_rate']} Hz"
683 ]
684 # write tf
685 lines += [" Transfer Functions"]
686 for c_out in self.output_channels:
687 line = ""
688 for c_in in self.input_channels:
689 c_in_name = self.channel_nomenclature[c_in]
690 c_out_name = self.channel_nomenclature[c_out]
691 tf_element = a.transfer_function.loc[
692 dict(output=c_out_name, input=c_in_name)
693 ].data
694 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
695 lines += [line]
696 # write signal power
697 lines += [" Inverse Coherent Signal Power Matrix"]
698 for ii, c_out in enumerate(self.input_channels):
699 line = ""
700 for c_in in self.input_channels[: ii + 1]:
701 c_in_name = self.channel_nomenclature[c_in]
702 c_out_name = self.channel_nomenclature[c_out]
703 tf_element = a.inverse_signal_power.loc[
704 dict(output=c_out_name, input=c_in_name)
705 ].data
706 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
707 lines += [line]
708 # write residual covariance
709 lines += [" Residual Covariance"]
710 for ii, c_out in enumerate(self.output_channels):
711 line = ""
712 for c_in in self.output_channels[: ii + 1]:
713 c_in_name = self.channel_nomenclature[c_in]
714 c_out_name = self.channel_nomenclature[c_out]
715 tf_element = a.residual_covariance.loc[
716 dict(output=c_out_name, input=c_in_name)
717 ].data
718 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
719 lines += [line]
720 with open(self.fn, "w") as fid:
721 fid.write("\n".join(lines))
722 return self.fn
724 def _get_period_blocks(self) -> list[list[str]]:
725 """
726 split file into period blocks
727 """
729 with open(self.fn, "r") as fid:
730 fn_str = fid.read()
731 period_strings = fn_str.lower().split("period")
732 period_blocks = []
733 for per in period_strings:
734 period_blocks.append(per.split("\n"))
735 return period_blocks[1:]
737 def _read_period_block(self, period_block: list[str]) -> dict:
738 """
739 read block:
740 period : 0.01587 decimation level 1 freq. band from 46 to 80
741 number of data point 951173 sampling freq. 0.004 Hz
742 Transfer Functions
743 0.1474E+00 -0.2049E-01 0.1618E+02 0.1107E+02
744 -0.1639E+02 -0.1100E+02 0.5559E-01 0.1249E-01
745 Inverse Coherent Signal Power Matrix
746 0.2426E+03 -0.2980E-06
747 0.9004E+02 -0.2567E+01 0.1114E+03 0.1192E-06
748 Residual Covaraince
749 0.8051E-05 0.0000E+00
750 -0.2231E-05 -0.2863E-06 0.8866E-05 0.0000E+00
751 """
753 period = float(period_block[0].strip().split(":")[1].split()[0].strip())
754 level = int(period_block[0].strip().split("level")[1].split()[0].strip())
755 bands = (
756 int(period_block[0].strip().split("from")[1].split()[0].strip()),
757 int(period_block[0].strip().split("to")[1].split()[0].strip()),
758 )
760 npts = int(period_block[1].strip().split("point")[1].split()[0].strip())
761 sr = float(period_block[1].strip().split("freq.")[1].split()[0].strip())
762 self.decimation_dict[f"{period:{PERIOD_FORMAT}}"] = {
763 "level": level,
764 "bands": bands,
765 "npts": npts,
766 "sample_rate": sr,
767 }
768 data_dict = {"period": period, "tf": [], "sig": [], "res": []}
769 key = "tf"
770 for line in period_block[2:]:
771 if "transfer" in line.lower():
772 key = "tf"
773 continue
774 elif "signal" in line.lower():
775 key = "sig"
776 continue
777 elif "residual" in line.lower():
778 key = "res"
779 continue
780 line_list = [float(xx) for xx in line.strip().split()]
781 values = [
782 complex(line_list[ii], line_list[ii + 1])
783 for ii in range(0, len(line_list), 2)
784 ]
785 data_dict[key].append(values)
786 return data_dict
788 def _flatten_list(self, x_list: list[list]) -> list:
789 """
790 flatten = lambda l: [item for sublist in l for item in sublist]
792 Returns
793 -------
794 None.
796 """
798 flat_list = [item for sublist in x_list for item in sublist]
800 return flat_list
802 def _fill_tf_array_from_block(self, tf_block: list[complex], index: int) -> None:
803 """
804 fill tf arrays from data blocks
805 """
806 tf_block = self._flatten_list(tf_block)
807 for kk, jj in enumerate(range(0, len(tf_block), 2)):
808 self.transfer_functions[index, kk, 0] = tf_block[jj]
809 self.transfer_functions[index, kk, 1] = tf_block[jj + 1]
811 def _fill_sig_array_from_block(self, sig_block: list[complex], index: int) -> None:
812 """
813 fill signal array
814 """
815 sig_block = self._flatten_list(sig_block)
816 self.sigma_s[index, 0, 0] = sig_block[0]
817 self.sigma_s[index, 1, 0] = sig_block[1]
818 self.sigma_s[index, 0, 1] = sig_block[1].conjugate()
819 self.sigma_s[index, 1, 1] = sig_block[2]
821 def _fill_res_array_from_block(self, res_block: list[complex], index: int) -> None:
822 """
823 fill residual covariance array
824 """
825 for jj in range(self.num_channels - 2):
826 values = res_block[jj]
827 for kk in range(jj + 1):
828 if jj == kk:
829 self.sigma_e[index, jj, kk] = values[kk]
830 else:
831 self.sigma_e[index, jj, kk] = values[kk]
832 self.sigma_e[index, kk, jj] = values[kk].conjugate()
834 def _fill_dataset(self) -> None:
835 """
836 fill the dataset
838 :return: DESCRIPTION
839 :rtype: TYPE
841 """
843 self.dataset = self._initialize_transfer_function(periods=self.periods)
845 self.dataset.transfer_function.loc[
846 dict(input=self.input_channels, output=self.output_channels)
847 ] = self.transfer_functions
848 self.dataset.inverse_signal_power.loc[
849 dict(input=self.input_channels, output=self.input_channels)
850 ] = self.sigma_s
851 self.dataset.residual_covariance.loc[
852 dict(input=self.output_channels, output=self.output_channels)
853 ] = self.sigma_e
855 def calculate_impedance(
856 self, angle: float = 0.0
857 ) -> tuple[np.typing.NDArray[np.complex64], np.typing.NDArray[np.float64]]:
858 """
859 calculate the impedances from the transfer functions
861 Parameters
862 ----------
863 angle : float, optional
864 The angle to rotate the impedance tensor.
866 Returns
867 -------
868 None
869 """
871 # check to see if there are actually electric fields in the TFs
872 if not hasattr(self, "ex") or not hasattr(self, "ey"):
873 msg = (
874 "Cannot return apparent resistivity and phase "
875 "data because these TFs do not contain electric "
876 "fields as a predicted channel."
877 )
878 logger.error(msg)
879 raise ZMMError(msg)
880 # transform the TFs first...
881 # build transformation matrix for predictor channels
882 # (horizontal magnetic fields)
883 hx_index = self.hx.index
884 hy_index = self.hy.index
885 u = np.eye(2, 2)
886 u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle))
887 u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle))
888 u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle))
889 u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle))
890 u = np.linalg.inv(u)
892 # build transformation matrix for predicted channels (electric fields)
893 ex_index = self.ex.index
894 ey_index = self.ey.index
895 v = np.eye(self.transfer_functions.shape[1], self.transfer_functions.shape[1])
896 v[ex_index - 2, ex_index - 2] = np.cos(np.deg2rad(self.ex.azimuth - angle))
897 v[ey_index - 2, ex_index - 2] = np.sin(np.deg2rad(self.ex.azimuth - angle))
898 v[ex_index - 2, ey_index - 2] = np.cos(np.deg2rad(self.ey.azimuth - angle))
899 v[ey_index - 2, ey_index - 2] = np.sin(np.deg2rad(self.ey.azimuth - angle))
901 # matrix multiplication...
902 rotated_transfer_functions = np.matmul(
903 v, np.matmul(self.transfer_functions, u.T)
904 )
905 rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T))
906 rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T))
908 # now pull out the impedance tensor
909 z = np.zeros((self.num_freq, 2, 2), dtype=np.complex64)
910 z[:, 0, 0] = rotated_transfer_functions[:, ex_index - 2, hx_index] # Zxx
911 z[:, 0, 1] = rotated_transfer_functions[:, ex_index - 2, hy_index] # Zxy
912 z[:, 1, 0] = rotated_transfer_functions[:, ey_index - 2, hx_index] # Zyx
913 z[:, 1, 1] = rotated_transfer_functions[:, ey_index - 2, hy_index] # Zyy
915 # and the variance information
916 var = np.zeros((self.num_freq, 2, 2))
917 var[:, 0, 0] = np.real(
918 rotated_sigma_e[:, ex_index - 2, ex_index - 2]
919 * rotated_sigma_s[:, hx_index, hx_index]
920 )
921 var[:, 0, 1] = np.real(
922 rotated_sigma_e[:, ex_index - 2, ex_index - 2]
923 * rotated_sigma_s[:, hy_index, hy_index]
924 )
925 var[:, 1, 0] = np.real(
926 rotated_sigma_e[:, ey_index - 2, ey_index - 2]
927 * rotated_sigma_s[:, hx_index, hx_index]
928 )
929 var[:, 1, 1] = np.real(
930 rotated_sigma_e[:, ey_index - 2, ey_index - 2]
931 * rotated_sigma_s[:, hy_index, hy_index]
932 )
934 error = np.sqrt(var)
936 return z, error
938 def calculate_tippers(
939 self, angle: float = 0.0
940 ) -> tuple[np.typing.NDArray[np.complex64], np.typing.NDArray[np.float64]]:
941 """
942 calculate induction vectors
944 Parameters
945 ----------
946 angle : float, optional
947 The angle to rotate the tipper tensor.
949 Returns
950 -------
951 tipper : np.ndarray
952 The tipper tensor.
953 """
955 # check to see if there is a vertical magnetic field in the TFs
956 if self.hz is None:
957 raise ZMMError(
958 "Cannot return tipper data because the TFs do not "
959 "contain the vertical magnetic field as a "
960 "predicted channel."
961 )
962 # transform the TFs first...
963 # build transformation matrix for predictor channels
964 # (horizontal magnetic fields)
965 hx_index = self.hx.index
966 hy_index = self.hy.index
967 u = np.eye(2, 2)
968 u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle))
969 u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle))
970 u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle))
971 u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle))
972 u = np.linalg.inv(u)
974 # don't need to transform predicated channels (assuming no tilt in Hz)
975 hz_index = self.hz.index
976 v = np.eye(self.transfer_functions.shape[1], self.transfer_functions.shape[1])
978 # matrix multiplication...
979 rotated_transfer_functions = np.matmul(
980 v, np.matmul(self.transfer_functions, u.T)
981 )
982 rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T))
983 rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T))
985 # now pull out tipper information
986 tipper = np.zeros((self.num_freq, 2), dtype=np.complex64)
987 tipper[:, 0] = rotated_transfer_functions[:, hz_index - 2, hx_index] # Tx
988 tipper[:, 1] = rotated_transfer_functions[:, hz_index - 2, hy_index] # Ty
990 # and the variance/error information
991 var = np.zeros((self.num_freq, 2))
992 var[:, 0] = np.real(
993 rotated_sigma_e[:, hz_index - 2, hz_index - 2]
994 * rotated_sigma_s[:, hx_index, hx_index]
995 ) # Tx
996 var[:, 1] = np.real(
997 rotated_sigma_e[:, hz_index - 2, hz_index - 2]
998 * rotated_sigma_s[:, hy_index, hy_index]
999 ) # Ty
1000 error = np.sqrt(var)
1002 tipper = tipper.reshape((self.num_freq, 1, 2))
1003 error = error.reshape((self.num_freq, 1, 2))
1005 return tipper, error
1007 @property
1008 def survey_metadata(self):
1009 sm = Survey()
1010 sm.add_station(self.station_metadata)
1012 return sm
1014 def _get_electric_metadata(self, comp: str) -> Electric:
1015 """
1016 get electric information from the various metadata
1018 Parameters
1019 ----------
1020 comp : str
1021 The component name (e.g., "ex", "ey").
1023 Returns
1024 -------
1025 Electric
1026 The electric metadata for the specified component.
1027 """
1028 comp = comp.lower()
1029 electric = Electric()
1030 electric.positive.type = "electric"
1031 electric.negative.type = "electric"
1032 if hasattr(self, comp):
1033 meas = getattr(self, comp)
1034 electric.measurement_azimuth = meas.azimuth
1035 electric.measurement_tilt = meas.tilt
1036 electric.component = comp
1037 electric.channel_number = meas.number
1038 electric.channel_id = meas.number
1039 return electric
1041 @property
1042 def ex_metadata(self) -> Electric:
1043 return self._get_electric_metadata("ex")
1045 @property
1046 def ey_metadata(self) -> Electric:
1047 return self._get_electric_metadata("ey")
1049 def _get_magnetic_metadata(self, comp: str) -> Magnetic:
1050 """
1052 get magnetic metadata from the various sources
1054 Parameters
1055 ----------
1056 comp : str
1057 The component name (e.g., "hx", "hy", "hz").
1059 Returns
1060 -------
1061 Magnetic
1062 The magnetic metadata for the specified component.
1064 """
1066 comp = comp.lower()
1067 magnetic = Magnetic()
1068 if hasattr(self, comp):
1069 meas = getattr(self, comp)
1070 magnetic.measurement_azimuth = meas.azimuth
1071 magnetic.measurement_tilt = meas.tilt
1072 magnetic.component = comp
1073 magnetic.channel_number = meas.number
1074 magnetic.channel_id = meas.number
1075 return magnetic
1077 @property
1078 def hx_metadata(self) -> Magnetic:
1079 return self._get_magnetic_metadata("hx")
1081 @property
1082 def hy_metadata(self) -> Magnetic:
1083 return self._get_magnetic_metadata("hy")
1085 @property
1086 def hz_metadata(self) -> Magnetic:
1087 return self._get_magnetic_metadata("hz")