Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ metronix \ metronix_atss.py: 99%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:01 -0800
1# -*- coding: utf-8 -*-
2"""
3ATSS (Audio Time Series System) file reader for Metronix data.
5This module provides functionality to read and process Metronix ATSS binary
6time series files and their associated JSON metadata files. ATSS files contain
7double precision floating point time series data equivalent to numpy arrays
8of type np.float64.
10The ATSS format consists of two files:
11- .atss file: Binary time series data (np.float64 values)
12- .json file: Metadata in JSON format
14This implementation is translated from:
15https://github.com/bfrmtx/MTHotel/blob/main/python/include/atss_file.py
17Classes
18-------
19ATSS : MetronixFileNameMetadata
20 Main class for reading ATSS files and converting to ChannelTS objects.
22Functions
23---------
24read_atss : function
25 Convenience function to read ATSS file and return ChannelTS object.
27Notes
28-----
29ATSS files store time series data as consecutive double precision floating
30point numbers in binary format, making them efficient for large datasets.
32Examples
33--------
34>>> from mth5.io.metronix.metronix_atss import ATSS, read_atss
35>>>
36>>> # Using the ATSS class directly
37>>> atss = ATSS('data/station001.atss')
38>>> data = atss.read_atss()
39>>> channel_ts = atss.to_channel_ts()
40>>>
41>>> # Using the convenience function
42>>> channel_ts = read_atss('data/station001.atss')
44Author
45------
46jpeacock
48Created
49-------
50Tue Nov 26 15:54:12 2024
51"""
53# =============================================================================
54# Imports
55# =============================================================================
56from __future__ import annotations
58from pathlib import Path
59from typing import Any
61import numpy as np
62from loguru import logger
63from mt_metadata.timeseries import Run, Station, Survey
64from mt_metadata.timeseries.auxiliary import Auxiliary
65from mt_metadata.timeseries.electric import Electric
66from mt_metadata.timeseries.filters import ChannelResponse
67from mt_metadata.timeseries.magnetic import Magnetic
69from mth5.io.metronix import MetronixChannelJSON, MetronixFileNameMetadata
70from mth5.timeseries import ChannelTS
73# =============================================================================
76class ATSS(MetronixFileNameMetadata):
77 """
78 ATSS (Audio Time Series System) file reader for Metronix data.
80 Handles reading and processing of Metronix ATSS binary time series files
81 and their associated JSON metadata files. ATSS files contain double precision
82 floating point time series data equivalent to numpy arrays of type np.float64.
84 Parameters
85 ----------
86 fn : str or Path, optional
87 Path to the ATSS file. If provided, metadata will be automatically
88 loaded if the corresponding JSON file exists.
89 **kwargs
90 Additional keyword arguments passed to parent class.
92 Attributes
93 ----------
94 header : MetronixChannelJSON
95 Metadata handler for the associated JSON file.
97 Notes
98 -----
99 ATSS files come in pairs:
100 - .atss file: Binary time series data (np.float64)
101 - .json file: Metadata in JSON format
103 Examples
104 --------
105 >>> atss = ATSS('data/station001_run001_ch001.atss')
106 >>> data = atss.read_atss()
107 >>> channel_ts = atss.to_channel_ts()
108 """
110 def __init__(self, fn: str | Path | None = None, **kwargs: Any) -> None:
111 super().__init__(fn=fn, **kwargs)
113 self.header = MetronixChannelJSON()
114 if self.has_metadata_file():
115 self.header.read(self.metadata_fn)
117 @property
118 def metadata_fn(self) -> Path | None:
119 """
120 Path to the metadata JSON file.
122 Returns the path to the JSON metadata file that corresponds to this
123 ATSS file. The JSON file has the same base name as the ATSS file
124 but with a .json extension.
126 Returns
127 -------
128 Path or None
129 Path to the JSON metadata file, or None if no ATSS file is set.
131 Examples
132 --------
133 >>> atss = ATSS('data/station001.atss')
134 >>> atss.metadata_fn
135 PosixPath('data/station001.json')
136 """
137 if self.fn is not None:
138 return self.fn.parent.joinpath(f"{self.fn.stem}.json")
140 def has_metadata_file(self) -> bool:
141 """
142 Check if metadata JSON file exists.
144 Returns
145 -------
146 bool
147 True if the metadata JSON file exists, False otherwise.
149 Examples
150 --------
151 >>> atss = ATSS('data/station001.atss')
152 >>> atss.has_metadata_file()
153 True
154 """
155 if self.fn is not None:
156 return self.metadata_fn.exists()
157 return False
159 def read_atss(
160 self, fn: str | Path | None = None, start: int = 0, stop: int = 0
161 ) -> np.ndarray:
162 """
163 Read binary ATSS time series data.
165 Reads double precision floating point time series data from the ATSS
166 binary file. Data is stored as consecutive np.float64 values.
168 Parameters
169 ----------
170 fn : str or Path, optional
171 Path to ATSS file. If None, uses the current file path.
172 start : int, default 0
173 Starting sample index (0-based).
174 stop : int, default 0
175 Ending sample index. If 0, reads to end of file.
177 Returns
178 -------
179 np.ndarray
180 Time series data as 1D array of np.float64 values.
182 Raises
183 ------
184 ValueError
185 If stop index exceeds the number of samples in the file.
187 Examples
188 --------
189 >>> atss = ATSS('data/station001.atss')
190 >>> data = atss.read_atss() # Read entire file
191 >>> data_slice = atss.read_atss(start=1000, stop=2000) # Read subset
192 """
193 if fn is not None:
194 self.fn = fn
196 if stop > self.n_samples:
197 raise ValueError(f"stop {stop} > samples {self.n_samples}")
198 with open(self.fn, "rb") as fid:
199 # Read the binary data
200 fid.seek(start * 8)
201 # read in full file
202 if stop == 0:
203 data_bytes = fid.read() # complete file
204 else:
205 data_bytes = fid.read(stop * 8)
206 # Convert the data to a numpy array
207 data_array = np.frombuffer(data_bytes, dtype=np.float64)
209 return data_array
211 def write_atss(self, data_array: np.ndarray, filename: str | Path) -> None:
212 """
213 Write time series data to ATSS binary file.
215 Writes numpy array data as double precision floating point values
216 to a binary ATSS file.
218 Parameters
219 ----------
220 data_array : np.ndarray
221 Time series data to write. Will be converted to np.float64.
222 filename : str or Path
223 Output file path for the ATSS binary file.
225 Examples
226 --------
227 >>> import numpy as np
228 >>> atss = ATSS()
229 >>> data = np.random.randn(10000)
230 >>> atss.write_atss(data, 'output.atss')
231 """
232 with open(filename, "wb") as fid:
233 data_bytes = data_array.tobytes()
234 fid.write(data_bytes)
236 @property
237 def channel_metadata(self) -> Electric | Magnetic | Auxiliary:
238 """
239 Channel metadata from the JSON header file.
241 Returns
242 -------
243 Electric or Magnetic or Auxiliary
244 Channel metadata object based on the channel type.
245 """
246 return self.header.get_channel_metadata()
248 @property
249 def channel_response(self) -> ChannelResponse:
250 """
251 Channel response information from the JSON header file.
253 Returns
254 -------
255 ChannelResponse
256 Channel response/calibration information.
257 """
258 return self.header.get_channel_response()
260 @property
261 def channel_type(self) -> str:
262 """
263 Determine channel type from component name.
265 Channel type is determined from the component identifier in the filename:
266 - Components starting with 'e': electric
267 - Components starting with 'h': magnetic
268 - All others: auxiliary
270 Returns
271 -------
272 str
273 Channel type: 'electric', 'magnetic', or 'auxiliary'.
274 """
275 if self.fn_exists:
276 if self.component.startswith("e"):
277 return "electric"
278 elif self.component.startswith("h"):
279 return "magnetic"
280 else:
281 return "auxiliary"
283 @property
284 def run_id(self) -> str | None:
285 """
286 Extract run ID from file path.
288 Expects file path structure: .../station/run/timeseries.atss
289 The run ID is extracted from the parent directory name.
291 Returns
292 -------
293 str or None
294 Run identifier, or None if file doesn't exist.
295 """
296 if self.fn.exists:
297 return self.fn.parent.name
299 @property
300 def station_id(self) -> str | None:
301 """
302 Extract station ID from file path.
304 Expects file path structure: .../station/run/timeseries.atss
305 The station ID is extracted from the grandparent directory name.
307 Returns
308 -------
309 str or None
310 Station identifier, or None if file doesn't exist.
311 """
312 if self.fn.exists:
313 return self.fn.parent.parent.name
315 @property
316 def survey_id(self) -> str | None:
317 """
318 Extract survey ID from file path.
320 Expects file path structure: .../survey/stations/station/run/timeseries.atss
321 The survey ID is extracted from the great-great-grandparent directory name.
323 Returns
324 -------
325 str or None
326 Survey identifier, or None if file doesn't exist.
327 """
328 if self.fn.exists:
329 return self.fn.parent.parent.parent.parent.name
331 @property
332 def run_metadata(self) -> Run:
333 """
334 Generate run-level metadata.
336 Creates a Run metadata object populated with information from the
337 ATSS file and its associated JSON metadata.
339 Returns
340 -------
341 Run
342 Run metadata object with data logger info, sample rate,
343 and channel metadata.
344 """
345 run = Run(id=self.run_id)
346 run.data_logger.id = self.system_number
347 run.data_logger.manufacturer = "Metronix Geophysics"
348 run.data_logger.model = self.system_name
349 run.sample_rate = self.sample_rate
350 run.add_channel(self.channel_metadata)
351 run.update_time_period()
352 return run
354 @property
355 def station_metadata(self) -> Station:
356 """
357 Generate station-level metadata.
359 Creates a Station metadata object populated with location information
360 from the JSON metadata and run information.
362 Returns
363 -------
364 Station
365 Station metadata object with location coordinates and run metadata.
366 """
367 station = Station(id=self.station_id)
368 station.location.latitude = self.header.metadata.latitude
369 station.location.longitude = self.header.metadata.longitude
370 station.location.elevation = self.header.metadata.elevation
371 station.add_run(self.run_metadata)
372 station.update_time_period()
373 return station
375 @property
376 def survey_metadata(self) -> Survey:
377 """
378 Generate survey-level metadata.
380 Creates a Survey metadata object that includes station metadata
381 and overall time period information.
383 Returns
384 -------
385 Survey
386 Survey metadata object containing station information.
387 """
388 survey = Survey(id=self.survey_id)
389 survey.add_station(self.station_metadata)
390 survey.update_time_period()
391 return survey
393 def to_channel_ts(self, fn: str | Path | None = None) -> ChannelTS:
394 """
395 Create a ChannelTS object from ATSS data.
397 Converts the ATSS time series data and metadata into a ChannelTS
398 object suitable for use with MTH5 workflows.
400 Parameters
401 ----------
402 fn : str or Path, optional
403 Path to ATSS file. If None, uses current file path.
405 Returns
406 -------
407 ChannelTS
408 Time series object with data, metadata, and response information.
410 Warnings
411 --------
412 Can be slow due to pandas datetime index creation for large datasets.
413 A warning is logged if the metadata JSON file is missing.
415 Examples
416 --------
417 >>> atss = ATSS('data/station001.atss')
418 >>> channel_ts = atss.to_channel_ts()
419 >>> print(channel_ts.sample_rate)
420 1024.0
421 """
422 if not self.has_metadata_file():
423 logger.warning(
424 f"Could not find Metronix metadata JSON file for {self.fn.name}."
425 )
427 return ChannelTS(
428 channel_type=self.channel_type,
429 data=self.read_atss(),
430 channel_metadata=self.channel_metadata,
431 channel_response=self.channel_response,
432 run_metadata=self.run_metadata,
433 station_metadata=self.station_metadata,
434 survey_metadata=self.survey_metadata,
435 )
438def read_atss(
439 fn: str | Path,
440 calibration_fn: str | Path | None = None,
441 logger_file_handler: Any = None,
442) -> ChannelTS:
443 """
444 Generic tool to read ATSS file and return ChannelTS object.
446 Convenience function that creates an ATSS object and converts it
447 to a ChannelTS in a single call.
449 Parameters
450 ----------
451 fn : str or Path
452 Path to the ATSS file to read.
453 calibration_fn : str or Path, optional
454 Path to calibration file (currently unused).
455 logger_file_handler : Any, optional
456 Logger file handler (currently unused).
458 Returns
459 -------
460 ChannelTS
461 Time series object with data and metadata from the ATSS file.
463 Examples
464 --------
465 >>> channel_ts = read_atss('data/station001.atss')
466 >>> print(f"Loaded {len(channel_ts.ts)} samples")
467 """
468 atss_obj = ATSS(fn)
469 return atss_obj.to_channel_ts()
472# ##################################################################################################################
475# def stop_date_time(file_name):
476# nsamples = samples(file_name)
477# # get the sample_rate from the file name
478# channel = read_header(file_name)
479# # get the sample rate, the read_header function returns ensures that the sample rate is in Hz
480# sample_rate = channel["sample_rate"]
481# # get the start date time ISO 8601 like "1970-01-01T00:00:00.0"
482# start_date_time = channel["datetime"]
483# # calculate the stop date time
484# stop_date_time = np.datetime64(start_date_time) + np.timedelta64(
485# int(nsamples / sample_rate), "s"
486# )
487# return stop_date_time
490# def duration(file_name):
491# # get the start date time ISO 8601 like "1970-01-01T00:00:00.0"
492# start_date_time = read_header(file_name)["datetime"]
493# # get the stop date time ISO 8601 like "1970-01-01T00:00:00.0"
494# stop_date_time = stop_date_time(file_name)
495# # calculate the duration
496# duration = stop_date_time - np.datetime64(start_date_time)
497# # return the duration in HH:MM:SS
498# return str(duration)
501# # check if atss and json exist, and return the samples
502# def exits_both(file_name):
503# sfile_name = atss_basename(file_name) + ".json"
504# # if not exist, terminate with FileNotFoundError
505# if not os.path.exists(sfile_name):
506# raise FileNotFoundError(f"File {sfile_name} not found")
507# #
508# sfile_name = atss_basename(file_name) + ".atss"
509# # if not exist, terminate with FileNotFoundError
510# if not os.path.exists(sfile_name):
511# raise FileNotFoundError(f"File {sfile_name} not found")
512# # if both exist, return the amount samples
513# return samples(file_name)
516# def cal_mfs_06e(spc, file_name, wl):
517# # the calibration data for the MFS-06e sensor
518# # the spc is the complex spectrum, calculated by the fft "backward" function
519# # file_name is the file name of the channel, we take the sample rate from the header
520# #
521# # get the channel from the file
522# if not os.path.exists(file_name + ".json"):
523# raise FileNotFoundError(f"File {file_name}.json not found")
524# channel = read_header(file_name)
525# fs = channel["sample_rate"]
526# chopper = channel["sensor_calibration"]["chopper"]
527# if chopper == 1:
528# # calculate the frequency for each bin
529# for i, x in enumerate(spc):
530# if i == 0:
531# continue
532# f = i * fs / wl
533# p1 = complex(0.0, (f / 4.0))
534# p2 = complex(0.0, (f / 8192.0))
535# p4 = complex(0.0, (f / 28300.0))
536# trf = 800.0 * (
537# (p1 / (1.0 + p1)) * (1.0 / (1.0 + p2)) * (1.0 / (1.0 + p4))
538# )
539# spc[i] = spc[i] / trf
540# else:
541# # calculate the frequency for each bin
542# for i in enumerate(spc):
543# if i == 0:
544# continue
545# f = i * fs / wl
546# p1 = complex(0.0, (f / 4.0))
547# p2 = complex(0.0, (f / 8192.0))
548# p3 = complex(0.0, (f / 0.720))
549# p4 = complex(0.0, (f / 28300.0))
550# trf = 800.0 * (
551# (p1 / (1.0 + p1))
552# * (1.0 / (1.0 + p2))
553# * (p3 / (1.0 + p3))
554# * (1.0 / (1.0 + p4))
555# )
556# spc[i] = spc[i] / trf