Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ usgs_ascii \ header.py: 85%
264 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Aug 27 16:54:09 2020
5:author: Jared Peacock
7:license: MIT
9"""
11import json
12from collections import OrderedDict
14# =============================================================================
15# Imports
16# =============================================================================
17from pathlib import Path
18from urllib.error import HTTPError
19from urllib.request import Request, urlopen
21import numpy as np
22from loguru import logger
23from mt_metadata.timeseries import Electric, Magnetic, Run, Station, Survey
26# =============================================================================
27# Metadata for usgs ascii file
28# =============================================================================
31class AsciiMetadata:
32 """
33 Container for all the important metadata in a USGS ascii file.
35 ========================= =================================================
36 Attributes Description
37 ========================= =================================================
38 survey_id Survey name
39 site_id Site name
40 run_id Run number
41 site_latitude Site latitude in decimal degrees WGS84
42 site_longitude Site longitude in decimal degrees WGS84
43 site_elevation Site elevation according to national map meters
44 start Start time of station YYYY-MM-DDThh:mm:ss UTC
45 end Stop time of station YYYY-MM-DDThh:mm:ss UTC
46 sample_rate Sampling rate samples/second
47 n_samples Number of samples
48 n_channels Number of channels
49 coordinate_system [ Geographic North | Geomagnetic North ]
50 chn_settings Channel settings, see below
51 missing_data_flag Missing data value
52 ========================= =================================================
54 :chn_settings:
56 ========================= =================================================
57 Keys Description
58 ========================= =================================================
59 ChnNum site_id+channel number
60 ChnID Component [ ex | ey | hx | hy | hz ]
61 InstrumentID Data logger + sensor number
62 Azimuth Setup angle of componet in degrees relative to
63 coordinate_system
64 Dipole_Length Dipole length in meters
65 ========================= =================================================
68 """
70 def __init__(self, fn=None, **kwargs):
71 self.logger = logger
73 self.fn = fn
74 self.missing_data_flag = np.nan
75 self.coordinate_system = None
76 self._metadata_len = 30
77 from mt_metadata.common import DataTypeEnum, TimePeriodDate
79 # Survey, Station, Run: instantiate and set required fields directly
80 self._survey_metadata = Survey()
81 self._survey_metadata.id = ""
82 self._survey_metadata.datum = "WGS 84"
83 self._survey_metadata.geographic_name = ""
84 self._survey_metadata.name = ""
85 self._survey_metadata.project = ""
86 self._survey_metadata.summary = ""
87 self._survey_metadata.time_period = TimePeriodDate(
88 start_date="1970-01-01", end_date="1970-01-02"
89 )
91 self._station_metadata = Station()
92 self._station_metadata.id = ""
93 self._station_metadata.channels_recorded = []
94 self._station_metadata.geographic_name = ""
95 self._station_metadata.run_list = []
96 self._station_metadata.data_type = DataTypeEnum.BBMT
98 self._run_metadata = Run()
99 self._run_metadata.id = ""
100 self._run_metadata.sample_rate = 0.0
101 self._run_metadata.channels_recorded_auxiliary = []
102 self._run_metadata.channels_recorded_electric = []
103 self._run_metadata.channels_recorded_magnetic = []
104 self._run_metadata.data_type = DataTypeEnum.BBMT
106 # Electric required fields (all base fields)
107 self.ex_metadata = Electric(component="ex") # type: ignore
108 self.ey_metadata = Electric(component="ey") # type: ignore
110 self.hx_metadata = Magnetic(component="hx") # type: ignore
111 self.hy_metadata = Magnetic(component="hy") # type: ignore
112 self.hz_metadata = Magnetic(component="hz") # type: ignore
114 self.channel_order = ["hx", "ex", "hy", "ey", "hz"]
116 self.national_map_url = r"https://epqs.nationalmap.gov/v1/json?x={0:.5f}&y={1:.5f}&units=Meters&wkid=4326&includeDate=False"
118 self._key_dict = OrderedDict(
119 **{
120 "SurveyID": "survey_id",
121 "SiteID": "site_id",
122 "RunID": "run_id",
123 "SiteLatitude": "latitude",
124 "SiteLongitude": "longitude",
125 "SiteElevation": "elevation",
126 "AcqStartTime": "start",
127 "AcqStopTime": "end",
128 "AcqSmpFreq": "sample_rate",
129 "AcqNumSmp": "n_samples",
130 "Nchan": "n_channels",
131 "Channel coordinates relative to geographic north": "",
132 "ChnSettings": "",
133 "MissingDataFlag": "missing_data_flag",
134 "DataSet": "data_set",
135 }
136 )
138 self._chn_dict = OrderedDict(
139 **{
140 "ChnNum": "channel_number",
141 "ChnID": "component",
142 "InstrumentID": "sensor.id",
143 "Azimuth": "measurement_azimuth",
144 "Dipole_Length": "dipole_length",
145 }
146 )
147 self._chn_fmt = {
148 "ChnNum": "<8",
149 "ChnID": "<6",
150 "InstrumentID": "<12",
151 "Azimuth": ">7.1f",
152 "Dipole_Length": ">14.1f",
153 }
155 for key in kwargs.keys():
156 setattr(self, key, kwargs[key])
158 def __str__(self):
159 return "\n".join(self.write_metadata())
161 def __repr__(self):
162 return self.__str__()
164 @property
165 def fn(self):
166 return self._fn
168 @fn.setter
169 def fn(self, value):
170 if value is not None:
171 self._fn = Path(value)
172 else:
173 self._fn = None
175 @property
176 def file_size(self):
177 if self.fn is not None:
178 if self.fn.exists():
179 return self.fn.stat().st_size
181 @property
182 def survey_id(self):
183 return self._survey_metadata.id
185 @survey_id.setter
186 def survey_id(self, value):
187 self._survey_metadata.id = value
189 @property
190 def run_id(self):
191 return self._run_metadata.id
193 @run_id.setter
194 def run_id(self, value):
195 self._run_metadata.id = value
197 @property
198 def site_id(self):
199 return self._station_metadata.id
201 @site_id.setter
202 def site_id(self, station):
203 self._station_metadata.id = station
205 @property
206 def latitude(self):
207 return self._station_metadata.location.latitude
209 @latitude.setter
210 def latitude(self, lat):
211 self._station_metadata.location.latitude = lat
213 @property
214 def longitude(self):
215 return self._station_metadata.location.longitude
217 @longitude.setter
218 def longitude(self, lon):
219 self._station_metadata.location.longitude = lon
221 @property
222 def elevation(self):
223 """
224 get elevation from national map
225 """
226 # the url for national map elevation query
227 nm_url = Request(self.national_map_url.format(self.longitude, self.latitude))
229 # call the url and get the response
230 try:
231 response = urlopen(nm_url)
232 except HTTPError:
233 self.logger.error("could not connect to get elevation from national map.")
234 self.logger.debug(nm_url.format(self.longitude, self.latitude))
235 return self.station_metadata.location.elevation
236 # read the json response and convert to a float. Be defensive:
237 # network requests may return non-json or empty responses in CI
238 # environments. If parsing fails, fall back to stored station value.
239 try:
240 body = response.read().decode()
241 info = json.loads(body)
242 try:
243 nm_elev = round(float(info.get("value", 0)), 1)
244 except (ValueError, TypeError):
245 self.logger.warning(
246 "could not read elevation from national map url. Setting to 0"
247 )
248 nm_elev = 0
249 except json.JSONDecodeError:
250 self.logger.warning(
251 "national map returned non-json response; using stored elevation"
252 )
253 nm_elev = self.station_metadata.location.elevation
254 except Exception:
255 # Any other issue should fall back to stored elevation
256 self.logger.debug(
257 "unexpected error parsing national map response; using stored elevation"
258 )
259 nm_elev = self.station_metadata.location.elevation
261 return nm_elev
263 @elevation.setter
264 def elevation(self, value):
265 self.station_metadata.location.elevation = value
267 @property
268 def start(self):
269 return self._station_metadata.time_period.start
271 @start.setter
272 def start(self, time_string):
273 # Always convert to UTC ISO string
274 import pandas as pd
276 ts = pd.Timestamp(time_string)
277 if ts.tz is None:
278 ts = ts.tz_localize("UTC")
279 else:
280 ts = ts.tz_convert("UTC")
281 self.station_metadata.time_period.start = ts.isoformat()
283 @property
284 def end(self):
285 return self._station_metadata.time_period.end
287 @end.setter
288 def end(self, time_string):
289 # Always convert to UTC ISO string
290 import pandas as pd
292 ts = pd.Timestamp(time_string)
293 if ts.tz is None:
294 ts = ts.tz_localize("UTC")
295 else:
296 ts = ts.tz_convert("UTC")
297 self._station_metadata.time_period.end = ts.isoformat()
299 @property
300 def n_channels(self):
301 return self._chn_num
303 @n_channels.setter
304 def n_channels(self, n_channel):
305 try:
306 self._chn_num = int(n_channel)
307 except ValueError:
308 self.logger.warning(f"{n_channel} is not a number, setting n_channels to 0")
310 @property
311 def sample_rate(self):
312 return self._run_metadata.sample_rate
314 @sample_rate.setter
315 def sample_rate(self, df):
316 self._run_metadata.sample_rate = float(df)
318 @property
319 def n_samples(self):
320 return self._n_samples
322 @n_samples.setter
323 def n_samples(self, n_samples):
324 self._n_samples = int(n_samples)
326 @property
327 def survey_metadata(self):
328 return self._survey_metadata
330 @survey_metadata.setter
331 def survey_metadata(self, value):
332 if isinstance(value, Survey):
333 self._survey_metadata.update(value)
334 else:
335 raise TypeError("Input must be a mt_metadata.timeseries.Survey instance")
337 @property
338 def station_metadata(self):
339 return self._station_metadata
341 @station_metadata.setter
342 def station_metadata(self, value):
343 if isinstance(value, Station):
344 self._station_metadata.update(value)
345 else:
346 raise TypeError("Input must be a mt_metadata.timeseries.Station instance")
348 @property
349 def run_metadata(self):
350 return self._run_metadata
352 @run_metadata.setter
353 def run_metadata(self, value):
354 if isinstance(value, Run):
355 self._run_metadata.update(value)
356 else:
357 raise TypeError("Input must be a mt_metadata.timeseries.Run instance")
359 def get_component_info(self, comp):
360 """
362 :param comp: DESCRIPTION
363 :type comp: TYPE
364 :return: DESCRIPTION
365 :rtype: TYPE
367 """
369 for key, kdict in self.channel_dict.items():
370 if kdict["ChnID"].lower() == comp.lower():
371 return kdict
372 return None
374 def read_metadata(self, fn=None, meta_lines=None):
375 """
376 Read in a meta from the raw string or file. Populate all metadata
377 as attributes.
379 :param fn: full path to USGS ascii file
380 :type fn: string
382 :param meta_lines: lines of metadata to read
383 :type meta_lines: list
384 """
385 chn_find = False
387 if fn is not None:
388 self.fn = fn
389 if self.fn is not None:
390 with self.fn.open("r") as fid:
391 meta_lines = [fid.readline() for ii in range(self._metadata_len)]
392 for ii, line in enumerate(meta_lines):
393 if "DataSet" in line:
394 break
395 if line.find(":") > 0:
396 key, value = line.strip().split(":", 1)
397 value = value.strip()
399 if len(value) < 1:
400 chn_find = True
401 continue
402 attr = self._key_dict[key]
403 setattr(self, attr, value)
404 elif "coordinate" in line:
405 self.coordinate_system = " ".join(line.strip().split()[-2:])
406 else:
407 if chn_find is True:
408 if "chnnum" in line.lower():
409 ch_keys = dict(
410 [(kk, ii) for ii, kk in enumerate(line.strip().split())]
411 )
412 else:
413 line_list = line.strip().split()
414 if len(line_list) == len(ch_keys.keys()):
415 ch = getattr(
416 self,
417 f"{line_list[ch_keys['ChnID']].lower()}_metadata",
418 )
420 ch.channel_number = line_list[ch_keys["ChnNum"]]
421 ch.measurement_azimuth = line_list[ch_keys["Azimuth"]]
422 if ch.type in ["electric"]:
423 ch.dipole_length = line_list[ch_keys["Dipole_Length"]]
424 else:
425 ch.sensor.id = line_list[ch_keys["InstrumentID"]]
426 self.run_metadata.data_logger.id = line_list[
427 ch_keys["InstrumentID"]
428 ]
430 ch.time_period.start = self.start
431 ch.time_period.end = self.end
432 ch.sample_rate = self.sample_rate
433 self._run_metadata.add_channel(ch)
434 else:
435 self.logger.warning("Not sure what line this is")
436 self._run_metadata.time_period.start = self.start
437 self._run_metadata.time_period.end = self.end
438 self._run_metadata.sample_rate = self.sample_rate
440 self._station_metadata.add_run(self.run_metadata)
442 return ii + 1
444 def write_metadata(self, chn_list=["Ex", "Ey", "Hx", "Hy", "Hz"]):
445 """
446 Write out metadata in the format of USGS ascii.
448 :return: list of metadate lines.
450 .. note:: meant to use '\n'.join(lines) to write out in a file.
452 """
454 lines = []
455 for key, attr in self._key_dict.items():
456 if key in ["ChnSettings"]:
457 lines.append("{0}:".format(key))
458 lines.append(" ".join(self._chn_dict.keys()))
459 for chn_key in self.channel_order:
460 chn_line = []
461 ch = getattr(self, f"{chn_key}_metadata")
462 for comp_key, comp_attr in self._chn_dict.items():
463 try:
464 value = ch.get_attr_from_name(comp_attr)
465 if value is None:
466 value = self.run_metadata.data_logger.id
467 except AttributeError:
468 value = 0
469 chn_line.append(f"{value:{self._chn_fmt[comp_key]}}")
470 lines.append("".join(chn_line))
471 elif key in ["DataSet"]:
472 lines.append("{0}:".format(key))
473 return lines
474 else:
475 try:
476 value = getattr(self, attr)
477 lines.append(f"{key}: {value}")
478 except AttributeError:
479 lines.append(f"{key}")
480 return lines