Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ edi \ edi.py: 72%
798 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3.. module:: EDI
4 :synopsis: Deal with EDI files. The Edi class can read and write an .edi
5 file, the 'standard format' of magnetotellurics. Each section
6 of the .edi file is given its own class, so the elements of each
7 section are attributes for easy access.
9.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov>
11Updated 2021 to used mt_metadata type metadata and how spectra are read.
13"""
16# ==============================================================================
17# Imports
18# ==============================================================================
19from pathlib import Path
20from typing import Literal
22import numpy as np
23from loguru import logger
25from mt_metadata import __version__, NULL_VALUES
26from mt_metadata.timeseries import Electric, Magnetic, Run, Survey
27from mt_metadata.transfer_functions import tf
28from mt_metadata.transfer_functions.io.edi.metadata import (
29 DataSection,
30 DefineMeasurement,
31 Header,
32 Information,
33)
34from mt_metadata.transfer_functions.io.tools import (
35 _validate_edi_lines,
36 _validate_str_with_equals,
37 get_nm_elev,
38 index_locator,
39)
42# ==============================================================================
43# EDI Class
44# ==============================================================================
45class EDI:
46 """
47 This class is for .edi files, mainly reading and writing. Has been tested
48 on Winglink and Phoenix output .edi's, which are meant to follow the
49 archaic EDI format put forward by SEG. Can read impedance, Tipper and/or
50 spectra data.
52 The Edi class contains a class for each major section of the .edi file.
54 Frequency and components are ordered from highest to lowest frequency.
56 :param fn: full path to .edi file to be read in.
57 *default* is None. If an .edi file is input, it is
58 automatically read in and attributes of Edi are filled
59 :type fn: string or :class:`pathlib.Path`
61 :Change Latitude: ::
63 >>> from mt_metadata.transfer_functions.io.edi import EDI
64 >>> edi_obj = EDI(fn=r"/home/mt/mt01.edi")
65 >>> # change the latitude
66 >>> edi_obj.lat = 45.7869
67 >>> new_edi_fn = edi_obj.write()
68 """
70 def __init__(self, fn=None, **kwargs):
71 self.logger = logger
72 self._fn = None
73 self._edi_lines = []
75 self.Header = Header()
76 self.Info = Information()
77 self.Measurement = DefineMeasurement()
78 self.Data = DataSection()
80 self.z = None
81 self.z_err = None
82 self.t = None
83 self.t_err = None
84 self.frequency = None
85 self.rotation_angle = None
86 self.residual_covariance = None
87 self.signal_inverse_power = None
88 self.tf = None
89 self.tf_err = None
91 self._z_labels = [
92 ["zxxr", "zxxi", "zxx.var"],
93 ["zxyr", "zxyi", "zxy.var"],
94 ["zyxr", "zyxi", "zyx.var"],
95 ["zyyr", "zyyi", "zyy.var"],
96 ]
98 self._t_labels = [
99 ["txr.exp", "txi.exp", "txvar.exp"],
100 ["tyr.exp", "tyi.exp", "tyvar.exp"],
101 ]
103 self._accepted_keys = [
104 "freq",
105 "zxxr",
106 "zxxi",
107 "zxyr",
108 "zxyi",
109 "zyxr",
110 "zyxi",
111 "zyyr",
112 "zyyi",
113 "zxx.var",
114 "zxy.var",
115 "zyx.var",
116 "zyy.var",
117 "txr.exp",
118 "txi.exp",
119 "tyr.exp",
120 "tyi.exp",
121 "txvar.exp",
122 "tyvar.exp",
123 "rhoxx",
124 "rhoxy",
125 "rhoyx",
126 "rhoyy",
127 "rhoxx.err",
128 "rhoxy.err",
129 "rhoyx.err",
130 "rhoyy.err",
131 "phsxx",
132 "phsxy",
133 "phsyx",
134 "phsyy",
135 "phsxx.err",
136 "phsxy.err",
137 "phsyx.err",
138 "phsyy.err",
139 "zrot",
140 "rhorot",
141 "trot",
142 ]
144 self._channel_skip_list = [
145 "filter.name",
146 "filter.applied",
147 "time_period.start",
148 "time_period.end",
149 "location.elevation",
150 "location.latitude",
151 "location.longitude",
152 "location.x",
153 "location.y",
154 "location.z",
155 "positive.latitude",
156 "positive.longitude",
157 "positive.elevation",
158 "positive.x",
159 "positive.x2",
160 "positive.y",
161 "positive.y2",
162 "positive.z",
163 "positive.z2",
164 "negative.latitude",
165 "negative.longitude",
166 "negative.elevation",
167 "negative.x",
168 "negative.x2",
169 "negative.y",
170 "negative.y2",
171 "negative.z",
172 "negative.z2",
173 "sample_rate",
174 "data_quality.rating.value",
175 "data_quality.flag",
176 ]
178 self._index_dict = {
179 "zxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"},
180 "zxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"},
181 "zyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"},
182 "zyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"},
183 "tx": {"ii": 0, "jj": 0, "obj": "t", "err_obj": "t_err"},
184 "ty": {"ii": 0, "jj": 1, "obj": "t", "err_obj": "t_err"},
185 "rhoxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"},
186 "rhoxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"},
187 "rhoyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"},
188 "rhoyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"},
189 }
191 self._data_header_str = ">!****{0}****!\n"
193 self._num_format = " 15.6e"
194 self._block_len = 6
196 self.fn = fn
198 for key, value in kwargs.items():
199 setattr(self, key, value)
201 def __str__(self) -> str:
202 lines = [f"Station: {self.station}", "-" * 50]
203 lines.append(f"\tSurvey: {self.survey_metadata.id}")
204 lines.append(f"\tProject: {self.survey_metadata.project}")
205 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}")
206 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}")
207 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}")
208 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}")
209 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}")
210 if self.z is not None:
211 lines.append("\tImpedance: True")
212 else:
213 lines.append("\tImpedance: False")
214 if self.t is not None:
215 lines.append("\tTipper: True")
216 else:
217 lines.append("\tTipper: False")
218 if self.frequency is not None:
219 lines.append(f"\tNumber of periods: {self.frequency.size}")
220 lines.append(
221 f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s"
222 )
223 lines.append(
224 f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s"
225 )
226 return "\n".join(lines)
228 def __repr__(self) -> str:
229 return self.__str__()
231 @property
232 def fn(self):
233 return self._fn
235 @fn.setter
236 def fn(self, fn: str | Path | None):
237 if fn is not None:
238 self._fn = Path(fn)
239 if self._fn.exists():
240 self.read()
242 @property
243 def period(self) -> np.typing.NDArray | None:
244 if self.frequency is not None:
245 return 1.0 / self.frequency
246 return None
248 def _assert_descending_frequency(self) -> None:
249 """
250 Assert that the transfer function is ordered from high frequency to low
251 frequency.
253 """
254 if self.frequency is not None:
255 if self.frequency[0] < self.frequency[1]:
256 self.logger.debug(
257 "Ordered arrays to be arranged from high to low frequency"
258 )
259 self.frequency = self.frequency[::-1]
260 if self.z is not None:
261 self.z = self.z[::-1]
262 if self.z_err is not None:
263 self.z_err = self.z_err[::-1]
264 if self.t is not None:
265 self.t = self.t[::-1]
266 if self.t_err is not None:
267 self.t_err = self.t_err[::-1]
269 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None:
270 """
271 Read in an edi file and fill attributes of each section's classes.
272 Including:
274 * Header
275 * Info
276 * Measurement
277 * Data
278 * z, z_err
279 * t, t_err
281 .. note:: Automatically detects if data is in spectra format. All
282 data read in is converted to impedance and Tipper.
285 :param fn: full path to .edi file to be read in
286 *default* is None
287 :type fn: string
289 :Example: ::
291 >>> from mt_metadata.transfer_functions.io.edi import EDI
292 >>> edi_obj = EDI
293 >>> edi_obj.read(fn=r"/home/mt/mt01.edi")
295 """
297 if fn is not None:
298 self._fn = Path(fn)
299 if self.fn is None:
300 msg = "Must input EDI file to read"
301 self.logger.error(msg)
302 raise IOError(msg)
303 if not self.fn.exists():
304 msg = f"Cannot find EDI file: {self.fn}"
305 self.logger.error(msg)
306 raise IOError(msg)
307 with open(self.fn, "r") as fid:
308 self._edi_lines = _validate_edi_lines(fid.readlines())
309 self.Header.read_header(self._edi_lines)
310 self.Info.read_info(self._edi_lines)
311 self.Measurement.read_measurement(self._edi_lines)
312 self.Data.read_data(self._edi_lines)
313 self.Data.match_channels(self.Measurement.channel_ids)
315 self._read_data()
317 if self.Header.latitude in [None, 0.0]:
318 self.Header.latitude = self.Measurement.reflat
319 self.logger.debug(f"Got latitude from reflat for {self.Header.dataid}")
320 if self.Header.longitude in [None, 0.0]:
321 self.Header.longitude = self.Measurement.reflon
322 self.logger.debug(f"Got longitude from reflon for {self.Header.dataid}")
323 if self.Header.elevation in [None, 0.0]:
324 self.Header.elevation = self.Measurement.refelev
325 self.logger.debug(f"Got elevation from refelev for {self.Header.dataid}")
327 if self.elev in [0, None] and get_elevation:
328 if self.lat != 0 and self.lon != 0:
329 self.elev = get_nm_elev(self.lat, self.lon)
331 def _read_data(self) -> None:
332 """
333 Read either impedance or spectra data depending on what the type is
334 in the data section.
335 """
337 lines = self._edi_lines[self.Data._line_num :]
339 if self.Data._data_type_in == "spectra":
340 self.logger.debug("Converting Spectra to Impedance and Tipper")
341 self.logger.debug(
342 "Check to make sure input channel list is correct if the data looks incorrect"
343 )
344 if self.Data.nchan == 5:
345 c_list = ["hx", "hy", "hz", "ex", "ey"]
346 elif self.Data.nchan == 4:
347 c_list = ["hx", "hy", "ex", "ey"]
348 elif self.Data.nchan == 6:
349 c_list = ["hx", "hy", "ex", "ey", "rhx", "rhy"]
350 elif self.Data.nchan == 7:
351 c_list = ["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"]
352 self._read_spectra(lines, comp_list=c_list)
353 elif self.Data._data_type_in == "z":
354 self._read_mt(lines)
356 def _read_mt(self, data_lines: list[str]) -> None:
357 """
358 Read in impedance and tipper data
360 :param data_lines: list of data lines from the edi file
361 :type data_lines: list
362 """
364 data_dict = {}
365 data_find = False
366 for line in data_lines:
367 line = line.strip()
368 if ">" in line and "!" not in line:
369 line_list = line[1:].strip().split()
370 if len(line_list) == 0:
371 continue
372 key = line_list[0].lower()
373 if key in self._accepted_keys:
374 data_find = True
375 data_dict[key] = []
376 else:
377 data_find = False
378 elif data_find and ">" not in line and "!" not in line:
379 d_lines = line.strip().split()
380 for ii, dd in enumerate(d_lines):
381 # check for empty values and set them to 0, check for any
382 # other characters sometimes there are ****** for a null
383 # component
384 try:
385 d_lines[ii] = float(dd)
386 if d_lines[ii] == self.Header.empty:
387 d_lines[ii] = 0.0
388 except ValueError:
389 d_lines[ii] = 0.0
390 data_dict[key] += d_lines
391 # put everything into arrays
392 for key, k_list in data_dict.items():
393 data_dict[key] = np.array(k_list)
394 # fill useful arrays
395 self.frequency = data_dict["freq"]
396 self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex)
397 self.z_err = np.zeros((self.frequency.size, 2, 2), dtype=float)
398 # fill tipper data if there it exists
399 self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex)
400 self.t_err = np.zeros((self.frequency.size, 1, 2), dtype=float)
401 self.data_dict = data_dict
403 # fill tensors
404 for key in sorted(self._index_dict.keys(), reverse=True):
405 index = self._index_dict[key]
406 ii = index["ii"]
407 jj = index["jj"]
408 obj = getattr(self, index["obj"])
409 error_obj = getattr(self, index["err_obj"])
410 try:
411 if key.startswith("z"):
412 obj[:, ii, jj] = data_dict[f"{key}r"] + data_dict[f"{key}i"] * 1j
413 try:
414 error_key = [
415 k for k in data_dict.keys() if key in k and "var" in k
416 ][0]
417 error_obj[:, ii, jj] = np.abs(data_dict[error_key]) ** 0.5
418 except IndexError:
419 self.logger.debug(f"Could not find error information for {key}")
420 elif key.startswith("t"):
421 obj[:, ii, jj] = (
422 data_dict[f"{key}r.exp"] + data_dict[f"{key}i.exp"] * 1j
423 )
424 try:
425 error_key = [
426 k for k in data_dict.keys() if key in k and "var" in k
427 ][0]
428 error_obj[:, ii, jj] = np.abs(data_dict[error_key]) ** 0.5
429 except IndexError:
430 self.logger.debug(f"Could not find error information for {key}")
431 elif key.startswith("r") or key.startswith("p"):
432 self.logger.debug("Reading RHO and PHS to compute impedance")
433 if (self.z[:, ii, jj] == 0).all():
434 phase = data_dict[f"phs{key[-2:]}"]
435 z_real = np.sqrt(
436 (5 * self.frequency * data_dict[key])
437 / (np.tan(np.deg2rad(phase)) ** 2 + 1)
438 )
439 z_imag = (np.tan(np.deg2rad(phase))) * z_real
440 if ii == 1 and jj == 0:
441 if phase.mean() < 90 and phase.mean() > 0:
442 obj[:, ii, jj] = -1 * (z_real + 1j * z_imag)
443 else:
444 obj[:, ii, jj] = z_real + 1j * z_imag
445 error_obj[:, ii, jj] = np.deg2rad(
446 data_dict[f"phs{key[-2:]}.err"]
447 ) * np.sqrt(data_dict[key] * (self.frequency * 5))
448 except KeyError as error:
449 self.logger.debug(error)
450 # check for order of frequency, we want high togit low
451 self._assert_descending_frequency()
453 try:
454 self.rotation_angle = np.array(data_dict["zrot"])
455 except KeyError:
456 try:
457 self.rotation_angle = np.array(data_dict["rhorot"])
458 except KeyError:
459 self.rotation_angle = np.zeros_like(self.frequency)
461 def _read_spectra(
462 self,
463 data_lines: list[str],
464 comp_list: list[str] = ["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"],
465 ) -> None:
466 """
467 Read in spectra data and convert to impedance and Tipper.
469 Translated from A. Kelbert's EMTF fortran module
471 :param data_lines: list of lines from edi file
472 :type data_lines: list
474 :param comp_list: list of components that correspond to the columns
475 of the spectra data.
476 :type comp_list: list
477 """
479 data_dict = {}
480 avgt_dict = {}
481 data_find = False
482 for line in data_lines:
483 if line.lower().find(">spectra") == 0 and line.find("!") == -1:
484 line_list = _validate_str_with_equals(line)
485 data_find = True
487 # frequency will be the key
488 try:
489 key = float(
490 [
491 ss.split("=")[1]
492 for ss in line_list
493 if ss.lower().find("freq") == 0
494 ][0]
495 )
496 data_dict[key] = []
497 avgt = float(
498 [
499 ss.split("=")[1]
500 for ss in line_list
501 if ss.lower().find("avgt") == 0
502 ][0]
503 )
504 avgt_dict[key] = avgt
505 except ValueError:
506 self.logger.debug("did not find frequency key")
507 elif data_find and line.find(">") == -1 and line.find("!") == -1:
508 data_dict[key] += [float(ll) for ll in line.strip().split()]
509 elif line.find(">spectra") == -1:
510 data_find = False
511 # get an object that contains the indices for each component
512 cc = index_locator(comp_list)
514 self.frequency = np.array(sorted(list(data_dict.keys()), reverse=True))
516 self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex)
517 self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex)
519 self.z_err = np.zeros_like(self.z, dtype=float)
520 self.t_err = np.zeros_like(self.t, dtype=float)
522 self.residual_covariance = np.zeros(
523 (self.frequency.size, cc.n_outputs, cc.n_outputs), dtype=complex
524 )
525 self.signal_inverse_power = np.zeros(
526 (self.frequency.size, cc.n_inputs, cc.n_inputs), dtype=complex
527 )
529 self.tf = np.zeros(
530 (self.frequency.size, cc.n_outputs, cc.n_inputs), dtype=complex
531 )
532 self.tf_err = np.zeros_like(self.tf, dtype=float)
534 for kk, key in enumerate(self.frequency):
535 # read in spectra as an (n_channel x n_channel) array
536 spectra_arr = np.reshape(
537 np.array(data_dict[key]), (len(comp_list), len(comp_list))
538 )
540 # compute cross powers
541 s_arr = np.zeros_like(spectra_arr, dtype=complex)
542 for ii in range(s_arr.shape[0]):
543 for jj in range(ii, s_arr.shape[0]):
544 if ii == jj:
545 s_arr[ii, jj] = spectra_arr[ii, jj]
546 else:
547 # minus sign for complex conjugation
548 # original spectra data are of form <A,B*>, but we need
549 # the order <B,A*>...
550 # this is achieved by complex conjugation of the
551 # original entries
552 s_arr[ii, jj] = complex(
553 spectra_arr[jj, ii], -spectra_arr[ii, jj]
554 )
555 # keep complex conjugated entries in the lower
556 # triangular matrix:
557 s_arr[jj, ii] = complex(
558 spectra_arr[jj, ii], spectra_arr[ii, jj]
559 )
560 # check for empty values
561 s_arr[s_arr == 0] = np.nan
562 s_arr[s_arr == self.Header.empty] = np.nan
564 # from A. Kelbert's EMTF
565 # cross spectra matrices
567 # Note we changed the indices to [ex, ey, hz] from [hz, ex, ey]
568 # input channels
569 rh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
570 rr = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
571 hh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
573 # output channels
574 re = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex)
575 he = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex)
576 ee = np.zeros((cc.n_outputs, cc.n_outputs), dtype=complex)
578 # fill in cross powers for input channels
579 rh[0, 0] = s_arr[cc.rhx, cc.hx]
580 rh[0, 1] = s_arr[cc.rhx, cc.hy]
581 rh[1, 0] = s_arr[cc.rhy, cc.hx]
582 rh[1, 1] = s_arr[cc.rhy, cc.hy]
584 rr[0, 0] = s_arr[cc.rhx, cc.rhx]
585 rr[0, 1] = s_arr[cc.rhx, cc.rhy]
586 rr[1, 0] = s_arr[cc.rhy, cc.rhx]
587 rr[1, 1] = s_arr[cc.rhy, cc.rhy]
589 hh[0, 0] = s_arr[cc.hx, cc.hx]
590 hh[0, 1] = s_arr[cc.hx, cc.hy]
591 hh[1, 0] = s_arr[cc.hy, cc.hx]
592 hh[1, 1] = s_arr[cc.hy, cc.hy]
594 # fill in cross powers for output channels
595 if cc.has_tipper and cc.has_electric:
596 re[0, 2] = s_arr[cc.rhx, cc.hz]
597 re[0, 0] = s_arr[cc.rhx, cc.ex]
598 re[0, 1] = s_arr[cc.rhx, cc.ey]
599 re[1, 2] = s_arr[cc.rhy, cc.hz]
600 re[1, 0] = s_arr[cc.rhy, cc.ex]
601 re[1, 1] = s_arr[cc.rhy, cc.ey]
603 he[0, 2] = s_arr[cc.hx, cc.hz]
604 he[0, 0] = s_arr[cc.hx, cc.ex]
605 he[0, 1] = s_arr[cc.hx, cc.ey]
606 he[1, 2] = s_arr[cc.hy, cc.hz]
607 he[1, 0] = s_arr[cc.hy, cc.ex]
608 he[1, 1] = s_arr[cc.hy, cc.ey]
610 ee[2, 2] = s_arr[cc.hz, cc.hz]
611 ee[2, 0] = s_arr[cc.hz, cc.ex]
612 ee[2, 1] = s_arr[cc.hz, cc.ey]
613 ee[0, 2] = s_arr[cc.ex, cc.hz]
614 ee[0, 0] = s_arr[cc.ex, cc.ex]
615 ee[0, 1] = s_arr[cc.ex, cc.ey]
616 ee[1, 2] = s_arr[cc.ey, cc.hz]
617 ee[1, 0] = s_arr[cc.ey, cc.ex]
618 ee[1, 1] = s_arr[cc.ey, cc.ey]
619 elif not cc.has_tipper and cc.has_electric:
620 re[0, 0] = s_arr[cc.rhx, cc.ex]
621 re[0, 1] = s_arr[cc.rhx, cc.ey]
622 re[1, 0] = s_arr[cc.rhy, cc.ex]
623 re[1, 0] = s_arr[cc.rhy, cc.ey]
625 he[0, 0] = s_arr[cc.hx, cc.ex]
626 he[0, 1] = s_arr[cc.hx, cc.ey]
627 he[0, 1] = s_arr[cc.hy, cc.ex]
628 he[1, 1] = s_arr[cc.hy, cc.ey]
630 ee[0, 0] = s_arr[cc.ex, cc.ex]
631 ee[0, 1] = s_arr[cc.ex, cc.ey]
632 ee[1, 0] = s_arr[cc.ey, cc.ex]
633 ee[1, 1] = s_arr[cc.ey, cc.ey]
634 elif cc.has_tipper and not cc.has_electric:
635 re[0, 0] = s_arr[cc.rhx, cc.hz]
636 re[1, 0] = s_arr[cc.rhy, cc.hz]
638 he[0, 0] = s_arr[cc.hx, cc.hz]
639 he[1, 0] = s_arr[cc.hy, cc.hz]
641 ee[0, 0] = s_arr[cc.hz, cc.hz]
642 # check to make sure the values are legit for accurate results
643 if abs(np.linalg.det(rh)) < np.finfo(float).eps:
644 self.logger.warning(
645 "spectral matrix determinant is too small "
646 f"{abs(np.linalg.det(rh))} for period {key}. "
647 "Results may be inaccurate"
648 )
649 tfh = np.matmul(np.linalg.inv(rh), re)
650 tf = tfh.conj().T
652 sig = np.matmul(
653 np.linalg.inv(rh), np.matmul(rr, np.linalg.inv(rh.conj().T))
654 )
655 res = (
656 ee
657 - np.matmul(tf, he)
658 - np.matmul(he.conj().T, tfh)
659 + np.matmul(tf, np.matmul(hh, tfh))
660 ) / avgt_dict[key]
662 # variance = abs(np.dot(res[0 : cc.n_inputs, :].T, sig))
663 variance = np.zeros((cc.n_outputs, cc.n_inputs), dtype=complex)
664 for nn in range(cc.n_outputs):
665 for mm in range(cc.n_inputs):
666 variance[nn, mm] = res[nn, nn] * sig[mm, mm]
668 tf_err = np.sqrt(np.abs(variance))
669 self.tf[kk, :, :] = tf
670 self.tf_err[kk, :, :] = np.sqrt(np.abs(variance))
671 self.signal_inverse_power[kk, :, :] = sig
672 self.residual_covariance[kk, :, :] = res
674 if cc.has_tipper and cc.has_electric:
675 self.z[kk, :, :] = tf[0:2, :]
676 self.z_err[kk, :, :] = tf_err[0:2, :]
677 self.t[kk, :, :] = tf[2, :]
678 self.t_err[kk, :, :] = tf_err[2, :]
679 self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0
680 self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0
681 elif not cc.has_tipper and cc.has_electric:
682 self.z[kk, :, :] = tf[:, :]
683 self.z_err[kk, :, :] = tf_err[:, :]
684 self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0
685 elif cc.has_tipper and not cc.has_electric:
686 self.t[kk, :, :] = tf[:, :]
687 self.t_err[kk, :, :] = tf_err[:, :]
688 self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0
690 def write(
691 self,
692 new_edi_fn: str | Path | None = None,
693 longitude_format: Literal["LON", "LONG"] = "LON",
694 latlon_format: Literal["dms", "dd"] = "dms",
695 ) -> Path:
696 """
697 Write a new edi file from either an existing .edi file or from data
698 input by the user into the attributes of Edi.
701 :param new_edi_fn: full path to new edi file.
702 *default* is None, which will write to the same
703 file as the input .edi with as:
704 r"/home/mt/mt01_1.edi"
705 :type new_edi_fn: string
706 :param longitude_format: whether to write longitude as LON or LONG.
707 options are 'LON' or 'LONG', default 'LON'
708 :type longitude_format: string
709 :param latlon_format: format of latitude and longitude in output edi,
710 degrees minutes seconds ('dms') or decimal
711 degrees ('dd')
712 :type latlon_format: string
714 :returns: full path to new edi file
715 :rtype: string
717 :Example: ::
719 >>> import mtpy.core.edi as mtedi
720 >>> edi_obj = mtedi.Edi(fn=r"/home/mt/mt01/edi")
721 >>> edi_obj.Header.dataid = 'mt01_rr'
722 >>> n_edi_fn = edi_obj.write_edi_file()
723 """
725 if new_edi_fn is None:
726 if self.fn is not None:
727 new_edi_fn = Path(self.fn)
728 else:
729 new_edi_fn = Path().cwd().joinpath(f"{self.Header.dataid}.edi")
730 else:
731 new_edi_fn = Path(new_edi_fn)
732 # write lines
733 extra_lines = []
734 if self.survey_metadata.summary != None:
735 extra_lines.append(f"\tsurvey.summary = {self.survey_metadata.summary}\n")
736 if self.Header.progname != "mt_metadata":
737 extra_lines.append(f"\toriginal_program.name={self.Header.progname}\n")
738 if self.Header.progvers != __version__:
739 extra_lines.append(f"\toriginal_program.version={self.Header.progvers}\n")
740 if self.Header.progdate != "1980-01-01":
741 extra_lines.append(f"\toriginal_program.date={self.Header.progdate}\n")
742 if self.Header.filedate != "1980-01-01":
743 extra_lines.append(f"\toriginal_file.date={self.Header.filedate}\n")
744 header_lines = self.Header.write_header(
745 longitude_format=longitude_format, latlon_format=latlon_format
746 )
748 info_lines = self.Info.write_info()
749 info_lines.insert(1, "".join(extra_lines))
751 define_lines = self.Measurement.write_measurement(
752 longitude_format=longitude_format, latlon_format=latlon_format
753 )
755 self.Data.nfreq = len(self.frequency)
756 dsect_lines = self.Data.write_data()
758 # write out frequencies
759 freq_lines = [self._data_header_str.format("frequencies".upper())]
760 freq_lines += self._write_data_block(self.frequency, "freq")
762 # write out rotation angles
763 zrot_lines = [self._data_header_str.format("impedance rotation angles".upper())]
764 if self.rotation_angle is None:
765 self.rotation_angle = np.zeros(self.frequency.size)
766 elif isinstance(self.rotation_angle, (float, int)):
767 self.rotation_angle = np.repeat(self.rotation_angle, self.frequency.size)
768 elif len(self.rotation_angle) != self.frequency.size:
769 raise ValueError(
770 "rotation angle must be the same length and the number of "
771 f"frequencies {len(self.rotation_angle)} != {self.frequency.size}"
772 )
773 zrot_lines += self._write_data_block(self.rotation_angle, "zrot")
775 # write out data only impedance and tipper
776 z_data_lines = [self._data_header_str.format("impedances".upper())]
777 self.z = np.nan_to_num(self.z)
778 self.z_err = np.nan_to_num(self.z_err)
779 self.t = np.nan_to_num(self.t)
780 self.t_err = np.nan_to_num(self.t_err)
781 if self.z is not None:
782 for ii in range(2):
783 for jj in range(2):
784 z_lines_real = self._write_data_block(
785 self.z[:, ii, jj].real, self._z_labels[2 * ii + jj][0]
786 )
787 z_lines_imag = self._write_data_block(
788 self.z[:, ii, jj].imag, self._z_labels[2 * ii + jj][1]
789 )
790 z_lines_var = self._write_data_block(
791 self.z_err[:, ii, jj] ** 2.0,
792 self._z_labels[2 * ii + jj][2],
793 )
795 z_data_lines += z_lines_real
796 z_data_lines += z_lines_imag
797 z_data_lines += z_lines_var
798 if self.t is None:
799 trot_lines = [""]
800 t_data_lines = [""]
801 elif np.all(self.t == 0):
802 trot_lines = [""]
803 t_data_lines = [""]
804 else:
805 try:
806 # write out rotation angles
807 trot_lines = [
808 self._data_header_str.format("tipper rotation angles".upper())
809 ]
810 if isinstance(self.rotation_angle, float):
811 trot = np.repeat(self.rotation_angle, self.frequency.size)
812 else:
813 trot = self.rotation_angle
814 trot_lines += self._write_data_block(np.array(trot), "trot")
816 # write out tipper lines
817 t_data_lines = [self._data_header_str.format("tipper".upper())]
818 for jj in range(2):
819 t_lines_real = self._write_data_block(
820 self.t[:, 0, jj].real, self._t_labels[jj][0]
821 )
822 t_lines_imag = self._write_data_block(
823 self.t[:, 0, jj].imag, self._t_labels[jj][1]
824 )
825 t_lines_var = self._write_data_block(
826 self.t_err[:, 0, jj] ** 2.0, self._t_labels[jj][2]
827 )
829 t_data_lines += t_lines_real
830 t_data_lines += t_lines_imag
831 t_data_lines += t_lines_var
832 except AttributeError:
833 trot_lines = [""]
834 t_data_lines = [""]
835 edi_lines = (
836 header_lines
837 + info_lines
838 + define_lines
839 + dsect_lines
840 + freq_lines
841 + zrot_lines
842 + z_data_lines
843 + trot_lines
844 + t_data_lines
845 + [">END"]
846 )
848 with open(new_edi_fn, "w") as fid:
849 fid.write("".join(edi_lines))
850 self.fn = new_edi_fn
851 return new_edi_fn
853 def _write_data_block(
854 self, data_comp_arr: np.typing.NDArray, data_key: str
855 ) -> list[str]:
856 """
857 Write a data block
859 :param data_comp_arr: array of data components
860 :type data_comp_arr: np.ndarray
862 :param data_key: the component to write out
863 :type data_key: string
865 :returns: list of lines to write to edi file
866 :rtype: list
867 """
868 if data_key.lower().find("z") >= 0 and data_key.lower() not in [
869 "zrot",
870 "trot",
871 ]:
872 block_lines = [
873 ">{0} ROT=ZROT // {1:.0f}\n".format(
874 data_key.upper(), data_comp_arr.size
875 )
876 ]
877 elif data_key.lower().find("t") >= 0 and data_key.lower() not in [
878 "zrot",
879 "trot",
880 ]:
881 block_lines = [
882 ">{0} ROT=TROT // {1:.0f}\n".format(
883 data_key.upper(), data_comp_arr.size
884 )
885 ]
886 elif data_key.lower() == "freq":
887 block_lines = [
888 ">{0} // {1:.0f}\n".format(data_key.upper(), data_comp_arr.size)
889 ]
890 elif data_key.lower() in ["zrot", "trot"]:
891 block_lines = [
892 ">{0} // {1:.0f}\n".format(data_key.upper(), data_comp_arr.size)
893 ]
894 else:
895 raise ValueError("Cannot write block for {0}".format(data_key))
896 for d_index, d_comp in enumerate(data_comp_arr, 1):
897 if d_comp == 0.0 and data_key.lower() not in ["zrot", "trot"]:
898 d_comp = self.Header.empty
899 # write the string in the specified format
900 num_str = "{0:{1}}".format(d_comp, self._num_format)
902 # check to see if a new line is needed
903 if d_index % self._block_len == 0:
904 num_str += "\n"
905 # at the end of the block add a return
906 if d_index == data_comp_arr.size:
907 num_str += "\n"
908 block_lines.append(num_str)
909 return block_lines
911 # -----------------------------------------------------------------------
912 # set a few important properties
913 # --> Latitude
914 @property
915 def lat(self) -> float | None:
916 """latitude in decimal degrees"""
917 return self.Header.latitude
919 @lat.setter
920 def lat(self, input_lat) -> None:
921 """set latitude and make sure it is converted to a float"""
922 self.Header.latitude = input_lat
924 # --> Longitude
925 @property
926 def lon(self) -> float | None:
927 """longitude in decimal degrees"""
928 return self.Header.longitude
930 @lon.setter
931 def lon(self, input_lon: float | None):
932 """set latitude and make sure it is converted to a float"""
933 self.Header.longitude = input_lon
935 # --> Elevation
936 @property
937 def elev(self) -> float:
938 """Elevation in elevation units"""
939 return self.Header.elevation
941 @elev.setter
942 def elev(self, input_elev: float) -> None:
943 """set elevation and make sure it is converted to a float"""
944 self.Header.elevation = input_elev
946 # --> station
947 @property
948 def station(self) -> str | None:
949 """station name"""
950 if self.Header.dataid is not None:
951 return self.Header.dataid.replace(r"/", "_")
952 elif self.Measurement.refloc is not None:
953 return self.Measurement.refloc.replace('"', "")
954 elif self.Data.sectid is not None:
955 return self.Data.sectid
957 @station.setter
958 def station(self, new_station: str | int):
959 """station name"""
960 if not isinstance(new_station, str):
961 new_station = f"{new_station}".replace(r"/", "_")
962 self.Header.dataid = new_station
963 self.Data.sectid = new_station
965 @property
966 def survey_metadata(self) -> Survey:
967 sm = Survey()
969 if self.Header.project is None:
970 if self.Header.prospect is not None:
971 sm.project = self.Header.prospect
972 else:
973 sm.project = self.Header.project
975 if self.Header.survey is None:
976 sm.id = "0"
977 else:
978 sm.id = self.Header.survey
979 if self.Header.acqby is not None:
980 sm.acquired_by.author = self.Header.acqby
981 if self.Header.loc is not None:
982 sm.geographic_name = self.Header.loc
983 sm.country = self.Header.country
985 for key, value in self.Info.info_dict.items():
986 if key is None:
987 key = "extra"
988 key = key.lower()
989 if key.startswith("survey."):
990 sm.update_attribute(key.split("survey.")[1], value)
992 sm.add_station(self.station_metadata)
994 return sm
996 @survey_metadata.setter
997 def survey_metadata(self, survey: Survey) -> None:
998 """
999 Update metadata from a survey metadata object
1001 :param value: DESCRIPTION
1002 :type value: TYPE
1003 :return: DESCRIPTION
1004 :rtype: TYPE
1006 """
1008 if not isinstance(survey, Survey):
1009 raise TypeError(
1010 "Input must be a mt_metadata.transfer_function.Survey object"
1011 f" not {type(survey)}"
1012 )
1013 self.Header.survey = survey.id
1014 self.Header.project = survey.project
1015 self.Header.loc = survey.geographic_name
1016 self.Header.country = survey.country
1017 if survey.summary not in NULL_VALUES:
1018 self.Info.info_dict[f"survey.summary"] = survey.summary
1020 for key in survey.to_dict(single=True).keys():
1021 if "northwest" in key or "southeast" in key or "time_period" in key:
1022 continue
1023 value = survey.get_attr_from_name(key)
1024 if value not in NULL_VALUES:
1025 self.Info.info_dict[f"survey.{key}"] = value
1027 @property
1028 def station_metadata(self) -> tf.Station:
1029 sm = tf.Station()
1030 sm.add_run(Run(id=f"{self.station}a"))
1031 if self.station is not None:
1032 sm.id = self.station
1033 sm.data_type = "MT"
1034 sm.channels_recorded = self.Measurement.channels_recorded
1035 # location
1036 sm.location.latitude = self.lat
1037 sm.location.longitude = self.lon
1038 sm.location.elevation = self.elev
1039 sm.location.datum = self.Header.datum
1040 sm.location.declination.value = self.Header.declination.value
1041 sm.orientation.reference_frame = self.Header.coordinate_system.split()[0]
1042 if self.Header.loc is not None:
1043 sm.geographic_name = self.Header.loc
1045 # provenance
1046 if self.Header.acqby is not None:
1047 sm.acquired_by.author = self.Header.acqby
1048 sm.provenance.creation_time = self.Header.filedate
1049 sm.provenance.submitter.author = self.Header.fileby
1050 sm.provenance.software.name = self.Header.fileby
1051 sm.provenance.software.version = self.Header.progvers
1052 sm.transfer_function.processed_date = self.Header.filedate
1053 sm.transfer_function.runs_processed = sm.run_list
1054 if self.station is not None:
1055 sm.transfer_function.id = self.station
1056 # dates
1057 if self.Header.acqdate is not None:
1058 sm.time_period.start = self.Header.acqdate
1059 if self.Header.enddate is not None:
1060 sm.time_period.end = self.Header.enddate
1062 # processing information
1063 for key, value in self.Info.info_dict.items():
1064 if key is None:
1065 continue
1066 if value in NULL_VALUES:
1067 continue
1068 key = key.lower()
1070 if "provenance" in key:
1071 sm.update_attribute(key, value)
1072 elif "transfer_function" in key:
1073 key = key.split("transfer_function.")[1]
1074 if "processing_parameters" in key:
1075 if isinstance(value, list):
1076 for item in value:
1077 if "=" in item:
1078 param, val = item.split("=")
1079 sm.transfer_function.processing_parameters.append(
1080 f"{param}={val}"
1081 )
1082 else:
1083 sm.transfer_function.processing_parameters.append(item)
1084 elif value not in NULL_VALUES:
1085 param = key.split(".")[-1]
1086 sm.transfer_function.processing_parameters.append(
1087 f"{param}={value}"
1088 )
1089 elif "remote_references" in key:
1090 if key.count(".") > 1:
1091 if key in ["remote_references.id"]:
1092 sm.transfer_function.remote_references.append(value)
1093 elif key in ["remote_references.geographic_name"]:
1094 try:
1095 sm.transfer_function.remote_references[
1096 -1
1097 ] = f"{value}.{sm.transfer_function.remote_references[-1]}"
1098 except IndexError:
1099 sm.transfer_function.remote_references.append(value)
1100 else:
1101 # skip any keys that have remote_references.something_else
1102 continue
1103 else:
1104 if isinstance(value, list):
1105 sm.transfer_function.remote_references.extend(value)
1106 elif isinstance(value, str):
1107 sm.transfer_function.remote_references.append(value)
1108 elif isinstance(value, (int, float)):
1109 sm.transfer_function.remote_references.append(value)
1110 else:
1111 self.logger.warning(
1112 f"Do not know how to handle remote_references {value}"
1113 )
1114 continue
1115 elif "runs_processed" in key:
1116 sm.run_list = sm.transfer_function.runs_processed
1117 else:
1118 sm.transfer_function.update_attribute(key, value)
1120 elif key.startswith("run."):
1121 comp = None
1122 key = key.split("run.")[1]
1123 if "." in key:
1124 # split the key into component and attribute
1125 # e.g. run.mt01a.hx.comments
1126 try:
1127 comp, key = key.split(".", 1)
1128 except (IndexError, ValueError):
1129 continue
1130 if comp is not None:
1131 try:
1132 ch = getattr(sm.runs[0], comp)
1133 except AttributeError:
1134 ch = None
1135 if ch is None:
1136 if comp in ["ex", "ey"]:
1137 ch = Electric(component=comp) # type: ignore
1138 sm.runs[0].add_channel(ch)
1139 elif comp in ["hx", "hy", "hz", "rrhx", "rrhy"]:
1140 ch = Magnetic(component=comp) # type: ignore
1141 sm.runs[0].add_channel(ch)
1142 else:
1143 self.logger.warning(
1144 f"Do not recognize channel {comp}, skipping..."
1145 )
1146 try:
1147 sm.runs[0].update_attribute(f"{key}.{comp}", value)
1149 except Exception as e:
1150 self.logger.warning(
1151 f"Failed to update attribute {key}.{comp}: {e}"
1152 )
1153 if key in ["comments"]:
1154 if isinstance(value, list):
1155 value = ",".join(value)
1156 try:
1157 ch.update_attribute(key, value)
1158 except Exception as e:
1159 self.logger.warning(f"Failed to update attribute {key}: {e}")
1160 elif key.startswith("data_logger"):
1161 sm.runs[0].update_attribute(key, value)
1162 elif key.startswith("station."):
1163 sm.update_attribute(key.split("station.")[1], value)
1164 elif "processing." in key:
1165 key = key.split("processing.")[1]
1166 if key in ["software"]:
1167 sm.transfer_function.software.name = value
1168 elif key in ["tag"]:
1169 if value.count(",") > 0:
1170 sm.transfer_function.remote_references = value.split(",")
1171 else:
1172 sm.transfer_function.remote_references = value.split()
1173 elif key in ["processedby", "processed_by"]:
1174 sm.transfer_function.processed_by.name = value
1175 elif key in ["runlist", "run_list"]:
1176 if value.count(",") > 0:
1177 runs = value.split(",")
1178 else:
1179 runs = value.split()
1180 sm.run_list = []
1181 for rr in runs:
1182 if rr not in sm.runs.keys():
1183 sm.add_run(Run(id=rr))
1184 sm.transfer_function.runs_processed = runs
1185 elif key == "sitename":
1186 sm.geographic_name = value
1187 elif key == "signconvention":
1188 sm.transfer_function.sign_convention = value
1189 elif "mtft" in key or "emtf" in key or "mtedit" in key:
1190 sm.transfer_function.processing_parameters.append(f"{key}={value}")
1192 if self.Header.filedate is not None:
1193 sm.transfer_function.processed_date = self.Header.filedate
1194 # make any extra information in info list into a comment
1195 sm.comments = "\n".join(self.Info.write_info()[1:])
1197 # add information to runs
1198 for rr in sm.runs:
1199 if rr.time_period.start in NULL_VALUES:
1200 rr.time_period.start = sm.time_period.start
1201 if rr.time_period.end in NULL_VALUES:
1202 rr.time_period.end = sm.time_period.end
1204 for ch in self.Measurement.channels_recorded:
1205 try:
1206 ch_obj = rr.channels[ch]
1207 ch_obj.update(getattr(self, f"{ch}_metadata"))
1208 except AttributeError:
1209 pass
1210 return sm
1212 @station_metadata.setter
1213 def station_metadata(self, sm: tf.Station) -> None:
1214 """
1215 Set EDI metadata from station metadata object
1217 :param sm: Station object to pull metadata from
1218 :type sm: :class:`mt_metadata.transfer_functions.tf.Station`
1220 """
1222 ### fill header information from station
1223 self.Header.acqby = sm.acquired_by.author
1224 self.Header.acqdate = sm.time_period.start
1225 self.Header.coordinate_system = sm.orientation.reference_frame
1226 self.Header.dataid = sm.id
1227 self.Header.declination = sm.location.declination
1228 self.Header.elevation = sm.location.elevation
1229 self.Header.fileby = sm.provenance.submitter.author
1230 self.Header.filedate = sm.provenance.creation_time
1231 self.Header.latitude = sm.location.latitude
1232 self.Header.longitude = sm.location.longitude
1233 self.Header.datum = sm.location.datum
1234 self.Header.units = sm.transfer_function.units
1235 self.Header.enddate = sm.time_period.end
1236 if sm.geographic_name is not None:
1237 self.Header.loc = sm.geographic_name
1239 ### write notes
1240 # write comments, which would be anything in the info section from an edi
1241 if isinstance(sm.comments, str):
1242 self.Info.read_info(sm.comments.split("\n"))
1243 # write transfer function info first
1244 for k, v in sm.transfer_function.to_dict(single=True).items():
1245 if not v in NULL_VALUES:
1246 if k in ["processing_parameters"]:
1247 for item in v:
1248 param, value = item.split("=", 1)
1249 self.Info.info_dict[
1250 f"transfer_function.processing_parameters.{param}"
1251 ] = value
1253 else:
1254 self.Info.info_dict[f"transfer_function.{k}"] = v
1255 # write provenance
1256 for k, v in sm.provenance.to_dict(single=True).items():
1257 if not v in NULL_VALUES:
1258 self.Info.info_dict[f"provenance.{k}"] = v
1259 # write field notes
1260 for run in sm.runs:
1261 r_dict = run.to_dict(single=True)
1262 for r_key, r_value in r_dict.items():
1263 if r_value in NULL_VALUES:
1264 continue
1265 self.Info.info_dict[f"{run.id}.{r_key}"] = r_value
1266 for ch in run.channels:
1267 ch_dict = ch.to_dict(single=True)
1268 for ch_key, ch_value in ch_dict.items():
1269 if ch_key not in self._channel_skip_list:
1270 if ch_value in NULL_VALUES:
1271 continue
1272 self.Info.info_dict[
1273 f"{run.id}.{ch.component}.{ch_key}"
1274 ] = ch_value
1275 # write station information
1276 self.Measurement.from_metadata(ch)
1277 # add channel id to data section
1278 try:
1279 setattr(self.Data, f"{ch.component}", ch.channel_id)
1280 except AttributeError:
1281 self.logger.warning(
1282 f"Cannot set {ch.component} channel id, skipping..."
1283 )
1285 ### fill measurement
1286 self.Measurement.refelev = sm.location.elevation
1287 self.Measurement.reflat = sm.location.latitude
1288 self.Measurement.reflon = sm.location.longitude
1289 self.Measurement.refloc = sm.id
1290 self.Measurement.maxchan = len(sm.channels_recorded)
1292 def _get_electric_metadata(self, comp):
1293 """
1294 get electric information from the various metadata
1295 """
1296 comp = comp.lower()
1297 electric = Electric(component=comp)
1298 electric.positive.type = "electric"
1299 electric.negative.type = "electric"
1300 if comp in self.Measurement.measurements.keys():
1301 meas = self.Measurement.measurements[comp]
1302 for attr in [
1303 "negative.x",
1304 "negative.y",
1305 "positive.x2",
1306 "positive.y2",
1307 "measurement_azimuth",
1308 "translated_azimuth",
1309 ]:
1310 if electric.get_attr_from_name(attr) is None:
1311 electric.update_attribute(attr, 0)
1312 electric.dipole_length = meas.dipole_length
1313 electric.channel_id = meas.id
1314 electric.measurement_azimuth = meas.azimuth
1315 electric.translated_azimuth = meas.azimuth
1316 electric.component = meas.chtype
1317 electric.channel_number = meas.channel_number
1318 electric.negative.x = meas.x
1319 electric.positive.x2 = meas.x2
1320 electric.negative.y = meas.y
1321 electric.positive.y2 = meas.y2
1322 for k, v in self.Info.info_dict.items():
1323 if k is None:
1324 continue
1325 if f"{comp}." in k:
1326 key = k.split(f"{comp}.")[1].strip()
1327 if key == "manufacturer":
1328 electric.negative.manufacturer = v
1329 electric.positive.manufacturer = v
1330 if key == "type":
1331 electric.negative.type = v
1332 electric.positive.type = v
1333 if (
1334 electric.positive.x2 == 0
1335 and electric.positive.y2 == 0.0
1336 and electric.negative.x == 0
1337 and electric.negative.y == 0.0
1338 ):
1339 electric.positive.x2 = electric.dipole_length * np.cos(
1340 np.deg2rad(meas.azimuth)
1341 )
1342 electric.positive.y2 = electric.dipole_length * np.sin(
1343 np.deg2rad(meas.azimuth)
1344 )
1345 for key, value in self.Info.info_dict.items():
1346 if key is None:
1347 continue
1348 elif ".type" in key:
1349 continue
1350 if f".{comp}." in key:
1351 key = key.split(f".{comp}.", 1)[-1]
1352 electric.update_attribute(key, value)
1353 return electric
1355 @property
1356 def ex_metadata(self):
1357 return self._get_electric_metadata("ex")
1359 @property
1360 def ey_metadata(self):
1361 return self._get_electric_metadata("ey")
1363 def _get_magnetic_metadata(self, comp):
1364 """
1366 get magnetic metadata from the various sources
1368 :param comp: DESCRIPTION
1369 :type comp: TYPE
1370 :return: DESCRIPTION
1371 :rtype: TYPE
1373 """
1375 magnetic = Magnetic(component=comp)
1376 magnetic.sensor.type = "magnetic"
1377 if comp in self.Measurement.measurements.keys():
1378 meas = self.Measurement.measurements[comp]
1379 for attr in ["location.x", "location.y", "location.z"]:
1380 if magnetic.get_attr_from_name(attr) is None:
1381 magnetic.update_attribute(attr, 0)
1382 magnetic.measurement_azimuth = meas.azm
1383 magnetic.translated_azimuth = meas.azm
1384 magnetic.component = meas.chtype
1385 magnetic.channel_number = meas.channel_number
1386 magnetic.channel_id = meas.id
1387 magnetic.location.x = meas.x
1388 magnetic.location.y = meas.y
1389 try:
1390 magnetic.sensor.id = meas.meas_magnetic.sensor
1391 except AttributeError:
1392 pass
1393 for k, v in self.Info.info_dict.items():
1394 if k is None:
1395 continue
1396 if f"{comp}." in k:
1397 key = k.split(f"{comp}.")[1].strip()
1398 if key == "manufacturer":
1399 magnetic.sensor.manufacturer = v
1400 if key == "type":
1401 magnetic.sensor.type = v
1402 if key.startswith("sensor."):
1403 magnetic.update_attribute(key, v)
1404 return magnetic
1406 @property
1407 def hx_metadata(self):
1408 return self._get_magnetic_metadata("hx")
1410 @property
1411 def hy_metadata(self):
1412 return self._get_magnetic_metadata("hy")
1414 @property
1415 def hz_metadata(self):
1416 return self._get_magnetic_metadata("hz")
1418 @property
1419 def rrhx_metadata(self):
1420 return self._get_magnetic_metadata("rrhx")
1422 @property
1423 def rrhy_metadata(self):
1424 return self._get_magnetic_metadata("rrhy")