Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ stationxml \ xml_channel_mt_channel.py: 79%
307 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Fri Feb 19 16:14:41 2021
5:copyright:
6 Jared Peacock (jpeacock@usgs.gov)
8:license: MIT
10"""
13# =============================================================================
14# Imports
15# =============================================================================
16import copy
17from collections import OrderedDict
19from loguru import logger
21from mt_metadata.base.helpers import requires
22from mt_metadata.common.units import get_unit_object
23from mt_metadata.timeseries import AppliedFilter, Auxiliary, Electric, Magnetic
24from mt_metadata.timeseries.filters.obspy_stages import create_filter_from_stage
25from mt_metadata.timeseries.stationxml.fdsn_tools import (
26 create_mt_component,
27 make_channel_code,
28 read_channel_code,
29 release_dict,
30)
31from mt_metadata.timeseries.stationxml.utils import BaseTranslator
34try:
35 from obspy import UTCDateTime
36 from obspy.core import inventory
37except ImportError as error:
38 inventory = None
39 UTCDateTime = None
41# =============================================================================
44@requires(obspy=inventory)
45class XMLChannelMTChannel(BaseTranslator):
46 """
47 translate back and forth between StationXML Channel and MT Channel
48 """
50 understood_sensor_types = [
51 "logger",
52 "magnetometer",
53 "induction coil",
54 "coil",
55 "dipole",
56 "electrode",
57 ]
59 def __init__(self):
60 super().__init__()
62 self.xml_translator.update(
63 {
64 "azimuth": "measurement_azimuth",
65 "calibration_units": None,
66 "clock_drift": None,
67 "comments": None,
68 "description": None,
69 "dip": "measurement_tilt",
70 "end_date": "time_period.end",
71 "equipments": None,
72 "pre_amplifier": None,
73 "response": None,
74 "sample_rate": "sample_rate",
75 "sensor": None,
76 "start_date": "time_period.start",
77 "types": "special",
78 "water_level": None,
79 "alternate_code": "component",
80 "latitude": None,
81 "longitude": None,
82 "elevation": None,
83 }
84 )
86 # StationXML to MT Survey
87 self.mt_translator = self.flip_dict(self.xml_translator)
89 self.mt_comments_list = ["run.id"]
90 self.run_list = None
92 def xml_to_mt(self, xml_channel, existing_filters={}):
93 """
94 Translate :class:`obspy.core.inventory.Channel` to
95 :class:`mt_metadata.timeseries.Channel`
97 :param xml_channel: Obspy Channel object
98 :type xml_channel: :class:`obspy.core.inventory.Channel`
99 :returns: MT Channel
100 :rtype: :class:`mt_metadata.timeseries.Channel`
102 """
104 if not isinstance(xml_channel, inventory.Channel):
105 msg = f"Input must be obspy.core.inventory.Channel object not {type(xml_channel)}"
106 logger.error(msg)
107 raise TypeError(msg)
109 ch_dict = read_channel_code(xml_channel.code)
110 if ch_dict["measurement"] in ["electric"]:
111 mt_channel = Electric()
112 elif ch_dict["measurement"] in ["magnetic"]:
113 mt_channel = Magnetic()
114 else:
115 mt_channel = Auxiliary(type=ch_dict["measurement"])
117 # Always set component from XML channel code, overriding any defaults
118 mt_channel.component = create_mt_component(xml_channel.code)
120 mt_channel = self._get_mt_position(xml_channel, mt_channel)
121 mt_channel = self._parse_xml_comments(xml_channel.comments, mt_channel)
122 mt_channel = self._sensor_to_mt(xml_channel.sensor, mt_channel)
123 mt_channel = self._get_mt_units(xml_channel, mt_channel)
124 mt_filters = self._xml_response_to_mt(xml_channel, existing_filters)
126 for xml_key, mt_key in self.xml_translator.items():
127 if mt_key:
128 value = getattr(xml_channel, xml_key)
129 if value:
130 mt_channel.update_attribute(mt_key, value)
132 # fill channel filters
133 for filter_name, mt_filter in mt_filters.items():
134 mt_channel.add_filter(
135 AppliedFilter(
136 name=filter_name,
137 applied=True,
138 comments=mt_filter.comments,
139 stage=mt_filter.sequence_number,
140 )
141 )
143 if UTCDateTime(mt_channel.time_period.end.time_stamp) < UTCDateTime(
144 mt_channel.time_period.start.time_stamp
145 ):
146 mt_channel.time_period.end = "2200-01-01T00:00:00+00:00"
147 return mt_channel, mt_filters
149 def mt_to_xml(self, mt_channel, filters_dict, hard_code=True):
150 """
151 Translate :class:`mt_metadata.timeseries.Channel` to
152 :class:`obspy.core.inventory.Channel`
155 :param xml_channel: MT Channel object
156 :type xml_channel: :class:`mt_metadata.timeseries.Channel`
157 :returns: MT Channel
158 :rtype: :class:`obspy.core.inventory.Channel`
160 """
162 if not isinstance(
163 mt_channel,
164 (Electric, Magnetic, Auxiliary),
165 ):
166 msg = f"Input must be mt_metadata.timeseries.Channel object not {type(mt_channel)}"
167 logger.error(msg)
168 raise TypeError(msg)
170 # location_code = get_location_code(mt_channel)
171 if not hard_code:
172 alignement = "horizontal"
173 if "z" in mt_channel.component.lower():
174 alignement = "vertical"
176 channel_code = make_channel_code(
177 mt_channel.sample_rate,
178 mt_channel.type,
179 mt_channel.measurement_azimuth,
180 orientation=alignement,
181 )
182 # this assumes the last character of the component is the orientation
183 # direction
184 elif hard_code:
185 channel_code = make_channel_code(
186 mt_channel.sample_rate,
187 mt_channel.type,
188 mt_channel.component[-1].lower(),
189 )
191 is_electric = mt_channel.type in ["electric"]
192 if is_electric:
193 xml_channel = inventory.Channel(
194 channel_code,
195 "",
196 mt_channel.positive.latitude,
197 mt_channel.positive.longitude,
198 mt_channel.positive.elevation,
199 0,
200 )
201 else:
202 xml_channel = inventory.Channel(
203 channel_code,
204 "",
205 mt_channel.location.latitude,
206 mt_channel.location.longitude,
207 mt_channel.location.elevation,
208 0,
209 )
211 xml_channel.types = ["geophysical".upper()]
212 xml_channel.sensor = self._mt_to_sensor(mt_channel)
213 xml_channel.comments = self._make_xml_comments(mt_channel.comments)
214 xml_channel.restricted_status = release_dict[xml_channel.restricted_status]
215 xml_channel = self._mt_to_xml_response(mt_channel, filters_dict, xml_channel)
216 xml_channel.restricted_status = release_dict[xml_channel.restricted_status]
217 xml_channel = self._mt_to_xml_response(mt_channel, filters_dict, xml_channel)
219 for mt_key, xml_key in self.mt_translator.items():
220 if xml_key is None:
221 msg = f"Cannot currently map {mt_key} to inventory.station.{xml_key}"
222 logger.debug(msg)
223 continue
225 # obspy only allows angles (0, 360)
226 if xml_key in ["azimuth"]:
227 xml_channel.azimuth = mt_channel.measurement_azimuth % 360
229 elif xml_key in ["dip"]:
230 xml_channel.dip = mt_channel.measurement_tilt % 360
232 elif "time_period" in mt_key:
233 value = mt_channel.get_attr_from_name(mt_key).time_stamp
234 setattr(xml_channel, xml_key, value)
236 else:
237 setattr(xml_channel, xml_key, mt_channel.get_attr_from_name(mt_key))
239 return xml_channel
241 def _sensor_to_mt(self, sensor, mt_channel):
242 """
243 Fill an MT channel with sensor information. It is slightly different
244 depending on electric or magnetic.
246 :param sensor: DESCRIPTION
247 :type sensor: TYPE
248 :param mt_channel: DESCRIPTION
249 :type mt_channel: TYPE
250 :return: DESCRIPTION
251 :rtype: TYPE
253 """
254 sensor.type = self._deduce_sensor_type(sensor)
256 if not sensor.type:
257 return mt_channel
259 if sensor.type.lower() in ["magnetometer", "induction coil", "coil"]:
260 if not isinstance(mt_channel, Magnetic):
261 msg = (
262 f"Cannot put sensor of type {sensor.type} into an "
263 f"MT Channel of {type(mt_channel)}."
264 )
265 logger.error(msg)
266 raise ValueError(msg)
267 mt_channel.sensor.id = sensor.serial_number
268 mt_channel.sensor.manufacturer = sensor.manufacturer
269 mt_channel.sensor.model = f"{sensor.model} {sensor.description}"
270 mt_channel.sensor.type = sensor.type
271 mt_channel.sensor.name = sensor.description
273 return mt_channel
275 elif sensor.type.lower() in ["dipole", "electrode"]:
276 if not isinstance(mt_channel, Electric):
277 msg = (
278 f"Cannot put sensor of type {sensor.type} into an "
279 f"MT Channel of {type(mt_channel)}."
280 )
281 logger.error(msg)
282 raise ValueError(msg)
283 if sensor.serial_number:
284 pid, nid = self._parse_electrode_ids(sensor.serial_number)
285 mt_channel.positive.id = pid
286 mt_channel.negative.id = nid
288 mt_channel.positive.manufacturer = sensor.manufacturer
289 mt_channel.positive.model = sensor.model
290 mt_channel.positive.type = "electrode"
292 mt_channel.negative.manufacturer = sensor.manufacturer
293 mt_channel.negative.model = sensor.model
294 mt_channel.negative.type = "electrode"
296 mt_channel.dipole_length = self._parse_dipole_length(sensor.description)
298 return mt_channel
300 else:
301 if not isinstance(mt_channel, Auxiliary):
302 msg = (
303 f"Cannot put sensor of type {sensor.type} into an "
304 f"MT Channel of {type(mt_channel)}."
305 )
306 logger.error(msg)
307 raise ValueError(msg)
309 mt_channel.sensor.type = sensor.type
310 mt_channel.sensor.manufacturer = sensor.manufacturer
311 mt_channel.sensor.model = f"{sensor.model} {sensor.description}"
312 mt_channel.sensor.id = sensor.serial_number
314 return mt_channel
316 def _mt_to_sensor(self, mt_channel):
317 """
318 Create an xml sensor from an MT channel
319 """
320 s = inventory.Equipment()
321 if mt_channel.type in ["electric"]:
322 s.type = "dipole"
323 s.description = f"{mt_channel.dipole_length} meters"
324 if mt_channel.positive.manufacturer:
325 s.manufacturer = mt_channel.positive.manufacturer
326 elif mt_channel.positive.manufacturer:
327 s.manufacturer = mt_channel.negative.manufacturer
329 if mt_channel.positive.model:
330 s.model = mt_channel.positive.model
331 elif mt_channel.positive.model:
332 s.model = mt_channel.negative.model
334 s.serial_number = (
335 f"positive: {mt_channel.positive.id}, "
336 f"negative: {mt_channel.negative.id}"
337 )
339 elif mt_channel.type in ["magnetic"]:
340 s.type = mt_channel.sensor.type
341 if mt_channel.sensor.model:
342 s.model = mt_channel.sensor.model.split()[0]
343 try:
344 s.description = mt_channel.sensor.model.split()[1]
345 except IndexError:
346 pass
347 s.serial_number = mt_channel.sensor.id
348 s.manufacturer = mt_channel.sensor.manufacturer
349 s.description = mt_channel.sensor.name
351 else:
352 s.type = mt_channel.sensor.type
353 s.model = mt_channel.sensor.model
354 s.serial_number = mt_channel.sensor.id
355 s.manufacturer = mt_channel.sensor.manufacturer
356 s.description = mt_channel.sensor.name
358 return s
360 def _parse_electrode_ids(self, serial_numbers):
361 """
362 parse electrode ids from a string formated 'positive: pid, negative: nid'
363 """
365 if ":" in serial_numbers and "," in serial_numbers:
366 serial_list = serial_numbers.split(",")
367 if len(serial_list) != 2:
368 msg = (
369 f"Cannot parse electrode ids from {serial_numbers}. Must "
370 "have format 'positive: pid, negative: nid'"
371 )
372 logger.error(msg)
373 raise ValueError(msg)
375 pid, nid = [ss.split(":")[1].strip() for ss in serial_list]
376 return pid, nid
378 elif ":" not in serial_numbers and "," in serial_numbers:
379 serial_list = serial_numbers.split(",")
380 if len(serial_list) != 2:
381 msg = (
382 f"Cannot parse electrode ids from {serial_numbers}. Must "
383 "have format 'positive: pid, negative: nid'"
384 )
385 logger.error(msg)
386 raise ValueError(msg)
388 pid, nid = [ss.strip() for ss in serial_list]
389 return pid, nid
390 else:
391 logger.warning(
392 "Electrod IDs are not properly formatted assigning"
393 f" {serial_numbers} to both positive and negative. "
394 "Accepted format is 'positive: pid, negative: nid'"
395 )
396 return serial_numbers, serial_numbers
398 def _parse_dipole_length(self, description):
399 """
400 Parse the dipole length from the sensor description Assuming a format
401 'lenth units' --> '100 meters'
402 """
404 dipole = description.split()
405 try:
406 return float(dipole[0])
408 except ValueError as error:
409 msg = f"Could not get dipole length from {description} got ValueError({error})"
410 logger.warning(msg)
411 return 0.0
413 def _parse_xml_comments(self, xml_comments, mt_channel):
414 """
415 Read xml comments into an MT comment
417 :param xml_comments: DESCRIPTION
418 :type xml_comments: TYPE
419 :param mt_channel: DESCRIPTION
420 :type mt_channel: TYPE
421 :return: DESCRIPTION
422 :rtype: TYPE
424 """
426 runs = []
427 for comment in xml_comments:
428 k, v = self.read_xml_comment(comment)
429 if k == "mt.run.id":
430 runs.append(v)
431 else:
432 if mt_channel.comments.value:
433 mt_channel.comments.value += f", {k}: {v}"
434 else:
435 mt_channel.comments.value = f", {k}: {v}"
436 if mt_channel.comments.value:
437 mt_channel.comments.value += f", run_ids: [{','.join(runs)}]"
438 else:
439 mt_channel.comments.value = f"run_ids: [{','.join(runs)}]"
441 self.run_list = runs
443 return mt_channel
445 def _make_xml_comments(self, mt_comment):
446 """
447 make xml comments from an mt comment, namely run ids.
449 :param mt_comment: DESCRIPTION
450 :type mt_comment: TYPE
451 :return: DESCRIPTION
452 :rtype: TYPE
454 """
456 comments = []
457 if mt_comment.value is None:
458 return comments
459 clist = mt_comment.value.split("run_ids:")
460 for item in clist:
461 if ":" in item:
462 k, v = item.split(":")
463 comments.append(inventory.Comment(v, subject=k))
464 elif "[" in item and "]" in item:
465 for run in item.replace("[", "").replace("]", "").split(","):
466 run = run.strip()
467 if run:
468 comments.append(
469 inventory.Comment(run.strip(), subject="mt.run.id")
470 )
471 return comments
473 def _get_mt_position(self, xml_channel, mt_channel):
474 """
475 Get the correct locations given the channel type
477 :param xml_channel: DESCRIPTION
478 :type xml_channel: TYPE
479 :param mt_channel: DESCRIPTION
480 :type mt_channel: TYPE
481 :return: DESCRIPTION
482 :rtype: TYPE
484 """
486 if mt_channel.type in ["electric"]:
487 for direction in ["positive", "negative"]:
488 for pos in ["latitude", "longitude", "elevation"]:
489 key = f"{direction}.{pos}"
490 value = getattr(xml_channel, pos)
491 mt_channel.update_attribute(key, value)
492 else:
493 for pos in ["latitude", "longitude", "elevation"]:
494 key = f"location.{pos}"
495 value = getattr(xml_channel, pos)
496 mt_channel.update_attribute(key, value)
498 return mt_channel
500 def _get_mt_units(self, xml_channel, mt_channel):
501 """ """
502 if len(xml_channel.response.response_stages) == 0:
503 return mt_channel
504 name = xml_channel.response.response_stages[-1].output_units
505 description = xml_channel.response.response_stages[-1].output_units_description
506 if description and name:
507 if len(description) > len(name):
508 mt_channel.units = description
509 else:
510 mt_channel.units = name
511 elif description:
512 mt_channel.units = description
513 elif name:
514 mt_channel.units = name
515 else:
516 logger.debug("Did not find any units descriptions in XML")
518 return mt_channel
520 def _xml_response_to_mt(self, xml_channel, existing_filters={}):
521 """
522 parse the filters from obspy into mt filters
523 """
524 ch_filter_dict = OrderedDict()
525 for i_stage, stage in enumerate(xml_channel.response.response_stages):
526 new_and_unnamed = False
527 mt_filter = create_filter_from_stage(stage)
528 if not mt_filter.name:
529 filter_name, new_and_unnamed = self._add_filter_number(
530 existing_filters, mt_filter
531 )
532 mt_filter.name = filter_name
534 if mt_filter.decimation_active:
535 # keep filter names unique if same one used more than once
536 mt_filter.name += f"_{mt_filter.decimation_input_sample_rate}"
538 if new_and_unnamed:
539 logger.info(f"Found an unnamed filter, named it: '{mt_filter.name}'")
540 existing_filters[filter_name] = mt_filter
542 ch_filter_dict[mt_filter.name.replace("/", " per ").lower()] = mt_filter
544 return ch_filter_dict
546 def _add_filter_number(self, existing_filters, mt_filter):
547 """
548 return the next number the number of filters
550 :param keys: DESCRIPTION
551 :type keys: TYPE
552 :return: DESCRIPTION
553 :rtype: TYPE
555 """
557 # check for existing filters
558 for f_obj in existing_filters.values():
559 if f_obj.type == mt_filter.type:
560 if round(abs(f_obj.complex_response([1])[0])) == round(
561 abs(mt_filter.complex_response([1])[0])
562 ):
563 return f_obj.name, False
565 try:
566 last = sorted([k for k in existing_filters.keys() if mt_filter.type in k])[
567 -1
568 ]
569 except IndexError:
570 return f"{mt_filter.type}_{0:02}", True
571 try:
572 return f"{mt_filter.type}_{int(last[-2:]) + 1:02}", True
573 except ValueError:
574 return f"{mt_filter.type}_{0:02}", True
576 def _mt_to_xml_response(self, mt_channel, filters_dict, xml_channel):
577 """
578 Translate MT filters into Obspy Response
580 :param mt_channel: DESCRIPTION
581 :type mt_channel: TYPE
582 :param filters_dict: DESCRIPTION
583 :type filters_dict: TYPE
584 :param xml_channel: DESCRIPTION
585 :type xml_channel: TYPE
586 :return: DESCRIPTION
587 :rtype: TYPE
589 """
591 mt_channel_response = mt_channel.channel_response(filters_dict)
592 xml_channel.response = mt_channel_response.to_obspy(
593 sample_rate=mt_channel.sample_rate
594 )
596 unit_obj = get_unit_object(mt_channel_response.units_in)
598 xml_channel.calibration_units = unit_obj.symbol
599 xml_channel.calibration_units_description = unit_obj.name
601 return xml_channel
603 def _deduce_sensor_type(self, sensor):
604 """
606 :param sensor: Information about a sensor, usually extractes from FDSN XML
607 :type sensor: obspy.core.inventory.util.Equipment
609 :return:
610 """
611 original_sensor_type = sensor.type
612 original_sensor_description = sensor.description
613 # set sensor_type to be a string if it is None
614 if original_sensor_type is None:
615 sensor_type = "" # make a string
616 msg = f"Sensor {sensor} does not have field type attr"
617 logger.debug(msg)
618 else:
619 sensor_type = copy.deepcopy(original_sensor_type)
621 if original_sensor_description is None:
622 sensor_description = "" # make a string
623 else:
624 sensor_description = copy.deepcopy(original_sensor_description)
626 if sensor_type.lower() in self.understood_sensor_types:
627 return sensor_type
628 else:
629 logger.warning(
630 f" sensor {sensor} type {sensor.type} not in {self.understood_sensor_types}"
631 )
632 logger.warning(
633 f" sensor {sensor} type {sensor.type} not in {self.understood_sensor_types}"
634 )
636 # Try handling Bartington FGM at Earthscope ... this is a place holder for handling non-standard cases
637 if sensor_type.lower() == "bartington":
638 sensor_type = "magnetometer"
639 if not sensor_type:
640 if sensor_description == "Bartington 3-Axis Fluxgate Sensor":
641 sensor_type = "magnetometer"
642 if sensor_description:
643 if ("bf-4" in sensor_description.lower()) & (
644 "schlumberger" in sensor_description.lower()
645 ): # BSL-NCEDC
646 sensor_type = "magnetometer"
647 elif ("electric" in sensor_description.lower()) & (
648 "dipole" in sensor_description.lower()
649 ): # BSL-NCEDC
650 sensor_type = "dipole"
652 # reset sensor_type to None it it was not handled
653 if not sensor_type:
654 sensor_type = original_sensor_type
655 logger.error("sensor type could not be resolved")
657 return sensor_type