Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ tools \ from_many_mt_files.py: 0%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Oct 7 16:31:55 2021 

4 

5@author: jpeacock 

6""" 

7 

8# ============================================================================= 

9# Imports 

10# ============================================================================= 

11from pathlib import Path 

12from xml.etree import cElementTree as et 

13 

14import pandas as pd 

15 

16from mt_metadata.timeseries import ( 

17 Electric, 

18 Experiment, 

19 Magnetic, 

20 Run, 

21 Station, 

22 Survey, 

23) 

24from mt_metadata.timeseries.filters import ( 

25 CoefficientFilter, 

26 FIRFilter, 

27 PoleZeroFilter, 

28 TimeDelayFilter, 

29) 

30from mt_metadata.timeseries.stationxml import XMLInventoryMTExperiment 

31 

32 

33# ============================================================================= 

34# Useful Class 

35# ============================================================================= 

36class MT2StationXML(XMLInventoryMTExperiment): 

37 """ 

38 A class to convert multiple MT xml files into a stationXML (MTML) 

39 

40 This is for a use case of A. Kelbert who places each level of metadata 

41 into a single XML file. This class collects all those files and puts 

42 them into the proper order. 

43 

44 She has the files named as follows 

45 

46 survey.xml --> Survey metadata `mt_metadata.timeseries.Survey` 

47 filters.xml --> All filters 

48 station.xml --> Station metadata `mt_metadata.timeseries.Station` 

49 station.run.xml --> Run metadata `mt_metadata.timeseries.Run` 

50 station.run.channel.xml --> Channel metadata `mt_metadata.timeseries.Channel` 

51 

52 

53 """ 

54 

55 def __init__(self, xml_path=None): 

56 self.xml_path = xml_path 

57 

58 super().__init__() 

59 

60 @property 

61 def xml_path(self): 

62 return self._xml_path 

63 

64 @xml_path.setter 

65 def xml_path(self, value): 

66 if value is None: 

67 self._xml_path = None 

68 else: 

69 self._xml_path = Path(value) 

70 self.make_df() 

71 

72 def has_xml_path(self): 

73 if self.xml_path is not None and self.xml_path.exists(): 

74 return True 

75 return False 

76 

77 @staticmethod 

78 def is_a_filter_xml(fn): 

79 return fn.stem in ["filters", "_filters"] 

80 

81 @staticmethod 

82 def is_a_survey_xml(fn): 

83 return fn.stem in ["survey", "_survey"] 

84 

85 @staticmethod 

86 def is_a_station_xml(fn): 

87 if fn.stem not in ["filters", "_filters", "_survey", "survey"]: 

88 return fn.stem.count(".") == 0 

89 return False 

90 

91 @staticmethod 

92 def is_a_run_xml(fn): 

93 return fn.stem.count(".") == 1 

94 

95 @staticmethod 

96 def is_a_channel_xml(fn): 

97 return fn.stem.count(".") > 1 

98 

99 def get_xml_files(self) -> list: 

100 """ 

101 Get all mtml xml files for a given station. 

102 """ 

103 if self.has_xml_path(): 

104 return list(self.xml_path.rglob("*.xml")) 

105 raise ValueError("self.xml_path must be set") 

106 

107 def make_df(self): 

108 """ 

109 Make a pandas data frame for easier querying 

110 

111 :return: DESCRIPTION 

112 :rtype: TYPE 

113 

114 """ 

115 df_dict = { 

116 "fn": [], 

117 "station": [], 

118 "run": [], 

119 "is_station": [], 

120 "is_run": [], 

121 "is_channel": [], 

122 "is_filters": [], 

123 "is_survey": [], 

124 } 

125 for fn in self.get_xml_files(): 

126 df_dict["fn"].append(fn) 

127 df_dict["station"].append(fn.stem.split(".")[0]) 

128 if self.is_a_run_xml(fn) or self.is_a_channel_xml(fn): 

129 df_dict["run"].append(fn.stem.split(".")[1]) 

130 else: 

131 df_dict["run"].append(None) 

132 df_dict["is_station"].append(self.is_a_station_xml(fn)) 

133 df_dict["is_run"].append(self.is_a_run_xml(fn)) 

134 df_dict["is_channel"].append(self.is_a_channel_xml(fn)) 

135 df_dict["is_filters"].append(self.is_a_filter_xml(fn)) 

136 df_dict["is_survey"].append(self.is_a_survey_xml(fn)) 

137 

138 self.df = pd.DataFrame(df_dict) 

139 

140 @property 

141 def stations(self): 

142 if self.has_xml_path(): 

143 return list(self.df[self.df.is_station == True].station) 

144 return None 

145 

146 @property 

147 def survey(self): 

148 if self.has_xml_path(): 

149 return self.df[self.df.is_survey == True].fn.values[0] 

150 return None 

151 

152 @property 

153 def filters(self): 

154 if self.has_xml_path(): 

155 return self.df[self.df.is_filters == True].fn.values[0] 

156 return None 

157 

158 def _get_runs(self, station): 

159 """ 

160 Get runs from the dataframe for a given station 

161 

162 :param station: DESCRIPTION 

163 :type station: TYPE 

164 :return: DESCRIPTION 

165 :rtype: TYPE 

166 

167 """ 

168 return self.df[ 

169 (self.df.station == station) & (self.df.is_run == True) 

170 ].sort_values("run") 

171 

172 def _get_channels(self, station, run, order=["hx", "hy", "hz", "ex", "ey"]): 

173 """ 

174 Get runs from the dataframe for a given station 

175 

176 :param station: DESCRIPTION 

177 :type station: TYPE 

178 :return: DESCRIPTION 

179 :rtype: TYPE 

180 

181 """ 

182 rdf = list( 

183 self.df[ 

184 (self.df.station == station) 

185 & (self.df.run == run) 

186 & (self.df.is_channel == True) 

187 ].fn 

188 ) 

189 

190 channels_list = [] 

191 for ch in order: 

192 for fn in rdf: 

193 if ch in fn.name[len(station) :].lower(): 

194 channels_list.append(fn) 

195 break 

196 

197 return channels_list 

198 

199 def sort_by_station(self, stations=None): 

200 """ 

201 sort the file into station, runs and channels 

202 

203 :return: DESCRIPTION 

204 :rtype: TYPE 

205 

206 """ 

207 fn_dict = { 

208 "survey": self.survey, 

209 "filters": self.filters, 

210 "stations": [], 

211 } 

212 if stations in [None, []]: 

213 station_iterator = self.stations 

214 else: 

215 if isinstance(stations, str): 

216 stations = [stations] 

217 if not isinstance(stations, list): 

218 raise ValueError("stations must be a list of stations") 

219 station_iterator = stations 

220 for station in station_iterator: 

221 station_dict = { 

222 "fn": self.df[ 

223 (self.df.station == station) & (self.df.is_station == True) 

224 ].fn.values[0], 

225 "runs": [], 

226 } 

227 for run in self._get_runs(station).itertuples(): 

228 run_dict = {} 

229 run_dict["fn"] = run.fn 

230 run_dict["channels"] = self._get_channels(station, run.run) 

231 station_dict["runs"].append(run_dict) 

232 fn_dict["stations"].append(station_dict) 

233 

234 return fn_dict 

235 

236 @staticmethod 

237 def read_xml_file(xml_file): 

238 """ 

239 read an xml file an return an xml element 

240 

241 :param xml_file: DESCRIPTION 

242 :type xml_file: TYPE 

243 :return: DESCRIPTION 

244 :rtype: TYPE 

245 

246 """ 

247 

248 return et.parse(xml_file).getroot() 

249 

250 def _make_channel(self, channel_fn): 

251 """ 

252 Make a :class:`mt_metadata.timeseries.Channel` object from an 

253 xml file 

254 

255 :param channel_fn: DESCRIPTION 

256 :type channel_fn: TYPE 

257 :return: DESCRIPTION 

258 :rtype: TYPE 

259 

260 """ 

261 ch_type = channel_fn.stem.split(".")[2].lower() 

262 if ch_type in ["electric"]: 

263 ch = Electric() 

264 

265 elif ch_type in ["magnetic"]: 

266 ch = Magnetic() 

267 

268 ch.from_xml(self.read_xml_file(channel_fn)) 

269 

270 dp_filter = None 

271 if ch.filter.name is not None: 

272 find = False 

273 for ii, filter_name in enumerate(ch.filter.name): 

274 # create a dipole pole zero filter 

275 if "dipole" in filter_name: 

276 find = True 

277 dp_filter = PoleZeroFilter() 

278 dp_filter.units_in = "V/m" 

279 dp_filter.units_out = "V" 

280 dp_filter.gain = ch.dipole_length 

281 dp_filter.name = f"electric_dipole_{ch.dipole_length:.3f}" 

282 dp_filter.comments = "electric dipole for electric field" 

283 break 

284 if find: 

285 ch.filter.name[ii] = dp_filter.name 

286 

287 return ch, dp_filter 

288 

289 def _make_run(self, run_dict): 

290 """ 

291 Make a :class:`mt_metadata.timeseries.Run` object from information 

292 in a run dictionary 

293 

294 run_dict = {'fn': xml_file_name, 'channels': [list of xml file names]} 

295 

296 :param run_dict: DESCRIPTION 

297 :type run_dict: TYPE 

298 :return: DESCRIPTION 

299 :rtype: TYPE 

300 

301 """ 

302 r = Run() 

303 r.from_xml(self.read_xml_file(run_dict["fn"])) 

304 dp_filters = {} 

305 for ch_fn in run_dict["channels"]: 

306 ch, dp_filter = self._make_channel(ch_fn) 

307 r.channels.append(ch) 

308 if dp_filter is not None: 

309 dp_filters[dp_filter.name] = dp_filter 

310 

311 return r, dp_filters 

312 

313 def _make_station(self, station_dict): 

314 """ 

315 Make a station object from a station dictionary 

316 

317 station_dict = { 

318 'fn': xml_file_name, 

319 'runs': [{'fn': run_xml_file_name, 

320 'channels': [list of xml file names]}] 

321 } 

322 

323 :param station_dict: DESCRIPTION 

324 :type station_dict: TYPE 

325 :return: DESCRIPTION 

326 :rtype: TYPE 

327 

328 """ 

329 station = Station() 

330 station.from_xml(self.read_xml_file(station_dict["fn"])) 

331 # < need to reset the runs, otherwise there are empty runs and double 

332 # the ammount of runs because the run_list is input. > 

333 station.runs = [] 

334 dp_filters = {} 

335 for run_dict in station_dict["runs"]: 

336 r, dp = self._make_run(run_dict) 

337 for channel in r.channels: 

338 if channel.type in ["electric"]: 

339 if ( 

340 channel.positive.latitude == 0 

341 and channel.positive.longitude == 0 

342 and channel.positive.elevation == 0 

343 ): 

344 channel.positive.latitude = station.location.latitude 

345 channel.positive.longitude = station.location.longitude 

346 channel.positive.elevation = station.location.elevation 

347 else: 

348 if ( 

349 channel.location.latitude == 0 

350 and channel.location.longitude == 0 

351 and channel.location.elevation == 0 

352 ): 

353 channel.location.latitude = station.location.latitude 

354 channel.location.longitude = station.location.longitude 

355 channel.location.elevation = station.location.elevation 

356 station.runs.append(r) 

357 dp_filters.update(dp) 

358 

359 station.update_time_period() 

360 

361 return station, dp_filters 

362 

363 def _make_survey(self, survey_dict): 

364 """ 

365 Make a :class:`mt_metadata.timeseries.Survey` object 

366 

367 survey_dict = { 

368 'survey': survey_xml_file, 

369 'filters': filter_xml_file, 

370 'stations': [ 

371 { 

372 'fn': xml_file_name, 

373 'runs': [ 

374 {'fn': run_xml_file_name, 

375 'channels': [list of xml file names]}] 

376 } 

377 ] 

378 } 

379 :param survey_dict: DESCRIPTION 

380 :type survey_dict: TYPE 

381 :return: DESCRIPTION 

382 :rtype: TYPE 

383 

384 """ 

385 s = Survey() 

386 s.from_xml(self.read_xml_file(survey_dict["survey"])) 

387 s.stations = [] 

388 dp_filters = {} 

389 for station_dict in survey_dict["stations"]: 

390 station, dp = self._make_station(station_dict) 

391 s.stations.append(station) 

392 dp_filters.update(dp) 

393 

394 s.update_bounding_box() 

395 s.update_time_period() 

396 

397 return s, dp_filters 

398 

399 def _make_filters_dict(self, filters_xml_file): 

400 """ 

401 Make a filter dictionary from a filter file with all the filters in it 

402 

403 :param filters_xml_file: DESCRIPTION 

404 :type filters_xml_file: TYPE 

405 :return: DESCRIPTION 

406 :rtype: TYPE 

407 

408 """ 

409 

410 element = self.read_xml_file(filters_xml_file) 

411 

412 f_dict = {} 

413 for f in element.iter(tag="filter"): 

414 f_type = [y.text for y in f.findall("type")][0] 

415 if f_type in ["zpk"]: 

416 mt_filter = PoleZeroFilter() 

417 elif f_type in ["coefficient"]: 

418 mt_filter = CoefficientFilter() 

419 elif f_type in ["time delay"]: 

420 mt_filter = TimeDelayFilter() 

421 elif f_type in ["fir"]: 

422 mt_filter = FIRFilter() 

423 else: 

424 raise ValueError(f"No support for {f_type} currently.") 

425 

426 mt_filter.from_xml(f) 

427 f_dict[mt_filter.name] = mt_filter 

428 

429 return f_dict 

430 

431 def make_experiment(self, stations=None): 

432 """ 

433 Create an MTML experiment from the a directory of xml files 

434 :return: DESCRIPTION 

435 :rtype: TYPE 

436 

437 """ 

438 mtex = Experiment() 

439 

440 survey, dp_filters = self._make_survey(self.sort_by_station(stations)) 

441 mtex.surveys.append(survey) 

442 mtex.surveys[0].filters = self._make_filters_dict(self.filters) 

443 mtex.surveys[0].filters.update(dp_filters) 

444 

445 return mtex 

446 

447 def get_mt_channel(self, ch_fn, filters_fn): 

448 """ 

449 have a look at an mt channel 

450 """ 

451 

452 mt_channel, dp_filter = self._make_channel(ch_fn) 

453 

454 filter_dict = self._make_filters_dict(filters_fn) 

455 if dp_filter is not None: 

456 filter_dict.update({dp_filter.name, dp_filter}) 

457 

458 channel_response = mt_channel.channel_response(filter_dict) 

459 

460 return mt_channel, channel_response