Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ emtfxml \ emtfxml.py: 66%
838 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3EMTFXML
4==========
6This is meant to follow Anna's XML schema for transfer functions
8Created on Sat Sep 4 17:59:53 2021
10@author: jpeacock
11"""
12# =============================================================================
13# Imports
14# =============================================================================
15import inspect
16from enum import Enum
17from pathlib import Path
18from xml.etree import cElementTree as et
20import numpy as np
21from loguru import logger
23from mt_metadata import NULL_VALUES
24from mt_metadata.base import helpers
25from mt_metadata.common import Instrument
26from mt_metadata.common.enumerations import DataTypeEnum
27from mt_metadata.timeseries import Electric, Magnetic, Run, Survey
28from mt_metadata.transfer_functions.io.emtfxml.metadata import helpers as emtf_helpers
29from mt_metadata.transfer_functions.io.tools import get_nm_elev
30from mt_metadata.transfer_functions.tf import Station
31from mt_metadata.utils.validators import validate_attribute
33from . import metadata as emtf_xml
36meta_classes = dict(
37 [
38 (validate_attribute(k), v)
39 for k, v in inspect.getmembers(emtf_xml, inspect.isclass)
40 ]
41)
42meta_classes["instrument"] = Instrument
43# =============================================================================
44# EMTFXML
45# =============================================================================
47estimates_dict = {
48 "variance": emtf_xml.Estimate(
49 name="VAR",
50 type="real",
51 description="Variance",
52 external_url="http://www.iris.edu/dms/products/emtf/variance.html",
53 intention="error estimate",
54 tag="variance",
55 ),
56 "covariance": emtf_xml.Estimate(
57 name="COV",
58 type="complex",
59 description="Covariance",
60 external_url="http://www.iris.edu/dms/products/emtf/covariance.html",
61 intention="error estimate",
62 tag="covariance",
63 ),
64 "residual_covariance": emtf_xml.Estimate(
65 name="RESIDCOV",
66 type="complex",
67 description="Residual Covariance (N)",
68 external_url="http://www.iris.edu/dms/products/emtf/residual_covariance.html",
69 intention="error estimate",
70 tag="residual_covariance",
71 ),
72 "inverse_signal_power": emtf_xml.Estimate(
73 name="INVSIGCOV",
74 type="complex",
75 description="Inverse Coherent Signal Power Matrix (S)",
76 external_url="http://www.iris.edu/dms/products/emtf/inverse_signal_covariance.html",
77 intention="signal power estimate",
78 tag="inverse_signal_covariance",
79 ),
80 "coherence": emtf_xml.Estimate(
81 name="COH",
82 type="complex",
83 description="Coherence",
84 external_url="http://www.iris.edu/dms/products/emtf/coherence.html",
85 intention="signal coherence",
86 tag="coherence",
87 ),
88 "predicted_coherence": emtf_xml.Estimate(
89 name="PREDCOH",
90 type="complex",
91 description="Multiple Coherence",
92 external_url="http://www.iris.edu/dms/products/emtf/multiple_coherence.html",
93 intention="signal coherence",
94 tag="multiple_coherence",
95 ),
96 "signal_amplidude": emtf_xml.Estimate(
97 name="SIGAMP",
98 type="complex",
99 description="Signal Amplitude",
100 external_url="http://www.iris.edu/dms/products/emtf/signal_amplitude.html",
101 intention="signal power estimate",
102 tag="signal_power",
103 ),
104 "signal_noise": emtf_xml.Estimate(
105 name="SIGNOISE",
106 type="complex",
107 description="Signal Noise",
108 external_url="http://www.iris.edu/dms/products/emtf/signal_noise.html",
109 intention="error estimate",
110 tag="signal_noise",
111 ),
112}
114data_types_dict = {
115 "impedance": emtf_xml.DataType(
116 name="Z",
117 type="complex",
118 output="E",
119 input="H",
120 units="milliVolt per kilometer per nanoTesla",
121 description="MT impedance",
122 external_url="http://www.iris.edu/dms/products/emtf/impedance.html",
123 intention="primary data type",
124 tag="impedance",
125 ),
126 "tipper": emtf_xml.DataType(
127 name="T",
128 type="complex",
129 output="H",
130 input="H",
131 units="",
132 description="Vertical Field Transfer Functions (Tipper)",
133 external_url="http://www.iris.edu/dms/products/emtf/tipper.html",
134 intention="primary data type",
135 tag="tipper",
136 ),
137}
140class EMTFXML:
141 """
142 This is meant to follow Anna's XML schema for transfer functions
144 [Kelbert2019](https://doi.org/10.1190/geo2018-0679.1).
146 making this a MetadataBase object is complicated because of station
147 and survey metadata, so we are going to leave this as just an object.
148 """
150 def __init__(self, fn=None, **kwargs):
151 self._root_dict = None
152 self.emtf = emtf_xml.EMTF() # type: ignore
153 self.external_url = emtf_xml.ExternalUrl() # type: ignore
154 self.primary_data = emtf_xml.PrimaryData() # type: ignore
155 self.attachment = emtf_xml.Attachment() # type: ignore
156 self.provenance = emtf_xml.Provenance() # type: ignore
157 self.copyright = emtf_xml.Copyright() # type: ignore
158 self.site = emtf_xml.Site() # type: ignore
160 # not sure why we need to do this, but if you don't FieldNotes end
161 # as a string.
162 self.field_notes = emtf_xml.FieldNotes() # type: ignore
163 self.processing_info = emtf_xml.ProcessingInfo() # type: ignore
164 self.statistical_estimates = emtf_xml.StatisticalEstimates() # type: ignore
165 self.data_types = emtf_xml.DataTypes() # type: ignore
166 self.site_layout = emtf_xml.SiteLayout() # type: ignore
167 self.data = emtf_xml.TransferFunction() # type: ignore
168 self.period_range = emtf_xml.PeriodRange() # type: ignore
170 self.fn = fn
172 self.element_keys = [
173 "description",
174 "product_id",
175 "sub_type",
176 "notes",
177 "tags",
178 "external_url",
179 "primary_data",
180 "attachment",
181 "provenance",
182 "copyright",
183 "site",
184 "field_notes",
185 "processing_info",
186 "statistical_estimates",
187 "data_types",
188 "site_layout",
189 "data",
190 "period_range",
191 ]
193 for key, value in kwargs.items():
194 setattr(self, key, value)
196 if self.fn != None:
197 self.read()
199 def __str__(self):
200 lines = [f"Station: {self.station_metadata.id}", "-" * 50]
201 lines.append(f"\tSurvey: {self.survey_metadata.id}")
202 lines.append(f"\tProject: {self.survey_metadata.project}")
203 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}")
204 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}")
205 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}")
206 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}")
207 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}")
208 lines.append("\tDeclination: ")
209 lines.append(
210 f"\t\tValue: {self.station_metadata.location.declination.value}"
211 )
212 lines.append(
213 f"\t\tModel: {self.station_metadata.location.declination.model}"
214 )
216 if self.data.z is not None:
217 lines.append("\tImpedance: True")
218 else:
219 lines.append("\tImpedance: False")
221 if self.data.t is not None:
222 lines.append("\ttipper: True")
223 else:
224 lines.append("\tTipper: False")
226 if self.data.period is not None:
227 lines.append(f"\tN Periods: {len(self.data.period)}")
229 lines.append("\tPeriod Range:")
230 lines.append(f"\t\tMin: {self.data.period.min():.5E} s")
231 lines.append(f"\t\tMax: {self.data.period.max():.5E} s")
233 lines.append("\tFrequency Range:")
234 lines.append(f"\t\tMin: {1./self.data.period.max():.5E} Hz")
235 lines.append(f"\t\tMax: {1./self.data.period.min():.5E} Hz")
237 return "\n".join(lines)
239 def __repr__(self):
240 lines = []
241 lines.append(f"station='{self.station_metadata.id}'")
242 lines.append(f"latitude={self.station_metadata.location.latitude:.2f}")
243 lines.append(f"longitude={self.station_metadata.location.longitude:.2f}")
244 lines.append(f"elevation={self.station_metadata.location.elevation:.2f}")
246 return f"EMTFXML({(', ').join(lines)})"
248 @property
249 def fn(self):
250 return self._fn
252 @fn.setter
253 def fn(self, value):
254 if value is not None:
255 self._fn = Path(value)
256 else:
257 self._fn = None
259 @property
260 def save_dir(self):
261 if self.fn is not None:
262 return self.fn.parent
263 return None
265 @property
266 def description(self) -> str:
267 return self.emtf.description
269 @description.setter
270 def description(self, value: str):
271 self.emtf.description = value
273 @property
274 def product_id(self) -> str:
275 return self.emtf.product_id
277 @product_id.setter
278 def product_id(self, value: str):
279 self.emtf.product_id = value
281 @property
282 def tags(self) -> str:
283 return self.emtf.tags
285 @tags.setter
286 def tags(self, value: str):
287 self.emtf.tags = value
289 @property
290 def sub_type(self) -> str:
291 return self.emtf.sub_type
293 @sub_type.setter
294 def sub_type(self, value: str):
295 self.emtf.sub_type = value
297 @property
298 def notes(self) -> str:
299 return self.emtf.notes
301 @notes.setter
302 def notes(self, value: str):
303 self.emtf.notes = value
305 def read(self, fn: str | Path = None, get_elevation: bool = False) -> None:
306 """
307 Read xml file
309 :param fn: XML file path to read, if None, use self.fn
310 :type fn: str | Path
311 :return: None
312 :rtype: None
314 """
315 if fn is not None:
316 self.fn = fn
317 if self.fn is not None:
318 if not self.fn.exists():
319 raise IOError(f"Cannot find: {fn}")
320 else:
321 raise IOError("Input file name is None, that is bad.")
323 with open(file=self.fn, mode="r", encoding="utf-8") as xml_fid:
324 xml_string = xml_fid.read()
325 xml_string = xml_string.replace("&", "and")
326 root = et.fromstring(
327 xml_string,
328 et.XMLParser(encoding="utf-8"),
329 )
331 root_dict = helpers.element_to_dict(root)
332 root_dict = root_dict[list(root_dict.keys())[0]]
333 root_dict = emtf_helpers._convert_keys_to_lower_case(root_dict)
334 self._root_dict = root_dict
336 for element in self.element_keys:
337 attr = getattr(self, element)
338 if hasattr(attr, "read_dict"):
339 attr.read_dict(root_dict)
340 else:
341 emtf_helpers._read_single(self, root_dict, element)
343 self.period_range.min = self.data.period.min()
344 self.period_range.max = self.data.period.max()
346 # apparently sometimes the run list will come out as None from an
347 # empty emtfxml.
348 if self.site.run_list is None:
349 self.site.run_list = []
351 self._get_statistical_estimates()
352 self._get_data_types()
353 self._update_site_layout()
355 if self.site.location.elevation == 0 and get_elevation:
356 if self.site.location.latitude != 0 and self.site.location.longitude != 0:
357 self.site.location.elevation = get_nm_elev(
358 self.site.location.latitude, self.site.location.longitude
359 )
361 def write(self, fn: str | Path, skip_field_notes: bool = False) -> None:
362 """
363 Write an xml
364 :param fn: XML file path to write
365 :type fn: str | Path
366 :return: None
367 :rtype: None
368 :rtype: TYPE
370 """
372 emtf_element = et.Element("EM_TF")
374 self._get_statistical_estimates()
375 self._get_data_types()
377 for key in self.element_keys:
378 if key == "external_url":
379 if self.external_url.url in [None, "None", "", "none"]:
380 continue
381 if skip_field_notes:
382 if key == "field_notes":
383 continue
384 value = getattr(self, key)
385 if hasattr(value, "to_xml") and callable(getattr(value, "to_xml")):
386 if key == "processing_info":
387 if skip_field_notes:
388 try:
389 value.remote_info._order.remove("field_notes")
390 except ValueError:
391 logger.debug("No field notes to skip.")
392 if value.remote_info.site.id in [
393 None,
394 "",
395 "None",
396 "none",
397 ]:
398 try:
399 value.remote_info._order.remove("site")
400 except ValueError:
401 logger.debug("No remote field notes to skip.")
402 element = value.to_xml()
403 if isinstance(element, list):
404 for item in element:
405 emtf_element.append(emtf_helpers._convert_tag_to_capwords(item))
406 else:
407 emtf_element.append(emtf_helpers._convert_tag_to_capwords(element))
408 else:
409 emtf_helpers._write_single(emtf_element, key, getattr(self, key))
411 emtf_element = emtf_helpers._remove_null_values(emtf_element)
413 with open(fn, "w") as fid:
414 fid.write(helpers.element_to_string(emtf_element))
416 self.fn = fn
418 def _get_statistical_estimates(self):
419 """
420 Get the appropriate statistical estimates in the file.
422 """
423 self.statistical_estimates.estimates_list = []
424 if self.data.z_var is not None:
425 if not np.all(self.data.z_var == 0.0):
426 self.statistical_estimates.estimates_list.append(
427 estimates_dict["variance"]
428 )
429 elif self.data.t_var is not None:
430 if not np.all(self.data.t_var == 0.0):
431 self.statistical_estimates.estimates_list.append(
432 estimates_dict["variance"]
433 )
435 if self.data.z_invsigcov is not None:
436 if not np.all(self.data.z_invsigcov == 0.0):
437 self.statistical_estimates.estimates_list.append(
438 estimates_dict["inverse_signal_power"]
439 )
440 elif self.data.t_invsigcov is not None:
441 if not np.all(self.data.t_invsigcov == 0.0):
442 self.statistical_estimates.estimates_list.append(
443 estimates_dict["inverse_signal_power"]
444 )
446 if self.data.z_residcov is not None:
447 if not np.all(self.data.z_residcov == 0.0):
448 self.statistical_estimates.estimates_list.append(
449 estimates_dict["residual_covariance"]
450 )
451 elif self.data.t_residcov is not None:
452 if not np.all(self.data.t_residcov == 0.0):
453 self.statistical_estimates.estimates_list.append(
454 estimates_dict["residual_covariance"]
455 )
457 def _get_data_types(self):
458 """
459 get the appropriate data types for the file
461 :return: DESCRIPTION
462 :rtype: TYPE
464 """
465 self.data_types.data_types_list = []
466 if self.data.z is not None:
467 if not np.all(self.data.z == 0.0):
468 self.data_types.data_types_list.append(data_types_dict["impedance"])
470 if self.data.t is not None:
471 if not np.all(self.data.t == 0.0):
472 self.data_types.data_types_list.append(data_types_dict["tipper"])
474 def _update_site_layout(self):
475 """
476 Need to update site layout from statistical estimates.
478 :return: DESCRIPTION
479 :rtype: TYPE
481 """
482 input_channels = []
483 output_channels = []
484 if (self.data.z != 0).any():
485 input_channels += ["hx", "hy"]
486 output_channels += ["ex", "ey"]
488 if (self.data.t != 0).any():
489 output_channels += ["hz"]
491 if input_channels == []:
492 input_channels = ["hx", "hy"]
494 # Case-insensitive comparison for channel names
495 current_input_names = [
496 name.lower() for name in self.site_layout.input_channel_names
497 ]
498 if list(sorted(input_channels)) != list(sorted(current_input_names)):
499 new_input_channels = []
500 for ach in input_channels:
501 find = False
502 for ch in self.site_layout.input_channels:
503 if ch.name.lower() == ach.lower():
504 new_input_channels.append(ch)
505 find = True
506 if not find:
507 new_input_channels.append(ach)
509 self.site_layout.input_channels = new_input_channels
511 # Case-insensitive comparison for output channels
512 current_output_names = [
513 name.lower() for name in self.site_layout.output_channel_names
514 ]
515 if list(sorted(output_channels)) != list(sorted(current_output_names)):
516 new_output_channels = []
517 for ach in output_channels:
518 find = False
519 for ch in self.site_layout.output_channels:
520 if ch.name.lower() == ach.lower():
521 new_output_channels.append(ch)
522 find = True
523 if not find:
524 new_output_channels.append(ach)
525 self.site_layout.output_channels = new_output_channels
527 def _parse_comments_data_logger(self, key: str, value: str) -> tuple[str, str]:
528 """
529 parse comments for data logger information.
531 Parameters
532 ----------
533 key : str
534 key value from the comments dictionary
535 value : str
536 value from the comments dictionary
538 Returns
539 -------
540 tuple[str, str]
541 key, value
542 """
544 if "datalogger" in key:
545 key = key.replace("datalogger", "instrument")
546 key = key.split(".", 1)[1]
548 return key, value
550 def _parse_comments_data_quality(
551 self, key: str, value: str
552 ) -> tuple[str, str | float]:
553 """
554 parse comments for data quality information.
556 Parameters
557 ----------
558 key : str
559 key value from the comments dictionary
560 value : str
561 value from the comments dictionary
563 Returns
564 -------
565 tuple[str, str | float]
566 key, value
567 """
568 key = f"site.{key.split('.', 1)[1]}"
569 key = key.replace("dataquality", "data_quality_notes")
570 if "comments" in key:
571 key = key.replace("comments", "comments.value")
572 if "author" in key:
573 key = key.replace("author", "comments.author")
575 if "rating" in key:
576 value = float(value)
578 return key, value
580 def _parse_comments_electric(self, key: str, value: str) -> tuple[None, None]:
581 """
582 parse comments for electric channel information.
584 Parameters
585 ----------
586 key : str
587 key value from the comments dictionary
588 value : str
589 value from the comments dictionary
591 Returns
592 -------
593 tuple[None, None]
594 None, None
595 """
596 key = key.split(".", 1)[1]
597 key = key.replace("electrode_", "")
598 klist = key.split(".")
599 if len(klist) > 1:
600 comp = klist[0]
601 fkey = klist[1]
602 else:
603 comp = "ex"
604 fkey = klist[0]
606 if fkey in ["chtype", "manufacturer", "azm"]:
607 e_dict = {
608 "chtype": "name",
609 "manufacturer": "manufacturer",
610 "azm": "azimuth",
611 }
613 dipole_names = []
614 for d in self.field_notes.run_list[0].dipole:
615 if isinstance(d.name, str):
616 dipole_names.append(d.name.lower())
617 else:
618 dipole_names.append(d.name)
619 if comp.lower() in dipole_names:
620 index = dipole_names.index(comp)
621 elif None in dipole_names:
622 index = dipole_names.index(None)
623 else:
624 self.field_notes.run_list[0].magnetometer.append(
625 emtf_xml.Dipole(name=comp)
626 )
627 index = -1
629 setattr(
630 self.field_notes.run_list[0].dipole[index],
631 e_dict[fkey],
632 value,
633 )
634 elif fkey in ["x", "x2", "y", "y2", "z", "z2"]:
635 if len(self.site_layout.output_channels) == 0:
636 self.site_layout.output_channels.append(emtf_xml.Electric(name=comp))
637 ch_names = [c.name for c in self.site_layout.output_channels]
638 if comp in ch_names:
639 index = ch_names.index(comp)
640 else:
641 index = 0
642 self.site_layout.output_channels[index].update_attribute(fkey, value)
643 return None, None
645 def _parse_comments_magnetic(self, key: str, value: str) -> tuple[None, None]:
646 """
647 parse comments for magnetic channel information.
649 Parameters
650 ----------
651 key : str
652 key value from the comments dictionary
653 value : str
654 value from the comments dictionary
656 Returns
657 -------
658 tuple[None, None]
659 None, None
660 """
662 key = key.split(".", 1)[1]
663 key = key.replace("magnetometer_", "")
664 klist = key.split(".")
665 if len(klist) > 1:
666 comp = klist[0]
667 fkey = klist[1]
668 else:
669 comp = "hx"
670 fkey = klist[0]
672 if fkey in ["chtype", "manufacturer", "azm", "type", "acqchan"]:
673 m_dict = {
674 "chtype": "name",
675 "manufacturer": "manufacturer",
676 "azm": "azimuth",
677 "type": "type",
678 "acqchan": "id",
679 }
681 mag_names = []
682 for d in self.field_notes.run_list[0].magnetometer:
683 if isinstance(d.name, str):
684 mag_names.append(d.name.lower())
685 else:
686 mag_names.append(d.name)
687 if comp.lower() in mag_names:
688 index = mag_names.index(comp)
690 elif None in mag_names:
691 index = mag_names.index(None)
692 else:
693 self.field_notes.run_list[0].magnetometer.append(
694 emtf_xml.Magnetometer(name=comp)
695 )
696 index = -1
698 setattr(
699 self.field_notes.run_list[0].magnetometer[index],
700 m_dict[fkey],
701 value,
702 )
703 elif fkey in ["x", "y", "z"]:
704 if comp in ["hx", "hy"]:
705 if len(self.site_layout.output_channels) == 0:
706 self.site_layout.input_channels.append(emtf_xml.Magnetic(name=comp))
707 ch_names = [c.name for c in self.site_layout.output_channels]
708 if comp in ch_names:
709 index = ch_names.index(comp)
710 else:
711 index = 0
712 self.site_layout.output_channels[index].update_attribute(fkey, value)
713 elif comp in ["hz"]:
714 if len(self.site_layout.output_channels) == 0:
715 self.site_layout.output_channels.append(
716 emtf_xml.Magnetic(name=comp)
717 )
718 ch_names = [c.name for c in self.site_layout.output_channels]
719 if comp in ch_names:
720 index = ch_names.index(comp)
721 else:
722 index = 0
724 self.site_layout.output_channels[index].update_attribute(fkey, value)
725 return None, None
727 def _parse_comments_processing(
728 self, key: str, value: str
729 ) -> tuple[str | None, str | None]:
730 """
731 parse comments for processing information.
733 Parameters
734 ----------
735 key : str
736 key value from the comments dictionary
737 value : str
738 value from the comments dictionary
740 Returns
741 -------
742 tuple[str | None, str | None]
743 _description_
744 """
746 key = key.replace("processing", "processing_info").replace(
747 "software", "processing_software"
748 )
749 if "author.name" in key:
750 key = key.replace("author.name", "author")
751 for item in [
752 "author.email",
753 "author.organization",
754 "author.organization_url",
755 "processing_software.version",
756 "author.organization",
757 "author.organization_url",
758 ]:
759 if item in key:
760 return None, None
762 return key, value
764 def _parse_comments(self, comments: str | None) -> None:
765 """
766 Parse comments for processing information.
768 Parameters
769 ----------
770 comments : str | None
771 Comments to parse.
772 """
773 if comments is None:
774 return
775 other = []
776 if comments.count("\n") > 0 and comments.count("=") > 0:
777 comments = comments.replace("\n", ";").replace("=", ":")
778 for comment in comments.split(";"):
779 if comment.count(":") >= 1:
780 key, value = [c.strip() for c in comment.split(":", 1)]
781 if "fieldnotes" in key:
782 if len(self.field_notes.run_list) == 0:
783 self.field_notes.run_list.append(emtf_xml.Run())
785 if "datalogger" in key:
786 key, value = self._parse_comments_data_logger(key, value)
787 try:
788 self.field_notes.run_list[0].update_attribute(key, value)
789 key = None
790 value = None
791 except:
792 pass
794 elif "fieldnotes" in key and "dataquality" in key:
795 key, value = self._parse_comments_data_quality(key, value)
797 elif "fieldnotes" in key and "electrode_" in key:
798 key, value = self._parse_comments_electric(key, value)
799 elif "fieldnotes" in key and "magnetometer_" in key:
800 key, value = self._parse_comments_magnetic(key, value)
801 elif "processing" in key:
802 key, value = self._parse_comments_processing(key, value)
804 if key is not None and value is not None:
805 if "." in key:
806 obj, attr_key = key.split(".", 1)
807 try:
808 getattr(self, obj).update_attribute(attr_key, value)
809 except:
810 logger.warning(f"Cannot set attribute {key}.")
811 elif key == "description":
812 # Handle description as direct attribute on the EMTFXML object
813 self.description = value
814 else:
815 # Handle other keys without dots
816 other.append(f"{key}:{value}")
817 else:
818 other.append(comment)
819 try:
820 self.site.comments.value = "; ".join(other)
821 except AttributeError:
822 pass
824 @property
825 def survey_metadata(self):
826 survey_obj = Survey()
827 survey_obj.acquired_by.author = self.site.acquired_by
828 survey_obj.citation_dataset.authors = self.copyright.citation.authors
829 survey_obj.citation_dataset.title = self.copyright.citation.title
830 survey_obj.citation_dataset.year = self.copyright.citation.year
831 survey_obj.citation_dataset.doi = self.copyright.citation.survey_d_o_i
832 survey_obj.country = self.site.country
833 survey_obj.geographic_name = self.site.survey
834 if self.site.survey not in NULL_VALUES:
835 survey_obj.id = self.site.survey
836 survey_obj.project = self.site.project
837 survey_obj.time_period.start_date = self.site.start
838 survey_obj.time_period.end_date = self.site.end
839 survey_obj.summary = self.description
840 survey_obj.comments.value = "; ".join(
841 [
842 f"{k}:{v}"
843 for k, v in {
844 "copyright.acknowledgement": self.copyright.acknowledgement,
845 "copyright.conditions_of_use": self.copyright.conditions_of_use,
846 "copyright.release_status": self.copyright.release_status,
847 "copyright.selected_publications": self.copyright.selected_publications,
848 "copyright.additional_info": self.copyright.additional_info,
849 }.items()
850 if v not in [None, ""]
851 ]
852 )
854 survey_obj.add_station(self.station_metadata)
856 return survey_obj
858 @survey_metadata.setter
859 def survey_metadata(self, sm: Survey) -> None:
860 """
861 Set metadata and other values in metadata
863 :param sm: survey metadata object
864 :type sm: Survey
865 :return: None
866 :rtype: None
868 """
869 self.description = sm.summary
870 self.site.project = sm.project
871 if sm.geographic_name is None:
872 self.site.survey = sm.id
873 else:
874 self.site.survey = sm.geographic_name
875 if sm.country is not None:
876 self.site.country = ",".join(sm.country)
877 self.copyright.citation.survey_d_o_i = sm.citation_dataset.doi
879 self.copyright.citation.authors = sm.citation_dataset.authors
880 self.copyright.citation.title = sm.citation_dataset.title
881 self.copyright.citation.year = sm.citation_dataset.year
883 self._parse_comments(sm.comments.value)
885 @property
886 def station_metadata(self):
887 s = Station()
888 # if self._root_dict is not None:
889 s.acquired_by.author = self.site.acquired_by
890 s.channels_recorded = [d.name for d in self.site_layout.input_channels] + [
891 d.name for d in self.site_layout.output_channels
892 ]
893 s.data_type = self.sub_type.lower().split("_")[0]
894 s.geographic_name = self.site.name
895 s.id = self.site.id
896 s.fdsn.id = self.product_id
897 s.location.from_dict(self.site.location.to_dict())
898 s.orientation.angle_to_geographic_north = (
899 self.site.orientation.angle_to_geographic_north
900 )
901 s.provenance.software.name = self.provenance.creating_application
902 s.provenance.creation_time = self.provenance.create_time
903 s.provenance.creator.author = self.provenance.creator.name
904 s.provenance.creator.email = self.provenance.creator.email
905 s.provenance.creator.organization = self.provenance.creator.organization
906 s.provenance.creator.url = self.provenance.creator.url
907 s.provenance.submitter.author = self.provenance.submitter.name
908 s.provenance.submitter.email = self.provenance.submitter.email
909 s.provenance.submitter.organization = self.provenance.submitter.organization
910 s.provenance.submitter.url = self.provenance.submitter.url
912 s.provenance.archive.url = self.external_url.url
913 s.provenance.archive.comments = self.external_url.description
915 s.time_period.start = self.site.start
916 s.time_period.end = self.site.end
918 comments = {"description": self.description}
919 for key in [
920 "primary_data.filename",
921 "attachment.description",
922 "attachment.filename",
923 "site.data_quality_notes.comments.author",
924 "site.data_quality_notes.comments.value",
925 "site.data_quality_warnings.flag",
926 "site.data_quality_warnings.comments.author",
927 "site.data_quality_warnings.comments.value",
928 ]:
929 obj, attr_key = key.split(".", 1)
930 comments[key] = getattr(self, obj).get_attr_from_name(attr_key)
931 s.comments.value = "; ".join(
932 [f"{k}:{v}" for k, v in comments.items() if v not in [None, ""]]
933 )
935 s.transfer_function.id = self.site.id
936 s.transfer_function.sign_convention = self.processing_info.sign_convention
937 s.transfer_function.processed_by.author = self.processing_info.processed_by
938 s.transfer_function.software.author = (
939 self.processing_info.processing_software.author
940 )
941 s.transfer_function.software.name = (
942 self.processing_info.processing_software.name
943 )
944 s.transfer_function.software.last_updated = (
945 self.processing_info.processing_software.last_mod
946 )
947 # need to use remote reference info if it has it.
948 if self.processing_info.remote_info.site.id is not None:
949 s.transfer_function.remote_references = [
950 self.processing_info.remote_info.site.id
951 ]
953 elif self.processing_info.processing_tag is not None:
954 if "_" in self.processing_info.processing_tag:
955 remotes = [
956 rr
957 for rr in self.processing_info.processing_tag.split("_")
958 if self.site.id not in rr
959 ]
960 s.transfer_function.remote_references = remotes
961 s.transfer_function.runs_processed = self.site.run_list
962 s.transfer_function.processing_type = self.processing_info.remote_ref.type
964 if self.processing_info.remote_info.site.id is not None:
965 for key in self.processing_info.remote_info.site.get_attribute_list():
966 value = self.processing_info.remote_info.site.get_attr_from_name(key)
968 if "location" in key:
969 if value == 0.0:
970 continue
971 elif value not in NULL_VALUES:
972 # need to add remote site information
973 # Handle Enum types by using .value property
974 str_value = value.value if isinstance(value, Enum) else value
975 s.transfer_function.processing_parameters.append(
976 f"remote_info.site.{key} = {str_value}"
977 )
979 # need to add remote site field notes information
980 for rfn in self.processing_info.remote_info.field_notes._run_list:
981 rr_dict = rfn.to_dict(single=True)
982 for rr_key, rr_value in rr_dict.items():
983 if rr_value not in [
984 None,
985 "1980",
986 1980,
987 "1980-01-01T00:00:00+00:00",
988 [],
989 "",
990 ]:
991 # Handle Enum types by using .value property
992 str_value = (
993 rr_value.value if isinstance(rr_value, Enum) else rr_value
994 )
995 s.transfer_function.processing_parameters.append(
996 f"remote_info.field_notes.{rr_key} = {str_value}"
997 )
998 for ii, dp in enumerate(rfn.dipole):
999 dp_dict = dp.to_dict(single=True)
1000 for dp_key, dp_value in dp_dict.items():
1001 if dp_value not in [None, ""]:
1002 # Handle Enum types by using .value property
1003 str_value = (
1004 dp_value.value
1005 if isinstance(dp_value, Enum)
1006 else dp_value
1007 )
1008 s.transfer_function.processing_parameters.append(
1009 f"remote_info.field_notes.dipole_{ii}.{dp_key} = {str_value}"
1010 )
1011 for ii, mag in enumerate(rfn.magnetometer):
1012 mag_dict = mag.to_dict(single=True)
1013 for mag_key, mag_value in mag_dict.items():
1014 if mag_value not in [None, ""]:
1015 # Handle Enum types by using .value property
1016 str_value = (
1017 mag_value.value
1018 if isinstance(mag_value, Enum)
1019 else mag_value
1020 )
1021 s.transfer_function.processing_parameters.append(
1022 f"remote_info.field_notes.magnetometer_{ii}.{mag_key} = {str_value}"
1023 )
1025 s.transfer_function.data_quality.good_from_period = (
1026 self.site.data_quality_notes.good_from_period
1027 )
1028 s.transfer_function.data_quality.good_to_period = (
1029 self.site.data_quality_notes.good_to_period
1030 )
1031 s.transfer_function.data_quality.rating.value = (
1032 self.site.data_quality_notes.rating
1033 )
1035 for fn in self.field_notes._run_list:
1036 if fn.sampling_rate in [0, None]:
1037 continue
1038 r = Run()
1039 r.id = fn.run
1040 r.data_logger.id = fn.instrument.id
1041 r.data_logger.type = fn.instrument.name
1042 r.data_logger.manufacturer = fn.instrument.manufacturer
1043 r.sample_rate = fn.sampling_rate
1044 r.time_period.start = fn.start
1045 r.time_period.end = fn.end
1046 comments = []
1047 if fn.comments.author not in [None, ""]:
1048 comments.append(f"comments.author:{fn.comments.author}")
1049 if fn.comments.value not in [None, ""]:
1050 comments.append(f"comments.value:{fn.comments.value}")
1051 if fn.errors not in [None, ""]:
1052 comments.append(f"errors:{fn.errors}")
1053 r.comments = "; ".join(comments)
1055 # need to set azimuths from site layout with the x, y, z postions.
1056 if len(fn.magnetometer) == 1:
1057 for comp in ["hx", "hy", "hz"]:
1058 c = Magnetic()
1059 c.component = comp
1060 c.sensor.id = fn.magnetometer[0].id
1061 c.sensor.name = fn.magnetometer[0].name
1062 c.sensor.manufacturer = fn.magnetometer[0].manufacturer
1063 c.sensor.type = fn.magnetometer[0].type
1064 c.time_period.start = fn.start
1065 c.time_period.end = fn.end
1066 r.add_channel(c)
1068 else:
1069 for mag in fn.magnetometer:
1070 comp = mag.name
1071 if comp is None:
1072 continue
1073 c = Magnetic()
1074 c.component = comp.lower()
1075 c.sensor.id = mag.id
1076 c.sensor.name = mag.name
1077 c.sensor.manufacturer = mag.manufacturer
1078 c.sensor.type = mag.type
1079 c.time_period.start = fn.start
1080 c.time_period.end = fn.end
1081 r.add_channel(c)
1083 for dp in fn.dipole:
1084 comp = dp.name
1085 if comp is None:
1086 continue
1087 c = Electric()
1088 c.component = comp.lower()
1089 c.translated_azimuth = dp.azimuth
1090 c.dipole_length = dp.length
1091 for pot in dp.electrode:
1092 if pot.location.lower() in ["n", "e"]:
1093 c.positive.id = pot.number
1094 c.positive.type = pot.value
1095 c.positive.manufacturer = dp.manufacturer
1096 c.positive.type = pot.comments.as_string()
1098 elif pot.location.lower() in ["s", "w"]:
1099 c.negative.id = pot.number
1100 c.negative.type = pot.value
1101 c.negative.manufacturer = dp.manufacturer
1102 c.negative.type = pot.comments.as_string()
1103 c.time_period.start = fn.start
1104 c.time_period.end = fn.end
1105 r.add_channel(c)
1107 for ch in (
1108 self.site_layout.input_channels + self.site_layout.output_channels
1109 ):
1110 try:
1111 c = r.get_channel(ch.name.lower())
1112 except AttributeError:
1113 # if the channel does not exist, create it.
1114 if ch.name.lower() in ["ex", "ey"]: # electric channels
1115 c = Electric()
1116 elif ch.name.lower() in ["hx", "hy", "hz"]: # magnetic channels
1117 c = Magnetic()
1118 c.from_dict(ch.to_dict(single=True))
1119 r.add_channel(c)
1121 if c.component in ["hx", "hy", "hz"]:
1122 c.location.x = ch.x
1123 c.location.y = ch.y
1124 c.location.z = ch.z
1126 elif c.component in ["ex", "ey"]:
1127 c.negative.x = ch.x
1128 c.negative.y = ch.y
1129 c.negative.z = ch.z
1130 c.positive.x2 = ch.x2
1131 c.positive.y2 = ch.y2
1132 c.positive.z2 = ch.z2
1133 c.measurement_azimuth = ch.orientation
1134 c.translated_azimuth = ch.orientation
1135 c.time_period.start = fn.start
1136 c.time_period.end = fn.end
1137 s.add_run(r)
1139 if self.field_notes._run_list == []:
1140 r = Run(id=f"{s.id}a")
1141 r.channels_recorded_electric = ["ex", "ey"]
1142 if (self.data.t == 0).all():
1143 r.channels_recorded_magnetic = ["hx", "hy"]
1144 else:
1145 r.channels_recorded_magnetic = ["hx", "hy", "hz"]
1147 for ch in (
1148 self.site_layout.input_channels + self.site_layout.output_channels
1149 ):
1150 c = r.get_channel(ch.name.lower())
1151 if c.component in r.channels_recorded_magnetic:
1152 c.location.x = ch.x
1153 c.location.y = ch.y
1154 c.location.z = ch.z
1156 elif c.component in r.channels_recorded_electric:
1157 c.negative.x = ch.x
1158 c.negative.y = ch.y
1159 c.negative.z = ch.z
1160 c.positive.x2 = ch.x2
1161 c.positive.y2 = ch.y2
1162 c.positive.z2 = ch.z2
1163 c.measurement_azimuth = ch.orientation
1164 c.translated_azimuth = ch.orientation
1165 c.time_period.start = s.time_period.start
1166 c.time_period.end = s.time_period.end
1168 s.add_run(r)
1170 return s
1172 @station_metadata.setter
1173 def station_metadata(self, station_metadata: Station) -> None:
1174 """
1175 Set metadata and other values in metadata
1177 :param sm: survey metadata object
1178 :type sm: SurveyMetadata
1179 :return: None
1180 :rtype: None
1182 """
1183 sm = station_metadata
1185 self.site.acquired_by = sm.acquired_by.author
1186 if sm.data_type is not None:
1187 self.sub_type = f"{sm.data_type.upper()}_TF"
1188 else:
1189 self.sub_type = DataTypeEnum.MT_TF
1190 self.site.name = sm.geographic_name
1191 self.site.id = sm.id
1192 self.product_id = sm.fdsn.id
1193 self.site.location.latitude = sm.location.latitude
1194 self.site.location.longitude = sm.location.longitude
1195 self.site.location.elevation = sm.location.elevation
1196 self.site.orientation.angle_to_geographic_north = (
1197 sm.orientation.angle_to_geographic_north
1198 if sm.orientation.angle_to_geographic_north is not None
1199 else 0.0
1200 )
1202 self.provenance.creating_application = sm.provenance.software.name
1203 self.provenance.create_time = sm.provenance.creation_time
1204 self.provenance.creator.name = sm.provenance.creator.author
1205 self.provenance.creator.email = sm.provenance.creator.email
1206 self.provenance.creator.organization = sm.provenance.creator.organization
1207 self.provenance.creator.url = sm.provenance.creator.url
1208 self.provenance.submitter.name = sm.provenance.submitter.author
1209 self.provenance.submitter.email = sm.provenance.submitter.email
1210 self.provenance.submitter.organization = sm.provenance.submitter.organization
1211 self.provenance.submitter.url = sm.provenance.submitter.url
1213 self.external_url.url = (
1214 sm.provenance.archive.url if sm.provenance.archive.url is not None else ""
1215 )
1216 self.external_url.description = (
1217 sm.provenance.archive.comments.value
1218 if sm.provenance.archive.comments.value is not None
1219 else ""
1220 )
1222 self.site.start = sm.time_period.start
1223 self.site.end = sm.time_period.end
1224 # Extract year from start date for year_collected
1225 if sm.time_period.start:
1226 try:
1227 # Handle different types of start time (datetime, string, etc.)
1228 if hasattr(sm.time_period.start, "year"):
1229 self.site.year_collected = sm.time_period.start.year
1230 elif isinstance(sm.time_period.start, str):
1231 # Extract year from ISO date string (YYYY-MM-DD...)
1232 self.site.year_collected = int(sm.time_period.start[:4])
1233 except (ValueError, AttributeError, TypeError):
1234 # If extraction fails, leave as None
1235 pass
1237 self.processing_info.sign_convention = sm.transfer_function.sign_convention
1238 self.processing_info.processed_by = sm.transfer_function.processed_by.author
1239 self.processing_info.process_date = sm.transfer_function.processed_date
1240 self.processing_info.processing_software.author = (
1241 sm.transfer_function.software.author
1242 )
1243 self.processing_info.processing_software.name = (
1244 sm.transfer_function.software.name
1245 )
1246 self.processing_info.processing_software.last_mod = (
1247 sm.transfer_function.software.last_updated
1248 )
1249 tag = []
1250 if sm.transfer_function.runs_processed is not None:
1251 tag += sm.transfer_function.runs_processed
1252 if sm.transfer_function.remote_references is not None:
1253 tag += sm.transfer_function.remote_references
1254 self.processing_info.processing_tag = "_".join(tag)
1255 self.processing_info.remote_ref.type = sm.transfer_function.processing_type
1256 for param in sm.transfer_function.processing_parameters:
1257 if isinstance(param, str):
1258 sep = None
1259 if param.count("=") >= 1:
1260 sep = "="
1261 elif param.count(":") >= 1:
1262 sep = ":"
1264 if sep:
1265 key, value = [k.strip() for k in param.split(sep, 1)]
1266 if "remote_info.field_notes" in key:
1267 key = key.replace("remote_info.field_notes.", "")
1268 if (
1269 len(self.processing_info.remote_info.field_notes._run_list)
1270 == 0
1271 ):
1272 self.processing_info.remote_info.field_notes._run_list.append(
1273 meta_classes["run"]()
1274 )
1276 run = self.processing_info.remote_info.field_notes._run_list[0]
1278 if "dipole" in key:
1279 try:
1280 # Handle different dipole key formats
1281 if "_" in key and "." in key:
1282 # Format: dipole_0.name, dipole_1.length, etc.
1283 index = int(key.split("_")[1].split(".")[0])
1284 attr_key = key.split(".", 1)[1]
1285 if len(run.dipole) < (index + 1):
1286 run.dipole.append(meta_classes["dipole"]())
1287 run.dipole[index].update_attribute(attr_key, value)
1288 elif key == "dipole" and isinstance(value, (list, str)):
1289 # Handle complex dipole value formats
1290 # This could be a list of dictionaries or serialized structure
1291 dipole_list = value
1292 if isinstance(value, str):
1293 # Try to evaluate if it's a string representation of a structure
1294 try:
1295 import ast
1297 dipole_list = ast.literal_eval(value)
1298 except (ValueError, SyntaxError):
1299 # If not evaluable, try to process as simple string
1300 logger.debug(
1301 f"Using dipole value as string: {value}"
1302 )
1303 dipole_list = [
1304 {"dipole": {"name": str(value)}}
1305 ]
1307 # Process list of dipole dictionaries
1308 if isinstance(dipole_list, list):
1309 for idx, dipole_data in enumerate(dipole_list):
1310 if (
1311 isinstance(dipole_data, dict)
1312 and "dipole" in dipole_data
1313 ):
1314 dipole_info = dipole_data["dipole"]
1315 if len(run.dipole) < (idx + 1):
1316 run.dipole.append(
1317 meta_classes["dipole"]()
1318 )
1320 # Set dipole attributes from the dictionary
1321 for (
1322 attr_name,
1323 attr_value,
1324 ) in dipole_info.items():
1325 try:
1326 run.dipole[
1327 idx
1328 ].update_attribute(
1329 attr_name, attr_value
1330 )
1331 except Exception as attr_error:
1332 logger.warning(
1333 f"Cannot set dipole attribute {attr_name}: {attr_error}"
1334 )
1335 else:
1336 logger.warning(
1337 f"Dipole value is not a list: {type(dipole_list)}"
1338 )
1339 else:
1340 # Unknown dipole key format
1341 logger.warning(f"Unknown dipole key format: {key}")
1342 except (IndexError, ValueError) as error:
1343 logger.warning(
1344 f"Cannot parse dipole processing info attribute {param}: {error}"
1345 )
1346 except Exception as error:
1347 logger.warning(
1348 f"Cannot set processing info attribute {param}: {error}"
1349 )
1350 # logger.exception(error)
1351 elif "magnetometer" in key:
1352 try:
1353 index = int(key.split("_")[1].split(".")[0])
1354 key_parts = key.split(".", 1)
1355 if len(key_parts) > 1:
1356 key_attr = key_parts[1]
1357 if len(run.magnetometer) < (index + 1):
1358 run.magnetometer.append(
1359 meta_classes["magnetometer"]()
1360 )
1361 run.magnetometer[index].update_attribute(
1362 key_attr, value
1363 )
1364 except (IndexError, ValueError) as error:
1365 logger.warning(
1366 f"Cannot parse magnetometer processing info attribute {param}: {error}"
1367 )
1368 except Exception as error:
1369 logger.warning(
1370 f"Cannot set processing info attribute {param}"
1371 )
1372 # logger.exception(error)
1373 else:
1374 try:
1375 run.update_attribute(key, value)
1376 except Exception as error:
1377 logger.warning(
1378 f"Cannot set processing info attribute {param}"
1379 )
1380 # logger.exception(error)
1381 else:
1382 try:
1383 self.processing_info.update_attribute(key, value)
1384 except Exception as error:
1385 logger.warning(
1386 f"Cannot set processing info attribute {param}"
1387 )
1388 # logger.exception(error)
1390 self.site.run_list = sm.transfer_function.runs_processed
1392 self.site.data_quality_notes.good_from_period = (
1393 sm.transfer_function.data_quality.good_from_period
1394 )
1395 self.site.data_quality_notes.good_to_period = (
1396 sm.transfer_function.data_quality.good_to_period
1397 )
1398 self.site.data_quality_notes.rating = (
1399 sm.transfer_function.data_quality.rating.value
1400 )
1401 self.site.data_quality_notes.comments.value = (
1402 sm.transfer_function.data_quality.comments.value
1403 )
1405 # not sure there is a place to put processing parameters yet
1407 # self.processing_info.processing_software., value, value_dict)s.transfer_function.processing_parameters.append(
1408 # {"type": self.processing_info.remote_ref.type}
1409 # )
1410 self.field_notes._run_list = []
1411 ch_in_dict = {}
1412 ch_out_dict = {}
1413 for r in sm.runs:
1414 fn = emtf_xml.Run()
1415 fn.dipole = []
1416 fn.magnetometer = []
1417 fn.instrument.id = r.data_logger.id
1418 fn.instrument.name = r.data_logger.type
1419 fn.instrument.manufacturer = r.data_logger.manufacturer
1420 fn.sampling_rate = r.sample_rate
1421 fn.start = r.time_period.start
1422 fn.end = r.time_period.end
1423 fn.run = r.id
1424 if r.comments is not None:
1425 # Handle both string and Comment object types
1426 comments_str = (
1427 r.comments.value
1428 if hasattr(r.comments, "value")
1429 else str(r.comments)
1430 )
1431 if comments_str:
1432 for comment in comments_str.split(";"):
1433 if comment.count(":") >= 1:
1434 key, value = comment.split(":", 1)
1435 try:
1436 fn.update_attribute(key.strip(), value.strip())
1437 except:
1438 raise AttributeError(f"Cannot set attribute {key}.")
1440 for comp in ["hx", "hy", "hz"]:
1441 try:
1442 rch = r.get_channel(comp)
1443 mag = emtf_xml.Magnetometer() # type: ignore
1444 mag.id = rch.sensor.id
1445 mag.name = comp
1446 mag.manufacturer = rch.sensor.manufacturer
1447 mag.type = rch.sensor.type
1448 fn.magnetometer.append(mag)
1450 # long period magnetometer
1451 if rch.sensor.name in [
1452 "NIMS",
1453 "LEMI",
1454 ] and rch.sensor.type in ["fluxgate"]:
1455 break
1457 except AttributeError:
1458 logger.debug(
1459 f"Did not find {comp} in run",
1460 )
1462 for comp in ["ex", "ey"]:
1463 try:
1464 c = r.get_channel(comp)
1465 dp = emtf_xml.Dipole() # type: ignore
1466 dp.name = comp.capitalize()
1467 dp.azimuth = c.translated_azimuth
1468 dp.length = c.dipole_length
1469 dp.manufacturer = c.positive.manufacturer
1470 dp.type = "wire"
1471 # fill electrodes
1472 pot_p = emtf_xml.Electrode() # type: ignore
1473 pot_p.number = c.positive.id
1474 pot_p.location = "n" if comp == "ex" else "e"
1475 pot_p.comments = c.positive.type
1477 dp.electrode.append(pot_p)
1478 pot_n = emtf_xml.Electrode() # type: ignore
1479 pot_n.number = c.negative.id
1480 pot_n.comments = c.positive.type
1481 pot_n.location = "s" if comp == "ex" else "w"
1482 dp.electrode.append(pot_n)
1483 fn.dipole.append(dp)
1485 except AttributeError:
1486 logger.debug(f"Did not find {comp} in run")
1488 self.field_notes._run_list.append(fn)
1490 for comp in ["hx", "hy", "hz"]:
1491 try:
1492 ch = r.get_channel(comp)
1493 m_ch = emtf_xml.Magnetic() # type: ignore
1495 for item in ["x", "y", "z"]:
1496 if getattr(ch.location, item) is None:
1497 value = 0.0
1498 else:
1499 value = getattr(ch.location, item)
1500 setattr(m_ch, item, value)
1502 m_ch.name = comp.capitalize()
1503 if ch.translated_azimuth is not None:
1504 m_ch.orientation = ch.translated_azimuth
1505 else:
1506 m_ch.orientation = ch.measurement_azimuth
1508 if comp in ["hx", "hy"]:
1509 ch_in_dict[comp] = m_ch
1510 else:
1511 ch_out_dict[comp] = m_ch
1512 except AttributeError:
1513 logger.debug(f"Did not find {comp} in run")
1515 for comp in ["ex", "ey"]:
1516 try:
1517 ch = r.get_channel(comp)
1518 ch_out = emtf_xml.Electric() # type: ignore
1519 for item in ["x", "y", "z"]:
1520 if getattr(ch.negative, item) is None:
1521 value = 0.0
1522 else:
1523 value = getattr(ch.negative, item)
1524 setattr(ch_out, item, value)
1526 for item in ["x2", "y2", "z2"]:
1527 if getattr(ch.positive, item) is None:
1528 value = 0.0
1529 else:
1530 value = getattr(ch.positive, item)
1531 setattr(ch_out, item, value)
1533 ch_out.name = comp.capitalize()
1534 if ch.translated_azimuth is not None:
1535 ch_out.orientation = ch.translated_azimuth
1536 else:
1537 ch_out.orientation = ch.measurement_azimuth
1539 if (
1540 ch_out.x == 0
1541 and ch_out.y == 0
1542 and ch_out.x2 == 0
1543 and ch_out.y2 == 0
1544 ):
1545 if comp in ["ex"]:
1546 ch_out.x2 = ch.dipole_length
1547 elif comp in ["ey"]:
1548 ch_out.y2 = ch.dipole_length
1550 ch_out_dict[comp] = ch_out
1551 except AttributeError:
1552 logger.debug(f"Did not find {comp} in run")
1554 self.site_layout.input_channels = list(ch_in_dict.values())
1555 self.site_layout.output_channels = list(ch_out_dict.values())
1557 self._parse_comments(sm.comments.value)