Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ tools \ from_many_mt_files.py: 0%
202 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Oct 7 16:31:55 2021
5@author: jpeacock
6"""
8# =============================================================================
9# Imports
10# =============================================================================
11from pathlib import Path
12from xml.etree import cElementTree as et
14import pandas as pd
16from mt_metadata.timeseries import (
17 Electric,
18 Experiment,
19 Magnetic,
20 Run,
21 Station,
22 Survey,
23)
24from mt_metadata.timeseries.filters import (
25 CoefficientFilter,
26 FIRFilter,
27 PoleZeroFilter,
28 TimeDelayFilter,
29)
30from mt_metadata.timeseries.stationxml import XMLInventoryMTExperiment
33# =============================================================================
34# Useful Class
35# =============================================================================
36class MT2StationXML(XMLInventoryMTExperiment):
37 """
38 A class to convert multiple MT xml files into a stationXML (MTML)
40 This is for a use case of A. Kelbert who places each level of metadata
41 into a single XML file. This class collects all those files and puts
42 them into the proper order.
44 She has the files named as follows
46 survey.xml --> Survey metadata `mt_metadata.timeseries.Survey`
47 filters.xml --> All filters
48 station.xml --> Station metadata `mt_metadata.timeseries.Station`
49 station.run.xml --> Run metadata `mt_metadata.timeseries.Run`
50 station.run.channel.xml --> Channel metadata `mt_metadata.timeseries.Channel`
53 """
55 def __init__(self, xml_path=None):
56 self.xml_path = xml_path
58 super().__init__()
60 @property
61 def xml_path(self):
62 return self._xml_path
64 @xml_path.setter
65 def xml_path(self, value):
66 if value is None:
67 self._xml_path = None
68 else:
69 self._xml_path = Path(value)
70 self.make_df()
72 def has_xml_path(self):
73 if self.xml_path is not None and self.xml_path.exists():
74 return True
75 return False
77 @staticmethod
78 def is_a_filter_xml(fn):
79 return fn.stem in ["filters", "_filters"]
81 @staticmethod
82 def is_a_survey_xml(fn):
83 return fn.stem in ["survey", "_survey"]
85 @staticmethod
86 def is_a_station_xml(fn):
87 if fn.stem not in ["filters", "_filters", "_survey", "survey"]:
88 return fn.stem.count(".") == 0
89 return False
91 @staticmethod
92 def is_a_run_xml(fn):
93 return fn.stem.count(".") == 1
95 @staticmethod
96 def is_a_channel_xml(fn):
97 return fn.stem.count(".") > 1
99 def get_xml_files(self) -> list:
100 """
101 Get all mtml xml files for a given station.
102 """
103 if self.has_xml_path():
104 return list(self.xml_path.rglob("*.xml"))
105 raise ValueError("self.xml_path must be set")
107 def make_df(self):
108 """
109 Make a pandas data frame for easier querying
111 :return: DESCRIPTION
112 :rtype: TYPE
114 """
115 df_dict = {
116 "fn": [],
117 "station": [],
118 "run": [],
119 "is_station": [],
120 "is_run": [],
121 "is_channel": [],
122 "is_filters": [],
123 "is_survey": [],
124 }
125 for fn in self.get_xml_files():
126 df_dict["fn"].append(fn)
127 df_dict["station"].append(fn.stem.split(".")[0])
128 if self.is_a_run_xml(fn) or self.is_a_channel_xml(fn):
129 df_dict["run"].append(fn.stem.split(".")[1])
130 else:
131 df_dict["run"].append(None)
132 df_dict["is_station"].append(self.is_a_station_xml(fn))
133 df_dict["is_run"].append(self.is_a_run_xml(fn))
134 df_dict["is_channel"].append(self.is_a_channel_xml(fn))
135 df_dict["is_filters"].append(self.is_a_filter_xml(fn))
136 df_dict["is_survey"].append(self.is_a_survey_xml(fn))
138 self.df = pd.DataFrame(df_dict)
140 @property
141 def stations(self):
142 if self.has_xml_path():
143 return list(self.df[self.df.is_station == True].station)
144 return None
146 @property
147 def survey(self):
148 if self.has_xml_path():
149 return self.df[self.df.is_survey == True].fn.values[0]
150 return None
152 @property
153 def filters(self):
154 if self.has_xml_path():
155 return self.df[self.df.is_filters == True].fn.values[0]
156 return None
158 def _get_runs(self, station):
159 """
160 Get runs from the dataframe for a given station
162 :param station: DESCRIPTION
163 :type station: TYPE
164 :return: DESCRIPTION
165 :rtype: TYPE
167 """
168 return self.df[
169 (self.df.station == station) & (self.df.is_run == True)
170 ].sort_values("run")
172 def _get_channels(self, station, run, order=["hx", "hy", "hz", "ex", "ey"]):
173 """
174 Get runs from the dataframe for a given station
176 :param station: DESCRIPTION
177 :type station: TYPE
178 :return: DESCRIPTION
179 :rtype: TYPE
181 """
182 rdf = list(
183 self.df[
184 (self.df.station == station)
185 & (self.df.run == run)
186 & (self.df.is_channel == True)
187 ].fn
188 )
190 channels_list = []
191 for ch in order:
192 for fn in rdf:
193 if ch in fn.name[len(station) :].lower():
194 channels_list.append(fn)
195 break
197 return channels_list
199 def sort_by_station(self, stations=None):
200 """
201 sort the file into station, runs and channels
203 :return: DESCRIPTION
204 :rtype: TYPE
206 """
207 fn_dict = {
208 "survey": self.survey,
209 "filters": self.filters,
210 "stations": [],
211 }
212 if stations in [None, []]:
213 station_iterator = self.stations
214 else:
215 if isinstance(stations, str):
216 stations = [stations]
217 if not isinstance(stations, list):
218 raise ValueError("stations must be a list of stations")
219 station_iterator = stations
220 for station in station_iterator:
221 station_dict = {
222 "fn": self.df[
223 (self.df.station == station) & (self.df.is_station == True)
224 ].fn.values[0],
225 "runs": [],
226 }
227 for run in self._get_runs(station).itertuples():
228 run_dict = {}
229 run_dict["fn"] = run.fn
230 run_dict["channels"] = self._get_channels(station, run.run)
231 station_dict["runs"].append(run_dict)
232 fn_dict["stations"].append(station_dict)
234 return fn_dict
236 @staticmethod
237 def read_xml_file(xml_file):
238 """
239 read an xml file an return an xml element
241 :param xml_file: DESCRIPTION
242 :type xml_file: TYPE
243 :return: DESCRIPTION
244 :rtype: TYPE
246 """
248 return et.parse(xml_file).getroot()
250 def _make_channel(self, channel_fn):
251 """
252 Make a :class:`mt_metadata.timeseries.Channel` object from an
253 xml file
255 :param channel_fn: DESCRIPTION
256 :type channel_fn: TYPE
257 :return: DESCRIPTION
258 :rtype: TYPE
260 """
261 ch_type = channel_fn.stem.split(".")[2].lower()
262 if ch_type in ["electric"]:
263 ch = Electric()
265 elif ch_type in ["magnetic"]:
266 ch = Magnetic()
268 ch.from_xml(self.read_xml_file(channel_fn))
270 dp_filter = None
271 if ch.filter.name is not None:
272 find = False
273 for ii, filter_name in enumerate(ch.filter.name):
274 # create a dipole pole zero filter
275 if "dipole" in filter_name:
276 find = True
277 dp_filter = PoleZeroFilter()
278 dp_filter.units_in = "V/m"
279 dp_filter.units_out = "V"
280 dp_filter.gain = ch.dipole_length
281 dp_filter.name = f"electric_dipole_{ch.dipole_length:.3f}"
282 dp_filter.comments = "electric dipole for electric field"
283 break
284 if find:
285 ch.filter.name[ii] = dp_filter.name
287 return ch, dp_filter
289 def _make_run(self, run_dict):
290 """
291 Make a :class:`mt_metadata.timeseries.Run` object from information
292 in a run dictionary
294 run_dict = {'fn': xml_file_name, 'channels': [list of xml file names]}
296 :param run_dict: DESCRIPTION
297 :type run_dict: TYPE
298 :return: DESCRIPTION
299 :rtype: TYPE
301 """
302 r = Run()
303 r.from_xml(self.read_xml_file(run_dict["fn"]))
304 dp_filters = {}
305 for ch_fn in run_dict["channels"]:
306 ch, dp_filter = self._make_channel(ch_fn)
307 r.channels.append(ch)
308 if dp_filter is not None:
309 dp_filters[dp_filter.name] = dp_filter
311 return r, dp_filters
313 def _make_station(self, station_dict):
314 """
315 Make a station object from a station dictionary
317 station_dict = {
318 'fn': xml_file_name,
319 'runs': [{'fn': run_xml_file_name,
320 'channels': [list of xml file names]}]
321 }
323 :param station_dict: DESCRIPTION
324 :type station_dict: TYPE
325 :return: DESCRIPTION
326 :rtype: TYPE
328 """
329 station = Station()
330 station.from_xml(self.read_xml_file(station_dict["fn"]))
331 # < need to reset the runs, otherwise there are empty runs and double
332 # the ammount of runs because the run_list is input. >
333 station.runs = []
334 dp_filters = {}
335 for run_dict in station_dict["runs"]:
336 r, dp = self._make_run(run_dict)
337 for channel in r.channels:
338 if channel.type in ["electric"]:
339 if (
340 channel.positive.latitude == 0
341 and channel.positive.longitude == 0
342 and channel.positive.elevation == 0
343 ):
344 channel.positive.latitude = station.location.latitude
345 channel.positive.longitude = station.location.longitude
346 channel.positive.elevation = station.location.elevation
347 else:
348 if (
349 channel.location.latitude == 0
350 and channel.location.longitude == 0
351 and channel.location.elevation == 0
352 ):
353 channel.location.latitude = station.location.latitude
354 channel.location.longitude = station.location.longitude
355 channel.location.elevation = station.location.elevation
356 station.runs.append(r)
357 dp_filters.update(dp)
359 station.update_time_period()
361 return station, dp_filters
363 def _make_survey(self, survey_dict):
364 """
365 Make a :class:`mt_metadata.timeseries.Survey` object
367 survey_dict = {
368 'survey': survey_xml_file,
369 'filters': filter_xml_file,
370 'stations': [
371 {
372 'fn': xml_file_name,
373 'runs': [
374 {'fn': run_xml_file_name,
375 'channels': [list of xml file names]}]
376 }
377 ]
378 }
379 :param survey_dict: DESCRIPTION
380 :type survey_dict: TYPE
381 :return: DESCRIPTION
382 :rtype: TYPE
384 """
385 s = Survey()
386 s.from_xml(self.read_xml_file(survey_dict["survey"]))
387 s.stations = []
388 dp_filters = {}
389 for station_dict in survey_dict["stations"]:
390 station, dp = self._make_station(station_dict)
391 s.stations.append(station)
392 dp_filters.update(dp)
394 s.update_bounding_box()
395 s.update_time_period()
397 return s, dp_filters
399 def _make_filters_dict(self, filters_xml_file):
400 """
401 Make a filter dictionary from a filter file with all the filters in it
403 :param filters_xml_file: DESCRIPTION
404 :type filters_xml_file: TYPE
405 :return: DESCRIPTION
406 :rtype: TYPE
408 """
410 element = self.read_xml_file(filters_xml_file)
412 f_dict = {}
413 for f in element.iter(tag="filter"):
414 f_type = [y.text for y in f.findall("type")][0]
415 if f_type in ["zpk"]:
416 mt_filter = PoleZeroFilter()
417 elif f_type in ["coefficient"]:
418 mt_filter = CoefficientFilter()
419 elif f_type in ["time delay"]:
420 mt_filter = TimeDelayFilter()
421 elif f_type in ["fir"]:
422 mt_filter = FIRFilter()
423 else:
424 raise ValueError(f"No support for {f_type} currently.")
426 mt_filter.from_xml(f)
427 f_dict[mt_filter.name] = mt_filter
429 return f_dict
431 def make_experiment(self, stations=None):
432 """
433 Create an MTML experiment from the a directory of xml files
434 :return: DESCRIPTION
435 :rtype: TYPE
437 """
438 mtex = Experiment()
440 survey, dp_filters = self._make_survey(self.sort_by_station(stations))
441 mtex.surveys.append(survey)
442 mtex.surveys[0].filters = self._make_filters_dict(self.filters)
443 mtex.surveys[0].filters.update(dp_filters)
445 return mtex
447 def get_mt_channel(self, ch_fn, filters_fn):
448 """
449 have a look at an mt channel
450 """
452 mt_channel, dp_filter = self._make_channel(ch_fn)
454 filter_dict = self._make_filters_dict(filters_fn)
455 if dp_filter is not None:
456 filter_dict.update({dp_filter.name, dp_filter})
458 channel_response = mt_channel.channel_response(filter_dict)
460 return mt_channel, channel_response