Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ timeseries \ stationxml \ xml_channel_mt_channel.py: 79%

307 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Fri Feb 19 16:14:41 2021 

4 

5:copyright: 

6 Jared Peacock (jpeacock@usgs.gov) 

7 

8:license: MIT 

9 

10""" 

11 

12 

13# ============================================================================= 

14# Imports 

15# ============================================================================= 

16import copy 

17from collections import OrderedDict 

18 

19from loguru import logger 

20 

21from mt_metadata.base.helpers import requires 

22from mt_metadata.common.units import get_unit_object 

23from mt_metadata.timeseries import AppliedFilter, Auxiliary, Electric, Magnetic 

24from mt_metadata.timeseries.filters.obspy_stages import create_filter_from_stage 

25from mt_metadata.timeseries.stationxml.fdsn_tools import ( 

26 create_mt_component, 

27 make_channel_code, 

28 read_channel_code, 

29 release_dict, 

30) 

31from mt_metadata.timeseries.stationxml.utils import BaseTranslator 

32 

33 

34try: 

35 from obspy import UTCDateTime 

36 from obspy.core import inventory 

37except ImportError as error: 

38 inventory = None 

39 UTCDateTime = None 

40 

41# ============================================================================= 

42 

43 

44@requires(obspy=inventory) 

45class XMLChannelMTChannel(BaseTranslator): 

46 """ 

47 translate back and forth between StationXML Channel and MT Channel 

48 """ 

49 

50 understood_sensor_types = [ 

51 "logger", 

52 "magnetometer", 

53 "induction coil", 

54 "coil", 

55 "dipole", 

56 "electrode", 

57 ] 

58 

59 def __init__(self): 

60 super().__init__() 

61 

62 self.xml_translator.update( 

63 { 

64 "azimuth": "measurement_azimuth", 

65 "calibration_units": None, 

66 "clock_drift": None, 

67 "comments": None, 

68 "description": None, 

69 "dip": "measurement_tilt", 

70 "end_date": "time_period.end", 

71 "equipments": None, 

72 "pre_amplifier": None, 

73 "response": None, 

74 "sample_rate": "sample_rate", 

75 "sensor": None, 

76 "start_date": "time_period.start", 

77 "types": "special", 

78 "water_level": None, 

79 "alternate_code": "component", 

80 "latitude": None, 

81 "longitude": None, 

82 "elevation": None, 

83 } 

84 ) 

85 

86 # StationXML to MT Survey 

87 self.mt_translator = self.flip_dict(self.xml_translator) 

88 

89 self.mt_comments_list = ["run.id"] 

90 self.run_list = None 

91 

92 def xml_to_mt(self, xml_channel, existing_filters={}): 

93 """ 

94 Translate :class:`obspy.core.inventory.Channel` to 

95 :class:`mt_metadata.timeseries.Channel` 

96 

97 :param xml_channel: Obspy Channel object 

98 :type xml_channel: :class:`obspy.core.inventory.Channel` 

99 :returns: MT Channel 

100 :rtype: :class:`mt_metadata.timeseries.Channel` 

101 

102 """ 

103 

104 if not isinstance(xml_channel, inventory.Channel): 

105 msg = f"Input must be obspy.core.inventory.Channel object not {type(xml_channel)}" 

106 logger.error(msg) 

107 raise TypeError(msg) 

108 

109 ch_dict = read_channel_code(xml_channel.code) 

110 if ch_dict["measurement"] in ["electric"]: 

111 mt_channel = Electric() 

112 elif ch_dict["measurement"] in ["magnetic"]: 

113 mt_channel = Magnetic() 

114 else: 

115 mt_channel = Auxiliary(type=ch_dict["measurement"]) 

116 

117 # Always set component from XML channel code, overriding any defaults 

118 mt_channel.component = create_mt_component(xml_channel.code) 

119 

120 mt_channel = self._get_mt_position(xml_channel, mt_channel) 

121 mt_channel = self._parse_xml_comments(xml_channel.comments, mt_channel) 

122 mt_channel = self._sensor_to_mt(xml_channel.sensor, mt_channel) 

123 mt_channel = self._get_mt_units(xml_channel, mt_channel) 

124 mt_filters = self._xml_response_to_mt(xml_channel, existing_filters) 

125 

126 for xml_key, mt_key in self.xml_translator.items(): 

127 if mt_key: 

128 value = getattr(xml_channel, xml_key) 

129 if value: 

130 mt_channel.update_attribute(mt_key, value) 

131 

132 # fill channel filters 

133 for filter_name, mt_filter in mt_filters.items(): 

134 mt_channel.add_filter( 

135 AppliedFilter( 

136 name=filter_name, 

137 applied=True, 

138 comments=mt_filter.comments, 

139 stage=mt_filter.sequence_number, 

140 ) 

141 ) 

142 

143 if UTCDateTime(mt_channel.time_period.end.time_stamp) < UTCDateTime( 

144 mt_channel.time_period.start.time_stamp 

145 ): 

146 mt_channel.time_period.end = "2200-01-01T00:00:00+00:00" 

147 return mt_channel, mt_filters 

148 

149 def mt_to_xml(self, mt_channel, filters_dict, hard_code=True): 

150 """ 

151 Translate :class:`mt_metadata.timeseries.Channel` to 

152 :class:`obspy.core.inventory.Channel` 

153 

154 

155 :param xml_channel: MT Channel object 

156 :type xml_channel: :class:`mt_metadata.timeseries.Channel` 

157 :returns: MT Channel 

158 :rtype: :class:`obspy.core.inventory.Channel` 

159 

160 """ 

161 

162 if not isinstance( 

163 mt_channel, 

164 (Electric, Magnetic, Auxiliary), 

165 ): 

166 msg = f"Input must be mt_metadata.timeseries.Channel object not {type(mt_channel)}" 

167 logger.error(msg) 

168 raise TypeError(msg) 

169 

170 # location_code = get_location_code(mt_channel) 

171 if not hard_code: 

172 alignement = "horizontal" 

173 if "z" in mt_channel.component.lower(): 

174 alignement = "vertical" 

175 

176 channel_code = make_channel_code( 

177 mt_channel.sample_rate, 

178 mt_channel.type, 

179 mt_channel.measurement_azimuth, 

180 orientation=alignement, 

181 ) 

182 # this assumes the last character of the component is the orientation 

183 # direction 

184 elif hard_code: 

185 channel_code = make_channel_code( 

186 mt_channel.sample_rate, 

187 mt_channel.type, 

188 mt_channel.component[-1].lower(), 

189 ) 

190 

191 is_electric = mt_channel.type in ["electric"] 

192 if is_electric: 

193 xml_channel = inventory.Channel( 

194 channel_code, 

195 "", 

196 mt_channel.positive.latitude, 

197 mt_channel.positive.longitude, 

198 mt_channel.positive.elevation, 

199 0, 

200 ) 

201 else: 

202 xml_channel = inventory.Channel( 

203 channel_code, 

204 "", 

205 mt_channel.location.latitude, 

206 mt_channel.location.longitude, 

207 mt_channel.location.elevation, 

208 0, 

209 ) 

210 

211 xml_channel.types = ["geophysical".upper()] 

212 xml_channel.sensor = self._mt_to_sensor(mt_channel) 

213 xml_channel.comments = self._make_xml_comments(mt_channel.comments) 

214 xml_channel.restricted_status = release_dict[xml_channel.restricted_status] 

215 xml_channel = self._mt_to_xml_response(mt_channel, filters_dict, xml_channel) 

216 xml_channel.restricted_status = release_dict[xml_channel.restricted_status] 

217 xml_channel = self._mt_to_xml_response(mt_channel, filters_dict, xml_channel) 

218 

219 for mt_key, xml_key in self.mt_translator.items(): 

220 if xml_key is None: 

221 msg = f"Cannot currently map {mt_key} to inventory.station.{xml_key}" 

222 logger.debug(msg) 

223 continue 

224 

225 # obspy only allows angles (0, 360) 

226 if xml_key in ["azimuth"]: 

227 xml_channel.azimuth = mt_channel.measurement_azimuth % 360 

228 

229 elif xml_key in ["dip"]: 

230 xml_channel.dip = mt_channel.measurement_tilt % 360 

231 

232 elif "time_period" in mt_key: 

233 value = mt_channel.get_attr_from_name(mt_key).time_stamp 

234 setattr(xml_channel, xml_key, value) 

235 

236 else: 

237 setattr(xml_channel, xml_key, mt_channel.get_attr_from_name(mt_key)) 

238 

239 return xml_channel 

240 

241 def _sensor_to_mt(self, sensor, mt_channel): 

242 """ 

243 Fill an MT channel with sensor information. It is slightly different 

244 depending on electric or magnetic. 

245 

246 :param sensor: DESCRIPTION 

247 :type sensor: TYPE 

248 :param mt_channel: DESCRIPTION 

249 :type mt_channel: TYPE 

250 :return: DESCRIPTION 

251 :rtype: TYPE 

252 

253 """ 

254 sensor.type = self._deduce_sensor_type(sensor) 

255 

256 if not sensor.type: 

257 return mt_channel 

258 

259 if sensor.type.lower() in ["magnetometer", "induction coil", "coil"]: 

260 if not isinstance(mt_channel, Magnetic): 

261 msg = ( 

262 f"Cannot put sensor of type {sensor.type} into an " 

263 f"MT Channel of {type(mt_channel)}." 

264 ) 

265 logger.error(msg) 

266 raise ValueError(msg) 

267 mt_channel.sensor.id = sensor.serial_number 

268 mt_channel.sensor.manufacturer = sensor.manufacturer 

269 mt_channel.sensor.model = f"{sensor.model} {sensor.description}" 

270 mt_channel.sensor.type = sensor.type 

271 mt_channel.sensor.name = sensor.description 

272 

273 return mt_channel 

274 

275 elif sensor.type.lower() in ["dipole", "electrode"]: 

276 if not isinstance(mt_channel, Electric): 

277 msg = ( 

278 f"Cannot put sensor of type {sensor.type} into an " 

279 f"MT Channel of {type(mt_channel)}." 

280 ) 

281 logger.error(msg) 

282 raise ValueError(msg) 

283 if sensor.serial_number: 

284 pid, nid = self._parse_electrode_ids(sensor.serial_number) 

285 mt_channel.positive.id = pid 

286 mt_channel.negative.id = nid 

287 

288 mt_channel.positive.manufacturer = sensor.manufacturer 

289 mt_channel.positive.model = sensor.model 

290 mt_channel.positive.type = "electrode" 

291 

292 mt_channel.negative.manufacturer = sensor.manufacturer 

293 mt_channel.negative.model = sensor.model 

294 mt_channel.negative.type = "electrode" 

295 

296 mt_channel.dipole_length = self._parse_dipole_length(sensor.description) 

297 

298 return mt_channel 

299 

300 else: 

301 if not isinstance(mt_channel, Auxiliary): 

302 msg = ( 

303 f"Cannot put sensor of type {sensor.type} into an " 

304 f"MT Channel of {type(mt_channel)}." 

305 ) 

306 logger.error(msg) 

307 raise ValueError(msg) 

308 

309 mt_channel.sensor.type = sensor.type 

310 mt_channel.sensor.manufacturer = sensor.manufacturer 

311 mt_channel.sensor.model = f"{sensor.model} {sensor.description}" 

312 mt_channel.sensor.id = sensor.serial_number 

313 

314 return mt_channel 

315 

316 def _mt_to_sensor(self, mt_channel): 

317 """ 

318 Create an xml sensor from an MT channel 

319 """ 

320 s = inventory.Equipment() 

321 if mt_channel.type in ["electric"]: 

322 s.type = "dipole" 

323 s.description = f"{mt_channel.dipole_length} meters" 

324 if mt_channel.positive.manufacturer: 

325 s.manufacturer = mt_channel.positive.manufacturer 

326 elif mt_channel.positive.manufacturer: 

327 s.manufacturer = mt_channel.negative.manufacturer 

328 

329 if mt_channel.positive.model: 

330 s.model = mt_channel.positive.model 

331 elif mt_channel.positive.model: 

332 s.model = mt_channel.negative.model 

333 

334 s.serial_number = ( 

335 f"positive: {mt_channel.positive.id}, " 

336 f"negative: {mt_channel.negative.id}" 

337 ) 

338 

339 elif mt_channel.type in ["magnetic"]: 

340 s.type = mt_channel.sensor.type 

341 if mt_channel.sensor.model: 

342 s.model = mt_channel.sensor.model.split()[0] 

343 try: 

344 s.description = mt_channel.sensor.model.split()[1] 

345 except IndexError: 

346 pass 

347 s.serial_number = mt_channel.sensor.id 

348 s.manufacturer = mt_channel.sensor.manufacturer 

349 s.description = mt_channel.sensor.name 

350 

351 else: 

352 s.type = mt_channel.sensor.type 

353 s.model = mt_channel.sensor.model 

354 s.serial_number = mt_channel.sensor.id 

355 s.manufacturer = mt_channel.sensor.manufacturer 

356 s.description = mt_channel.sensor.name 

357 

358 return s 

359 

360 def _parse_electrode_ids(self, serial_numbers): 

361 """ 

362 parse electrode ids from a string formated 'positive: pid, negative: nid' 

363 """ 

364 

365 if ":" in serial_numbers and "," in serial_numbers: 

366 serial_list = serial_numbers.split(",") 

367 if len(serial_list) != 2: 

368 msg = ( 

369 f"Cannot parse electrode ids from {serial_numbers}. Must " 

370 "have format 'positive: pid, negative: nid'" 

371 ) 

372 logger.error(msg) 

373 raise ValueError(msg) 

374 

375 pid, nid = [ss.split(":")[1].strip() for ss in serial_list] 

376 return pid, nid 

377 

378 elif ":" not in serial_numbers and "," in serial_numbers: 

379 serial_list = serial_numbers.split(",") 

380 if len(serial_list) != 2: 

381 msg = ( 

382 f"Cannot parse electrode ids from {serial_numbers}. Must " 

383 "have format 'positive: pid, negative: nid'" 

384 ) 

385 logger.error(msg) 

386 raise ValueError(msg) 

387 

388 pid, nid = [ss.strip() for ss in serial_list] 

389 return pid, nid 

390 else: 

391 logger.warning( 

392 "Electrod IDs are not properly formatted assigning" 

393 f" {serial_numbers} to both positive and negative. " 

394 "Accepted format is 'positive: pid, negative: nid'" 

395 ) 

396 return serial_numbers, serial_numbers 

397 

398 def _parse_dipole_length(self, description): 

399 """ 

400 Parse the dipole length from the sensor description Assuming a format 

401 'lenth units' --> '100 meters' 

402 """ 

403 

404 dipole = description.split() 

405 try: 

406 return float(dipole[0]) 

407 

408 except ValueError as error: 

409 msg = f"Could not get dipole length from {description} got ValueError({error})" 

410 logger.warning(msg) 

411 return 0.0 

412 

413 def _parse_xml_comments(self, xml_comments, mt_channel): 

414 """ 

415 Read xml comments into an MT comment 

416 

417 :param xml_comments: DESCRIPTION 

418 :type xml_comments: TYPE 

419 :param mt_channel: DESCRIPTION 

420 :type mt_channel: TYPE 

421 :return: DESCRIPTION 

422 :rtype: TYPE 

423 

424 """ 

425 

426 runs = [] 

427 for comment in xml_comments: 

428 k, v = self.read_xml_comment(comment) 

429 if k == "mt.run.id": 

430 runs.append(v) 

431 else: 

432 if mt_channel.comments.value: 

433 mt_channel.comments.value += f", {k}: {v}" 

434 else: 

435 mt_channel.comments.value = f", {k}: {v}" 

436 if mt_channel.comments.value: 

437 mt_channel.comments.value += f", run_ids: [{','.join(runs)}]" 

438 else: 

439 mt_channel.comments.value = f"run_ids: [{','.join(runs)}]" 

440 

441 self.run_list = runs 

442 

443 return mt_channel 

444 

445 def _make_xml_comments(self, mt_comment): 

446 """ 

447 make xml comments from an mt comment, namely run ids. 

448 

449 :param mt_comment: DESCRIPTION 

450 :type mt_comment: TYPE 

451 :return: DESCRIPTION 

452 :rtype: TYPE 

453 

454 """ 

455 

456 comments = [] 

457 if mt_comment.value is None: 

458 return comments 

459 clist = mt_comment.value.split("run_ids:") 

460 for item in clist: 

461 if ":" in item: 

462 k, v = item.split(":") 

463 comments.append(inventory.Comment(v, subject=k)) 

464 elif "[" in item and "]" in item: 

465 for run in item.replace("[", "").replace("]", "").split(","): 

466 run = run.strip() 

467 if run: 

468 comments.append( 

469 inventory.Comment(run.strip(), subject="mt.run.id") 

470 ) 

471 return comments 

472 

473 def _get_mt_position(self, xml_channel, mt_channel): 

474 """ 

475 Get the correct locations given the channel type 

476 

477 :param xml_channel: DESCRIPTION 

478 :type xml_channel: TYPE 

479 :param mt_channel: DESCRIPTION 

480 :type mt_channel: TYPE 

481 :return: DESCRIPTION 

482 :rtype: TYPE 

483 

484 """ 

485 

486 if mt_channel.type in ["electric"]: 

487 for direction in ["positive", "negative"]: 

488 for pos in ["latitude", "longitude", "elevation"]: 

489 key = f"{direction}.{pos}" 

490 value = getattr(xml_channel, pos) 

491 mt_channel.update_attribute(key, value) 

492 else: 

493 for pos in ["latitude", "longitude", "elevation"]: 

494 key = f"location.{pos}" 

495 value = getattr(xml_channel, pos) 

496 mt_channel.update_attribute(key, value) 

497 

498 return mt_channel 

499 

500 def _get_mt_units(self, xml_channel, mt_channel): 

501 """ """ 

502 if len(xml_channel.response.response_stages) == 0: 

503 return mt_channel 

504 name = xml_channel.response.response_stages[-1].output_units 

505 description = xml_channel.response.response_stages[-1].output_units_description 

506 if description and name: 

507 if len(description) > len(name): 

508 mt_channel.units = description 

509 else: 

510 mt_channel.units = name 

511 elif description: 

512 mt_channel.units = description 

513 elif name: 

514 mt_channel.units = name 

515 else: 

516 logger.debug("Did not find any units descriptions in XML") 

517 

518 return mt_channel 

519 

520 def _xml_response_to_mt(self, xml_channel, existing_filters={}): 

521 """ 

522 parse the filters from obspy into mt filters 

523 """ 

524 ch_filter_dict = OrderedDict() 

525 for i_stage, stage in enumerate(xml_channel.response.response_stages): 

526 new_and_unnamed = False 

527 mt_filter = create_filter_from_stage(stage) 

528 if not mt_filter.name: 

529 filter_name, new_and_unnamed = self._add_filter_number( 

530 existing_filters, mt_filter 

531 ) 

532 mt_filter.name = filter_name 

533 

534 if mt_filter.decimation_active: 

535 # keep filter names unique if same one used more than once 

536 mt_filter.name += f"_{mt_filter.decimation_input_sample_rate}" 

537 

538 if new_and_unnamed: 

539 logger.info(f"Found an unnamed filter, named it: '{mt_filter.name}'") 

540 existing_filters[filter_name] = mt_filter 

541 

542 ch_filter_dict[mt_filter.name.replace("/", " per ").lower()] = mt_filter 

543 

544 return ch_filter_dict 

545 

546 def _add_filter_number(self, existing_filters, mt_filter): 

547 """ 

548 return the next number the number of filters 

549 

550 :param keys: DESCRIPTION 

551 :type keys: TYPE 

552 :return: DESCRIPTION 

553 :rtype: TYPE 

554 

555 """ 

556 

557 # check for existing filters 

558 for f_obj in existing_filters.values(): 

559 if f_obj.type == mt_filter.type: 

560 if round(abs(f_obj.complex_response([1])[0])) == round( 

561 abs(mt_filter.complex_response([1])[0]) 

562 ): 

563 return f_obj.name, False 

564 

565 try: 

566 last = sorted([k for k in existing_filters.keys() if mt_filter.type in k])[ 

567 -1 

568 ] 

569 except IndexError: 

570 return f"{mt_filter.type}_{0:02}", True 

571 try: 

572 return f"{mt_filter.type}_{int(last[-2:]) + 1:02}", True 

573 except ValueError: 

574 return f"{mt_filter.type}_{0:02}", True 

575 

576 def _mt_to_xml_response(self, mt_channel, filters_dict, xml_channel): 

577 """ 

578 Translate MT filters into Obspy Response 

579 

580 :param mt_channel: DESCRIPTION 

581 :type mt_channel: TYPE 

582 :param filters_dict: DESCRIPTION 

583 :type filters_dict: TYPE 

584 :param xml_channel: DESCRIPTION 

585 :type xml_channel: TYPE 

586 :return: DESCRIPTION 

587 :rtype: TYPE 

588 

589 """ 

590 

591 mt_channel_response = mt_channel.channel_response(filters_dict) 

592 xml_channel.response = mt_channel_response.to_obspy( 

593 sample_rate=mt_channel.sample_rate 

594 ) 

595 

596 unit_obj = get_unit_object(mt_channel_response.units_in) 

597 

598 xml_channel.calibration_units = unit_obj.symbol 

599 xml_channel.calibration_units_description = unit_obj.name 

600 

601 return xml_channel 

602 

603 def _deduce_sensor_type(self, sensor): 

604 """ 

605 

606 :param sensor: Information about a sensor, usually extractes from FDSN XML 

607 :type sensor: obspy.core.inventory.util.Equipment 

608 

609 :return: 

610 """ 

611 original_sensor_type = sensor.type 

612 original_sensor_description = sensor.description 

613 # set sensor_type to be a string if it is None 

614 if original_sensor_type is None: 

615 sensor_type = "" # make a string 

616 msg = f"Sensor {sensor} does not have field type attr" 

617 logger.debug(msg) 

618 else: 

619 sensor_type = copy.deepcopy(original_sensor_type) 

620 

621 if original_sensor_description is None: 

622 sensor_description = "" # make a string 

623 else: 

624 sensor_description = copy.deepcopy(original_sensor_description) 

625 

626 if sensor_type.lower() in self.understood_sensor_types: 

627 return sensor_type 

628 else: 

629 logger.warning( 

630 f" sensor {sensor} type {sensor.type} not in {self.understood_sensor_types}" 

631 ) 

632 logger.warning( 

633 f" sensor {sensor} type {sensor.type} not in {self.understood_sensor_types}" 

634 ) 

635 

636 # Try handling Bartington FGM at Earthscope ... this is a place holder for handling non-standard cases 

637 if sensor_type.lower() == "bartington": 

638 sensor_type = "magnetometer" 

639 if not sensor_type: 

640 if sensor_description == "Bartington 3-Axis Fluxgate Sensor": 

641 sensor_type = "magnetometer" 

642 if sensor_description: 

643 if ("bf-4" in sensor_description.lower()) & ( 

644 "schlumberger" in sensor_description.lower() 

645 ): # BSL-NCEDC 

646 sensor_type = "magnetometer" 

647 elif ("electric" in sensor_description.lower()) & ( 

648 "dipole" in sensor_description.lower() 

649 ): # BSL-NCEDC 

650 sensor_type = "dipole" 

651 

652 # reset sensor_type to None it it was not handled 

653 if not sensor_type: 

654 sensor_type = original_sensor_type 

655 logger.error("sensor type could not be resolved") 

656 

657 return sensor_type