Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ stationxml \ xml_inventory_mt_experiment.py: 90%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Mon Feb 22 09:27:10 2021 

4 

5:copyright: 

6 Jared Peacock (jpeacock@usgs.gov) 

7 

8:license: MIT 

9 

10""" 

11from copy import deepcopy 

12 

13# ============================================================================= 

14# Imports 

15# ============================================================================= 

16from pathlib import Path 

17 

18from loguru import logger 

19from obspy import read_inventory 

20from obspy.core import inventory 

21 

22from mt_metadata import timeseries as metadata 

23from mt_metadata.timeseries.stationxml import ( 

24 XMLChannelMTChannel, 

25 XMLNetworkMTSurvey, 

26 XMLStationMTStation, 

27) 

28 

29 

30# ============================================================================= 

31 

32 

33class XMLInventoryMTExperiment: 

34 """ 

35 Read the full files and put the elements in the appropriate locations. 

36 

37 """ 

38 

39 def __init__(self): 

40 self.logger = logger 

41 self.network_translator = XMLNetworkMTSurvey() 

42 self.station_translator = XMLStationMTStation() 

43 self.channel_translator = XMLChannelMTChannel() 

44 

45 def xml_to_mt(self, inventory_object=None, stationxml_fn=None, mt_fn=None): 

46 """ 

47 Read in a StationXML using Obspy :class:`obspy.core.inventory.Inventory` 

48 and convert to an MT :class:`mt_metadata.timeseries.Experiment` 

49 

50 

51 :param inventory_object: inventory object or StationXML file name 

52 :type inventory_object: :class:`obspy.core.inventory.Inventory` 

53 :param stationxml_fn: full path to StationXML file 

54 :type stationxml_fn: Path or string 

55 :param mt_fn: full path to MT file 

56 :type mt_fn: Path or string 

57 

58 :return: DESCRIPTION 

59 :rtype: TYPE 

60 

61 """ 

62 

63 if stationxml_fn: 

64 if isinstance(stationxml_fn, Path): 

65 stationxml_fn = stationxml_fn.as_posix() 

66 inventory_object = read_inventory(stationxml_fn) 

67 if not inventory_object: 

68 msg = "Must provide either an inventory object or StationXML file path" 

69 self.logger.error(msg) 

70 raise ValueError(msg) 

71 mt_experiment = metadata.Experiment() 

72 for xml_network in inventory_object.networks: 

73 mt_survey = self.network_translator.xml_to_mt(xml_network) 

74 for xml_station in xml_network.stations: 

75 mt_station = self.station_translator.xml_to_mt(xml_station) 

76 for xml_channel in xml_station: 

77 mt_channel, mt_filters = self.channel_translator.xml_to_mt( 

78 xml_channel, mt_survey.filters 

79 ) 

80 mt_survey.filters.update(mt_filters) 

81 # if there is a run list match channel to runs 

82 if self.channel_translator.run_list: 

83 for run_id in sorted(self.channel_translator.run_list): 

84 # need to make a copy of the channel otherwise 

85 # the properties are constantly overwritten as we loop 

86 # through the runs 

87 run_channel = deepcopy(mt_channel) 

88 mt_run = mt_station.get_run(run_id) 

89 # need to set the start and end time to the run 

90 run_channel.time_period.start = mt_run.time_period.start 

91 run_channel.time_period.end = mt_run.time_period.end 

92 mt_run.add_channel(run_channel) 

93 # if there are runs already try to match by start, end, sample_rate 

94 # initialized runs have a sample rate of 0. This could be an 

95 # issue in the future. 

96 elif mt_station.runs: 

97 for mt_run in mt_station.runs: 

98 if ( 

99 mt_run.sample_rate == mt_channel.sample_rate 

100 or mt_run.sample_rate == 0 

101 ): 

102 # match assuming the runs have the correct start 

103 # and end times. 

104 if ( 

105 mt_run.time_period.start 

106 >= mt_channel.time_period.start 

107 ) and ( 

108 mt_run.time_period.end <= mt_channel.time_period.end 

109 ): 

110 mt_run.channels.append(mt_channel) 

111 mt_run.sample_rate = mt_channel.sample_rate 

112 # make a new run with generic information 

113 else: 

114 mt_run = metadata.Run(id=f"{len(mt_station.runs)+1:03d}") 

115 mt_run.time_period.start = mt_channel.time_period.start 

116 mt_run.time_period.end = mt_channel.time_period.end 

117 mt_run.sample_rate = mt_channel.sample_rate 

118 mt_run.channels.append(mt_channel) 

119 mt_station.runs.append(mt_run) 

120 mt_station.update_time_period() 

121 mt_survey.stations.append(mt_station) 

122 if xml_network.stations: 

123 mt_survey.update_bounding_box() 

124 mt_survey.update_time_period() 

125 # need to check if the network/survey already exists, the files 

126 # from make_mth5_from_iris have multiples of the same network 

127 if mt_survey.id in mt_experiment.surveys.keys(): 

128 mt_experiment.surveys[mt_survey.id].stations.update(mt_survey.stations) 

129 else: 

130 mt_experiment.surveys.append(mt_survey) 

131 if mt_fn: 

132 mt_experiment.to_xml(fn=mt_fn) 

133 return mt_experiment 

134 

135 def mt_to_xml(self, mt_experiment, mt_fn=None, stationxml_fn=None, ns_dict=None): 

136 """ 

137 Convert from MT :class:`mt_metadata.timeseries.Experiment` to 

138 :class:`obspy.core.inventory.Inventory` 

139 

140 :param mt_experiment: DESCRIPTION 

141 :type mt_experiment: TYPE 

142 :param mt_fn: DESCRIPTION, defaults to None 

143 :type mt_fn: TYPE, optional 

144 :param stationxml_fn: DESCRIPTION, defaults to None 

145 :type stationxml_fn: TYPE, optional 

146 :param ns_dict: DESCRIPTION, defaults to None 

147 :type ns_dict: TYPE, optional 

148 :raises ValueError: DESCRIPTION 

149 :return: DESCRIPTION 

150 :rtype: TYPE 

151 

152 """ 

153 

154 if mt_fn: 

155 mt_experiment = metadata.Experiment() 

156 mt_experiment.from_xml(mt_fn) 

157 if not mt_experiment: 

158 msg = "Must provide either an experiment object or file path" 

159 self.logger.error(msg) 

160 raise ValueError(msg) 

161 xml_inventory = inventory.Inventory() 

162 for mt_survey in mt_experiment.surveys: 

163 xml_network = self.network_translator.mt_to_xml(mt_survey) 

164 for mt_station in mt_survey.stations: 

165 xml_station = self.station_translator.mt_to_xml(mt_station) 

166 if mt_survey.country is not None: 

167 xml_station.site.country = ",".join( 

168 [str(country) for country in mt_survey.country] 

169 ) 

170 # need to sort the runs by time 

171 for mt_run in mt_station.runs: 

172 xml_station = self.add_run(xml_station, mt_run, mt_survey.filters) 

173 xml_network.stations.append(xml_station) 

174 xml_inventory.networks.append(xml_network) 

175 if stationxml_fn: 

176 if isinstance(stationxml_fn, Path): 

177 stationxml_fn = stationxml_fn.as_posix() 

178 xml_inventory.write(stationxml_fn, "stationxml", nsmap=ns_dict) 

179 return xml_inventory 

180 

181 def add_run(self, xml_station, mt_run, filters_dict): 

182 """ 

183 Check to see if channel information already exists in the channel list of 

184 an xml station. 

185 

186 .. todo:: Need to make sure the times are updated 

187 

188 :param xml_station: DESCRIPTION 

189 :type xml_station: TYPE 

190 :param xml_channel: DESCRIPTION 

191 :type xml_channel: TYPE 

192 :return: DESCRIPTION 

193 :rtype: TYPE 

194 

195 """ 

196 

197 for mt_channel in mt_run.channels: 

198 xml_channel = self.channel_translator.mt_to_xml(mt_channel, filters_dict) 

199 existing_channels = xml_station.select(channel=xml_channel.code).channels 

200 

201 if existing_channels: 

202 find = False 

203 start_list = [c.start_date for c in existing_channels] 

204 existing_channel = existing_channels[start_list.index(max(start_list))] 

205 # should only compare the last channel 

206 # for existing_channel in existing_channels: 

207 run_list = [c.value for c in existing_channel.comments] 

208 # Compare channel metadata if matches just add run.id if its 

209 # not already there. 

210 self.logger.debug( 

211 f"Comparing {xml_channel.code} to {existing_channel.code}" 

212 ) 

213 if self.compare_xml_channel(xml_channel, existing_channel): 

214 find = True 

215 self.logger.debug( 

216 f"Matched {xml_channel.code}={existing_channel.code}" 

217 ) 

218 if not mt_run.id in run_list: 

219 self.logger.debug(f"adding run id {mt_run.id} to {run_list}") 

220 existing_channel.comments.append( 

221 inventory.Comment(mt_run.id, subject="mt.run.id") 

222 ) 

223 if xml_channel.start_date < existing_channel.start_date: 

224 self.logger.debug("Changed starting time") 

225 existing_channel.start_date = xml_channel.start_date 

226 if xml_channel.end_date > existing_channel.end_date: 

227 self.logger.debug("Changed ending time") 

228 existing_channel.end_date = xml_channel.end_date 

229 # continue 

230 if not find: 

231 self.logger.debug( 

232 f"xxx Unmatched {xml_channel.code}!={existing_channel.code}" 

233 ) 

234 run_list = [c.value for c in xml_channel.comments] 

235 if not mt_run.id in run_list: 

236 self.logger.debug(f"adding run id {mt_run.id} to {run_list}") 

237 xml_channel.comments.append( 

238 inventory.Comment(mt_run.id, subject="mt.run.id") 

239 ) 

240 xml_station.channels.append(xml_channel) 

241 else: 

242 self.logger.debug(f"no existing channels for {xml_channel.code}") 

243 run_list = [c.value for c in xml_channel.comments] 

244 if not mt_run.id in run_list: 

245 self.logger.debug(f"adding run id {mt_run.id} to {run_list}") 

246 xml_channel.comments.append( 

247 inventory.Comment(mt_run.id, subject="mt.run.id") 

248 ) 

249 xml_station.channels.append(xml_channel) 

250 return xml_station 

251 

252 def compare_xml_channel(self, xml_channel_01, xml_channel_02): 

253 """ 

254 Compare xml channels to see if a new epoch needs to be made or not. 

255 

256 :param xml_channel_01: DESCRIPTION 

257 :type xml_channel_01: TYPE 

258 :param xml_channel_02: DESCRIPTION 

259 :type xml_channel_02: TYPE 

260 :return: DESCRIPTION 

261 :rtype: TYPE 

262 

263 """ 

264 

265 if xml_channel_01.code != xml_channel_02.code: 

266 self.logger.debug(f"{xml_channel_01.code} != {xml_channel_02.code}") 

267 return False 

268 if xml_channel_01.sample_rate != xml_channel_02.sample_rate: 

269 self.logger.debug( 

270 f"{xml_channel_01.sample_rate} != {xml_channel_02.sample_rate}" 

271 ) 

272 return False 

273 if xml_channel_01.sensor != xml_channel_02.sensor: 

274 self.logger.debug(f"{xml_channel_01.sensor} != {xml_channel_02.sensor}") 

275 return False 

276 if round(xml_channel_01.latitude, 3) != round(xml_channel_02.latitude, 3): 

277 self.logger.debug( 

278 f"{round(xml_channel_01.latitude, 3)} != {round(xml_channel_02.latitude, 3)}" 

279 ) 

280 return False 

281 if round(xml_channel_01.longitude, 3) != round(xml_channel_02.longitude, 3): 

282 self.logger.debug( 

283 f"{round(xml_channel_01.longitude, 3)} != {round(xml_channel_02.longitude, 3)}" 

284 ) 

285 return False 

286 if round(xml_channel_01.azimuth, 2) != round(xml_channel_02.azimuth, 2): 

287 self.logger.debug( 

288 f"{round(xml_channel_01.azimuth, 2)} != {round(xml_channel_02.azimuth, 2)}" 

289 ) 

290 return False 

291 if round(xml_channel_01.dip, 2) != round(xml_channel_02.dip, 2): 

292 self.logger.debug( 

293 f"{round(xml_channel_01.dip, 2)} != {round(xml_channel_02.dip, 2)}" 

294 ) 

295 return False 

296 return True