Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ jfiles \ jfile.py: 87%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1# -*- coding: utf-8 -*- 

2""" 

3.. py:module:: JFile 

4 :synopsis: Deal with J-Files of the format propsed by Alan Jones 

5 

6.. codeauthor:: Jared Peacock <jpeacock@usgs.gov> 

7 

8""" 

9 

10# ============================================================================== 

11from pathlib import Path 

12 

13import numpy as np 

14from loguru import logger 

15 

16from mt_metadata.common.mttime import MTime 

17from mt_metadata.timeseries import Electric, Magnetic, Run, Survey 

18from mt_metadata.transfer_functions.io.tools import get_nm_elev 

19from mt_metadata.transfer_functions.tf import Station 

20 

21from .metadata import Header 

22 

23 

24# ============================================================================== 

25# Class to read j_file 

26# ============================================================================== 

27class JFile: 

28 """ 

29 be able to read and write a j-file 

30 """ 

31 

32 def __init__(self, fn: str | Path | None = None, **kwargs): 

33 self.header = Header() 

34 

35 self._jfn = None 

36 self.fn = fn 

37 

38 self.z = None 

39 self.z_err = None 

40 self.t = None 

41 self.t_err = None 

42 self.frequency = None 

43 

44 for key, value in kwargs.items(): 

45 setattr(self, key, value) 

46 

47 if self.fn is not None: 

48 self.read() 

49 

50 def __str__(self): 

51 lines = [f"Station: {self.header.station}", "-" * 50] 

52 lines.append(f"\tSurvey: {self.survey_metadata.id}") 

53 lines.append(f"\tProject: {self.survey_metadata.project}") 

54 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}") 

55 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}") 

56 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}") 

57 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}") 

58 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}") 

59 if self.z is not None: 

60 if (self.z == 0).all(): 

61 lines.append("\tImpedance: False") 

62 else: 

63 lines.append("\tImpedance: True") 

64 else: 

65 lines.append("\tImpedance: False") 

66 

67 if self.t is not None: 

68 if (self.t == 0).all(): 

69 lines.append("\tTipper: False") 

70 else: 

71 lines.append("\tTipper: True") 

72 else: 

73 lines.append("\tTipper: False") 

74 

75 if self.frequency is not None: 

76 lines.append(f"\tNumber of periods: {self.frequency.size}") 

77 lines.append( 

78 f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s" 

79 ) 

80 lines.append( 

81 f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s" 

82 ) 

83 

84 return "\n".join(lines) 

85 

86 def __repr__(self): 

87 lines = [] 

88 lines.append(f"station='{self.header.station}'") 

89 lines.append(f"latitude={self.header.latitude:.2f}") 

90 lines.append(f"longitude={self.header.longitude:.2f}") 

91 lines.append(f"elevation={self.header.elevation:.2f}") 

92 

93 return f"JFile({(', ').join(lines)})" 

94 

95 @property 

96 def fn(self): 

97 return self._jfn 

98 

99 @fn.setter 

100 def fn(self, value: str | Path | None) -> None: 

101 """ 

102 set the j-file name 

103 

104 Parameters 

105 ---------- 

106 value : str | Path | None 

107 The j-file name to set. 

108 

109 Raises 

110 ------ 

111 ValueError 

112 If the file is not found or cannot be opened. 

113 """ 

114 if value is None: 

115 return 

116 value = Path(value) 

117 if value.suffix in [".j"]: 

118 self._jfn = value 

119 else: 

120 msg = f"Input file must be a *.j file not {value.suffix}" 

121 logger.error(msg) 

122 raise ValueError(msg) 

123 

124 @property 

125 def periods(self) -> None | np.typing.NDArray[np.float64]: 

126 if self.frequency is not None: 

127 return 1.0 / self.frequency 

128 

129 def _validate_j_file(self) -> list[str]: 

130 """ 

131 change the lat, lon, elev lines to something machine readable, 

132 if they are not. 

133 """ 

134 if self.fn is not None: 

135 if not self.fn.exists(): 

136 msg = f"Could not find {self.fn}, check path" 

137 logger.error(msg) 

138 raise NameError(msg) 

139 

140 with open(str(self.fn), "r", errors="replace") as fid: 

141 j_lines = fid.readlines() 

142 

143 for variable in ["lat", "lon", "elev"]: 

144 for ii, line in enumerate(j_lines): 

145 if variable in line.lower(): 

146 name = line.split("=")[0] 

147 try: 

148 value = float(line.split("=")[1].strip()) 

149 except ValueError: 

150 value = 0.0 

151 logger.debug(f"Changed {name[1:]} to 0.0") 

152 j_lines[ii] = "{0} = {1}\n".format(name, value) 

153 break 

154 

155 return j_lines 

156 

157 def read(self, fn: str | Path | None = None, get_elevation=False): 

158 """ 

159 Read data from a j file 

160 

161 parameters 

162 ---------- 

163 fn : str | Path | None 

164 full path to j-file to read, defaults to None 

165 

166 get_elevation : bool, optional 

167 if True, will try to get elevation from the NM elevation service, 

168 defaults to False 

169 

170 Raises 

171 ------ 

172 ValueError 

173 If the file is not found or cannot be opened. 

174 NameError 

175 If the file is not a valid j-file. 

176 

177 Returns 

178 ------- 

179 None 

180 Reads the data into the instance variables. 

181 

182 """ 

183 # read data 

184 z_index_dict = { 

185 "zxx": (0, 0), 

186 "zxy": (0, 1), 

187 "zyx": (1, 0), 

188 "zyy": (1, 1), 

189 } 

190 t_index_dict = {"tzx": (0, 0), "tzy": (0, 1)} 

191 

192 if fn is not None: 

193 self.fn = fn 

194 

195 logger.debug(f"Reading {self.fn}") 

196 

197 j_line_list = self._validate_j_file() 

198 

199 self.header.read_header(j_line_list) 

200 self.header.read_metadata(j_line_list) 

201 

202 data_lines = [ 

203 j_line for j_line in j_line_list if not ">" in j_line and not "#" in j_line 

204 ][:] 

205 

206 self.header.station = data_lines[0].strip() 

207 

208 # sometimes birrp outputs some missing periods, so the best way to deal with 

209 # this that I could come up with was to get things into dictionaries with 

210 # key words that are the period values, then fill in Z and T from there 

211 # leaving any missing values as 0 

212 

213 # make empty dictionary that have keys as the component 

214 z_dict = dict([(z_key, {}) for z_key in list(z_index_dict.keys())]) 

215 t_dict = dict([(t_key, {}) for t_key in list(t_index_dict.keys())]) 

216 

217 # initialize d_key to avoid NameError 

218 d_key = None 

219 

220 for d_line in data_lines[1:]: 

221 # check to see if we are at the beginning of a component block, if so 

222 # set the dictionary key to that value 

223 line_parts = d_line.strip().split() 

224 if line_parts and ( 

225 line_parts[0].lower().startswith("z") 

226 or line_parts[0].lower().startswith("t") 

227 ): 

228 d_key = line_parts[0].lower() 

229 # if we are at the number of periods line, skip it 

230 elif len(d_line.strip().split()) == 1 and "r" not in d_line.lower(): 

231 continue 

232 elif "r" in d_line.lower(): 

233 break 

234 # get the numbers into the correct dictionary with a key as period and 

235 # for now we will leave the numbers as a list, which we will parse later 

236 else: 

237 # split the line up into each number 

238 d_list = d_line.strip().split() 

239 

240 # make a copy of the list to be sure we don't rewrite any values, 

241 # not sure if this is necessary at the moment 

242 d_value_list = list(d_list) 

243 for d_index, d_value in enumerate(d_list): 

244 # check to see if the column number can be converted into a float 

245 # if it can't, then it will be set to 0, which is assumed to be 

246 # a masked number when writing to an .edi file 

247 

248 try: 

249 d_value = float(d_value) 

250 # need to check for masked points represented by 

251 # birrp as -999, apparently 

252 if d_value == -999 or np.isnan(d_value): 

253 d_value_list[d_index] = "0.0" 

254 else: 

255 d_value_list[d_index] = str(d_value) 

256 except ValueError: 

257 d_value_list[d_index] = "0.0" 

258 

259 # put the numbers in the correct dictionary as: 

260 # key = period, value = [real, imaginary, error] 

261 if d_key is not None and d_key in list(z_index_dict.keys()): 

262 z_dict[d_key][d_value_list[0]] = d_value_list[1:4] 

263 elif d_key is not None and d_key in list(t_index_dict.keys()): 

264 t_dict[d_key][d_value_list[0]] = d_value_list[1:4] 

265 

266 # --> now we need to get the set of periods for all components 

267 # check to see if there is any tipper data output 

268 

269 all_periods = [] 

270 for z_key in list(z_index_dict.keys()): 

271 for f_key in list(z_dict[z_key].keys()): 

272 all_periods.append(f_key) 

273 

274 if len(list(t_dict["tzx"].keys())) == 0: 

275 logger.debug(f"Could not find any Tipper data in {self.fn}") 

276 find_tipper = False 

277 

278 else: 

279 for t_key in list(t_index_dict.keys()): 

280 for f_key in list(t_dict[t_key].keys()): 

281 all_periods.append(f_key) 

282 find_tipper = True 

283 

284 all_periods = np.array(sorted(list(set(all_periods))), dtype=float) 

285 all_periods = all_periods[np.nonzero(all_periods)] 

286 num_per = len(all_periods) 

287 

288 # fill arrays using the period key from all_periods 

289 self.z = np.zeros((num_per, 2, 2), dtype=complex) 

290 self.z_err = np.zeros((num_per, 2, 2), dtype=float) 

291 

292 self.t = np.zeros((num_per, 1, 2), dtype=complex) 

293 self.t_err = np.zeros((num_per, 1, 2), dtype=float) 

294 

295 for p_index, per in enumerate(all_periods): 

296 # Convert float period back to string for dictionary lookup 

297 per_key = str(per) 

298 for z_key in sorted(z_index_dict.keys()): 

299 kk = z_index_dict[z_key][0] 

300 ll = z_index_dict[z_key][1] 

301 try: 

302 self.z[p_index, kk, ll] = float( 

303 z_dict[z_key][per_key][0] 

304 ) + 1j * float(z_dict[z_key][per_key][1]) 

305 self.z_err[p_index, kk, ll] = float(z_dict[z_key][per_key][2]) 

306 except KeyError: 

307 logger.debug(f"No value found for period {per:.4g}") 

308 logger.debug(f"For component {z_key}") 

309 if find_tipper is True: 

310 for t_key in sorted(t_index_dict.keys()): 

311 kk = t_index_dict[t_key][0] 

312 ll = t_index_dict[t_key][1] 

313 try: 

314 self.t[p_index, kk, ll] = float( 

315 t_dict[t_key][per_key][0] 

316 ) + 1j * float(t_dict[t_key][per_key][1]) 

317 self.t_err[p_index, kk, ll] = float(t_dict[t_key][per_key][2]) 

318 except KeyError: 

319 logger.debug(f"No value found for period {per:.4g}") 

320 logger.debug(f"For component {t_key}") 

321 

322 # put the results into mtpy objects 

323 self.frequency = 1.0 / all_periods 

324 self.z[np.where(self.z == np.inf)] = 0 + 0j 

325 self.t[np.where(self.t == np.inf)] = 0 + 0j 

326 self.z_err[np.where(self.z_err == np.inf)] = 10**6 

327 self.t_err[np.where(self.t_err == np.inf)] = 10**6 

328 

329 if self.header.elevation == 0 and get_elevation: 

330 if self.header.latitude != 0 and self.header.longitude != 0: 

331 self.header.elevation = get_nm_elev( 

332 self.header.latitude, self.header.longitude 

333 ) 

334 

335 @property 

336 def station_metadata(self): 

337 sm = Station() 

338 r1 = Run(id="001") 

339 if self.header.birrp_parameters.deltat < 0: 

340 r1.sample_rate = abs(self.header.birrp_parameters.deltat) 

341 else: 

342 r1.sample_rate = 1.0 / (self.header.birrp_parameters.deltat) 

343 

344 if not np.all(self.z == 0): 

345 for ii, comp in enumerate(["ex", "ey", "hx", "hy"], 1): 

346 if comp.startswith("e"): 

347 ch = Electric(component=comp, channel_id=ii) 

348 elif comp.startswith("h"): 

349 ch = Magnetic(component=comp, channel_id=ii) 

350 r1.add_channel(ch) 

351 

352 if not np.all(self.t == 0): 

353 ch = Magnetic(component="hz", channel_id=5) 

354 r1.add_channel(ch) 

355 

356 sm.runs.append(r1) 

357 sm.id = self.header.station 

358 sm.data_type = "MT" 

359 

360 sm.location.latitude = self.header.latitude 

361 sm.location.longitude = self.header.longitude 

362 sm.location.elevation = self.header.elevation 

363 

364 # provenance 

365 sm.provenance.software.name = "BIRRP" 

366 sm.provenance.software.version = "5" 

367 sm.transfer_function.id = self.header.station 

368 if self.fn is not None: 

369 sm.transfer_function.processed_date = MTime( 

370 time_stamp=self.fn.stat().st_ctime 

371 ).isoformat() 

372 sm.transfer_function.runs_processed = sm.run_list 

373 # add birrp parameters 

374 for key, value in self.header.birrp_parameters.to_dict(single=True).items(): 

375 sm.transfer_function.processing_parameters.append(f"{key} = {value}") 

376 

377 return sm 

378 

379 @property 

380 def survey_metadata(self): 

381 sm = Survey() 

382 sm.add_station(self.station_metadata) 

383 

384 return sm