Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ transfer_functions \ io \ jfiles \ jfile.py: 87%
210 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1# -*- coding: utf-8 -*-
2"""
3.. py:module:: JFile
4 :synopsis: Deal with J-Files of the format propsed by Alan Jones
6.. codeauthor:: Jared Peacock <jpeacock@usgs.gov>
8"""
10# ==============================================================================
11from pathlib import Path
13import numpy as np
14from loguru import logger
16from mt_metadata.common.mttime import MTime
17from mt_metadata.timeseries import Electric, Magnetic, Run, Survey
18from mt_metadata.transfer_functions.io.tools import get_nm_elev
19from mt_metadata.transfer_functions.tf import Station
21from .metadata import Header
24# ==============================================================================
25# Class to read j_file
26# ==============================================================================
27class JFile:
28 """
29 be able to read and write a j-file
30 """
32 def __init__(self, fn: str | Path | None = None, **kwargs):
33 self.header = Header()
35 self._jfn = None
36 self.fn = fn
38 self.z = None
39 self.z_err = None
40 self.t = None
41 self.t_err = None
42 self.frequency = None
44 for key, value in kwargs.items():
45 setattr(self, key, value)
47 if self.fn is not None:
48 self.read()
50 def __str__(self):
51 lines = [f"Station: {self.header.station}", "-" * 50]
52 lines.append(f"\tSurvey: {self.survey_metadata.id}")
53 lines.append(f"\tProject: {self.survey_metadata.project}")
54 lines.append(f"\tAcquired by: {self.station_metadata.acquired_by.author}")
55 lines.append(f"\tAcquired date: {self.station_metadata.time_period.start}")
56 lines.append(f"\tLatitude: {self.station_metadata.location.latitude:.3f}")
57 lines.append(f"\tLongitude: {self.station_metadata.location.longitude:.3f}")
58 lines.append(f"\tElevation: {self.station_metadata.location.elevation:.3f}")
59 if self.z is not None:
60 if (self.z == 0).all():
61 lines.append("\tImpedance: False")
62 else:
63 lines.append("\tImpedance: True")
64 else:
65 lines.append("\tImpedance: False")
67 if self.t is not None:
68 if (self.t == 0).all():
69 lines.append("\tTipper: False")
70 else:
71 lines.append("\tTipper: True")
72 else:
73 lines.append("\tTipper: False")
75 if self.frequency is not None:
76 lines.append(f"\tNumber of periods: {self.frequency.size}")
77 lines.append(
78 f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s"
79 )
80 lines.append(
81 f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s"
82 )
84 return "\n".join(lines)
86 def __repr__(self):
87 lines = []
88 lines.append(f"station='{self.header.station}'")
89 lines.append(f"latitude={self.header.latitude:.2f}")
90 lines.append(f"longitude={self.header.longitude:.2f}")
91 lines.append(f"elevation={self.header.elevation:.2f}")
93 return f"JFile({(', ').join(lines)})"
95 @property
96 def fn(self):
97 return self._jfn
99 @fn.setter
100 def fn(self, value: str | Path | None) -> None:
101 """
102 set the j-file name
104 Parameters
105 ----------
106 value : str | Path | None
107 The j-file name to set.
109 Raises
110 ------
111 ValueError
112 If the file is not found or cannot be opened.
113 """
114 if value is None:
115 return
116 value = Path(value)
117 if value.suffix in [".j"]:
118 self._jfn = value
119 else:
120 msg = f"Input file must be a *.j file not {value.suffix}"
121 logger.error(msg)
122 raise ValueError(msg)
124 @property
125 def periods(self) -> None | np.typing.NDArray[np.float64]:
126 if self.frequency is not None:
127 return 1.0 / self.frequency
129 def _validate_j_file(self) -> list[str]:
130 """
131 change the lat, lon, elev lines to something machine readable,
132 if they are not.
133 """
134 if self.fn is not None:
135 if not self.fn.exists():
136 msg = f"Could not find {self.fn}, check path"
137 logger.error(msg)
138 raise NameError(msg)
140 with open(str(self.fn), "r", errors="replace") as fid:
141 j_lines = fid.readlines()
143 for variable in ["lat", "lon", "elev"]:
144 for ii, line in enumerate(j_lines):
145 if variable in line.lower():
146 name = line.split("=")[0]
147 try:
148 value = float(line.split("=")[1].strip())
149 except ValueError:
150 value = 0.0
151 logger.debug(f"Changed {name[1:]} to 0.0")
152 j_lines[ii] = "{0} = {1}\n".format(name, value)
153 break
155 return j_lines
157 def read(self, fn: str | Path | None = None, get_elevation=False):
158 """
159 Read data from a j file
161 parameters
162 ----------
163 fn : str | Path | None
164 full path to j-file to read, defaults to None
166 get_elevation : bool, optional
167 if True, will try to get elevation from the NM elevation service,
168 defaults to False
170 Raises
171 ------
172 ValueError
173 If the file is not found or cannot be opened.
174 NameError
175 If the file is not a valid j-file.
177 Returns
178 -------
179 None
180 Reads the data into the instance variables.
182 """
183 # read data
184 z_index_dict = {
185 "zxx": (0, 0),
186 "zxy": (0, 1),
187 "zyx": (1, 0),
188 "zyy": (1, 1),
189 }
190 t_index_dict = {"tzx": (0, 0), "tzy": (0, 1)}
192 if fn is not None:
193 self.fn = fn
195 logger.debug(f"Reading {self.fn}")
197 j_line_list = self._validate_j_file()
199 self.header.read_header(j_line_list)
200 self.header.read_metadata(j_line_list)
202 data_lines = [
203 j_line for j_line in j_line_list if not ">" in j_line and not "#" in j_line
204 ][:]
206 self.header.station = data_lines[0].strip()
208 # sometimes birrp outputs some missing periods, so the best way to deal with
209 # this that I could come up with was to get things into dictionaries with
210 # key words that are the period values, then fill in Z and T from there
211 # leaving any missing values as 0
213 # make empty dictionary that have keys as the component
214 z_dict = dict([(z_key, {}) for z_key in list(z_index_dict.keys())])
215 t_dict = dict([(t_key, {}) for t_key in list(t_index_dict.keys())])
217 # initialize d_key to avoid NameError
218 d_key = None
220 for d_line in data_lines[1:]:
221 # check to see if we are at the beginning of a component block, if so
222 # set the dictionary key to that value
223 line_parts = d_line.strip().split()
224 if line_parts and (
225 line_parts[0].lower().startswith("z")
226 or line_parts[0].lower().startswith("t")
227 ):
228 d_key = line_parts[0].lower()
229 # if we are at the number of periods line, skip it
230 elif len(d_line.strip().split()) == 1 and "r" not in d_line.lower():
231 continue
232 elif "r" in d_line.lower():
233 break
234 # get the numbers into the correct dictionary with a key as period and
235 # for now we will leave the numbers as a list, which we will parse later
236 else:
237 # split the line up into each number
238 d_list = d_line.strip().split()
240 # make a copy of the list to be sure we don't rewrite any values,
241 # not sure if this is necessary at the moment
242 d_value_list = list(d_list)
243 for d_index, d_value in enumerate(d_list):
244 # check to see if the column number can be converted into a float
245 # if it can't, then it will be set to 0, which is assumed to be
246 # a masked number when writing to an .edi file
248 try:
249 d_value = float(d_value)
250 # need to check for masked points represented by
251 # birrp as -999, apparently
252 if d_value == -999 or np.isnan(d_value):
253 d_value_list[d_index] = "0.0"
254 else:
255 d_value_list[d_index] = str(d_value)
256 except ValueError:
257 d_value_list[d_index] = "0.0"
259 # put the numbers in the correct dictionary as:
260 # key = period, value = [real, imaginary, error]
261 if d_key is not None and d_key in list(z_index_dict.keys()):
262 z_dict[d_key][d_value_list[0]] = d_value_list[1:4]
263 elif d_key is not None and d_key in list(t_index_dict.keys()):
264 t_dict[d_key][d_value_list[0]] = d_value_list[1:4]
266 # --> now we need to get the set of periods for all components
267 # check to see if there is any tipper data output
269 all_periods = []
270 for z_key in list(z_index_dict.keys()):
271 for f_key in list(z_dict[z_key].keys()):
272 all_periods.append(f_key)
274 if len(list(t_dict["tzx"].keys())) == 0:
275 logger.debug(f"Could not find any Tipper data in {self.fn}")
276 find_tipper = False
278 else:
279 for t_key in list(t_index_dict.keys()):
280 for f_key in list(t_dict[t_key].keys()):
281 all_periods.append(f_key)
282 find_tipper = True
284 all_periods = np.array(sorted(list(set(all_periods))), dtype=float)
285 all_periods = all_periods[np.nonzero(all_periods)]
286 num_per = len(all_periods)
288 # fill arrays using the period key from all_periods
289 self.z = np.zeros((num_per, 2, 2), dtype=complex)
290 self.z_err = np.zeros((num_per, 2, 2), dtype=float)
292 self.t = np.zeros((num_per, 1, 2), dtype=complex)
293 self.t_err = np.zeros((num_per, 1, 2), dtype=float)
295 for p_index, per in enumerate(all_periods):
296 # Convert float period back to string for dictionary lookup
297 per_key = str(per)
298 for z_key in sorted(z_index_dict.keys()):
299 kk = z_index_dict[z_key][0]
300 ll = z_index_dict[z_key][1]
301 try:
302 self.z[p_index, kk, ll] = float(
303 z_dict[z_key][per_key][0]
304 ) + 1j * float(z_dict[z_key][per_key][1])
305 self.z_err[p_index, kk, ll] = float(z_dict[z_key][per_key][2])
306 except KeyError:
307 logger.debug(f"No value found for period {per:.4g}")
308 logger.debug(f"For component {z_key}")
309 if find_tipper is True:
310 for t_key in sorted(t_index_dict.keys()):
311 kk = t_index_dict[t_key][0]
312 ll = t_index_dict[t_key][1]
313 try:
314 self.t[p_index, kk, ll] = float(
315 t_dict[t_key][per_key][0]
316 ) + 1j * float(t_dict[t_key][per_key][1])
317 self.t_err[p_index, kk, ll] = float(t_dict[t_key][per_key][2])
318 except KeyError:
319 logger.debug(f"No value found for period {per:.4g}")
320 logger.debug(f"For component {t_key}")
322 # put the results into mtpy objects
323 self.frequency = 1.0 / all_periods
324 self.z[np.where(self.z == np.inf)] = 0 + 0j
325 self.t[np.where(self.t == np.inf)] = 0 + 0j
326 self.z_err[np.where(self.z_err == np.inf)] = 10**6
327 self.t_err[np.where(self.t_err == np.inf)] = 10**6
329 if self.header.elevation == 0 and get_elevation:
330 if self.header.latitude != 0 and self.header.longitude != 0:
331 self.header.elevation = get_nm_elev(
332 self.header.latitude, self.header.longitude
333 )
335 @property
336 def station_metadata(self):
337 sm = Station()
338 r1 = Run(id="001")
339 if self.header.birrp_parameters.deltat < 0:
340 r1.sample_rate = abs(self.header.birrp_parameters.deltat)
341 else:
342 r1.sample_rate = 1.0 / (self.header.birrp_parameters.deltat)
344 if not np.all(self.z == 0):
345 for ii, comp in enumerate(["ex", "ey", "hx", "hy"], 1):
346 if comp.startswith("e"):
347 ch = Electric(component=comp, channel_id=ii)
348 elif comp.startswith("h"):
349 ch = Magnetic(component=comp, channel_id=ii)
350 r1.add_channel(ch)
352 if not np.all(self.t == 0):
353 ch = Magnetic(component="hz", channel_id=5)
354 r1.add_channel(ch)
356 sm.runs.append(r1)
357 sm.id = self.header.station
358 sm.data_type = "MT"
360 sm.location.latitude = self.header.latitude
361 sm.location.longitude = self.header.longitude
362 sm.location.elevation = self.header.elevation
364 # provenance
365 sm.provenance.software.name = "BIRRP"
366 sm.provenance.software.version = "5"
367 sm.transfer_function.id = self.header.station
368 if self.fn is not None:
369 sm.transfer_function.processed_date = MTime(
370 time_stamp=self.fn.stat().st_ctime
371 ).isoformat()
372 sm.transfer_function.runs_processed = sm.run_list
373 # add birrp parameters
374 for key, value in self.header.birrp_parameters.to_dict(single=True).items():
375 sm.transfer_function.processing_parameters.append(f"{key} = {value}")
377 return sm
379 @property
380 def survey_metadata(self):
381 sm = Survey()
382 sm.add_station(self.station_metadata)
384 return sm