Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ usgs_ascii \ usgs_ascii.py: 84%
162 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Thu Aug 27 16:54:09 2020
5:author: Jared Peacock
7:license: MIT
9"""
11# =============================================================================
12# Imports
13# =============================================================================
15import datetime
16import gzip
17import time
18from pathlib import Path
20import numpy as np
21import pandas as pd
23from mth5.io.usgs_ascii import AsciiMetadata
24from mth5.timeseries import ChannelTS, RunTS
27# =============================================================================
28# Metadata for usgs ascii file
29# =============================================================================
32class USGSascii(AsciiMetadata):
33 """
34 Read and write USGS ascii formatted time series.
36 =================== =======================================================
37 Attributes Description
38 =================== =======================================================
39 ts Pandas dataframe holding the time series data
40 fn Full path to .asc file
41 station_dir Full path to station directory
42 meta_notes Notes of how the station was collected
43 =================== =======================================================
45 :Example: ::
47 >>> zc = Z3DCollection()
48 >>> fn_list = zc.get_time_blocks(z3d_path)
49 >>> zm = USGSasc()
50 >>> zm.SurveyID = 'iMUSH'
51 >>> zm.get_z3d_db(fn_list[0])
52 >>> zm.read_mtft24_cfg()
53 >>> zm.CoordinateSystem = 'Geomagnetic North'
54 >>> zm.SurveyID = 'MT'
55 >>> zm.write_asc_file(str_fmt='%15.7e')
56 >>> zm.write_station_info_metadata()
58 """
60 def __init__(self, fn=None, **kwargs):
61 super().__init__(fn, **kwargs)
62 self.ts = None
63 self.station_dir = Path().cwd()
64 self.meta_notes = None
65 for key in kwargs.keys():
66 setattr(self, key, kwargs[key])
68 ### need copy in the metadata otherwise the order in the channels
69 ### gets messed up
71 @property
72 def hx(self):
73 """HX"""
74 if self.ts is not None:
75 return ChannelTS(
76 "magnetic",
77 data=self.ts.hx.to_numpy(),
78 channel_metadata=self.hx_metadata.copy(),
79 run_metadata=self.run_metadata.copy(),
80 station_metadata=self.station_metadata.copy(),
81 survey_metadata=self.survey_metadata.copy(),
82 )
83 return None
85 @property
86 def hy(self):
87 """hy"""
88 if self.ts is not None:
89 return ChannelTS(
90 "magnetic",
91 data=self.ts.hy.to_numpy(),
92 channel_metadata=self.hy_metadata.copy(),
93 run_metadata=self.run_metadata.copy(),
94 station_metadata=self.station_metadata.copy(),
95 survey_metadata=self.survey_metadata.copy(),
96 )
97 return None
99 @property
100 def hz(self):
101 """hz"""
102 if self.ts is not None:
103 return ChannelTS(
104 "magnetic",
105 data=self.ts.hz.to_numpy(),
106 channel_metadata=self.hz_metadata.copy(),
107 run_metadata=self.run_metadata.copy(),
108 station_metadata=self.station_metadata.copy(),
109 survey_metadata=self.survey_metadata.copy(),
110 )
111 return None
113 @property
114 def ex(self):
115 """ex"""
116 if self.ts is not None:
117 return ChannelTS(
118 "electric",
119 data=self.ts.ex.to_numpy(),
120 channel_metadata=self.ex_metadata.copy(),
121 run_metadata=self.run_metadata.copy(),
122 station_metadata=self.station_metadata.copy(),
123 survey_metadata=self.survey_metadata.copy(),
124 )
125 return None
127 @property
128 def ey(self):
129 """ey"""
130 if self.ts is not None:
131 return ChannelTS(
132 "electric",
133 data=self.ts.ey.to_numpy(),
134 channel_metadata=self.ey_metadata.copy(),
135 run_metadata=self.run_metadata.copy(),
136 station_metadata=self.station_metadata.copy(),
137 survey_metadata=self.survey_metadata.copy(),
138 )
139 return None
141 def to_run_ts(self):
142 """Get xarray for run"""
143 if self.ts is not None:
144 return RunTS(
145 array_list=[self.hx, self.hy, self.hz, self.ex, self.ey],
146 run_metadata=self.run_metadata.copy(),
147 station_metadata=self.station_metadata.copy(),
148 survey_metadata=self.survey_metadata.copy(),
149 )
150 return None
152 def read(self, fn=None):
153 """
154 Read in a USGS ascii file and fill attributes accordingly.
156 :param fn: full path to .asc file to be read in
157 :type fn: string
158 """
159 if fn is not None:
160 self.fn = fn
161 st = datetime.datetime.now()
162 data_line = self.read_metadata()
163 self.ts = pd.read_csv(
164 self.fn,
165 sep="\s+",
166 skiprows=data_line,
167 dtype=np.float32,
168 )
169 dt_index = pd.date_range(
170 start=self.start.time_stamp, periods=self.n_samples, end=self.end.time_stamp
171 )
172 self.ts.index = dt_index
173 self.ts.columns = self.ts.columns.str.lower()
175 et = datetime.datetime.now()
176 read_time = et - st
177 self.logger.info(f"Reading took {read_time.total_seconds()}")
179 def _make_file_name(self, save_path=None, compression=True, compress_type="zip"):
180 """
181 get the file name to save to
183 :param save_path: full path to directory to save file to
184 :type save_path: string
186 :param compression: compress file
187 :type compression: [ True | False ]
189 :return: save_fn
190 :rtype: string
192 """
193 # make the file name to save to
194 if save_path is not None:
195 save_path = Path(save_path)
196 save_fn = save_path.joinpath(
197 "{0}_{1}T{2}_{3:.0f}.asc".format(
198 self.SiteID,
199 self._start_time.strftime("%Y-%m-%d"),
200 self._start_time.strftime("%H%M%S"),
201 self.AcqSmpFreq,
202 ),
203 )
204 else:
205 save_fn = self.station_dir.joinpath(
206 "{0}_{1}T{2}_{3:.0f}.asc".format(
207 self.SiteID,
208 self._start_time.strftime("%Y-%m-%d"),
209 self._start_time.strftime("%H%M%S"),
210 self.AcqSmpFreq,
211 ),
212 )
213 if compression:
214 # Use conventional suffixes: zip -> .zip, gzip -> .gz
215 if compress_type == "zip":
216 save_fn = save_fn.with_suffix(save_fn.suffix + ".zip")
217 elif compress_type == "gzip":
218 save_fn = save_fn.with_suffix(save_fn.suffix + ".gz")
219 return save_fn
221 def write(
222 self,
223 save_fn=None,
224 chunk_size=1024,
225 str_fmt="%15.7e",
226 full=True,
227 compress=False,
228 save_dir=None,
229 compress_type="zip",
230 convert_electrics=True,
231 ):
232 """
233 Write an ascii file in the USGS ascii format.
235 :param save_fn: full path to file name to save the merged ascii to
236 :type save_fn: string
238 :param chunck_size: chunck size to write file in blocks, larger numbers
239 are typically slower.
240 :type chunck_size: int
242 :param str_fmt: format of the data as written
243 :type str_fmt: string
245 :param full: write out the complete file, mostly for testing.
246 :type full: boolean [ True | False ]
248 :param compress: compress file
249 :type compress: boolean [ True | False ]
251 :param compress_type: compress file using zip or gzip
252 :type compress_type: boolean [ zip | gzip ]
253 """
254 # get the filename to save to
255 save_fn = self._make_file_name(
256 save_path=save_dir,
257 compression=compress,
258 compress_type=compress_type,
259 )
260 # get the number of characters in the desired string
261 s_num = int(str_fmt[1 : str_fmt.find(".")])
263 # convert electric fields into mV/km
264 if convert_electrics:
265 self.convert_electrics()
266 self.logger.debug(f"==> {save_fn}")
267 self.logger.debug("START --> {time.ctime()}")
268 st = datetime.datetime.now()
270 # write meta data first
271 # sort channel information same as columns
272 meta_lines = self.write_metadata(
273 chn_list=[c.capitalize() for c in self.ts.columns]
274 )
275 if compress == True and compress_type == "gzip":
276 # gzip expects bytes; encode strings before writing
277 with gzip.open(save_fn, "wb") as fid:
278 h_line = [
279 "".join(
280 [
281 "{0:>{1}}".format(c.capitalize(), s_num)
282 for c in self.ts.columns
283 ]
284 )
285 ]
286 header_bytes = ("\n".join(meta_lines + h_line) + "\n").encode("utf-8")
287 fid.write(header_bytes)
289 # write out data
290 if full is False:
291 out = np.array(self.ts[0:chunk_size])
292 out[np.where(out == 0)] = float(self.MissingDataFlag)
293 out = np.char.mod(str_fmt, out)
294 lines = "\n".join(
295 ["".join(out[ii, :]) for ii in range(out.shape[0])]
296 )
297 fid.write((lines + "\n").encode("utf-8"))
298 self.logger.debug(f"END --> {time.ctime()}")
299 et = datetime.datetime.now()
300 write_time = et - st
301 self.logger.debug(
302 f"Writing took: {write_time.total_seconds()} seconds"
303 )
304 return
305 for chunk in range(int(self.ts.shape[0] / chunk_size)):
306 out = np.array(
307 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size]
308 )
309 out[np.where(out == 0)] = float(self.MissingDataFlag)
310 out = np.char.mod(str_fmt, out)
311 lines = "\n".join(
312 ["".join(out[ii, :]) for ii in range(out.shape[0])]
313 )
314 fid.write((lines + "\n").encode("utf-8"))
315 else:
316 if compress == True and compress_type == "zip":
317 # Create a temporary .asc file, then zip it into the final
318 # .zip archive. `save_fn` is expected to be a Path ending with
319 # ".zip" (from _make_file_name).
320 import zipfile
322 zip_path = Path(save_fn)
323 # inner asc filename is the zip name without the .zip suffix
324 asc_name = zip_path.name[:-4]
325 asc_temp = zip_path.parent / asc_name
327 # write the ascii file
328 need_zip = False
329 with open(asc_temp, "w", encoding="utf-8") as fid:
330 h_line = [
331 "".join(
332 [
333 "{0:>{1}}".format(c.capitalize(), s_num)
334 for c in self.ts.columns
335 ]
336 )
337 ]
338 fid.write("\n".join(meta_lines + h_line) + "\n")
340 # write out data
341 if full is False:
342 out = np.array(self.ts[0:chunk_size])
343 out[np.where(out == 0)] = float(self.MissingDataFlag)
344 out = np.char.mod(str_fmt, out)
345 lines = "\n".join(
346 ["".join(out[ii, :]) for ii in range(out.shape[0])]
347 )
348 fid.write(lines + "\n")
349 self.logger.debug(f"END --> {time.ctime()}")
350 et = datetime.datetime.now()
351 write_time = et - st
352 self.logger.debug(
353 f"Writing took: {write_time.total_seconds()} seconds"
354 )
355 need_zip = True
356 else:
357 for chunk in range(int(self.ts.shape[0] / chunk_size)):
358 out = np.array(
359 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size]
360 )
361 out[np.where(out == 0)] = float(self.MissingDataFlag)
362 out = np.char.mod(str_fmt, out)
363 lines = "\n".join(
364 ["".join(out[ii, :]) for ii in range(out.shape[0])]
365 )
366 fid.write(lines + "\n")
368 # create zip and clean up (only if we need it)
369 if need_zip:
370 try:
371 with zipfile.ZipFile(
372 zip_path, "w", compression=zipfile.ZIP_DEFLATED
373 ) as zf:
374 zf.write(asc_temp, arcname=asc_temp.name)
375 finally:
376 try:
377 asc_temp.unlink(missing_ok=True)
378 except Exception:
379 pass
380 return
382 # default uncompressed write
383 with open(save_fn, "w", encoding="utf-8") as fid:
384 h_line = [
385 "".join(
386 [
387 "{0:>{1}}".format(c.capitalize(), s_num)
388 for c in self.ts.columns
389 ]
390 )
391 ]
392 fid.write("\n".join(meta_lines + h_line) + "\n")
394 # write out data
395 if full is False:
396 out = np.array(self.ts[0:chunk_size])
397 out[np.where(out == 0)] = float(self.MissingDataFlag)
398 out = np.char.mod(str_fmt, out)
399 lines = "\n".join(
400 ["".join(out[ii, :]) for ii in range(out.shape[0])]
401 )
402 fid.write(lines + "\n")
403 self.logger.debug(f"END --> {time.ctime()}")
404 et = datetime.datetime.now()
405 write_time = et - st
406 self.logger.debug(
407 f"Writing took: {write_time.total_seconds()} seconds"
408 )
409 return
410 for chunk in range(int(self.ts.shape[0] / chunk_size)):
411 out = np.array(
412 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size]
413 )
414 out[np.where(out == 0)] = float(self.MissingDataFlag)
415 out = np.char.mod(str_fmt, out)
416 lines = "\n".join(
417 ["".join(out[ii, :]) for ii in range(out.shape[0])]
418 )
419 fid.write(lines + "\n")
420 # for some fucking reason, all interal variables don't exist anymore
421 # and if you try to do the zipping nothing happens, so have to do
422 # it externally. WTF
423 self.logger.debug(f"END --> {time.ctime()}")
424 et = datetime.datetime.now()
425 write_time = et - st
426 self.logger.debug("Writing took: {write_time.total_seconds()} seconds")
429def read_ascii(fn):
430 """
431 read USGS ASCII formatted file
433 :param fn: DESCRIPTION
434 :type fn: TYPE
435 :return: DESCRIPTION
436 :rtype: TYPE
438 """
440 asc_obj = USGSascii(fn)
441 asc_obj.read_ascii_file()
443 return asc_obj.to_run_ts()