Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ zfiles \ zmm.py: 73%

536 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Sep 28 12:34:23 2017 

4@author: jrpeacock 

5 

6Translated from code by B. Murphy. 

7""" 

8 

9# ============================================================================== 

10# Imports 

11# ============================================================================== 

12from pathlib import Path 

13 

14import numpy as np 

15import xarray as xr 

16from loguru import logger 

17 

18from mt_metadata import DEFAULT_CHANNEL_NOMENCLATURE 

19from mt_metadata.common.list_dict import ListDict 

20from mt_metadata.timeseries import Electric, Magnetic, Run, Survey 

21from mt_metadata.transfer_functions.io.tools import get_nm_elev 

22from mt_metadata.transfer_functions.tf import Station 

23 

24from .metadata import Channel 

25 

26 

27# ============================================================================== 

28PERIOD_FORMAT = ".10g" 

29 

30 

31class ZMMError(Exception): 

32 pass 

33 

34 

35class ZMMHeader(object): 

36 """ 

37 Container for Header of an Egbert file 

38 """ 

39 

40 def __init__(self, fn=None, **kwargs): 

41 self.processing_type = None 

42 self.num_channels = None 

43 self.num_freq = None 

44 self._header_count = 0 

45 self._component_dict = None 

46 self.ex = None 

47 self.ey = None 

48 self.hx = None 

49 self.hy = None 

50 self.hz = None 

51 self._zfn = None 

52 self.fn = fn 

53 self.station_metadata = Station() 

54 self._channel_order = ["hx", "hy", "hz", "ex", "ey"] 

55 self._header_lines = [ 

56 "TRANSFER FUNCTIONS IN MEASUREMENT COORDINATES", 

57 "********* WITH FULL ERROR COVARIANCE ********", 

58 ] 

59 

60 @property 

61 def fn(self): 

62 return self._zfn 

63 

64 @fn.setter 

65 def fn(self, value): 

66 if value is None: 

67 return 

68 value = Path(value) 

69 if value.suffix.lower() in [".zmm", ".zrr", ".zss"]: 

70 self._zfn = value 

71 else: 

72 msg = f"Input file must be a *.zmm or *.zrr file not {value.suffix}" 

73 logger.error(msg) 

74 raise ValueError(msg) 

75 

76 @property 

77 def latitude(self): 

78 return self.station_metadata.location.latitude 

79 

80 @latitude.setter 

81 def latitude(self, lat): 

82 self.station_metadata.location.latitude = lat 

83 

84 @property 

85 def longitude(self): 

86 return self.station_metadata.location.longitude 

87 

88 @longitude.setter 

89 def longitude(self, lon): 

90 self.station_metadata.location.longitude = lon 

91 

92 @property 

93 def elevation(self): 

94 return self.station_metadata.location.elevation 

95 

96 @elevation.setter 

97 def elevation(self, value): 

98 self.station_metadata.location.elevation = value 

99 

100 @property 

101 def declination(self): 

102 return self.station_metadata.location.declination.value 

103 

104 @declination.setter 

105 def declination(self, value): 

106 self.station_metadata.location.declination.value = value 

107 

108 @property 

109 def station(self): 

110 return self.station_metadata.id 

111 

112 @station.setter 

113 def station(self, value): 

114 self.station_metadata.id = value 

115 

116 def read_header(self, fn: str | Path | None = None) -> None: 

117 """ 

118 Read the header information from a ZMM file. 

119 

120 Parameters 

121 ---------- 

122 fn : str | Path | None, optional 

123 The file name to read, by default None 

124 """ 

125 

126 if fn is not None: 

127 self.fn = fn 

128 with open(self.fn, "r") as fid: 

129 line = fid.readline() 

130 

131 self._header_count = 0 

132 header_list = [] 

133 while "period" not in line: 

134 header_list.append(line) 

135 self._header_count += 1 

136 

137 line = fid.readline() 

138 self.station_metadata.comments.value = "" 

139 self.station_metadata.transfer_function.processing_type = header_list[2].strip() 

140 station = header_list[3].lower().strip() 

141 if station.count(":") > 0: 

142 station = station.split(":")[1] 

143 station = station.strip() 

144 self.station = station 

145 self.station_metadata.runs = ListDict() 

146 self.station_metadata.add_run(Run(id=f"{self.station}a")) 

147 self.station_metadata.transfer_function.id = self.station 

148 

149 for ii, line in enumerate(header_list): 

150 if line.find("**") >= 0: 

151 if self.station_metadata.comments.value: 

152 self.station_metadata.comments.value += ( 

153 "\n" + line.replace("*", "").strip() 

154 ) 

155 else: 

156 self.station_metadata.comments.value = line.replace("*", "").strip() 

157 elif ii == 2: 

158 self.processing_type = line.lower().strip() 

159 elif "station" in line: 

160 self.station_metadata.id = line.split(":")[1].strip() 

161 elif "coordinate" in line: 

162 line_list = line.strip().split() 

163 self.latitude = line_list[1] 

164 lon = float(line_list[2]) 

165 if lon > 180: 

166 lon -= 360 

167 self.longitude = lon 

168 

169 self.station_metadata.location.declination.value = float(line_list[-1]) 

170 elif "number" in line: 

171 line_list = line.strip().split() 

172 self.num_channels = int(line_list[3]) 

173 self.num_freq = int(line_list[-1]) 

174 elif "orientations" in line: 

175 pass 

176 elif line.strip()[-2:].lower() in ["ex", "ey", "hx", "hy", "hz"]: 

177 line_list = line.strip().split() 

178 comp = line_list[-1].lower() 

179 channel_dict = {"channel": comp} 

180 channel_dict["number"] = int( 

181 line_list[0] 

182 ) # Changed from "chn_num" to "number" 

183 channel_dict["azimuth"] = float( 

184 line_list[1] 

185 ) # Changed from "azm" to "azimuth" 

186 channel_dict["tilt"] = float(line_list[2]) 

187 channel_dict["dl"] = line_list[3] 

188 if channel_dict["number"] == 0: # Changed from "chn_num" to "number" 

189 channel_dict["number"] = self.num_channels 

190 setattr(self, comp, Channel(**channel_dict)) 

191 

192 if comp in ["ex", "ey"]: 

193 ch = Electric() 

194 elif comp in ["hx", "hy", "hz"]: 

195 ch = Magnetic() 

196 ch.component = comp 

197 ch.measurement_azimuth = channel_dict[ 

198 "azimuth" 

199 ] # Changed from "azm" to "azimuth" 

200 ch.measurement_tilt = channel_dict["tilt"] 

201 ch.translated_azimuth = channel_dict[ 

202 "azimuth" 

203 ] # Changed from "azm" to "azimuth" 

204 ch.translated_tilt = channel_dict["tilt"] 

205 ch.channel_number = channel_dict[ 

206 "number" 

207 ] # Changed from "chn_num" to "number" 

208 

209 self.station_metadata.runs[0].add_channel(ch) 

210 

211 def write_header(self) -> list[str]: 

212 """ 

213 write a zmm header 

214 

215 TRANSFER FUNCTIONS IN MEASUREMENT COORDINATES 

216 ********** WITH FULL ERROR COVARINCE********* 

217 

218 300 

219 coordinate 34.727 -115.735 declination 13.10 

220 number of channels 5 number of frequencies 38 

221 orientations and tilts of each channel 

222 1 0.00 0.00 300 Hx 

223 2 90.00 0.00 300 Hy 

224 3 0.00 0.00 300 Hz 

225 4 0.00 0.00 300 Ex 

226 5 90.00 0.00 300 Ey 

227 

228 returns 

229 ------- 

230 list[str] 

231 The formatted header lines. 

232 

233 """ 

234 lines = [ 

235 self._header_lines[0], 

236 self._header_lines[1], 

237 ] 

238 lines += [f"{self.station_metadata.transfer_function.processing_type}"] 

239 lines += [f"station: {self.station}"] 

240 lines += [ 

241 f"coordinate {self.latitude:>9.3f} {self.longitude:>9.3f} declination {self.declination:>8.2f}" 

242 ] 

243 lines += [ 

244 f"number of channels {self.num_channels:>3d} number of frequencies {self.num_freq:>3d}" 

245 ] 

246 lines += [" orientations and tilts of each channel"] 

247 for ii, ch in enumerate(self._channel_order): 

248 try: 

249 channel = getattr(self, ch) 

250 if channel.number == None: 

251 channel.number = int(ii) 

252 if channel.tilt is None: 

253 channel.tilt = 0.0 

254 if channel.azimuth is None: 

255 channel.azimuth = 0.0 

256 lines += [ 

257 ( 

258 f"{channel.number:>5d} " 

259 f"{channel.azimuth:>8.2f} " 

260 f"{channel.tilt:>8.2f} " 

261 f"{self.station:>3} " 

262 f"{channel.channel.capitalize():>3}" 

263 ) 

264 ] 

265 except (AttributeError, TypeError): 

266 logger.warning(f"Could not find {ch}") 

267 continue 

268 return lines 

269 

270 @property 

271 def channel_dict(self) -> dict[str, str]: 

272 channels = {} 

273 for cc in ["ex", "ey", "hx", "hy", "hz"]: 

274 ch = getattr(self, cc) 

275 if ch is not None: 

276 # Ensure channel value is extracted as string from ChannelEnum 

277 channel_value = ch.channel 

278 if hasattr(channel_value, "value"): 

279 channels[cc] = channel_value.value 

280 else: 

281 channels[cc] = str(channel_value) 

282 return channels 

283 

284 @property 

285 def channels_recorded(self) -> list[str]: 

286 channels = {} 

287 for cc in ["ex", "ey", "hx", "hy", "hz"]: 

288 ch = getattr(self, cc) 

289 if ch is not None: 

290 # Ensure channel value is extracted as string from ChannelEnum 

291 channel_value = ch.channel 

292 if hasattr(channel_value, "value"): 

293 channels[ch.index] = channel_value.value 

294 else: 

295 channels[ch.index] = str(channel_value) 

296 ordered_channels = [channels[k] for k in sorted(channels.keys())] 

297 return ordered_channels 

298 

299 @property 

300 def input_channels(self) -> list[str]: 

301 return self.channels_recorded[0:2] 

302 

303 @property 

304 def output_channels(self) -> list[str]: 

305 return self.channels_recorded[2:] 

306 

307 @property 

308 def has_tipper(self) -> bool: 

309 if "hz" in self.channels_recorded: 

310 return True 

311 return False 

312 

313 @property 

314 def has_impedance(self) -> bool: 

315 if "ex" in self.channels_recorded and "ey" in self.channels_recorded: 

316 return True 

317 return False 

318 

319 

320class ZMM(ZMMHeader): 

321 """ 

322 Container for Egberts zrr format. 

323 

324 """ 

325 

326 def __init__(self, fn: str | Path | None = None, **kwargs): 

327 super().__init__() 

328 

329 self.fn = fn 

330 self._header_count = 0 

331 self.transfer_functions = None 

332 self.sigma_e = None 

333 self.sigma_s = None 

334 self.periods = None 

335 self.dataset = None 

336 self.decimation_dict = {} 

337 self.channel_nomenclature = DEFAULT_CHANNEL_NOMENCLATURE 

338 

339 self._transfer_function = self._initialize_transfer_function() 

340 

341 for key in list(kwargs.keys()): 

342 setattr(self, key, kwargs[key]) 

343 if self.fn is not None: 

344 self.read() 

345 

346 def __str__(self) -> str: 

347 lines = [f"Station: {self.station}", "-" * 50] 

348 lines.append(f"\tSurvey: {self.survey_metadata.id}") 

349 lines.append(f"\tProject: {self.survey_metadata.project}") 

350 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}") 

351 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}") 

352 lines.append(f"\tLatitude: {self.latitude:.3f}") 

353 lines.append(f"\tLongitude: {self.longitude:.3f}") 

354 lines.append(f"\tElevation: {self.elevation:.3f}") 

355 if "ex" in self.output_channels: 

356 lines.append("\tImpedance: True") 

357 else: 

358 lines.append("\tImpedance: False") 

359 if "hz" in self.output_channels: 

360 lines.append("\tTipper: True") 

361 else: 

362 lines.append("\tTipper: False") 

363 if self.periods is not None: 

364 lines.append(f"\tNumber of periods: {self.periods.size}") 

365 lines.append( 

366 f"\t\tPeriod Range: {self.periods.min():.5E} -- {self.periods.max():.5E} s" 

367 ) 

368 lines.append( 

369 f"\t\tFrequency Range {1./self.periods.max():.5E} -- {1./self.periods.min():.5E} s" 

370 ) 

371 return "\n".join(lines) 

372 

373 def __repr__(self): 

374 lines = [] 

375 lines.append(f"station='{self.station}'") 

376 lines.append(f"latitude={self.latitude:.2f}") 

377 lines.append(f"longitude={self.longitude:.2f}") 

378 lines.append(f"elevation={self.elevation:.2f}") 

379 

380 return f"MT( {(', ').join(lines)} )" 

381 

382 def __eq__(self, other: object) -> bool: 

383 """ 

384 compare equals 

385 

386 :param other: DESCRIPTION 

387 :type other: TYPE 

388 :return: DESCRIPTION 

389 :rtype: TYPE 

390 

391 """ 

392 

393 if not isinstance(other, ZMM): 

394 raise TypeError(f"Cannot compare type {type(other)} with ZMM.") 

395 

396 is_equal = True 

397 if self.station_metadata != other.station_metadata: 

398 is_equal = False 

399 if not self.dataset.equals(other.dataset): 

400 is_equal = False 

401 logger.info("Datasets are not equal") 

402 print(self.dataset.fillna(0) != other.dataset.fillna(0).all()) 

403 return is_equal 

404 

405 @property 

406 def channel_nomenclature(self) -> dict[str, str]: 

407 return self._channel_nomenclature 

408 

409 @channel_nomenclature.setter 

410 def channel_nomenclature(self, ch_dict: dict[str, str]) -> None: 

411 """ 

412 channel dictionary 

413 """ 

414 

415 if not isinstance(ch_dict, dict): 

416 raise TypeError( 

417 "Channel_nomenclature must be a dictionary with keys " 

418 "['ex', 'ey', 'hx', 'hy', 'hz']." 

419 ) 

420 

421 self._channel_nomenclature = ch_dict 

422 # unpack channel nomenclature dict 

423 for comp in DEFAULT_CHANNEL_NOMENCLATURE.keys(): 

424 try: 

425 setattr(self, f"_{comp}", ch_dict[comp]) 

426 except KeyError: 

427 setattr(self, f"_{comp}", comp) 

428 ch_dict[comp] = comp 

429 

430 self._ex_ey = [self._ex, self._ey] 

431 self._hx_hy = [self._hx, self._hy] 

432 self._ex_ey_hz = [self._ex, self._ey, self._hz] 

433 

434 @property 

435 def _ch_input_dict(self) -> dict[str, list[str]]: 

436 return { 

437 "isp": self._hx_hy, 

438 "res": self._ex_ey_hz, 

439 "tf": self._hx_hy, 

440 "tf_error": self._hx_hy, 

441 "all": [self._ex, self._ey, self._hz, self._hx, self._hy], 

442 } 

443 

444 @property 

445 def _ch_output_dict(self) -> dict[str, list[str]]: 

446 return { 

447 "isp": self._hx_hy, 

448 "res": self._ex_ey_hz, 

449 "tf": self._ex_ey_hz, 

450 "tf_error": self._ex_ey_hz, 

451 "all": [self._ex, self._ey, self._hz, self._hx, self._hy], 

452 } 

453 

454 def _initialize_transfer_function(self, periods: list[float] = [1]) -> None: 

455 """ 

456 create an empty x array for the data. For now this accommodates 

457 a single processed station. 

458 

459 

460 Parameters 

461 ---------- 

462 periods : list[float], optional 

463 List of periods to create the transfer function for, defaults to [1]. 

464 

465 Returns 

466 ------- 

467 None 

468 

469 """ 

470 # create an empty array for the transfer function 

471 tf = xr.DataArray( 

472 data=0.0 + 0j, 

473 dims=["period", "output", "input"], 

474 coords={ 

475 "period": periods, 

476 "output": self._ch_output_dict["all"], 

477 "input": self._ch_input_dict["all"], 

478 }, 

479 name="transfer_function", 

480 ) 

481 

482 tf_err = xr.DataArray( 

483 data=0.0, 

484 dims=["period", "output", "input"], 

485 coords={ 

486 "period": periods, 

487 "output": self._ch_output_dict["all"], 

488 "input": self._ch_input_dict["all"], 

489 }, 

490 name="error", 

491 ) 

492 

493 inv_signal_power = xr.DataArray( 

494 data=0.0 + 0j, 

495 dims=["period", "output", "input"], 

496 coords={ 

497 "period": periods, 

498 "output": self._ch_output_dict["all"], 

499 "input": self._ch_input_dict["all"], 

500 }, 

501 name="inverse_signal_power", 

502 ) 

503 

504 residual_covariance = xr.DataArray( 

505 data=0.0 + 0j, 

506 dims=["period", "output", "input"], 

507 coords={ 

508 "period": periods, 

509 "output": self._ch_output_dict["all"], 

510 "input": self._ch_input_dict["all"], 

511 }, 

512 name="residual_covariance", 

513 ) 

514 

515 # will need to add in covariance in some fashion 

516 return xr.Dataset( 

517 { 

518 tf.name: tf, 

519 tf_err.name: tf_err, 

520 inv_signal_power.name: inv_signal_power, 

521 residual_covariance.name: residual_covariance, 

522 }, 

523 coords={ 

524 "period": periods, 

525 "output": self._ch_output_dict["all"], 

526 "input": self._ch_input_dict["all"], 

527 }, 

528 ) 

529 

530 @property 

531 def frequencies(self) -> np.typing.NDArray[np.float64] | None: 

532 if self.periods is None: 

533 return None 

534 return 1.0 / self.periods 

535 

536 def initialize_arrays(self) -> None: 

537 """ 

538 make initial arrays based on number of frequencies and channels 

539 """ 

540 if self.num_freq is None: 

541 return 

542 self.periods = np.zeros(self.num_freq) 

543 self.transfer_functions = np.zeros( 

544 (self.num_freq, self.num_channels - 2, 2), dtype=np.complex64 

545 ) 

546 

547 # residual covariance -- square matrix with dimension as number of 

548 # predicted channels 

549 self.sigma_e = np.zeros( 

550 (self.num_freq, self.num_channels - 2, self.num_channels - 2), 

551 dtype=np.complex64, 

552 ) 

553 

554 # inverse coherent signal power -- square matrix, with dimension as the 

555 # number of predictor channels 

556 # since EMTF and this code assume N predictors is 2, 

557 # this dimension is hard-coded 

558 self.sigma_s = np.zeros((self.num_freq, 2, 2), dtype=np.complex64) 

559 

560 def read(self, fn: str | Path | None = None, get_elevation: bool = False) -> None: 

561 """ 

562 Read in Egbert zrr/zmm file 

563 

564 Parameters 

565 ---------- 

566 fn : str | Path | None, optional 

567 The file name to read, by default None 

568 

569 get_elevation : bool, optional 

570 If True, fetch elevation from the National Map, by default False 

571 

572 Raises 

573 ------ 

574 ZMMError 

575 If the file cannot be read or is not in the expected format. 

576 """ 

577 if fn is not None: 

578 self.fn = fn 

579 self.read_header() 

580 self.channel_nomenclature = self.channel_dict 

581 self.initialize_arrays() 

582 

583 self._transfer_function = self._initialize_transfer_function() 

584 self.dataset = self._initialize_transfer_function() 

585 

586 ### read each data block and fill the appropriate array 

587 for ii, period_block in enumerate(self._get_period_blocks()): 

588 data_block = self._read_period_block(period_block) 

589 self.periods[ii] = data_block["period"] 

590 

591 self._fill_tf_array_from_block(data_block["tf"], ii) 

592 self._fill_sig_array_from_block(data_block["sig"], ii) 

593 self._fill_res_array_from_block(data_block["res"], ii) 

594 self._fill_dataset() 

595 

596 self.station_metadata.id = self.station 

597 self.station_metadata.data_type = "MT" 

598 self.station_metadata.channels_recorded = self.channels_recorded 

599 # provenance 

600 self.station_metadata.provenance.software.name = "EMTF" 

601 self.station_metadata.provenance.software.version = "1" 

602 self.station_metadata.transfer_function.runs_processed = ( 

603 self.station_metadata.run_list 

604 ) 

605 self.station_metadata.transfer_function.software.name = "EMTF" 

606 self.station_metadata.transfer_function.software.version = "1" 

607 self.station_metadata.runs[0].sample_rate = np.median( 

608 np.array([d["sample_rate"] for k, d in self.decimation_dict.items()]) 

609 ) 

610 

611 # add information to runs 

612 for rr in self.station_metadata.runs: 

613 if self.transfer_functions.shape[1] >= 2: 

614 rr.ex = self.ex_metadata 

615 rr.ey = self.ey_metadata 

616 rr.hx = self.hx_metadata 

617 rr.hy = self.hy_metadata 

618 if self.hz is not None: 

619 rr.hz = self.hz_metadata 

620 

621 if self.elevation in [0, None] and get_elevation: 

622 if self.latitude != 0 and self.longitude != 0: 

623 self.elevation = get_nm_elev( 

624 self.latitude, 

625 self.longitude, 

626 ) 

627 

628 def write( 

629 self, fn: str | Path | None = None, decimation_levels: dict | None = None 

630 ) -> None: 

631 """ 

632 write a zmm file 

633 

634 decimation_levels should be a dictionary with keys 

635 

636 * decimation_level 

637 

638 values will be a dictionary with keys 

639 

640 * frequency_band, value = (min, max) 

641 * n_points, value = int 

642 * sampling_freq, value = float 

643 

644 Parameters 

645 ---------- 

646 fn : str | Path | None, optional 

647 The file name to write, by default None 

648 

649 decimation_levels : dict, optional 

650 A dictionary containing decimation levels and their properties, by default None. 

651 

652 Raises 

653 ------ 

654 ZMMError 

655 If the file cannot be written or is not in the expected format. 

656 """ 

657 if fn is not None: 

658 self.fn = fn 

659 lines = self.write_header() 

660 lines += [ 

661 "", 

662 ] # add 1 space separating header from data 

663 for p in self.dataset.period.data: 

664 a = self.dataset.sel(period=p) 

665 try: 

666 dec_dict = self.decimation_dict[f"{p:{PERIOD_FORMAT}}"] 

667 except KeyError: 

668 dec_dict = { 

669 "level": 0, 

670 "bands": (0, 0), 

671 "npts": 0, 

672 "sample_rate": self.station_metadata.runs[0].sample_rate, 

673 } 

674 lines += [ 

675 ( 

676 f"period : {p:^18.5f} " 

677 f"decimation level {dec_dict['level']:^8d}" 

678 f"freq. band from {dec_dict['bands'][0]:>5d} to {dec_dict['bands'][1]:>5d}" 

679 ) 

680 ] 

681 lines += [ 

682 f"number of data point {dec_dict['npts']} sampling freq. {dec_dict['sample_rate']} Hz" 

683 ] 

684 # write tf 

685 lines += [" Transfer Functions"] 

686 for c_out in self.output_channels: 

687 line = "" 

688 for c_in in self.input_channels: 

689 c_in_name = self.channel_nomenclature[c_in] 

690 c_out_name = self.channel_nomenclature[c_out] 

691 tf_element = a.transfer_function.loc[ 

692 dict(output=c_out_name, input=c_in_name) 

693 ].data 

694 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}" 

695 lines += [line] 

696 # write signal power 

697 lines += [" Inverse Coherent Signal Power Matrix"] 

698 for ii, c_out in enumerate(self.input_channels): 

699 line = "" 

700 for c_in in self.input_channels[: ii + 1]: 

701 c_in_name = self.channel_nomenclature[c_in] 

702 c_out_name = self.channel_nomenclature[c_out] 

703 tf_element = a.inverse_signal_power.loc[ 

704 dict(output=c_out_name, input=c_in_name) 

705 ].data 

706 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}" 

707 lines += [line] 

708 # write residual covariance 

709 lines += [" Residual Covariance"] 

710 for ii, c_out in enumerate(self.output_channels): 

711 line = "" 

712 for c_in in self.output_channels[: ii + 1]: 

713 c_in_name = self.channel_nomenclature[c_in] 

714 c_out_name = self.channel_nomenclature[c_out] 

715 tf_element = a.residual_covariance.loc[ 

716 dict(output=c_out_name, input=c_in_name) 

717 ].data 

718 line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}" 

719 lines += [line] 

720 with open(self.fn, "w") as fid: 

721 fid.write("\n".join(lines)) 

722 return self.fn 

723 

724 def _get_period_blocks(self) -> list[list[str]]: 

725 """ 

726 split file into period blocks 

727 """ 

728 

729 with open(self.fn, "r") as fid: 

730 fn_str = fid.read() 

731 period_strings = fn_str.lower().split("period") 

732 period_blocks = [] 

733 for per in period_strings: 

734 period_blocks.append(per.split("\n")) 

735 return period_blocks[1:] 

736 

737 def _read_period_block(self, period_block: list[str]) -> dict: 

738 """ 

739 read block: 

740 period : 0.01587 decimation level 1 freq. band from 46 to 80 

741 number of data point 951173 sampling freq. 0.004 Hz 

742 Transfer Functions 

743 0.1474E+00 -0.2049E-01 0.1618E+02 0.1107E+02 

744 -0.1639E+02 -0.1100E+02 0.5559E-01 0.1249E-01 

745 Inverse Coherent Signal Power Matrix 

746 0.2426E+03 -0.2980E-06 

747 0.9004E+02 -0.2567E+01 0.1114E+03 0.1192E-06 

748 Residual Covaraince 

749 0.8051E-05 0.0000E+00 

750 -0.2231E-05 -0.2863E-06 0.8866E-05 0.0000E+00 

751 """ 

752 

753 period = float(period_block[0].strip().split(":")[1].split()[0].strip()) 

754 level = int(period_block[0].strip().split("level")[1].split()[0].strip()) 

755 bands = ( 

756 int(period_block[0].strip().split("from")[1].split()[0].strip()), 

757 int(period_block[0].strip().split("to")[1].split()[0].strip()), 

758 ) 

759 

760 npts = int(period_block[1].strip().split("point")[1].split()[0].strip()) 

761 sr = float(period_block[1].strip().split("freq.")[1].split()[0].strip()) 

762 self.decimation_dict[f"{period:{PERIOD_FORMAT}}"] = { 

763 "level": level, 

764 "bands": bands, 

765 "npts": npts, 

766 "sample_rate": sr, 

767 } 

768 data_dict = {"period": period, "tf": [], "sig": [], "res": []} 

769 key = "tf" 

770 for line in period_block[2:]: 

771 if "transfer" in line.lower(): 

772 key = "tf" 

773 continue 

774 elif "signal" in line.lower(): 

775 key = "sig" 

776 continue 

777 elif "residual" in line.lower(): 

778 key = "res" 

779 continue 

780 line_list = [float(xx) for xx in line.strip().split()] 

781 values = [ 

782 complex(line_list[ii], line_list[ii + 1]) 

783 for ii in range(0, len(line_list), 2) 

784 ] 

785 data_dict[key].append(values) 

786 return data_dict 

787 

788 def _flatten_list(self, x_list: list[list]) -> list: 

789 """ 

790 flatten = lambda l: [item for sublist in l for item in sublist] 

791 

792 Returns 

793 ------- 

794 None. 

795 

796 """ 

797 

798 flat_list = [item for sublist in x_list for item in sublist] 

799 

800 return flat_list 

801 

802 def _fill_tf_array_from_block(self, tf_block: list[complex], index: int) -> None: 

803 """ 

804 fill tf arrays from data blocks 

805 """ 

806 tf_block = self._flatten_list(tf_block) 

807 for kk, jj in enumerate(range(0, len(tf_block), 2)): 

808 self.transfer_functions[index, kk, 0] = tf_block[jj] 

809 self.transfer_functions[index, kk, 1] = tf_block[jj + 1] 

810 

811 def _fill_sig_array_from_block(self, sig_block: list[complex], index: int) -> None: 

812 """ 

813 fill signal array 

814 """ 

815 sig_block = self._flatten_list(sig_block) 

816 self.sigma_s[index, 0, 0] = sig_block[0] 

817 self.sigma_s[index, 1, 0] = sig_block[1] 

818 self.sigma_s[index, 0, 1] = sig_block[1].conjugate() 

819 self.sigma_s[index, 1, 1] = sig_block[2] 

820 

821 def _fill_res_array_from_block(self, res_block: list[complex], index: int) -> None: 

822 """ 

823 fill residual covariance array 

824 """ 

825 for jj in range(self.num_channels - 2): 

826 values = res_block[jj] 

827 for kk in range(jj + 1): 

828 if jj == kk: 

829 self.sigma_e[index, jj, kk] = values[kk] 

830 else: 

831 self.sigma_e[index, jj, kk] = values[kk] 

832 self.sigma_e[index, kk, jj] = values[kk].conjugate() 

833 

834 def _fill_dataset(self) -> None: 

835 """ 

836 fill the dataset 

837 

838 :return: DESCRIPTION 

839 :rtype: TYPE 

840 

841 """ 

842 

843 self.dataset = self._initialize_transfer_function(periods=self.periods) 

844 

845 self.dataset.transfer_function.loc[ 

846 dict(input=self.input_channels, output=self.output_channels) 

847 ] = self.transfer_functions 

848 self.dataset.inverse_signal_power.loc[ 

849 dict(input=self.input_channels, output=self.input_channels) 

850 ] = self.sigma_s 

851 self.dataset.residual_covariance.loc[ 

852 dict(input=self.output_channels, output=self.output_channels) 

853 ] = self.sigma_e 

854 

855 def calculate_impedance( 

856 self, angle: float = 0.0 

857 ) -> tuple[np.typing.NDArray[np.complex64], np.typing.NDArray[np.float64]]: 

858 """ 

859 calculate the impedances from the transfer functions 

860 

861 Parameters 

862 ---------- 

863 angle : float, optional 

864 The angle to rotate the impedance tensor. 

865 

866 Returns 

867 ------- 

868 None 

869 """ 

870 

871 # check to see if there are actually electric fields in the TFs 

872 if not hasattr(self, "ex") or not hasattr(self, "ey"): 

873 msg = ( 

874 "Cannot return apparent resistivity and phase " 

875 "data because these TFs do not contain electric " 

876 "fields as a predicted channel." 

877 ) 

878 logger.error(msg) 

879 raise ZMMError(msg) 

880 # transform the TFs first... 

881 # build transformation matrix for predictor channels 

882 # (horizontal magnetic fields) 

883 hx_index = self.hx.index 

884 hy_index = self.hy.index 

885 u = np.eye(2, 2) 

886 u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle)) 

887 u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle)) 

888 u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle)) 

889 u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle)) 

890 u = np.linalg.inv(u) 

891 

892 # build transformation matrix for predicted channels (electric fields) 

893 ex_index = self.ex.index 

894 ey_index = self.ey.index 

895 v = np.eye(self.transfer_functions.shape[1], self.transfer_functions.shape[1]) 

896 v[ex_index - 2, ex_index - 2] = np.cos(np.deg2rad(self.ex.azimuth - angle)) 

897 v[ey_index - 2, ex_index - 2] = np.sin(np.deg2rad(self.ex.azimuth - angle)) 

898 v[ex_index - 2, ey_index - 2] = np.cos(np.deg2rad(self.ey.azimuth - angle)) 

899 v[ey_index - 2, ey_index - 2] = np.sin(np.deg2rad(self.ey.azimuth - angle)) 

900 

901 # matrix multiplication... 

902 rotated_transfer_functions = np.matmul( 

903 v, np.matmul(self.transfer_functions, u.T) 

904 ) 

905 rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T)) 

906 rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T)) 

907 

908 # now pull out the impedance tensor 

909 z = np.zeros((self.num_freq, 2, 2), dtype=np.complex64) 

910 z[:, 0, 0] = rotated_transfer_functions[:, ex_index - 2, hx_index] # Zxx 

911 z[:, 0, 1] = rotated_transfer_functions[:, ex_index - 2, hy_index] # Zxy 

912 z[:, 1, 0] = rotated_transfer_functions[:, ey_index - 2, hx_index] # Zyx 

913 z[:, 1, 1] = rotated_transfer_functions[:, ey_index - 2, hy_index] # Zyy 

914 

915 # and the variance information 

916 var = np.zeros((self.num_freq, 2, 2)) 

917 var[:, 0, 0] = np.real( 

918 rotated_sigma_e[:, ex_index - 2, ex_index - 2] 

919 * rotated_sigma_s[:, hx_index, hx_index] 

920 ) 

921 var[:, 0, 1] = np.real( 

922 rotated_sigma_e[:, ex_index - 2, ex_index - 2] 

923 * rotated_sigma_s[:, hy_index, hy_index] 

924 ) 

925 var[:, 1, 0] = np.real( 

926 rotated_sigma_e[:, ey_index - 2, ey_index - 2] 

927 * rotated_sigma_s[:, hx_index, hx_index] 

928 ) 

929 var[:, 1, 1] = np.real( 

930 rotated_sigma_e[:, ey_index - 2, ey_index - 2] 

931 * rotated_sigma_s[:, hy_index, hy_index] 

932 ) 

933 

934 error = np.sqrt(var) 

935 

936 return z, error 

937 

938 def calculate_tippers( 

939 self, angle: float = 0.0 

940 ) -> tuple[np.typing.NDArray[np.complex64], np.typing.NDArray[np.float64]]: 

941 """ 

942 calculate induction vectors 

943 

944 Parameters 

945 ---------- 

946 angle : float, optional 

947 The angle to rotate the tipper tensor. 

948 

949 Returns 

950 ------- 

951 tipper : np.ndarray 

952 The tipper tensor. 

953 """ 

954 

955 # check to see if there is a vertical magnetic field in the TFs 

956 if self.hz is None: 

957 raise ZMMError( 

958 "Cannot return tipper data because the TFs do not " 

959 "contain the vertical magnetic field as a " 

960 "predicted channel." 

961 ) 

962 # transform the TFs first... 

963 # build transformation matrix for predictor channels 

964 # (horizontal magnetic fields) 

965 hx_index = self.hx.index 

966 hy_index = self.hy.index 

967 u = np.eye(2, 2) 

968 u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle)) 

969 u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle)) 

970 u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle)) 

971 u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle)) 

972 u = np.linalg.inv(u) 

973 

974 # don't need to transform predicated channels (assuming no tilt in Hz) 

975 hz_index = self.hz.index 

976 v = np.eye(self.transfer_functions.shape[1], self.transfer_functions.shape[1]) 

977 

978 # matrix multiplication... 

979 rotated_transfer_functions = np.matmul( 

980 v, np.matmul(self.transfer_functions, u.T) 

981 ) 

982 rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T)) 

983 rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T)) 

984 

985 # now pull out tipper information 

986 tipper = np.zeros((self.num_freq, 2), dtype=np.complex64) 

987 tipper[:, 0] = rotated_transfer_functions[:, hz_index - 2, hx_index] # Tx 

988 tipper[:, 1] = rotated_transfer_functions[:, hz_index - 2, hy_index] # Ty 

989 

990 # and the variance/error information 

991 var = np.zeros((self.num_freq, 2)) 

992 var[:, 0] = np.real( 

993 rotated_sigma_e[:, hz_index - 2, hz_index - 2] 

994 * rotated_sigma_s[:, hx_index, hx_index] 

995 ) # Tx 

996 var[:, 1] = np.real( 

997 rotated_sigma_e[:, hz_index - 2, hz_index - 2] 

998 * rotated_sigma_s[:, hy_index, hy_index] 

999 ) # Ty 

1000 error = np.sqrt(var) 

1001 

1002 tipper = tipper.reshape((self.num_freq, 1, 2)) 

1003 error = error.reshape((self.num_freq, 1, 2)) 

1004 

1005 return tipper, error 

1006 

1007 @property 

1008 def survey_metadata(self): 

1009 sm = Survey() 

1010 sm.add_station(self.station_metadata) 

1011 

1012 return sm 

1013 

1014 def _get_electric_metadata(self, comp: str) -> Electric: 

1015 """ 

1016 get electric information from the various metadata 

1017 

1018 Parameters 

1019 ---------- 

1020 comp : str 

1021 The component name (e.g., "ex", "ey"). 

1022 

1023 Returns 

1024 ------- 

1025 Electric 

1026 The electric metadata for the specified component. 

1027 """ 

1028 comp = comp.lower() 

1029 electric = Electric() 

1030 electric.positive.type = "electric" 

1031 electric.negative.type = "electric" 

1032 if hasattr(self, comp): 

1033 meas = getattr(self, comp) 

1034 electric.measurement_azimuth = meas.azimuth 

1035 electric.measurement_tilt = meas.tilt 

1036 electric.component = comp 

1037 electric.channel_number = meas.number 

1038 electric.channel_id = meas.number 

1039 return electric 

1040 

1041 @property 

1042 def ex_metadata(self) -> Electric: 

1043 return self._get_electric_metadata("ex") 

1044 

1045 @property 

1046 def ey_metadata(self) -> Electric: 

1047 return self._get_electric_metadata("ey") 

1048 

1049 def _get_magnetic_metadata(self, comp: str) -> Magnetic: 

1050 """ 

1051 

1052 get magnetic metadata from the various sources 

1053 

1054 Parameters 

1055 ---------- 

1056 comp : str 

1057 The component name (e.g., "hx", "hy", "hz"). 

1058 

1059 Returns 

1060 ------- 

1061 Magnetic 

1062 The magnetic metadata for the specified component. 

1063 

1064 """ 

1065 

1066 comp = comp.lower() 

1067 magnetic = Magnetic() 

1068 if hasattr(self, comp): 

1069 meas = getattr(self, comp) 

1070 magnetic.measurement_azimuth = meas.azimuth 

1071 magnetic.measurement_tilt = meas.tilt 

1072 magnetic.component = comp 

1073 magnetic.channel_number = meas.number 

1074 magnetic.channel_id = meas.number 

1075 return magnetic 

1076 

1077 @property 

1078 def hx_metadata(self) -> Magnetic: 

1079 return self._get_magnetic_metadata("hx") 

1080 

1081 @property 

1082 def hy_metadata(self) -> Magnetic: 

1083 return self._get_magnetic_metadata("hy") 

1084 

1085 @property 

1086 def hz_metadata(self) -> Magnetic: 

1087 return self._get_magnetic_metadata("hz")