Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ edi \ edi.py: 72%

798 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3.. module:: EDI 

4 :synopsis: Deal with EDI files. The Edi class can read and write an .edi 

5 file, the 'standard format' of magnetotellurics. Each section 

6 of the .edi file is given its own class, so the elements of each 

7 section are attributes for easy access. 

8 

9.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov> 

10 

11Updated 2021 to used mt_metadata type metadata and how spectra are read. 

12 

13""" 

14 

15 

16# ============================================================================== 

17# Imports 

18# ============================================================================== 

19from pathlib import Path 

20from typing import Literal 

21 

22import numpy as np 

23from loguru import logger 

24 

25from mt_metadata import __version__, NULL_VALUES 

26from mt_metadata.timeseries import Electric, Magnetic, Run, Survey 

27from mt_metadata.transfer_functions import tf 

28from mt_metadata.transfer_functions.io.edi.metadata import ( 

29 DataSection, 

30 DefineMeasurement, 

31 Header, 

32 Information, 

33) 

34from mt_metadata.transfer_functions.io.tools import ( 

35 _validate_edi_lines, 

36 _validate_str_with_equals, 

37 get_nm_elev, 

38 index_locator, 

39) 

40 

41 

42# ============================================================================== 

43# EDI Class 

44# ============================================================================== 

45class EDI: 

46 """ 

47 This class is for .edi files, mainly reading and writing. Has been tested 

48 on Winglink and Phoenix output .edi's, which are meant to follow the 

49 archaic EDI format put forward by SEG. Can read impedance, Tipper and/or 

50 spectra data. 

51 

52 The Edi class contains a class for each major section of the .edi file. 

53 

54 Frequency and components are ordered from highest to lowest frequency. 

55 

56 :param fn: full path to .edi file to be read in. 

57 *default* is None. If an .edi file is input, it is 

58 automatically read in and attributes of Edi are filled 

59 :type fn: string or :class:`pathlib.Path` 

60 

61 :Change Latitude: :: 

62 

63 >>> from mt_metadata.transfer_functions.io.edi import EDI 

64 >>> edi_obj = EDI(fn=r"/home/mt/mt01.edi") 

65 >>> # change the latitude 

66 >>> edi_obj.lat = 45.7869 

67 >>> new_edi_fn = edi_obj.write() 

68 """ 

69 

70 def __init__(self, fn=None, **kwargs): 

71 self.logger = logger 

72 self._fn = None 

73 self._edi_lines = [] 

74 

75 self.Header = Header() 

76 self.Info = Information() 

77 self.Measurement = DefineMeasurement() 

78 self.Data = DataSection() 

79 

80 self.z = None 

81 self.z_err = None 

82 self.t = None 

83 self.t_err = None 

84 self.frequency = None 

85 self.rotation_angle = None 

86 self.residual_covariance = None 

87 self.signal_inverse_power = None 

88 self.tf = None 

89 self.tf_err = None 

90 

91 self._z_labels = [ 

92 ["zxxr", "zxxi", "zxx.var"], 

93 ["zxyr", "zxyi", "zxy.var"], 

94 ["zyxr", "zyxi", "zyx.var"], 

95 ["zyyr", "zyyi", "zyy.var"], 

96 ] 

97 

98 self._t_labels = [ 

99 ["txr.exp", "txi.exp", "txvar.exp"], 

100 ["tyr.exp", "tyi.exp", "tyvar.exp"], 

101 ] 

102 

103 self._accepted_keys = [ 

104 "freq", 

105 "zxxr", 

106 "zxxi", 

107 "zxyr", 

108 "zxyi", 

109 "zyxr", 

110 "zyxi", 

111 "zyyr", 

112 "zyyi", 

113 "zxx.var", 

114 "zxy.var", 

115 "zyx.var", 

116 "zyy.var", 

117 "txr.exp", 

118 "txi.exp", 

119 "tyr.exp", 

120 "tyi.exp", 

121 "txvar.exp", 

122 "tyvar.exp", 

123 "rhoxx", 

124 "rhoxy", 

125 "rhoyx", 

126 "rhoyy", 

127 "rhoxx.err", 

128 "rhoxy.err", 

129 "rhoyx.err", 

130 "rhoyy.err", 

131 "phsxx", 

132 "phsxy", 

133 "phsyx", 

134 "phsyy", 

135 "phsxx.err", 

136 "phsxy.err", 

137 "phsyx.err", 

138 "phsyy.err", 

139 "zrot", 

140 "rhorot", 

141 "trot", 

142 ] 

143 

144 self._channel_skip_list = [ 

145 "filter.name", 

146 "filter.applied", 

147 "time_period.start", 

148 "time_period.end", 

149 "location.elevation", 

150 "location.latitude", 

151 "location.longitude", 

152 "location.x", 

153 "location.y", 

154 "location.z", 

155 "positive.latitude", 

156 "positive.longitude", 

157 "positive.elevation", 

158 "positive.x", 

159 "positive.x2", 

160 "positive.y", 

161 "positive.y2", 

162 "positive.z", 

163 "positive.z2", 

164 "negative.latitude", 

165 "negative.longitude", 

166 "negative.elevation", 

167 "negative.x", 

168 "negative.x2", 

169 "negative.y", 

170 "negative.y2", 

171 "negative.z", 

172 "negative.z2", 

173 "sample_rate", 

174 "data_quality.rating.value", 

175 "data_quality.flag", 

176 ] 

177 

178 self._index_dict = { 

179 "zxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"}, 

180 "zxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"}, 

181 "zyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"}, 

182 "zyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"}, 

183 "tx": {"ii": 0, "jj": 0, "obj": "t", "err_obj": "t_err"}, 

184 "ty": {"ii": 0, "jj": 1, "obj": "t", "err_obj": "t_err"}, 

185 "rhoxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"}, 

186 "rhoxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"}, 

187 "rhoyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"}, 

188 "rhoyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"}, 

189 } 

190 

191 self._data_header_str = ">!****{0}****!\n" 

192 

193 self._num_format = " 15.6e" 

194 self._block_len = 6 

195 

196 self.fn = fn 

197 

198 for key, value in kwargs.items(): 

199 setattr(self, key, value) 

200 

201 def __str__(self) -> str: 

202 lines = [f"Station: {self.station}", "-" * 50] 

203 lines.append(f"\tSurvey: {self.survey_metadata.id}") 

204 lines.append(f"\tProject: {self.survey_metadata.project}") 

205 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}") 

206 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}") 

207 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}") 

208 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}") 

209 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}") 

210 if self.z is not None: 

211 lines.append("\tImpedance: True") 

212 else: 

213 lines.append("\tImpedance: False") 

214 if self.t is not None: 

215 lines.append("\tTipper: True") 

216 else: 

217 lines.append("\tTipper: False") 

218 if self.frequency is not None: 

219 lines.append(f"\tNumber of periods: {self.frequency.size}") 

220 lines.append( 

221 f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s" 

222 ) 

223 lines.append( 

224 f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s" 

225 ) 

226 return "\n".join(lines) 

227 

228 def __repr__(self) -> str: 

229 return self.__str__() 

230 

231 @property 

232 def fn(self): 

233 return self._fn 

234 

235 @fn.setter 

236 def fn(self, fn: str | Path | None): 

237 if fn is not None: 

238 self._fn = Path(fn) 

239 if self._fn.exists(): 

240 self.read() 

241 

242 @property 

243 def period(self) -> np.typing.NDArray | None: 

244 if self.frequency is not None: 

245 return 1.0 / self.frequency 

246 return None 

247 

248 def _assert_descending_frequency(self) -> None: 

249 """ 

250 Assert that the transfer function is ordered from high frequency to low 

251 frequency. 

252 

253 """ 

254 if self.frequency is not None: 

255 if self.frequency[0] < self.frequency[1]: 

256 self.logger.debug( 

257 "Ordered arrays to be arranged from high to low frequency" 

258 ) 

259 self.frequency = self.frequency[::-1] 

260 if self.z is not None: 

261 self.z = self.z[::-1] 

262 if self.z_err is not None: 

263 self.z_err = self.z_err[::-1] 

264 if self.t is not None: 

265 self.t = self.t[::-1] 

266 if self.t_err is not None: 

267 self.t_err = self.t_err[::-1] 

268 

269 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None: 

270 """ 

271 Read in an edi file and fill attributes of each section's classes. 

272 Including: 

273 

274 * Header 

275 * Info 

276 * Measurement 

277 * Data 

278 * z, z_err 

279 * t, t_err 

280 

281 .. note:: Automatically detects if data is in spectra format. All 

282 data read in is converted to impedance and Tipper. 

283 

284 

285 :param fn: full path to .edi file to be read in 

286 *default* is None 

287 :type fn: string 

288 

289 :Example: :: 

290 

291 >>> from mt_metadata.transfer_functions.io.edi import EDI 

292 >>> edi_obj = EDI 

293 >>> edi_obj.read(fn=r"/home/mt/mt01.edi") 

294 

295 """ 

296 

297 if fn is not None: 

298 self._fn = Path(fn) 

299 if self.fn is None: 

300 msg = "Must input EDI file to read" 

301 self.logger.error(msg) 

302 raise IOError(msg) 

303 if not self.fn.exists(): 

304 msg = f"Cannot find EDI file: {self.fn}" 

305 self.logger.error(msg) 

306 raise IOError(msg) 

307 with open(self.fn, "r") as fid: 

308 self._edi_lines = _validate_edi_lines(fid.readlines()) 

309 self.Header.read_header(self._edi_lines) 

310 self.Info.read_info(self._edi_lines) 

311 self.Measurement.read_measurement(self._edi_lines) 

312 self.Data.read_data(self._edi_lines) 

313 self.Data.match_channels(self.Measurement.channel_ids) 

314 

315 self._read_data() 

316 

317 if self.Header.latitude in [None, 0.0]: 

318 self.Header.latitude = self.Measurement.reflat 

319 self.logger.debug(f"Got latitude from reflat for {self.Header.dataid}") 

320 if self.Header.longitude in [None, 0.0]: 

321 self.Header.longitude = self.Measurement.reflon 

322 self.logger.debug(f"Got longitude from reflon for {self.Header.dataid}") 

323 if self.Header.elevation in [None, 0.0]: 

324 self.Header.elevation = self.Measurement.refelev 

325 self.logger.debug(f"Got elevation from refelev for {self.Header.dataid}") 

326 

327 if self.elev in [0, None] and get_elevation: 

328 if self.lat != 0 and self.lon != 0: 

329 self.elev = get_nm_elev(self.lat, self.lon) 

330 

331 def _read_data(self) -> None: 

332 """ 

333 Read either impedance or spectra data depending on what the type is 

334 in the data section. 

335 """ 

336 

337 lines = self._edi_lines[self.Data._line_num :] 

338 

339 if self.Data._data_type_in == "spectra": 

340 self.logger.debug("Converting Spectra to Impedance and Tipper") 

341 self.logger.debug( 

342 "Check to make sure input channel list is correct if the data looks incorrect" 

343 ) 

344 if self.Data.nchan == 5: 

345 c_list = ["hx", "hy", "hz", "ex", "ey"] 

346 elif self.Data.nchan == 4: 

347 c_list = ["hx", "hy", "ex", "ey"] 

348 elif self.Data.nchan == 6: 

349 c_list = ["hx", "hy", "ex", "ey", "rhx", "rhy"] 

350 elif self.Data.nchan == 7: 

351 c_list = ["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"] 

352 self._read_spectra(lines, comp_list=c_list) 

353 elif self.Data._data_type_in == "z": 

354 self._read_mt(lines) 

355 

356 def _read_mt(self, data_lines: list[str]) -> None: 

357 """ 

358 Read in impedance and tipper data 

359 

360 :param data_lines: list of data lines from the edi file 

361 :type data_lines: list 

362 """ 

363 

364 data_dict = {} 

365 data_find = False 

366 for line in data_lines: 

367 line = line.strip() 

368 if ">" in line and "!" not in line: 

369 line_list = line[1:].strip().split() 

370 if len(line_list) == 0: 

371 continue 

372 key = line_list[0].lower() 

373 if key in self._accepted_keys: 

374 data_find = True 

375 data_dict[key] = [] 

376 else: 

377 data_find = False 

378 elif data_find and ">" not in line and "!" not in line: 

379 d_lines = line.strip().split() 

380 for ii, dd in enumerate(d_lines): 

381 # check for empty values and set them to 0, check for any 

382 # other characters sometimes there are ****** for a null 

383 # component 

384 try: 

385 d_lines[ii] = float(dd) 

386 if d_lines[ii] == self.Header.empty: 

387 d_lines[ii] = 0.0 

388 except ValueError: 

389 d_lines[ii] = 0.0 

390 data_dict[key] += d_lines 

391 # put everything into arrays 

392 for key, k_list in data_dict.items(): 

393 data_dict[key] = np.array(k_list) 

394 # fill useful arrays 

395 self.frequency = data_dict["freq"] 

396 self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex) 

397 self.z_err = np.zeros((self.frequency.size, 2, 2), dtype=float) 

398 # fill tipper data if there it exists 

399 self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex) 

400 self.t_err = np.zeros((self.frequency.size, 1, 2), dtype=float) 

401 self.data_dict = data_dict 

402 

403 # fill tensors 

404 for key in sorted(self._index_dict.keys(), reverse=True): 

405 index = self._index_dict[key] 

406 ii = index["ii"] 

407 jj = index["jj"] 

408 obj = getattr(self, index["obj"]) 

409 error_obj = getattr(self, index["err_obj"]) 

410 try: 

411 if key.startswith("z"): 

412 obj[:, ii, jj] = data_dict[f"{key}r"] + data_dict[f"{key}i"] * 1j 

413 try: 

414 error_key = [ 

415 k for k in data_dict.keys() if key in k and "var" in k 

416 ][0] 

417 error_obj[:, ii, jj] = np.abs(data_dict[error_key]) ** 0.5 

418 except IndexError: 

419 self.logger.debug(f"Could not find error information for {key}") 

420 elif key.startswith("t"): 

421 obj[:, ii, jj] = ( 

422 data_dict[f"{key}r.exp"] + data_dict[f"{key}i.exp"] * 1j 

423 ) 

424 try: 

425 error_key = [ 

426 k for k in data_dict.keys() if key in k and "var" in k 

427 ][0] 

428 error_obj[:, ii, jj] = np.abs(data_dict[error_key]) ** 0.5 

429 except IndexError: 

430 self.logger.debug(f"Could not find error information for {key}") 

431 elif key.startswith("r") or key.startswith("p"): 

432 self.logger.debug("Reading RHO and PHS to compute impedance") 

433 if (self.z[:, ii, jj] == 0).all(): 

434 phase = data_dict[f"phs{key[-2:]}"] 

435 z_real = np.sqrt( 

436 (5 * self.frequency * data_dict[key]) 

437 / (np.tan(np.deg2rad(phase)) ** 2 + 1) 

438 ) 

439 z_imag = (np.tan(np.deg2rad(phase))) * z_real 

440 if ii == 1 and jj == 0: 

441 if phase.mean() < 90 and phase.mean() > 0: 

442 obj[:, ii, jj] = -1 * (z_real + 1j * z_imag) 

443 else: 

444 obj[:, ii, jj] = z_real + 1j * z_imag 

445 error_obj[:, ii, jj] = np.deg2rad( 

446 data_dict[f"phs{key[-2:]}.err"] 

447 ) * np.sqrt(data_dict[key] * (self.frequency * 5)) 

448 except KeyError as error: 

449 self.logger.debug(error) 

450 # check for order of frequency, we want high togit low 

451 self._assert_descending_frequency() 

452 

453 try: 

454 self.rotation_angle = np.array(data_dict["zrot"]) 

455 except KeyError: 

456 try: 

457 self.rotation_angle = np.array(data_dict["rhorot"]) 

458 except KeyError: 

459 self.rotation_angle = np.zeros_like(self.frequency) 

460 

461 def _read_spectra( 

462 self, 

463 data_lines: list[str], 

464 comp_list: list[str] = ["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"], 

465 ) -> None: 

466 """ 

467 Read in spectra data and convert to impedance and Tipper. 

468 

469 Translated from A. Kelbert's EMTF fortran module 

470 

471 :param data_lines: list of lines from edi file 

472 :type data_lines: list 

473 

474 :param comp_list: list of components that correspond to the columns 

475 of the spectra data. 

476 :type comp_list: list 

477 """ 

478 

479 data_dict = {} 

480 avgt_dict = {} 

481 data_find = False 

482 for line in data_lines: 

483 if line.lower().find(">spectra") == 0 and line.find("!") == -1: 

484 line_list = _validate_str_with_equals(line) 

485 data_find = True 

486 

487 # frequency will be the key 

488 try: 

489 key = float( 

490 [ 

491 ss.split("=")[1] 

492 for ss in line_list 

493 if ss.lower().find("freq") == 0 

494 ][0] 

495 ) 

496 data_dict[key] = [] 

497 avgt = float( 

498 [ 

499 ss.split("=")[1] 

500 for ss in line_list 

501 if ss.lower().find("avgt") == 0 

502 ][0] 

503 ) 

504 avgt_dict[key] = avgt 

505 except ValueError: 

506 self.logger.debug("did not find frequency key") 

507 elif data_find and line.find(">") == -1 and line.find("!") == -1: 

508 data_dict[key] += [float(ll) for ll in line.strip().split()] 

509 elif line.find(">spectra") == -1: 

510 data_find = False 

511 # get an object that contains the indices for each component 

512 cc = index_locator(comp_list) 

513 

514 self.frequency = np.array(sorted(list(data_dict.keys()), reverse=True)) 

515 

516 self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex) 

517 self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex) 

518 

519 self.z_err = np.zeros_like(self.z, dtype=float) 

520 self.t_err = np.zeros_like(self.t, dtype=float) 

521 

522 self.residual_covariance = np.zeros( 

523 (self.frequency.size, cc.n_outputs, cc.n_outputs), dtype=complex 

524 ) 

525 self.signal_inverse_power = np.zeros( 

526 (self.frequency.size, cc.n_inputs, cc.n_inputs), dtype=complex 

527 ) 

528 

529 self.tf = np.zeros( 

530 (self.frequency.size, cc.n_outputs, cc.n_inputs), dtype=complex 

531 ) 

532 self.tf_err = np.zeros_like(self.tf, dtype=float) 

533 

534 for kk, key in enumerate(self.frequency): 

535 # read in spectra as an (n_channel x n_channel) array 

536 spectra_arr = np.reshape( 

537 np.array(data_dict[key]), (len(comp_list), len(comp_list)) 

538 ) 

539 

540 # compute cross powers 

541 s_arr = np.zeros_like(spectra_arr, dtype=complex) 

542 for ii in range(s_arr.shape[0]): 

543 for jj in range(ii, s_arr.shape[0]): 

544 if ii == jj: 

545 s_arr[ii, jj] = spectra_arr[ii, jj] 

546 else: 

547 # minus sign for complex conjugation 

548 # original spectra data are of form <A,B*>, but we need 

549 # the order <B,A*>... 

550 # this is achieved by complex conjugation of the 

551 # original entries 

552 s_arr[ii, jj] = complex( 

553 spectra_arr[jj, ii], -spectra_arr[ii, jj] 

554 ) 

555 # keep complex conjugated entries in the lower 

556 # triangular matrix: 

557 s_arr[jj, ii] = complex( 

558 spectra_arr[jj, ii], spectra_arr[ii, jj] 

559 ) 

560 # check for empty values 

561 s_arr[s_arr == 0] = np.nan 

562 s_arr[s_arr == self.Header.empty] = np.nan 

563 

564 # from A. Kelbert's EMTF 

565 # cross spectra matrices 

566 

567 # Note we changed the indices to [ex, ey, hz] from [hz, ex, ey] 

568 # input channels 

569 rh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex) 

570 rr = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex) 

571 hh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex) 

572 

573 # output channels 

574 re = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex) 

575 he = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex) 

576 ee = np.zeros((cc.n_outputs, cc.n_outputs), dtype=complex) 

577 

578 # fill in cross powers for input channels 

579 rh[0, 0] = s_arr[cc.rhx, cc.hx] 

580 rh[0, 1] = s_arr[cc.rhx, cc.hy] 

581 rh[1, 0] = s_arr[cc.rhy, cc.hx] 

582 rh[1, 1] = s_arr[cc.rhy, cc.hy] 

583 

584 rr[0, 0] = s_arr[cc.rhx, cc.rhx] 

585 rr[0, 1] = s_arr[cc.rhx, cc.rhy] 

586 rr[1, 0] = s_arr[cc.rhy, cc.rhx] 

587 rr[1, 1] = s_arr[cc.rhy, cc.rhy] 

588 

589 hh[0, 0] = s_arr[cc.hx, cc.hx] 

590 hh[0, 1] = s_arr[cc.hx, cc.hy] 

591 hh[1, 0] = s_arr[cc.hy, cc.hx] 

592 hh[1, 1] = s_arr[cc.hy, cc.hy] 

593 

594 # fill in cross powers for output channels 

595 if cc.has_tipper and cc.has_electric: 

596 re[0, 2] = s_arr[cc.rhx, cc.hz] 

597 re[0, 0] = s_arr[cc.rhx, cc.ex] 

598 re[0, 1] = s_arr[cc.rhx, cc.ey] 

599 re[1, 2] = s_arr[cc.rhy, cc.hz] 

600 re[1, 0] = s_arr[cc.rhy, cc.ex] 

601 re[1, 1] = s_arr[cc.rhy, cc.ey] 

602 

603 he[0, 2] = s_arr[cc.hx, cc.hz] 

604 he[0, 0] = s_arr[cc.hx, cc.ex] 

605 he[0, 1] = s_arr[cc.hx, cc.ey] 

606 he[1, 2] = s_arr[cc.hy, cc.hz] 

607 he[1, 0] = s_arr[cc.hy, cc.ex] 

608 he[1, 1] = s_arr[cc.hy, cc.ey] 

609 

610 ee[2, 2] = s_arr[cc.hz, cc.hz] 

611 ee[2, 0] = s_arr[cc.hz, cc.ex] 

612 ee[2, 1] = s_arr[cc.hz, cc.ey] 

613 ee[0, 2] = s_arr[cc.ex, cc.hz] 

614 ee[0, 0] = s_arr[cc.ex, cc.ex] 

615 ee[0, 1] = s_arr[cc.ex, cc.ey] 

616 ee[1, 2] = s_arr[cc.ey, cc.hz] 

617 ee[1, 0] = s_arr[cc.ey, cc.ex] 

618 ee[1, 1] = s_arr[cc.ey, cc.ey] 

619 elif not cc.has_tipper and cc.has_electric: 

620 re[0, 0] = s_arr[cc.rhx, cc.ex] 

621 re[0, 1] = s_arr[cc.rhx, cc.ey] 

622 re[1, 0] = s_arr[cc.rhy, cc.ex] 

623 re[1, 0] = s_arr[cc.rhy, cc.ey] 

624 

625 he[0, 0] = s_arr[cc.hx, cc.ex] 

626 he[0, 1] = s_arr[cc.hx, cc.ey] 

627 he[0, 1] = s_arr[cc.hy, cc.ex] 

628 he[1, 1] = s_arr[cc.hy, cc.ey] 

629 

630 ee[0, 0] = s_arr[cc.ex, cc.ex] 

631 ee[0, 1] = s_arr[cc.ex, cc.ey] 

632 ee[1, 0] = s_arr[cc.ey, cc.ex] 

633 ee[1, 1] = s_arr[cc.ey, cc.ey] 

634 elif cc.has_tipper and not cc.has_electric: 

635 re[0, 0] = s_arr[cc.rhx, cc.hz] 

636 re[1, 0] = s_arr[cc.rhy, cc.hz] 

637 

638 he[0, 0] = s_arr[cc.hx, cc.hz] 

639 he[1, 0] = s_arr[cc.hy, cc.hz] 

640 

641 ee[0, 0] = s_arr[cc.hz, cc.hz] 

642 # check to make sure the values are legit for accurate results 

643 if abs(np.linalg.det(rh)) < np.finfo(float).eps: 

644 self.logger.warning( 

645 "spectral matrix determinant is too small " 

646 f"{abs(np.linalg.det(rh))} for period {key}. " 

647 "Results may be inaccurate" 

648 ) 

649 tfh = np.matmul(np.linalg.inv(rh), re) 

650 tf = tfh.conj().T 

651 

652 sig = np.matmul( 

653 np.linalg.inv(rh), np.matmul(rr, np.linalg.inv(rh.conj().T)) 

654 ) 

655 res = ( 

656 ee 

657 - np.matmul(tf, he) 

658 - np.matmul(he.conj().T, tfh) 

659 + np.matmul(tf, np.matmul(hh, tfh)) 

660 ) / avgt_dict[key] 

661 

662 # variance = abs(np.dot(res[0 : cc.n_inputs, :].T, sig)) 

663 variance = np.zeros((cc.n_outputs, cc.n_inputs), dtype=complex) 

664 for nn in range(cc.n_outputs): 

665 for mm in range(cc.n_inputs): 

666 variance[nn, mm] = res[nn, nn] * sig[mm, mm] 

667 

668 tf_err = np.sqrt(np.abs(variance)) 

669 self.tf[kk, :, :] = tf 

670 self.tf_err[kk, :, :] = np.sqrt(np.abs(variance)) 

671 self.signal_inverse_power[kk, :, :] = sig 

672 self.residual_covariance[kk, :, :] = res 

673 

674 if cc.has_tipper and cc.has_electric: 

675 self.z[kk, :, :] = tf[0:2, :] 

676 self.z_err[kk, :, :] = tf_err[0:2, :] 

677 self.t[kk, :, :] = tf[2, :] 

678 self.t_err[kk, :, :] = tf_err[2, :] 

679 self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0 

680 self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0 

681 elif not cc.has_tipper and cc.has_electric: 

682 self.z[kk, :, :] = tf[:, :] 

683 self.z_err[kk, :, :] = tf_err[:, :] 

684 self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0 

685 elif cc.has_tipper and not cc.has_electric: 

686 self.t[kk, :, :] = tf[:, :] 

687 self.t_err[kk, :, :] = tf_err[:, :] 

688 self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0 

689 

690 def write( 

691 self, 

692 new_edi_fn: str | Path | None = None, 

693 longitude_format: Literal["LON", "LONG"] = "LON", 

694 latlon_format: Literal["dms", "dd"] = "dms", 

695 ) -> Path: 

696 """ 

697 Write a new edi file from either an existing .edi file or from data 

698 input by the user into the attributes of Edi. 

699 

700 

701 :param new_edi_fn: full path to new edi file. 

702 *default* is None, which will write to the same 

703 file as the input .edi with as: 

704 r"/home/mt/mt01_1.edi" 

705 :type new_edi_fn: string 

706 :param longitude_format: whether to write longitude as LON or LONG. 

707 options are 'LON' or 'LONG', default 'LON' 

708 :type longitude_format: string 

709 :param latlon_format: format of latitude and longitude in output edi, 

710 degrees minutes seconds ('dms') or decimal 

711 degrees ('dd') 

712 :type latlon_format: string 

713 

714 :returns: full path to new edi file 

715 :rtype: string 

716 

717 :Example: :: 

718 

719 >>> import mtpy.core.edi as mtedi 

720 >>> edi_obj = mtedi.Edi(fn=r"/home/mt/mt01/edi") 

721 >>> edi_obj.Header.dataid = 'mt01_rr' 

722 >>> n_edi_fn = edi_obj.write_edi_file() 

723 """ 

724 

725 if new_edi_fn is None: 

726 if self.fn is not None: 

727 new_edi_fn = Path(self.fn) 

728 else: 

729 new_edi_fn = Path().cwd().joinpath(f"{self.Header.dataid}.edi") 

730 else: 

731 new_edi_fn = Path(new_edi_fn) 

732 # write lines 

733 extra_lines = [] 

734 if self.survey_metadata.summary != None: 

735 extra_lines.append(f"\tsurvey.summary = {self.survey_metadata.summary}\n") 

736 if self.Header.progname != "mt_metadata": 

737 extra_lines.append(f"\toriginal_program.name={self.Header.progname}\n") 

738 if self.Header.progvers != __version__: 

739 extra_lines.append(f"\toriginal_program.version={self.Header.progvers}\n") 

740 if self.Header.progdate != "1980-01-01": 

741 extra_lines.append(f"\toriginal_program.date={self.Header.progdate}\n") 

742 if self.Header.filedate != "1980-01-01": 

743 extra_lines.append(f"\toriginal_file.date={self.Header.filedate}\n") 

744 header_lines = self.Header.write_header( 

745 longitude_format=longitude_format, latlon_format=latlon_format 

746 ) 

747 

748 info_lines = self.Info.write_info() 

749 info_lines.insert(1, "".join(extra_lines)) 

750 

751 define_lines = self.Measurement.write_measurement( 

752 longitude_format=longitude_format, latlon_format=latlon_format 

753 ) 

754 

755 self.Data.nfreq = len(self.frequency) 

756 dsect_lines = self.Data.write_data() 

757 

758 # write out frequencies 

759 freq_lines = [self._data_header_str.format("frequencies".upper())] 

760 freq_lines += self._write_data_block(self.frequency, "freq") 

761 

762 # write out rotation angles 

763 zrot_lines = [self._data_header_str.format("impedance rotation angles".upper())] 

764 if self.rotation_angle is None: 

765 self.rotation_angle = np.zeros(self.frequency.size) 

766 elif isinstance(self.rotation_angle, (float, int)): 

767 self.rotation_angle = np.repeat(self.rotation_angle, self.frequency.size) 

768 elif len(self.rotation_angle) != self.frequency.size: 

769 raise ValueError( 

770 "rotation angle must be the same length and the number of " 

771 f"frequencies {len(self.rotation_angle)} != {self.frequency.size}" 

772 ) 

773 zrot_lines += self._write_data_block(self.rotation_angle, "zrot") 

774 

775 # write out data only impedance and tipper 

776 z_data_lines = [self._data_header_str.format("impedances".upper())] 

777 self.z = np.nan_to_num(self.z) 

778 self.z_err = np.nan_to_num(self.z_err) 

779 self.t = np.nan_to_num(self.t) 

780 self.t_err = np.nan_to_num(self.t_err) 

781 if self.z is not None: 

782 for ii in range(2): 

783 for jj in range(2): 

784 z_lines_real = self._write_data_block( 

785 self.z[:, ii, jj].real, self._z_labels[2 * ii + jj][0] 

786 ) 

787 z_lines_imag = self._write_data_block( 

788 self.z[:, ii, jj].imag, self._z_labels[2 * ii + jj][1] 

789 ) 

790 z_lines_var = self._write_data_block( 

791 self.z_err[:, ii, jj] ** 2.0, 

792 self._z_labels[2 * ii + jj][2], 

793 ) 

794 

795 z_data_lines += z_lines_real 

796 z_data_lines += z_lines_imag 

797 z_data_lines += z_lines_var 

798 if self.t is None: 

799 trot_lines = [""] 

800 t_data_lines = [""] 

801 elif np.all(self.t == 0): 

802 trot_lines = [""] 

803 t_data_lines = [""] 

804 else: 

805 try: 

806 # write out rotation angles 

807 trot_lines = [ 

808 self._data_header_str.format("tipper rotation angles".upper()) 

809 ] 

810 if isinstance(self.rotation_angle, float): 

811 trot = np.repeat(self.rotation_angle, self.frequency.size) 

812 else: 

813 trot = self.rotation_angle 

814 trot_lines += self._write_data_block(np.array(trot), "trot") 

815 

816 # write out tipper lines 

817 t_data_lines = [self._data_header_str.format("tipper".upper())] 

818 for jj in range(2): 

819 t_lines_real = self._write_data_block( 

820 self.t[:, 0, jj].real, self._t_labels[jj][0] 

821 ) 

822 t_lines_imag = self._write_data_block( 

823 self.t[:, 0, jj].imag, self._t_labels[jj][1] 

824 ) 

825 t_lines_var = self._write_data_block( 

826 self.t_err[:, 0, jj] ** 2.0, self._t_labels[jj][2] 

827 ) 

828 

829 t_data_lines += t_lines_real 

830 t_data_lines += t_lines_imag 

831 t_data_lines += t_lines_var 

832 except AttributeError: 

833 trot_lines = [""] 

834 t_data_lines = [""] 

835 edi_lines = ( 

836 header_lines 

837 + info_lines 

838 + define_lines 

839 + dsect_lines 

840 + freq_lines 

841 + zrot_lines 

842 + z_data_lines 

843 + trot_lines 

844 + t_data_lines 

845 + [">END"] 

846 ) 

847 

848 with open(new_edi_fn, "w") as fid: 

849 fid.write("".join(edi_lines)) 

850 self.fn = new_edi_fn 

851 return new_edi_fn 

852 

853 def _write_data_block( 

854 self, data_comp_arr: np.typing.NDArray, data_key: str 

855 ) -> list[str]: 

856 """ 

857 Write a data block 

858 

859 :param data_comp_arr: array of data components 

860 :type data_comp_arr: np.ndarray 

861 

862 :param data_key: the component to write out 

863 :type data_key: string 

864 

865 :returns: list of lines to write to edi file 

866 :rtype: list 

867 """ 

868 if data_key.lower().find("z") >= 0 and data_key.lower() not in [ 

869 "zrot", 

870 "trot", 

871 ]: 

872 block_lines = [ 

873 ">{0} ROT=ZROT // {1:.0f}\n".format( 

874 data_key.upper(), data_comp_arr.size 

875 ) 

876 ] 

877 elif data_key.lower().find("t") >= 0 and data_key.lower() not in [ 

878 "zrot", 

879 "trot", 

880 ]: 

881 block_lines = [ 

882 ">{0} ROT=TROT // {1:.0f}\n".format( 

883 data_key.upper(), data_comp_arr.size 

884 ) 

885 ] 

886 elif data_key.lower() == "freq": 

887 block_lines = [ 

888 ">{0} // {1:.0f}\n".format(data_key.upper(), data_comp_arr.size) 

889 ] 

890 elif data_key.lower() in ["zrot", "trot"]: 

891 block_lines = [ 

892 ">{0} // {1:.0f}\n".format(data_key.upper(), data_comp_arr.size) 

893 ] 

894 else: 

895 raise ValueError("Cannot write block for {0}".format(data_key)) 

896 for d_index, d_comp in enumerate(data_comp_arr, 1): 

897 if d_comp == 0.0 and data_key.lower() not in ["zrot", "trot"]: 

898 d_comp = self.Header.empty 

899 # write the string in the specified format 

900 num_str = "{0:{1}}".format(d_comp, self._num_format) 

901 

902 # check to see if a new line is needed 

903 if d_index % self._block_len == 0: 

904 num_str += "\n" 

905 # at the end of the block add a return 

906 if d_index == data_comp_arr.size: 

907 num_str += "\n" 

908 block_lines.append(num_str) 

909 return block_lines 

910 

911 # ----------------------------------------------------------------------- 

912 # set a few important properties 

913 # --> Latitude 

914 @property 

915 def lat(self) -> float | None: 

916 """latitude in decimal degrees""" 

917 return self.Header.latitude 

918 

919 @lat.setter 

920 def lat(self, input_lat) -> None: 

921 """set latitude and make sure it is converted to a float""" 

922 self.Header.latitude = input_lat 

923 

924 # --> Longitude 

925 @property 

926 def lon(self) -> float | None: 

927 """longitude in decimal degrees""" 

928 return self.Header.longitude 

929 

930 @lon.setter 

931 def lon(self, input_lon: float | None): 

932 """set latitude and make sure it is converted to a float""" 

933 self.Header.longitude = input_lon 

934 

935 # --> Elevation 

936 @property 

937 def elev(self) -> float: 

938 """Elevation in elevation units""" 

939 return self.Header.elevation 

940 

941 @elev.setter 

942 def elev(self, input_elev: float) -> None: 

943 """set elevation and make sure it is converted to a float""" 

944 self.Header.elevation = input_elev 

945 

946 # --> station 

947 @property 

948 def station(self) -> str | None: 

949 """station name""" 

950 if self.Header.dataid is not None: 

951 return self.Header.dataid.replace(r"/", "_") 

952 elif self.Measurement.refloc is not None: 

953 return self.Measurement.refloc.replace('"', "") 

954 elif self.Data.sectid is not None: 

955 return self.Data.sectid 

956 

957 @station.setter 

958 def station(self, new_station: str | int): 

959 """station name""" 

960 if not isinstance(new_station, str): 

961 new_station = f"{new_station}".replace(r"/", "_") 

962 self.Header.dataid = new_station 

963 self.Data.sectid = new_station 

964 

965 @property 

966 def survey_metadata(self) -> Survey: 

967 sm = Survey() 

968 

969 if self.Header.project is None: 

970 if self.Header.prospect is not None: 

971 sm.project = self.Header.prospect 

972 else: 

973 sm.project = self.Header.project 

974 

975 if self.Header.survey is None: 

976 sm.id = "0" 

977 else: 

978 sm.id = self.Header.survey 

979 if self.Header.acqby is not None: 

980 sm.acquired_by.author = self.Header.acqby 

981 if self.Header.loc is not None: 

982 sm.geographic_name = self.Header.loc 

983 sm.country = self.Header.country 

984 

985 for key, value in self.Info.info_dict.items(): 

986 if key is None: 

987 key = "extra" 

988 key = key.lower() 

989 if key.startswith("survey."): 

990 sm.update_attribute(key.split("survey.")[1], value) 

991 

992 sm.add_station(self.station_metadata) 

993 

994 return sm 

995 

996 @survey_metadata.setter 

997 def survey_metadata(self, survey: Survey) -> None: 

998 """ 

999 Update metadata from a survey metadata object 

1000 

1001 :param value: DESCRIPTION 

1002 :type value: TYPE 

1003 :return: DESCRIPTION 

1004 :rtype: TYPE 

1005 

1006 """ 

1007 

1008 if not isinstance(survey, Survey): 

1009 raise TypeError( 

1010 "Input must be a mt_metadata.transfer_function.Survey object" 

1011 f" not {type(survey)}" 

1012 ) 

1013 self.Header.survey = survey.id 

1014 self.Header.project = survey.project 

1015 self.Header.loc = survey.geographic_name 

1016 self.Header.country = survey.country 

1017 if survey.summary not in NULL_VALUES: 

1018 self.Info.info_dict[f"survey.summary"] = survey.summary 

1019 

1020 for key in survey.to_dict(single=True).keys(): 

1021 if "northwest" in key or "southeast" in key or "time_period" in key: 

1022 continue 

1023 value = survey.get_attr_from_name(key) 

1024 if value not in NULL_VALUES: 

1025 self.Info.info_dict[f"survey.{key}"] = value 

1026 

1027 @property 

1028 def station_metadata(self) -> tf.Station: 

1029 sm = tf.Station() 

1030 sm.add_run(Run(id=f"{self.station}a")) 

1031 if self.station is not None: 

1032 sm.id = self.station 

1033 sm.data_type = "MT" 

1034 sm.channels_recorded = self.Measurement.channels_recorded 

1035 # location 

1036 sm.location.latitude = self.lat 

1037 sm.location.longitude = self.lon 

1038 sm.location.elevation = self.elev 

1039 sm.location.datum = self.Header.datum 

1040 sm.location.declination.value = self.Header.declination.value 

1041 sm.orientation.reference_frame = self.Header.coordinate_system.split()[0] 

1042 if self.Header.loc is not None: 

1043 sm.geographic_name = self.Header.loc 

1044 

1045 # provenance 

1046 if self.Header.acqby is not None: 

1047 sm.acquired_by.author = self.Header.acqby 

1048 sm.provenance.creation_time = self.Header.filedate 

1049 sm.provenance.submitter.author = self.Header.fileby 

1050 sm.provenance.software.name = self.Header.fileby 

1051 sm.provenance.software.version = self.Header.progvers 

1052 sm.transfer_function.processed_date = self.Header.filedate 

1053 sm.transfer_function.runs_processed = sm.run_list 

1054 if self.station is not None: 

1055 sm.transfer_function.id = self.station 

1056 # dates 

1057 if self.Header.acqdate is not None: 

1058 sm.time_period.start = self.Header.acqdate 

1059 if self.Header.enddate is not None: 

1060 sm.time_period.end = self.Header.enddate 

1061 

1062 # processing information 

1063 for key, value in self.Info.info_dict.items(): 

1064 if key is None: 

1065 continue 

1066 if value in NULL_VALUES: 

1067 continue 

1068 key = key.lower() 

1069 

1070 if "provenance" in key: 

1071 sm.update_attribute(key, value) 

1072 elif "transfer_function" in key: 

1073 key = key.split("transfer_function.")[1] 

1074 if "processing_parameters" in key: 

1075 if isinstance(value, list): 

1076 for item in value: 

1077 if "=" in item: 

1078 param, val = item.split("=") 

1079 sm.transfer_function.processing_parameters.append( 

1080 f"{param}={val}" 

1081 ) 

1082 else: 

1083 sm.transfer_function.processing_parameters.append(item) 

1084 elif value not in NULL_VALUES: 

1085 param = key.split(".")[-1] 

1086 sm.transfer_function.processing_parameters.append( 

1087 f"{param}={value}" 

1088 ) 

1089 elif "remote_references" in key: 

1090 if key.count(".") > 1: 

1091 if key in ["remote_references.id"]: 

1092 sm.transfer_function.remote_references.append(value) 

1093 elif key in ["remote_references.geographic_name"]: 

1094 try: 

1095 sm.transfer_function.remote_references[ 

1096 -1 

1097 ] = f"{value}.{sm.transfer_function.remote_references[-1]}" 

1098 except IndexError: 

1099 sm.transfer_function.remote_references.append(value) 

1100 else: 

1101 # skip any keys that have remote_references.something_else 

1102 continue 

1103 else: 

1104 if isinstance(value, list): 

1105 sm.transfer_function.remote_references.extend(value) 

1106 elif isinstance(value, str): 

1107 sm.transfer_function.remote_references.append(value) 

1108 elif isinstance(value, (int, float)): 

1109 sm.transfer_function.remote_references.append(value) 

1110 else: 

1111 self.logger.warning( 

1112 f"Do not know how to handle remote_references {value}" 

1113 ) 

1114 continue 

1115 elif "runs_processed" in key: 

1116 sm.run_list = sm.transfer_function.runs_processed 

1117 else: 

1118 sm.transfer_function.update_attribute(key, value) 

1119 

1120 elif key.startswith("run."): 

1121 comp = None 

1122 key = key.split("run.")[1] 

1123 if "." in key: 

1124 # split the key into component and attribute 

1125 # e.g. run.mt01a.hx.comments 

1126 try: 

1127 comp, key = key.split(".", 1) 

1128 except (IndexError, ValueError): 

1129 continue 

1130 if comp is not None: 

1131 try: 

1132 ch = getattr(sm.runs[0], comp) 

1133 except AttributeError: 

1134 ch = None 

1135 if ch is None: 

1136 if comp in ["ex", "ey"]: 

1137 ch = Electric(component=comp) # type: ignore 

1138 sm.runs[0].add_channel(ch) 

1139 elif comp in ["hx", "hy", "hz", "rrhx", "rrhy"]: 

1140 ch = Magnetic(component=comp) # type: ignore 

1141 sm.runs[0].add_channel(ch) 

1142 else: 

1143 self.logger.warning( 

1144 f"Do not recognize channel {comp}, skipping..." 

1145 ) 

1146 try: 

1147 sm.runs[0].update_attribute(f"{key}.{comp}", value) 

1148 

1149 except Exception as e: 

1150 self.logger.warning( 

1151 f"Failed to update attribute {key}.{comp}: {e}" 

1152 ) 

1153 if key in ["comments"]: 

1154 if isinstance(value, list): 

1155 value = ",".join(value) 

1156 try: 

1157 ch.update_attribute(key, value) 

1158 except Exception as e: 

1159 self.logger.warning(f"Failed to update attribute {key}: {e}") 

1160 elif key.startswith("data_logger"): 

1161 sm.runs[0].update_attribute(key, value) 

1162 elif key.startswith("station."): 

1163 sm.update_attribute(key.split("station.")[1], value) 

1164 elif "processing." in key: 

1165 key = key.split("processing.")[1] 

1166 if key in ["software"]: 

1167 sm.transfer_function.software.name = value 

1168 elif key in ["tag"]: 

1169 if value.count(",") > 0: 

1170 sm.transfer_function.remote_references = value.split(",") 

1171 else: 

1172 sm.transfer_function.remote_references = value.split() 

1173 elif key in ["processedby", "processed_by"]: 

1174 sm.transfer_function.processed_by.name = value 

1175 elif key in ["runlist", "run_list"]: 

1176 if value.count(",") > 0: 

1177 runs = value.split(",") 

1178 else: 

1179 runs = value.split() 

1180 sm.run_list = [] 

1181 for rr in runs: 

1182 if rr not in sm.runs.keys(): 

1183 sm.add_run(Run(id=rr)) 

1184 sm.transfer_function.runs_processed = runs 

1185 elif key == "sitename": 

1186 sm.geographic_name = value 

1187 elif key == "signconvention": 

1188 sm.transfer_function.sign_convention = value 

1189 elif "mtft" in key or "emtf" in key or "mtedit" in key: 

1190 sm.transfer_function.processing_parameters.append(f"{key}={value}") 

1191 

1192 if self.Header.filedate is not None: 

1193 sm.transfer_function.processed_date = self.Header.filedate 

1194 # make any extra information in info list into a comment 

1195 sm.comments = "\n".join(self.Info.write_info()[1:]) 

1196 

1197 # add information to runs 

1198 for rr in sm.runs: 

1199 if rr.time_period.start in NULL_VALUES: 

1200 rr.time_period.start = sm.time_period.start 

1201 if rr.time_period.end in NULL_VALUES: 

1202 rr.time_period.end = sm.time_period.end 

1203 

1204 for ch in self.Measurement.channels_recorded: 

1205 try: 

1206 ch_obj = rr.channels[ch] 

1207 ch_obj.update(getattr(self, f"{ch}_metadata")) 

1208 except AttributeError: 

1209 pass 

1210 return sm 

1211 

1212 @station_metadata.setter 

1213 def station_metadata(self, sm: tf.Station) -> None: 

1214 """ 

1215 Set EDI metadata from station metadata object 

1216 

1217 :param sm: Station object to pull metadata from 

1218 :type sm: :class:`mt_metadata.transfer_functions.tf.Station` 

1219 

1220 """ 

1221 

1222 ### fill header information from station 

1223 self.Header.acqby = sm.acquired_by.author 

1224 self.Header.acqdate = sm.time_period.start 

1225 self.Header.coordinate_system = sm.orientation.reference_frame 

1226 self.Header.dataid = sm.id 

1227 self.Header.declination = sm.location.declination 

1228 self.Header.elevation = sm.location.elevation 

1229 self.Header.fileby = sm.provenance.submitter.author 

1230 self.Header.filedate = sm.provenance.creation_time 

1231 self.Header.latitude = sm.location.latitude 

1232 self.Header.longitude = sm.location.longitude 

1233 self.Header.datum = sm.location.datum 

1234 self.Header.units = sm.transfer_function.units 

1235 self.Header.enddate = sm.time_period.end 

1236 if sm.geographic_name is not None: 

1237 self.Header.loc = sm.geographic_name 

1238 

1239 ### write notes 

1240 # write comments, which would be anything in the info section from an edi 

1241 if isinstance(sm.comments, str): 

1242 self.Info.read_info(sm.comments.split("\n")) 

1243 # write transfer function info first 

1244 for k, v in sm.transfer_function.to_dict(single=True).items(): 

1245 if not v in NULL_VALUES: 

1246 if k in ["processing_parameters"]: 

1247 for item in v: 

1248 param, value = item.split("=", 1) 

1249 self.Info.info_dict[ 

1250 f"transfer_function.processing_parameters.{param}" 

1251 ] = value 

1252 

1253 else: 

1254 self.Info.info_dict[f"transfer_function.{k}"] = v 

1255 # write provenance 

1256 for k, v in sm.provenance.to_dict(single=True).items(): 

1257 if not v in NULL_VALUES: 

1258 self.Info.info_dict[f"provenance.{k}"] = v 

1259 # write field notes 

1260 for run in sm.runs: 

1261 r_dict = run.to_dict(single=True) 

1262 for r_key, r_value in r_dict.items(): 

1263 if r_value in NULL_VALUES: 

1264 continue 

1265 self.Info.info_dict[f"{run.id}.{r_key}"] = r_value 

1266 for ch in run.channels: 

1267 ch_dict = ch.to_dict(single=True) 

1268 for ch_key, ch_value in ch_dict.items(): 

1269 if ch_key not in self._channel_skip_list: 

1270 if ch_value in NULL_VALUES: 

1271 continue 

1272 self.Info.info_dict[ 

1273 f"{run.id}.{ch.component}.{ch_key}" 

1274 ] = ch_value 

1275 # write station information 

1276 self.Measurement.from_metadata(ch) 

1277 # add channel id to data section 

1278 try: 

1279 setattr(self.Data, f"{ch.component}", ch.channel_id) 

1280 except AttributeError: 

1281 self.logger.warning( 

1282 f"Cannot set {ch.component} channel id, skipping..." 

1283 ) 

1284 

1285 ### fill measurement 

1286 self.Measurement.refelev = sm.location.elevation 

1287 self.Measurement.reflat = sm.location.latitude 

1288 self.Measurement.reflon = sm.location.longitude 

1289 self.Measurement.refloc = sm.id 

1290 self.Measurement.maxchan = len(sm.channels_recorded) 

1291 

1292 def _get_electric_metadata(self, comp): 

1293 """ 

1294 get electric information from the various metadata 

1295 """ 

1296 comp = comp.lower() 

1297 electric = Electric(component=comp) 

1298 electric.positive.type = "electric" 

1299 electric.negative.type = "electric" 

1300 if comp in self.Measurement.measurements.keys(): 

1301 meas = self.Measurement.measurements[comp] 

1302 for attr in [ 

1303 "negative.x", 

1304 "negative.y", 

1305 "positive.x2", 

1306 "positive.y2", 

1307 "measurement_azimuth", 

1308 "translated_azimuth", 

1309 ]: 

1310 if electric.get_attr_from_name(attr) is None: 

1311 electric.update_attribute(attr, 0) 

1312 electric.dipole_length = meas.dipole_length 

1313 electric.channel_id = meas.id 

1314 electric.measurement_azimuth = meas.azimuth 

1315 electric.translated_azimuth = meas.azimuth 

1316 electric.component = meas.chtype 

1317 electric.channel_number = meas.channel_number 

1318 electric.negative.x = meas.x 

1319 electric.positive.x2 = meas.x2 

1320 electric.negative.y = meas.y 

1321 electric.positive.y2 = meas.y2 

1322 for k, v in self.Info.info_dict.items(): 

1323 if k is None: 

1324 continue 

1325 if f"{comp}." in k: 

1326 key = k.split(f"{comp}.")[1].strip() 

1327 if key == "manufacturer": 

1328 electric.negative.manufacturer = v 

1329 electric.positive.manufacturer = v 

1330 if key == "type": 

1331 electric.negative.type = v 

1332 electric.positive.type = v 

1333 if ( 

1334 electric.positive.x2 == 0 

1335 and electric.positive.y2 == 0.0 

1336 and electric.negative.x == 0 

1337 and electric.negative.y == 0.0 

1338 ): 

1339 electric.positive.x2 = electric.dipole_length * np.cos( 

1340 np.deg2rad(meas.azimuth) 

1341 ) 

1342 electric.positive.y2 = electric.dipole_length * np.sin( 

1343 np.deg2rad(meas.azimuth) 

1344 ) 

1345 for key, value in self.Info.info_dict.items(): 

1346 if key is None: 

1347 continue 

1348 elif ".type" in key: 

1349 continue 

1350 if f".{comp}." in key: 

1351 key = key.split(f".{comp}.", 1)[-1] 

1352 electric.update_attribute(key, value) 

1353 return electric 

1354 

1355 @property 

1356 def ex_metadata(self): 

1357 return self._get_electric_metadata("ex") 

1358 

1359 @property 

1360 def ey_metadata(self): 

1361 return self._get_electric_metadata("ey") 

1362 

1363 def _get_magnetic_metadata(self, comp): 

1364 """ 

1365 

1366 get magnetic metadata from the various sources 

1367 

1368 :param comp: DESCRIPTION 

1369 :type comp: TYPE 

1370 :return: DESCRIPTION 

1371 :rtype: TYPE 

1372 

1373 """ 

1374 

1375 magnetic = Magnetic(component=comp) 

1376 magnetic.sensor.type = "magnetic" 

1377 if comp in self.Measurement.measurements.keys(): 

1378 meas = self.Measurement.measurements[comp] 

1379 for attr in ["location.x", "location.y", "location.z"]: 

1380 if magnetic.get_attr_from_name(attr) is None: 

1381 magnetic.update_attribute(attr, 0) 

1382 magnetic.measurement_azimuth = meas.azm 

1383 magnetic.translated_azimuth = meas.azm 

1384 magnetic.component = meas.chtype 

1385 magnetic.channel_number = meas.channel_number 

1386 magnetic.channel_id = meas.id 

1387 magnetic.location.x = meas.x 

1388 magnetic.location.y = meas.y 

1389 try: 

1390 magnetic.sensor.id = meas.meas_magnetic.sensor 

1391 except AttributeError: 

1392 pass 

1393 for k, v in self.Info.info_dict.items(): 

1394 if k is None: 

1395 continue 

1396 if f"{comp}." in k: 

1397 key = k.split(f"{comp}.")[1].strip() 

1398 if key == "manufacturer": 

1399 magnetic.sensor.manufacturer = v 

1400 if key == "type": 

1401 magnetic.sensor.type = v 

1402 if key.startswith("sensor."): 

1403 magnetic.update_attribute(key, v) 

1404 return magnetic 

1405 

1406 @property 

1407 def hx_metadata(self): 

1408 return self._get_magnetic_metadata("hx") 

1409 

1410 @property 

1411 def hy_metadata(self): 

1412 return self._get_magnetic_metadata("hy") 

1413 

1414 @property 

1415 def hz_metadata(self): 

1416 return self._get_magnetic_metadata("hz") 

1417 

1418 @property 

1419 def rrhx_metadata(self): 

1420 return self._get_magnetic_metadata("rrhx") 

1421 

1422 @property 

1423 def rrhy_metadata(self): 

1424 return self._get_magnetic_metadata("rrhy")