Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ emtfxml \ emtfxml.py: 66%

838 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3EMTFXML 

4========== 

5 

6This is meant to follow Anna's XML schema for transfer functions 

7 

8Created on Sat Sep 4 17:59:53 2021 

9 

10@author: jpeacock 

11""" 

12# ============================================================================= 

13# Imports 

14# ============================================================================= 

15import inspect 

16from enum import Enum 

17from pathlib import Path 

18from xml.etree import cElementTree as et 

19 

20import numpy as np 

21from loguru import logger 

22 

23from mt_metadata import NULL_VALUES 

24from mt_metadata.base import helpers 

25from mt_metadata.common import Instrument 

26from mt_metadata.common.enumerations import DataTypeEnum 

27from mt_metadata.timeseries import Electric, Magnetic, Run, Survey 

28from mt_metadata.transfer_functions.io.emtfxml.metadata import helpers as emtf_helpers 

29from mt_metadata.transfer_functions.io.tools import get_nm_elev 

30from mt_metadata.transfer_functions.tf import Station 

31from mt_metadata.utils.validators import validate_attribute 

32 

33from . import metadata as emtf_xml 

34 

35 

36meta_classes = dict( 

37 [ 

38 (validate_attribute(k), v) 

39 for k, v in inspect.getmembers(emtf_xml, inspect.isclass) 

40 ] 

41) 

42meta_classes["instrument"] = Instrument 

43# ============================================================================= 

44# EMTFXML 

45# ============================================================================= 

46 

47estimates_dict = { 

48 "variance": emtf_xml.Estimate( 

49 name="VAR", 

50 type="real", 

51 description="Variance", 

52 external_url="http://www.iris.edu/dms/products/emtf/variance.html", 

53 intention="error estimate", 

54 tag="variance", 

55 ), 

56 "covariance": emtf_xml.Estimate( 

57 name="COV", 

58 type="complex", 

59 description="Covariance", 

60 external_url="http://www.iris.edu/dms/products/emtf/covariance.html", 

61 intention="error estimate", 

62 tag="covariance", 

63 ), 

64 "residual_covariance": emtf_xml.Estimate( 

65 name="RESIDCOV", 

66 type="complex", 

67 description="Residual Covariance (N)", 

68 external_url="http://www.iris.edu/dms/products/emtf/residual_covariance.html", 

69 intention="error estimate", 

70 tag="residual_covariance", 

71 ), 

72 "inverse_signal_power": emtf_xml.Estimate( 

73 name="INVSIGCOV", 

74 type="complex", 

75 description="Inverse Coherent Signal Power Matrix (S)", 

76 external_url="http://www.iris.edu/dms/products/emtf/inverse_signal_covariance.html", 

77 intention="signal power estimate", 

78 tag="inverse_signal_covariance", 

79 ), 

80 "coherence": emtf_xml.Estimate( 

81 name="COH", 

82 type="complex", 

83 description="Coherence", 

84 external_url="http://www.iris.edu/dms/products/emtf/coherence.html", 

85 intention="signal coherence", 

86 tag="coherence", 

87 ), 

88 "predicted_coherence": emtf_xml.Estimate( 

89 name="PREDCOH", 

90 type="complex", 

91 description="Multiple Coherence", 

92 external_url="http://www.iris.edu/dms/products/emtf/multiple_coherence.html", 

93 intention="signal coherence", 

94 tag="multiple_coherence", 

95 ), 

96 "signal_amplidude": emtf_xml.Estimate( 

97 name="SIGAMP", 

98 type="complex", 

99 description="Signal Amplitude", 

100 external_url="http://www.iris.edu/dms/products/emtf/signal_amplitude.html", 

101 intention="signal power estimate", 

102 tag="signal_power", 

103 ), 

104 "signal_noise": emtf_xml.Estimate( 

105 name="SIGNOISE", 

106 type="complex", 

107 description="Signal Noise", 

108 external_url="http://www.iris.edu/dms/products/emtf/signal_noise.html", 

109 intention="error estimate", 

110 tag="signal_noise", 

111 ), 

112} 

113 

114data_types_dict = { 

115 "impedance": emtf_xml.DataType( 

116 name="Z", 

117 type="complex", 

118 output="E", 

119 input="H", 

120 units="milliVolt per kilometer per nanoTesla", 

121 description="MT impedance", 

122 external_url="http://www.iris.edu/dms/products/emtf/impedance.html", 

123 intention="primary data type", 

124 tag="impedance", 

125 ), 

126 "tipper": emtf_xml.DataType( 

127 name="T", 

128 type="complex", 

129 output="H", 

130 input="H", 

131 units="", 

132 description="Vertical Field Transfer Functions (Tipper)", 

133 external_url="http://www.iris.edu/dms/products/emtf/tipper.html", 

134 intention="primary data type", 

135 tag="tipper", 

136 ), 

137} 

138 

139 

140class EMTFXML: 

141 """ 

142 This is meant to follow Anna's XML schema for transfer functions 

143 

144 [Kelbert2019](https://doi.org/10.1190/geo2018-0679.1). 

145 

146 making this a MetadataBase object is complicated because of station 

147 and survey metadata, so we are going to leave this as just an object. 

148 """ 

149 

150 def __init__(self, fn=None, **kwargs): 

151 self._root_dict = None 

152 self.emtf = emtf_xml.EMTF() # type: ignore 

153 self.external_url = emtf_xml.ExternalUrl() # type: ignore 

154 self.primary_data = emtf_xml.PrimaryData() # type: ignore 

155 self.attachment = emtf_xml.Attachment() # type: ignore 

156 self.provenance = emtf_xml.Provenance() # type: ignore 

157 self.copyright = emtf_xml.Copyright() # type: ignore 

158 self.site = emtf_xml.Site() # type: ignore 

159 

160 # not sure why we need to do this, but if you don't FieldNotes end 

161 # as a string. 

162 self.field_notes = emtf_xml.FieldNotes() # type: ignore 

163 self.processing_info = emtf_xml.ProcessingInfo() # type: ignore 

164 self.statistical_estimates = emtf_xml.StatisticalEstimates() # type: ignore 

165 self.data_types = emtf_xml.DataTypes() # type: ignore 

166 self.site_layout = emtf_xml.SiteLayout() # type: ignore 

167 self.data = emtf_xml.TransferFunction() # type: ignore 

168 self.period_range = emtf_xml.PeriodRange() # type: ignore 

169 

170 self.fn = fn 

171 

172 self.element_keys = [ 

173 "description", 

174 "product_id", 

175 "sub_type", 

176 "notes", 

177 "tags", 

178 "external_url", 

179 "primary_data", 

180 "attachment", 

181 "provenance", 

182 "copyright", 

183 "site", 

184 "field_notes", 

185 "processing_info", 

186 "statistical_estimates", 

187 "data_types", 

188 "site_layout", 

189 "data", 

190 "period_range", 

191 ] 

192 

193 for key, value in kwargs.items(): 

194 setattr(self, key, value) 

195 

196 if self.fn != None: 

197 self.read() 

198 

199 def __str__(self): 

200 lines = [f"Station: {self.station_metadata.id}", "-" * 50] 

201 lines.append(f"\tSurvey: {self.survey_metadata.id}") 

202 lines.append(f"\tProject: {self.survey_metadata.project}") 

203 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}") 

204 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}") 

205 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}") 

206 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}") 

207 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}") 

208 lines.append("\tDeclination: ") 

209 lines.append( 

210 f"\t\tValue: {self.station_metadata.location.declination.value}" 

211 ) 

212 lines.append( 

213 f"\t\tModel: {self.station_metadata.location.declination.model}" 

214 ) 

215 

216 if self.data.z is not None: 

217 lines.append("\tImpedance: True") 

218 else: 

219 lines.append("\tImpedance: False") 

220 

221 if self.data.t is not None: 

222 lines.append("\ttipper: True") 

223 else: 

224 lines.append("\tTipper: False") 

225 

226 if self.data.period is not None: 

227 lines.append(f"\tN Periods: {len(self.data.period)}") 

228 

229 lines.append("\tPeriod Range:") 

230 lines.append(f"\t\tMin: {self.data.period.min():.5E} s") 

231 lines.append(f"\t\tMax: {self.data.period.max():.5E} s") 

232 

233 lines.append("\tFrequency Range:") 

234 lines.append(f"\t\tMin: {1./self.data.period.max():.5E} Hz") 

235 lines.append(f"\t\tMax: {1./self.data.period.min():.5E} Hz") 

236 

237 return "\n".join(lines) 

238 

239 def __repr__(self): 

240 lines = [] 

241 lines.append(f"station='{self.station_metadata.id}'") 

242 lines.append(f"latitude={self.station_metadata.location.latitude:.2f}") 

243 lines.append(f"longitude={self.station_metadata.location.longitude:.2f}") 

244 lines.append(f"elevation={self.station_metadata.location.elevation:.2f}") 

245 

246 return f"EMTFXML({(', ').join(lines)})" 

247 

248 @property 

249 def fn(self): 

250 return self._fn 

251 

252 @fn.setter 

253 def fn(self, value): 

254 if value is not None: 

255 self._fn = Path(value) 

256 else: 

257 self._fn = None 

258 

259 @property 

260 def save_dir(self): 

261 if self.fn is not None: 

262 return self.fn.parent 

263 return None 

264 

265 @property 

266 def description(self) -> str: 

267 return self.emtf.description 

268 

269 @description.setter 

270 def description(self, value: str): 

271 self.emtf.description = value 

272 

273 @property 

274 def product_id(self) -> str: 

275 return self.emtf.product_id 

276 

277 @product_id.setter 

278 def product_id(self, value: str): 

279 self.emtf.product_id = value 

280 

281 @property 

282 def tags(self) -> str: 

283 return self.emtf.tags 

284 

285 @tags.setter 

286 def tags(self, value: str): 

287 self.emtf.tags = value 

288 

289 @property 

290 def sub_type(self) -> str: 

291 return self.emtf.sub_type 

292 

293 @sub_type.setter 

294 def sub_type(self, value: str): 

295 self.emtf.sub_type = value 

296 

297 @property 

298 def notes(self) -> str: 

299 return self.emtf.notes 

300 

301 @notes.setter 

302 def notes(self, value: str): 

303 self.emtf.notes = value 

304 

305 def read(self, fn: str | Path = None, get_elevation: bool = False) -> None: 

306 """ 

307 Read xml file 

308 

309 :param fn: XML file path to read, if None, use self.fn 

310 :type fn: str | Path 

311 :return: None 

312 :rtype: None 

313 

314 """ 

315 if fn is not None: 

316 self.fn = fn 

317 if self.fn is not None: 

318 if not self.fn.exists(): 

319 raise IOError(f"Cannot find: {fn}") 

320 else: 

321 raise IOError("Input file name is None, that is bad.") 

322 

323 with open(file=self.fn, mode="r", encoding="utf-8") as xml_fid: 

324 xml_string = xml_fid.read() 

325 xml_string = xml_string.replace("&", "and") 

326 root = et.fromstring( 

327 xml_string, 

328 et.XMLParser(encoding="utf-8"), 

329 ) 

330 

331 root_dict = helpers.element_to_dict(root) 

332 root_dict = root_dict[list(root_dict.keys())[0]] 

333 root_dict = emtf_helpers._convert_keys_to_lower_case(root_dict) 

334 self._root_dict = root_dict 

335 

336 for element in self.element_keys: 

337 attr = getattr(self, element) 

338 if hasattr(attr, "read_dict"): 

339 attr.read_dict(root_dict) 

340 else: 

341 emtf_helpers._read_single(self, root_dict, element) 

342 

343 self.period_range.min = self.data.period.min() 

344 self.period_range.max = self.data.period.max() 

345 

346 # apparently sometimes the run list will come out as None from an 

347 # empty emtfxml. 

348 if self.site.run_list is None: 

349 self.site.run_list = [] 

350 

351 self._get_statistical_estimates() 

352 self._get_data_types() 

353 self._update_site_layout() 

354 

355 if self.site.location.elevation == 0 and get_elevation: 

356 if self.site.location.latitude != 0 and self.site.location.longitude != 0: 

357 self.site.location.elevation = get_nm_elev( 

358 self.site.location.latitude, self.site.location.longitude 

359 ) 

360 

361 def write(self, fn: str | Path, skip_field_notes: bool = False) -> None: 

362 """ 

363 Write an xml 

364 :param fn: XML file path to write 

365 :type fn: str | Path 

366 :return: None 

367 :rtype: None 

368 :rtype: TYPE 

369 

370 """ 

371 

372 emtf_element = et.Element("EM_TF") 

373 

374 self._get_statistical_estimates() 

375 self._get_data_types() 

376 

377 for key in self.element_keys: 

378 if key == "external_url": 

379 if self.external_url.url in [None, "None", "", "none"]: 

380 continue 

381 if skip_field_notes: 

382 if key == "field_notes": 

383 continue 

384 value = getattr(self, key) 

385 if hasattr(value, "to_xml") and callable(getattr(value, "to_xml")): 

386 if key == "processing_info": 

387 if skip_field_notes: 

388 try: 

389 value.remote_info._order.remove("field_notes") 

390 except ValueError: 

391 logger.debug("No field notes to skip.") 

392 if value.remote_info.site.id in [ 

393 None, 

394 "", 

395 "None", 

396 "none", 

397 ]: 

398 try: 

399 value.remote_info._order.remove("site") 

400 except ValueError: 

401 logger.debug("No remote field notes to skip.") 

402 element = value.to_xml() 

403 if isinstance(element, list): 

404 for item in element: 

405 emtf_element.append(emtf_helpers._convert_tag_to_capwords(item)) 

406 else: 

407 emtf_element.append(emtf_helpers._convert_tag_to_capwords(element)) 

408 else: 

409 emtf_helpers._write_single(emtf_element, key, getattr(self, key)) 

410 

411 emtf_element = emtf_helpers._remove_null_values(emtf_element) 

412 

413 with open(fn, "w") as fid: 

414 fid.write(helpers.element_to_string(emtf_element)) 

415 

416 self.fn = fn 

417 

418 def _get_statistical_estimates(self): 

419 """ 

420 Get the appropriate statistical estimates in the file. 

421 

422 """ 

423 self.statistical_estimates.estimates_list = [] 

424 if self.data.z_var is not None: 

425 if not np.all(self.data.z_var == 0.0): 

426 self.statistical_estimates.estimates_list.append( 

427 estimates_dict["variance"] 

428 ) 

429 elif self.data.t_var is not None: 

430 if not np.all(self.data.t_var == 0.0): 

431 self.statistical_estimates.estimates_list.append( 

432 estimates_dict["variance"] 

433 ) 

434 

435 if self.data.z_invsigcov is not None: 

436 if not np.all(self.data.z_invsigcov == 0.0): 

437 self.statistical_estimates.estimates_list.append( 

438 estimates_dict["inverse_signal_power"] 

439 ) 

440 elif self.data.t_invsigcov is not None: 

441 if not np.all(self.data.t_invsigcov == 0.0): 

442 self.statistical_estimates.estimates_list.append( 

443 estimates_dict["inverse_signal_power"] 

444 ) 

445 

446 if self.data.z_residcov is not None: 

447 if not np.all(self.data.z_residcov == 0.0): 

448 self.statistical_estimates.estimates_list.append( 

449 estimates_dict["residual_covariance"] 

450 ) 

451 elif self.data.t_residcov is not None: 

452 if not np.all(self.data.t_residcov == 0.0): 

453 self.statistical_estimates.estimates_list.append( 

454 estimates_dict["residual_covariance"] 

455 ) 

456 

457 def _get_data_types(self): 

458 """ 

459 get the appropriate data types for the file 

460 

461 :return: DESCRIPTION 

462 :rtype: TYPE 

463 

464 """ 

465 self.data_types.data_types_list = [] 

466 if self.data.z is not None: 

467 if not np.all(self.data.z == 0.0): 

468 self.data_types.data_types_list.append(data_types_dict["impedance"]) 

469 

470 if self.data.t is not None: 

471 if not np.all(self.data.t == 0.0): 

472 self.data_types.data_types_list.append(data_types_dict["tipper"]) 

473 

474 def _update_site_layout(self): 

475 """ 

476 Need to update site layout from statistical estimates. 

477 

478 :return: DESCRIPTION 

479 :rtype: TYPE 

480 

481 """ 

482 input_channels = [] 

483 output_channels = [] 

484 if (self.data.z != 0).any(): 

485 input_channels += ["hx", "hy"] 

486 output_channels += ["ex", "ey"] 

487 

488 if (self.data.t != 0).any(): 

489 output_channels += ["hz"] 

490 

491 if input_channels == []: 

492 input_channels = ["hx", "hy"] 

493 

494 # Case-insensitive comparison for channel names 

495 current_input_names = [ 

496 name.lower() for name in self.site_layout.input_channel_names 

497 ] 

498 if list(sorted(input_channels)) != list(sorted(current_input_names)): 

499 new_input_channels = [] 

500 for ach in input_channels: 

501 find = False 

502 for ch in self.site_layout.input_channels: 

503 if ch.name.lower() == ach.lower(): 

504 new_input_channels.append(ch) 

505 find = True 

506 if not find: 

507 new_input_channels.append(ach) 

508 

509 self.site_layout.input_channels = new_input_channels 

510 

511 # Case-insensitive comparison for output channels 

512 current_output_names = [ 

513 name.lower() for name in self.site_layout.output_channel_names 

514 ] 

515 if list(sorted(output_channels)) != list(sorted(current_output_names)): 

516 new_output_channels = [] 

517 for ach in output_channels: 

518 find = False 

519 for ch in self.site_layout.output_channels: 

520 if ch.name.lower() == ach.lower(): 

521 new_output_channels.append(ch) 

522 find = True 

523 if not find: 

524 new_output_channels.append(ach) 

525 self.site_layout.output_channels = new_output_channels 

526 

527 def _parse_comments_data_logger(self, key: str, value: str) -> tuple[str, str]: 

528 """ 

529 parse comments for data logger information. 

530 

531 Parameters 

532 ---------- 

533 key : str 

534 key value from the comments dictionary 

535 value : str 

536 value from the comments dictionary 

537 

538 Returns 

539 ------- 

540 tuple[str, str] 

541 key, value 

542 """ 

543 

544 if "datalogger" in key: 

545 key = key.replace("datalogger", "instrument") 

546 key = key.split(".", 1)[1] 

547 

548 return key, value 

549 

550 def _parse_comments_data_quality( 

551 self, key: str, value: str 

552 ) -> tuple[str, str | float]: 

553 """ 

554 parse comments for data quality information. 

555 

556 Parameters 

557 ---------- 

558 key : str 

559 key value from the comments dictionary 

560 value : str 

561 value from the comments dictionary 

562 

563 Returns 

564 ------- 

565 tuple[str, str | float] 

566 key, value 

567 """ 

568 key = f"site.{key.split('.', 1)[1]}" 

569 key = key.replace("dataquality", "data_quality_notes") 

570 if "comments" in key: 

571 key = key.replace("comments", "comments.value") 

572 if "author" in key: 

573 key = key.replace("author", "comments.author") 

574 

575 if "rating" in key: 

576 value = float(value) 

577 

578 return key, value 

579 

580 def _parse_comments_electric(self, key: str, value: str) -> tuple[None, None]: 

581 """ 

582 parse comments for electric channel information. 

583 

584 Parameters 

585 ---------- 

586 key : str 

587 key value from the comments dictionary 

588 value : str 

589 value from the comments dictionary 

590 

591 Returns 

592 ------- 

593 tuple[None, None] 

594 None, None 

595 """ 

596 key = key.split(".", 1)[1] 

597 key = key.replace("electrode_", "") 

598 klist = key.split(".") 

599 if len(klist) > 1: 

600 comp = klist[0] 

601 fkey = klist[1] 

602 else: 

603 comp = "ex" 

604 fkey = klist[0] 

605 

606 if fkey in ["chtype", "manufacturer", "azm"]: 

607 e_dict = { 

608 "chtype": "name", 

609 "manufacturer": "manufacturer", 

610 "azm": "azimuth", 

611 } 

612 

613 dipole_names = [] 

614 for d in self.field_notes.run_list[0].dipole: 

615 if isinstance(d.name, str): 

616 dipole_names.append(d.name.lower()) 

617 else: 

618 dipole_names.append(d.name) 

619 if comp.lower() in dipole_names: 

620 index = dipole_names.index(comp) 

621 elif None in dipole_names: 

622 index = dipole_names.index(None) 

623 else: 

624 self.field_notes.run_list[0].magnetometer.append( 

625 emtf_xml.Dipole(name=comp) 

626 ) 

627 index = -1 

628 

629 setattr( 

630 self.field_notes.run_list[0].dipole[index], 

631 e_dict[fkey], 

632 value, 

633 ) 

634 elif fkey in ["x", "x2", "y", "y2", "z", "z2"]: 

635 if len(self.site_layout.output_channels) == 0: 

636 self.site_layout.output_channels.append(emtf_xml.Electric(name=comp)) 

637 ch_names = [c.name for c in self.site_layout.output_channels] 

638 if comp in ch_names: 

639 index = ch_names.index(comp) 

640 else: 

641 index = 0 

642 self.site_layout.output_channels[index].update_attribute(fkey, value) 

643 return None, None 

644 

645 def _parse_comments_magnetic(self, key: str, value: str) -> tuple[None, None]: 

646 """ 

647 parse comments for magnetic channel information. 

648 

649 Parameters 

650 ---------- 

651 key : str 

652 key value from the comments dictionary 

653 value : str 

654 value from the comments dictionary 

655 

656 Returns 

657 ------- 

658 tuple[None, None] 

659 None, None 

660 """ 

661 

662 key = key.split(".", 1)[1] 

663 key = key.replace("magnetometer_", "") 

664 klist = key.split(".") 

665 if len(klist) > 1: 

666 comp = klist[0] 

667 fkey = klist[1] 

668 else: 

669 comp = "hx" 

670 fkey = klist[0] 

671 

672 if fkey in ["chtype", "manufacturer", "azm", "type", "acqchan"]: 

673 m_dict = { 

674 "chtype": "name", 

675 "manufacturer": "manufacturer", 

676 "azm": "azimuth", 

677 "type": "type", 

678 "acqchan": "id", 

679 } 

680 

681 mag_names = [] 

682 for d in self.field_notes.run_list[0].magnetometer: 

683 if isinstance(d.name, str): 

684 mag_names.append(d.name.lower()) 

685 else: 

686 mag_names.append(d.name) 

687 if comp.lower() in mag_names: 

688 index = mag_names.index(comp) 

689 

690 elif None in mag_names: 

691 index = mag_names.index(None) 

692 else: 

693 self.field_notes.run_list[0].magnetometer.append( 

694 emtf_xml.Magnetometer(name=comp) 

695 ) 

696 index = -1 

697 

698 setattr( 

699 self.field_notes.run_list[0].magnetometer[index], 

700 m_dict[fkey], 

701 value, 

702 ) 

703 elif fkey in ["x", "y", "z"]: 

704 if comp in ["hx", "hy"]: 

705 if len(self.site_layout.output_channels) == 0: 

706 self.site_layout.input_channels.append(emtf_xml.Magnetic(name=comp)) 

707 ch_names = [c.name for c in self.site_layout.output_channels] 

708 if comp in ch_names: 

709 index = ch_names.index(comp) 

710 else: 

711 index = 0 

712 self.site_layout.output_channels[index].update_attribute(fkey, value) 

713 elif comp in ["hz"]: 

714 if len(self.site_layout.output_channels) == 0: 

715 self.site_layout.output_channels.append( 

716 emtf_xml.Magnetic(name=comp) 

717 ) 

718 ch_names = [c.name for c in self.site_layout.output_channels] 

719 if comp in ch_names: 

720 index = ch_names.index(comp) 

721 else: 

722 index = 0 

723 

724 self.site_layout.output_channels[index].update_attribute(fkey, value) 

725 return None, None 

726 

727 def _parse_comments_processing( 

728 self, key: str, value: str 

729 ) -> tuple[str | None, str | None]: 

730 """ 

731 parse comments for processing information. 

732 

733 Parameters 

734 ---------- 

735 key : str 

736 key value from the comments dictionary 

737 value : str 

738 value from the comments dictionary 

739 

740 Returns 

741 ------- 

742 tuple[str | None, str | None] 

743 _description_ 

744 """ 

745 

746 key = key.replace("processing", "processing_info").replace( 

747 "software", "processing_software" 

748 ) 

749 if "author.name" in key: 

750 key = key.replace("author.name", "author") 

751 for item in [ 

752 "author.email", 

753 "author.organization", 

754 "author.organization_url", 

755 "processing_software.version", 

756 "author.organization", 

757 "author.organization_url", 

758 ]: 

759 if item in key: 

760 return None, None 

761 

762 return key, value 

763 

764 def _parse_comments(self, comments: str | None) -> None: 

765 """ 

766 Parse comments for processing information. 

767 

768 Parameters 

769 ---------- 

770 comments : str | None 

771 Comments to parse. 

772 """ 

773 if comments is None: 

774 return 

775 other = [] 

776 if comments.count("\n") > 0 and comments.count("=") > 0: 

777 comments = comments.replace("\n", ";").replace("=", ":") 

778 for comment in comments.split(";"): 

779 if comment.count(":") >= 1: 

780 key, value = [c.strip() for c in comment.split(":", 1)] 

781 if "fieldnotes" in key: 

782 if len(self.field_notes.run_list) == 0: 

783 self.field_notes.run_list.append(emtf_xml.Run()) 

784 

785 if "datalogger" in key: 

786 key, value = self._parse_comments_data_logger(key, value) 

787 try: 

788 self.field_notes.run_list[0].update_attribute(key, value) 

789 key = None 

790 value = None 

791 except: 

792 pass 

793 

794 elif "fieldnotes" in key and "dataquality" in key: 

795 key, value = self._parse_comments_data_quality(key, value) 

796 

797 elif "fieldnotes" in key and "electrode_" in key: 

798 key, value = self._parse_comments_electric(key, value) 

799 elif "fieldnotes" in key and "magnetometer_" in key: 

800 key, value = self._parse_comments_magnetic(key, value) 

801 elif "processing" in key: 

802 key, value = self._parse_comments_processing(key, value) 

803 

804 if key is not None and value is not None: 

805 if "." in key: 

806 obj, attr_key = key.split(".", 1) 

807 try: 

808 getattr(self, obj).update_attribute(attr_key, value) 

809 except: 

810 logger.warning(f"Cannot set attribute {key}.") 

811 elif key == "description": 

812 # Handle description as direct attribute on the EMTFXML object 

813 self.description = value 

814 else: 

815 # Handle other keys without dots 

816 other.append(f"{key}:{value}") 

817 else: 

818 other.append(comment) 

819 try: 

820 self.site.comments.value = "; ".join(other) 

821 except AttributeError: 

822 pass 

823 

824 @property 

825 def survey_metadata(self): 

826 survey_obj = Survey() 

827 survey_obj.acquired_by.author = self.site.acquired_by 

828 survey_obj.citation_dataset.authors = self.copyright.citation.authors 

829 survey_obj.citation_dataset.title = self.copyright.citation.title 

830 survey_obj.citation_dataset.year = self.copyright.citation.year 

831 survey_obj.citation_dataset.doi = self.copyright.citation.survey_d_o_i 

832 survey_obj.country = self.site.country 

833 survey_obj.geographic_name = self.site.survey 

834 if self.site.survey not in NULL_VALUES: 

835 survey_obj.id = self.site.survey 

836 survey_obj.project = self.site.project 

837 survey_obj.time_period.start_date = self.site.start 

838 survey_obj.time_period.end_date = self.site.end 

839 survey_obj.summary = self.description 

840 survey_obj.comments.value = "; ".join( 

841 [ 

842 f"{k}:{v}" 

843 for k, v in { 

844 "copyright.acknowledgement": self.copyright.acknowledgement, 

845 "copyright.conditions_of_use": self.copyright.conditions_of_use, 

846 "copyright.release_status": self.copyright.release_status, 

847 "copyright.selected_publications": self.copyright.selected_publications, 

848 "copyright.additional_info": self.copyright.additional_info, 

849 }.items() 

850 if v not in [None, ""] 

851 ] 

852 ) 

853 

854 survey_obj.add_station(self.station_metadata) 

855 

856 return survey_obj 

857 

858 @survey_metadata.setter 

859 def survey_metadata(self, sm: Survey) -> None: 

860 """ 

861 Set metadata and other values in metadata 

862 

863 :param sm: survey metadata object 

864 :type sm: Survey 

865 :return: None 

866 :rtype: None 

867 

868 """ 

869 self.description = sm.summary 

870 self.site.project = sm.project 

871 if sm.geographic_name is None: 

872 self.site.survey = sm.id 

873 else: 

874 self.site.survey = sm.geographic_name 

875 if sm.country is not None: 

876 self.site.country = ",".join(sm.country) 

877 self.copyright.citation.survey_d_o_i = sm.citation_dataset.doi 

878 

879 self.copyright.citation.authors = sm.citation_dataset.authors 

880 self.copyright.citation.title = sm.citation_dataset.title 

881 self.copyright.citation.year = sm.citation_dataset.year 

882 

883 self._parse_comments(sm.comments.value) 

884 

885 @property 

886 def station_metadata(self): 

887 s = Station() 

888 # if self._root_dict is not None: 

889 s.acquired_by.author = self.site.acquired_by 

890 s.channels_recorded = [d.name for d in self.site_layout.input_channels] + [ 

891 d.name for d in self.site_layout.output_channels 

892 ] 

893 s.data_type = self.sub_type.lower().split("_")[0] 

894 s.geographic_name = self.site.name 

895 s.id = self.site.id 

896 s.fdsn.id = self.product_id 

897 s.location.from_dict(self.site.location.to_dict()) 

898 s.orientation.angle_to_geographic_north = ( 

899 self.site.orientation.angle_to_geographic_north 

900 ) 

901 s.provenance.software.name = self.provenance.creating_application 

902 s.provenance.creation_time = self.provenance.create_time 

903 s.provenance.creator.author = self.provenance.creator.name 

904 s.provenance.creator.email = self.provenance.creator.email 

905 s.provenance.creator.organization = self.provenance.creator.organization 

906 s.provenance.creator.url = self.provenance.creator.url 

907 s.provenance.submitter.author = self.provenance.submitter.name 

908 s.provenance.submitter.email = self.provenance.submitter.email 

909 s.provenance.submitter.organization = self.provenance.submitter.organization 

910 s.provenance.submitter.url = self.provenance.submitter.url 

911 

912 s.provenance.archive.url = self.external_url.url 

913 s.provenance.archive.comments = self.external_url.description 

914 

915 s.time_period.start = self.site.start 

916 s.time_period.end = self.site.end 

917 

918 comments = {"description": self.description} 

919 for key in [ 

920 "primary_data.filename", 

921 "attachment.description", 

922 "attachment.filename", 

923 "site.data_quality_notes.comments.author", 

924 "site.data_quality_notes.comments.value", 

925 "site.data_quality_warnings.flag", 

926 "site.data_quality_warnings.comments.author", 

927 "site.data_quality_warnings.comments.value", 

928 ]: 

929 obj, attr_key = key.split(".", 1) 

930 comments[key] = getattr(self, obj).get_attr_from_name(attr_key) 

931 s.comments.value = "; ".join( 

932 [f"{k}:{v}" for k, v in comments.items() if v not in [None, ""]] 

933 ) 

934 

935 s.transfer_function.id = self.site.id 

936 s.transfer_function.sign_convention = self.processing_info.sign_convention 

937 s.transfer_function.processed_by.author = self.processing_info.processed_by 

938 s.transfer_function.software.author = ( 

939 self.processing_info.processing_software.author 

940 ) 

941 s.transfer_function.software.name = ( 

942 self.processing_info.processing_software.name 

943 ) 

944 s.transfer_function.software.last_updated = ( 

945 self.processing_info.processing_software.last_mod 

946 ) 

947 # need to use remote reference info if it has it. 

948 if self.processing_info.remote_info.site.id is not None: 

949 s.transfer_function.remote_references = [ 

950 self.processing_info.remote_info.site.id 

951 ] 

952 

953 elif self.processing_info.processing_tag is not None: 

954 if "_" in self.processing_info.processing_tag: 

955 remotes = [ 

956 rr 

957 for rr in self.processing_info.processing_tag.split("_") 

958 if self.site.id not in rr 

959 ] 

960 s.transfer_function.remote_references = remotes 

961 s.transfer_function.runs_processed = self.site.run_list 

962 s.transfer_function.processing_type = self.processing_info.remote_ref.type 

963 

964 if self.processing_info.remote_info.site.id is not None: 

965 for key in self.processing_info.remote_info.site.get_attribute_list(): 

966 value = self.processing_info.remote_info.site.get_attr_from_name(key) 

967 

968 if "location" in key: 

969 if value == 0.0: 

970 continue 

971 elif value not in NULL_VALUES: 

972 # need to add remote site information 

973 # Handle Enum types by using .value property 

974 str_value = value.value if isinstance(value, Enum) else value 

975 s.transfer_function.processing_parameters.append( 

976 f"remote_info.site.{key} = {str_value}" 

977 ) 

978 

979 # need to add remote site field notes information 

980 for rfn in self.processing_info.remote_info.field_notes._run_list: 

981 rr_dict = rfn.to_dict(single=True) 

982 for rr_key, rr_value in rr_dict.items(): 

983 if rr_value not in [ 

984 None, 

985 "1980", 

986 1980, 

987 "1980-01-01T00:00:00+00:00", 

988 [], 

989 "", 

990 ]: 

991 # Handle Enum types by using .value property 

992 str_value = ( 

993 rr_value.value if isinstance(rr_value, Enum) else rr_value 

994 ) 

995 s.transfer_function.processing_parameters.append( 

996 f"remote_info.field_notes.{rr_key} = {str_value}" 

997 ) 

998 for ii, dp in enumerate(rfn.dipole): 

999 dp_dict = dp.to_dict(single=True) 

1000 for dp_key, dp_value in dp_dict.items(): 

1001 if dp_value not in [None, ""]: 

1002 # Handle Enum types by using .value property 

1003 str_value = ( 

1004 dp_value.value 

1005 if isinstance(dp_value, Enum) 

1006 else dp_value 

1007 ) 

1008 s.transfer_function.processing_parameters.append( 

1009 f"remote_info.field_notes.dipole_{ii}.{dp_key} = {str_value}" 

1010 ) 

1011 for ii, mag in enumerate(rfn.magnetometer): 

1012 mag_dict = mag.to_dict(single=True) 

1013 for mag_key, mag_value in mag_dict.items(): 

1014 if mag_value not in [None, ""]: 

1015 # Handle Enum types by using .value property 

1016 str_value = ( 

1017 mag_value.value 

1018 if isinstance(mag_value, Enum) 

1019 else mag_value 

1020 ) 

1021 s.transfer_function.processing_parameters.append( 

1022 f"remote_info.field_notes.magnetometer_{ii}.{mag_key} = {str_value}" 

1023 ) 

1024 

1025 s.transfer_function.data_quality.good_from_period = ( 

1026 self.site.data_quality_notes.good_from_period 

1027 ) 

1028 s.transfer_function.data_quality.good_to_period = ( 

1029 self.site.data_quality_notes.good_to_period 

1030 ) 

1031 s.transfer_function.data_quality.rating.value = ( 

1032 self.site.data_quality_notes.rating 

1033 ) 

1034 

1035 for fn in self.field_notes._run_list: 

1036 if fn.sampling_rate in [0, None]: 

1037 continue 

1038 r = Run() 

1039 r.id = fn.run 

1040 r.data_logger.id = fn.instrument.id 

1041 r.data_logger.type = fn.instrument.name 

1042 r.data_logger.manufacturer = fn.instrument.manufacturer 

1043 r.sample_rate = fn.sampling_rate 

1044 r.time_period.start = fn.start 

1045 r.time_period.end = fn.end 

1046 comments = [] 

1047 if fn.comments.author not in [None, ""]: 

1048 comments.append(f"comments.author:{fn.comments.author}") 

1049 if fn.comments.value not in [None, ""]: 

1050 comments.append(f"comments.value:{fn.comments.value}") 

1051 if fn.errors not in [None, ""]: 

1052 comments.append(f"errors:{fn.errors}") 

1053 r.comments = "; ".join(comments) 

1054 

1055 # need to set azimuths from site layout with the x, y, z postions. 

1056 if len(fn.magnetometer) == 1: 

1057 for comp in ["hx", "hy", "hz"]: 

1058 c = Magnetic() 

1059 c.component = comp 

1060 c.sensor.id = fn.magnetometer[0].id 

1061 c.sensor.name = fn.magnetometer[0].name 

1062 c.sensor.manufacturer = fn.magnetometer[0].manufacturer 

1063 c.sensor.type = fn.magnetometer[0].type 

1064 c.time_period.start = fn.start 

1065 c.time_period.end = fn.end 

1066 r.add_channel(c) 

1067 

1068 else: 

1069 for mag in fn.magnetometer: 

1070 comp = mag.name 

1071 if comp is None: 

1072 continue 

1073 c = Magnetic() 

1074 c.component = comp.lower() 

1075 c.sensor.id = mag.id 

1076 c.sensor.name = mag.name 

1077 c.sensor.manufacturer = mag.manufacturer 

1078 c.sensor.type = mag.type 

1079 c.time_period.start = fn.start 

1080 c.time_period.end = fn.end 

1081 r.add_channel(c) 

1082 

1083 for dp in fn.dipole: 

1084 comp = dp.name 

1085 if comp is None: 

1086 continue 

1087 c = Electric() 

1088 c.component = comp.lower() 

1089 c.translated_azimuth = dp.azimuth 

1090 c.dipole_length = dp.length 

1091 for pot in dp.electrode: 

1092 if pot.location.lower() in ["n", "e"]: 

1093 c.positive.id = pot.number 

1094 c.positive.type = pot.value 

1095 c.positive.manufacturer = dp.manufacturer 

1096 c.positive.type = pot.comments.as_string() 

1097 

1098 elif pot.location.lower() in ["s", "w"]: 

1099 c.negative.id = pot.number 

1100 c.negative.type = pot.value 

1101 c.negative.manufacturer = dp.manufacturer 

1102 c.negative.type = pot.comments.as_string() 

1103 c.time_period.start = fn.start 

1104 c.time_period.end = fn.end 

1105 r.add_channel(c) 

1106 

1107 for ch in ( 

1108 self.site_layout.input_channels + self.site_layout.output_channels 

1109 ): 

1110 try: 

1111 c = r.get_channel(ch.name.lower()) 

1112 except AttributeError: 

1113 # if the channel does not exist, create it. 

1114 if ch.name.lower() in ["ex", "ey"]: # electric channels 

1115 c = Electric() 

1116 elif ch.name.lower() in ["hx", "hy", "hz"]: # magnetic channels 

1117 c = Magnetic() 

1118 c.from_dict(ch.to_dict(single=True)) 

1119 r.add_channel(c) 

1120 

1121 if c.component in ["hx", "hy", "hz"]: 

1122 c.location.x = ch.x 

1123 c.location.y = ch.y 

1124 c.location.z = ch.z 

1125 

1126 elif c.component in ["ex", "ey"]: 

1127 c.negative.x = ch.x 

1128 c.negative.y = ch.y 

1129 c.negative.z = ch.z 

1130 c.positive.x2 = ch.x2 

1131 c.positive.y2 = ch.y2 

1132 c.positive.z2 = ch.z2 

1133 c.measurement_azimuth = ch.orientation 

1134 c.translated_azimuth = ch.orientation 

1135 c.time_period.start = fn.start 

1136 c.time_period.end = fn.end 

1137 s.add_run(r) 

1138 

1139 if self.field_notes._run_list == []: 

1140 r = Run(id=f"{s.id}a") 

1141 r.channels_recorded_electric = ["ex", "ey"] 

1142 if (self.data.t == 0).all(): 

1143 r.channels_recorded_magnetic = ["hx", "hy"] 

1144 else: 

1145 r.channels_recorded_magnetic = ["hx", "hy", "hz"] 

1146 

1147 for ch in ( 

1148 self.site_layout.input_channels + self.site_layout.output_channels 

1149 ): 

1150 c = r.get_channel(ch.name.lower()) 

1151 if c.component in r.channels_recorded_magnetic: 

1152 c.location.x = ch.x 

1153 c.location.y = ch.y 

1154 c.location.z = ch.z 

1155 

1156 elif c.component in r.channels_recorded_electric: 

1157 c.negative.x = ch.x 

1158 c.negative.y = ch.y 

1159 c.negative.z = ch.z 

1160 c.positive.x2 = ch.x2 

1161 c.positive.y2 = ch.y2 

1162 c.positive.z2 = ch.z2 

1163 c.measurement_azimuth = ch.orientation 

1164 c.translated_azimuth = ch.orientation 

1165 c.time_period.start = s.time_period.start 

1166 c.time_period.end = s.time_period.end 

1167 

1168 s.add_run(r) 

1169 

1170 return s 

1171 

1172 @station_metadata.setter 

1173 def station_metadata(self, station_metadata: Station) -> None: 

1174 """ 

1175 Set metadata and other values in metadata 

1176 

1177 :param sm: survey metadata object 

1178 :type sm: SurveyMetadata 

1179 :return: None 

1180 :rtype: None 

1181 

1182 """ 

1183 sm = station_metadata 

1184 

1185 self.site.acquired_by = sm.acquired_by.author 

1186 if sm.data_type is not None: 

1187 self.sub_type = f"{sm.data_type.upper()}_TF" 

1188 else: 

1189 self.sub_type = DataTypeEnum.MT_TF 

1190 self.site.name = sm.geographic_name 

1191 self.site.id = sm.id 

1192 self.product_id = sm.fdsn.id 

1193 self.site.location.latitude = sm.location.latitude 

1194 self.site.location.longitude = sm.location.longitude 

1195 self.site.location.elevation = sm.location.elevation 

1196 self.site.orientation.angle_to_geographic_north = ( 

1197 sm.orientation.angle_to_geographic_north 

1198 if sm.orientation.angle_to_geographic_north is not None 

1199 else 0.0 

1200 ) 

1201 

1202 self.provenance.creating_application = sm.provenance.software.name 

1203 self.provenance.create_time = sm.provenance.creation_time 

1204 self.provenance.creator.name = sm.provenance.creator.author 

1205 self.provenance.creator.email = sm.provenance.creator.email 

1206 self.provenance.creator.organization = sm.provenance.creator.organization 

1207 self.provenance.creator.url = sm.provenance.creator.url 

1208 self.provenance.submitter.name = sm.provenance.submitter.author 

1209 self.provenance.submitter.email = sm.provenance.submitter.email 

1210 self.provenance.submitter.organization = sm.provenance.submitter.organization 

1211 self.provenance.submitter.url = sm.provenance.submitter.url 

1212 

1213 self.external_url.url = ( 

1214 sm.provenance.archive.url if sm.provenance.archive.url is not None else "" 

1215 ) 

1216 self.external_url.description = ( 

1217 sm.provenance.archive.comments.value 

1218 if sm.provenance.archive.comments.value is not None 

1219 else "" 

1220 ) 

1221 

1222 self.site.start = sm.time_period.start 

1223 self.site.end = sm.time_period.end 

1224 # Extract year from start date for year_collected 

1225 if sm.time_period.start: 

1226 try: 

1227 # Handle different types of start time (datetime, string, etc.) 

1228 if hasattr(sm.time_period.start, "year"): 

1229 self.site.year_collected = sm.time_period.start.year 

1230 elif isinstance(sm.time_period.start, str): 

1231 # Extract year from ISO date string (YYYY-MM-DD...) 

1232 self.site.year_collected = int(sm.time_period.start[:4]) 

1233 except (ValueError, AttributeError, TypeError): 

1234 # If extraction fails, leave as None 

1235 pass 

1236 

1237 self.processing_info.sign_convention = sm.transfer_function.sign_convention 

1238 self.processing_info.processed_by = sm.transfer_function.processed_by.author 

1239 self.processing_info.process_date = sm.transfer_function.processed_date 

1240 self.processing_info.processing_software.author = ( 

1241 sm.transfer_function.software.author 

1242 ) 

1243 self.processing_info.processing_software.name = ( 

1244 sm.transfer_function.software.name 

1245 ) 

1246 self.processing_info.processing_software.last_mod = ( 

1247 sm.transfer_function.software.last_updated 

1248 ) 

1249 tag = [] 

1250 if sm.transfer_function.runs_processed is not None: 

1251 tag += sm.transfer_function.runs_processed 

1252 if sm.transfer_function.remote_references is not None: 

1253 tag += sm.transfer_function.remote_references 

1254 self.processing_info.processing_tag = "_".join(tag) 

1255 self.processing_info.remote_ref.type = sm.transfer_function.processing_type 

1256 for param in sm.transfer_function.processing_parameters: 

1257 if isinstance(param, str): 

1258 sep = None 

1259 if param.count("=") >= 1: 

1260 sep = "=" 

1261 elif param.count(":") >= 1: 

1262 sep = ":" 

1263 

1264 if sep: 

1265 key, value = [k.strip() for k in param.split(sep, 1)] 

1266 if "remote_info.field_notes" in key: 

1267 key = key.replace("remote_info.field_notes.", "") 

1268 if ( 

1269 len(self.processing_info.remote_info.field_notes._run_list) 

1270 == 0 

1271 ): 

1272 self.processing_info.remote_info.field_notes._run_list.append( 

1273 meta_classes["run"]() 

1274 ) 

1275 

1276 run = self.processing_info.remote_info.field_notes._run_list[0] 

1277 

1278 if "dipole" in key: 

1279 try: 

1280 # Handle different dipole key formats 

1281 if "_" in key and "." in key: 

1282 # Format: dipole_0.name, dipole_1.length, etc. 

1283 index = int(key.split("_")[1].split(".")[0]) 

1284 attr_key = key.split(".", 1)[1] 

1285 if len(run.dipole) < (index + 1): 

1286 run.dipole.append(meta_classes["dipole"]()) 

1287 run.dipole[index].update_attribute(attr_key, value) 

1288 elif key == "dipole" and isinstance(value, (list, str)): 

1289 # Handle complex dipole value formats 

1290 # This could be a list of dictionaries or serialized structure 

1291 dipole_list = value 

1292 if isinstance(value, str): 

1293 # Try to evaluate if it's a string representation of a structure 

1294 try: 

1295 import ast 

1296 

1297 dipole_list = ast.literal_eval(value) 

1298 except (ValueError, SyntaxError): 

1299 # If not evaluable, try to process as simple string 

1300 logger.debug( 

1301 f"Using dipole value as string: {value}" 

1302 ) 

1303 dipole_list = [ 

1304 {"dipole": {"name": str(value)}} 

1305 ] 

1306 

1307 # Process list of dipole dictionaries 

1308 if isinstance(dipole_list, list): 

1309 for idx, dipole_data in enumerate(dipole_list): 

1310 if ( 

1311 isinstance(dipole_data, dict) 

1312 and "dipole" in dipole_data 

1313 ): 

1314 dipole_info = dipole_data["dipole"] 

1315 if len(run.dipole) < (idx + 1): 

1316 run.dipole.append( 

1317 meta_classes["dipole"]() 

1318 ) 

1319 

1320 # Set dipole attributes from the dictionary 

1321 for ( 

1322 attr_name, 

1323 attr_value, 

1324 ) in dipole_info.items(): 

1325 try: 

1326 run.dipole[ 

1327 idx 

1328 ].update_attribute( 

1329 attr_name, attr_value 

1330 ) 

1331 except Exception as attr_error: 

1332 logger.warning( 

1333 f"Cannot set dipole attribute {attr_name}: {attr_error}" 

1334 ) 

1335 else: 

1336 logger.warning( 

1337 f"Dipole value is not a list: {type(dipole_list)}" 

1338 ) 

1339 else: 

1340 # Unknown dipole key format 

1341 logger.warning(f"Unknown dipole key format: {key}") 

1342 except (IndexError, ValueError) as error: 

1343 logger.warning( 

1344 f"Cannot parse dipole processing info attribute {param}: {error}" 

1345 ) 

1346 except Exception as error: 

1347 logger.warning( 

1348 f"Cannot set processing info attribute {param}: {error}" 

1349 ) 

1350 # logger.exception(error) 

1351 elif "magnetometer" in key: 

1352 try: 

1353 index = int(key.split("_")[1].split(".")[0]) 

1354 key_parts = key.split(".", 1) 

1355 if len(key_parts) > 1: 

1356 key_attr = key_parts[1] 

1357 if len(run.magnetometer) < (index + 1): 

1358 run.magnetometer.append( 

1359 meta_classes["magnetometer"]() 

1360 ) 

1361 run.magnetometer[index].update_attribute( 

1362 key_attr, value 

1363 ) 

1364 except (IndexError, ValueError) as error: 

1365 logger.warning( 

1366 f"Cannot parse magnetometer processing info attribute {param}: {error}" 

1367 ) 

1368 except Exception as error: 

1369 logger.warning( 

1370 f"Cannot set processing info attribute {param}" 

1371 ) 

1372 # logger.exception(error) 

1373 else: 

1374 try: 

1375 run.update_attribute(key, value) 

1376 except Exception as error: 

1377 logger.warning( 

1378 f"Cannot set processing info attribute {param}" 

1379 ) 

1380 # logger.exception(error) 

1381 else: 

1382 try: 

1383 self.processing_info.update_attribute(key, value) 

1384 except Exception as error: 

1385 logger.warning( 

1386 f"Cannot set processing info attribute {param}" 

1387 ) 

1388 # logger.exception(error) 

1389 

1390 self.site.run_list = sm.transfer_function.runs_processed 

1391 

1392 self.site.data_quality_notes.good_from_period = ( 

1393 sm.transfer_function.data_quality.good_from_period 

1394 ) 

1395 self.site.data_quality_notes.good_to_period = ( 

1396 sm.transfer_function.data_quality.good_to_period 

1397 ) 

1398 self.site.data_quality_notes.rating = ( 

1399 sm.transfer_function.data_quality.rating.value 

1400 ) 

1401 self.site.data_quality_notes.comments.value = ( 

1402 sm.transfer_function.data_quality.comments.value 

1403 ) 

1404 

1405 # not sure there is a place to put processing parameters yet 

1406 

1407 # self.processing_info.processing_software., value, value_dict)s.transfer_function.processing_parameters.append( 

1408 # {"type": self.processing_info.remote_ref.type} 

1409 # ) 

1410 self.field_notes._run_list = [] 

1411 ch_in_dict = {} 

1412 ch_out_dict = {} 

1413 for r in sm.runs: 

1414 fn = emtf_xml.Run() 

1415 fn.dipole = [] 

1416 fn.magnetometer = [] 

1417 fn.instrument.id = r.data_logger.id 

1418 fn.instrument.name = r.data_logger.type 

1419 fn.instrument.manufacturer = r.data_logger.manufacturer 

1420 fn.sampling_rate = r.sample_rate 

1421 fn.start = r.time_period.start 

1422 fn.end = r.time_period.end 

1423 fn.run = r.id 

1424 if r.comments is not None: 

1425 # Handle both string and Comment object types 

1426 comments_str = ( 

1427 r.comments.value 

1428 if hasattr(r.comments, "value") 

1429 else str(r.comments) 

1430 ) 

1431 if comments_str: 

1432 for comment in comments_str.split(";"): 

1433 if comment.count(":") >= 1: 

1434 key, value = comment.split(":", 1) 

1435 try: 

1436 fn.update_attribute(key.strip(), value.strip()) 

1437 except: 

1438 raise AttributeError(f"Cannot set attribute {key}.") 

1439 

1440 for comp in ["hx", "hy", "hz"]: 

1441 try: 

1442 rch = r.get_channel(comp) 

1443 mag = emtf_xml.Magnetometer() # type: ignore 

1444 mag.id = rch.sensor.id 

1445 mag.name = comp 

1446 mag.manufacturer = rch.sensor.manufacturer 

1447 mag.type = rch.sensor.type 

1448 fn.magnetometer.append(mag) 

1449 

1450 # long period magnetometer 

1451 if rch.sensor.name in [ 

1452 "NIMS", 

1453 "LEMI", 

1454 ] and rch.sensor.type in ["fluxgate"]: 

1455 break 

1456 

1457 except AttributeError: 

1458 logger.debug( 

1459 f"Did not find {comp} in run", 

1460 ) 

1461 

1462 for comp in ["ex", "ey"]: 

1463 try: 

1464 c = r.get_channel(comp) 

1465 dp = emtf_xml.Dipole() # type: ignore 

1466 dp.name = comp.capitalize() 

1467 dp.azimuth = c.translated_azimuth 

1468 dp.length = c.dipole_length 

1469 dp.manufacturer = c.positive.manufacturer 

1470 dp.type = "wire" 

1471 # fill electrodes 

1472 pot_p = emtf_xml.Electrode() # type: ignore 

1473 pot_p.number = c.positive.id 

1474 pot_p.location = "n" if comp == "ex" else "e" 

1475 pot_p.comments = c.positive.type 

1476 

1477 dp.electrode.append(pot_p) 

1478 pot_n = emtf_xml.Electrode() # type: ignore 

1479 pot_n.number = c.negative.id 

1480 pot_n.comments = c.positive.type 

1481 pot_n.location = "s" if comp == "ex" else "w" 

1482 dp.electrode.append(pot_n) 

1483 fn.dipole.append(dp) 

1484 

1485 except AttributeError: 

1486 logger.debug(f"Did not find {comp} in run") 

1487 

1488 self.field_notes._run_list.append(fn) 

1489 

1490 for comp in ["hx", "hy", "hz"]: 

1491 try: 

1492 ch = r.get_channel(comp) 

1493 m_ch = emtf_xml.Magnetic() # type: ignore 

1494 

1495 for item in ["x", "y", "z"]: 

1496 if getattr(ch.location, item) is None: 

1497 value = 0.0 

1498 else: 

1499 value = getattr(ch.location, item) 

1500 setattr(m_ch, item, value) 

1501 

1502 m_ch.name = comp.capitalize() 

1503 if ch.translated_azimuth is not None: 

1504 m_ch.orientation = ch.translated_azimuth 

1505 else: 

1506 m_ch.orientation = ch.measurement_azimuth 

1507 

1508 if comp in ["hx", "hy"]: 

1509 ch_in_dict[comp] = m_ch 

1510 else: 

1511 ch_out_dict[comp] = m_ch 

1512 except AttributeError: 

1513 logger.debug(f"Did not find {comp} in run") 

1514 

1515 for comp in ["ex", "ey"]: 

1516 try: 

1517 ch = r.get_channel(comp) 

1518 ch_out = emtf_xml.Electric() # type: ignore 

1519 for item in ["x", "y", "z"]: 

1520 if getattr(ch.negative, item) is None: 

1521 value = 0.0 

1522 else: 

1523 value = getattr(ch.negative, item) 

1524 setattr(ch_out, item, value) 

1525 

1526 for item in ["x2", "y2", "z2"]: 

1527 if getattr(ch.positive, item) is None: 

1528 value = 0.0 

1529 else: 

1530 value = getattr(ch.positive, item) 

1531 setattr(ch_out, item, value) 

1532 

1533 ch_out.name = comp.capitalize() 

1534 if ch.translated_azimuth is not None: 

1535 ch_out.orientation = ch.translated_azimuth 

1536 else: 

1537 ch_out.orientation = ch.measurement_azimuth 

1538 

1539 if ( 

1540 ch_out.x == 0 

1541 and ch_out.y == 0 

1542 and ch_out.x2 == 0 

1543 and ch_out.y2 == 0 

1544 ): 

1545 if comp in ["ex"]: 

1546 ch_out.x2 = ch.dipole_length 

1547 elif comp in ["ey"]: 

1548 ch_out.y2 = ch.dipole_length 

1549 

1550 ch_out_dict[comp] = ch_out 

1551 except AttributeError: 

1552 logger.debug(f"Did not find {comp} in run") 

1553 

1554 self.site_layout.input_channels = list(ch_in_dict.values()) 

1555 self.site_layout.output_channels = list(ch_out_dict.values()) 

1556 

1557 self._parse_comments(sm.comments.value)