Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ zen.py: 85%
488 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2"""
3====================
4Zen
5====================
7 * Tools for reading and writing files for Zen and processing software
8 * Tools for copying data from SD cards
9 * Tools for copying schedules to SD cards
11Created on Tue Jun 11 10:53:23 2013
12Updated August 2020 (JP)
14:copyright:
15 Jared Peacock (jpeacock@usgs.gov)
17:license:
18 MIT
20"""
22# ==============================================================================
24from __future__ import annotations
26import datetime
27import struct
28from pathlib import Path
29from typing import Any, BinaryIO
31import numpy as np
32from loguru import logger
33from mt_metadata.common.mttime import MTime
34from mt_metadata.timeseries import AppliedFilter, Electric, Magnetic, Run, Station
35from mt_metadata.timeseries.filters import (
36 ChannelResponse,
37 CoefficientFilter,
38 FrequencyResponseTableFilter,
39)
41from mth5.io.zen import Z3DHeader, Z3DMetadata, Z3DSchedule
42from mth5.io.zen.coil_response import CoilResponse
43from mth5.timeseries import ChannelTS
46# ==============================================================================
47#
48# ==============================================================================
49class Z3D:
50 """
51 A class for reading and processing Z3D files output by Zen data loggers.
53 This class handles the parsing of Z3D binary files which contain GPS-stamped
54 time series data from magnetotelluric measurements. It provides methods for
55 reading file headers, metadata, schedule information, and time series data,
56 as well as converting between different units and formats.
58 Parameters
59 ----------
60 fn : str or Path, optional
61 Full path to the .Z3D file to be read. Default is None.
62 **kwargs : dict
63 Additional keyword arguments including:
64 - stamp_len : int, default 64
65 GPS stamp length in bits
67 Attributes
68 ----------
69 fn : Path or None
70 Path to the Z3D file
71 calibration_fn : str or None
72 Path to calibration file
73 header : Z3DHeader
74 Header information object
75 schedule : Z3DSchedule
76 Schedule information object
77 metadata : Z3DMetadata
78 Metadata information object
79 gps_stamps : numpy.ndarray or None
80 Array of GPS time stamps
81 time_series : numpy.ndarray or None
82 Time series data array
83 sample_rate : float or None
84 Data sampling rate in Hz
85 units : str
86 Data units, default 'counts'
88 Notes
89 -----
90 GPS data type is formatted as::
92 numpy.dtype([('flag0', numpy.int32),
93 ('flag1', numpy.int32),
94 ('time', numpy.int32),
95 ('lat', numpy.float64),
96 ('lon', numpy.float64),
97 ('num_sat', numpy.int32),
98 ('gps_sens', numpy.int32),
99 ('temperature', numpy.float32),
100 ('voltage', numpy.float32),
101 ('num_fpga', numpy.int32),
102 ('num_adc', numpy.int32),
103 ('pps_count', numpy.int32),
104 ('dac_tune', numpy.int32),
105 ('block_len', numpy.int32)])
107 Examples
108 --------
109 >>> from mth5.io.zen import Z3D
110 >>> z3d = Z3D(r"/path/to/data/station_20150522_080000_256_EX.Z3D")
111 >>> z3d.read_z3d()
112 >>> print(f"Found {z3d.gps_stamps.shape[0]} GPS time stamps")
113 >>> print(f"Found {z3d.time_series.size} data points")
114 """
116 def __init__(self, fn: str | Path | None = None, **kwargs: Any) -> None:
117 """
118 Initialize Z3D file reader object.
120 Parameters
121 ----------
122 fn : str or Path, optional
123 Full path to the Z3D file to be processed, by default None
124 **kwargs : dict
125 Additional keyword arguments:
126 - stamp_len : int, default 64
127 GPS stamp length in bits
129 Examples
130 --------
131 >>> z3d = Z3D("/path/to/file.Z3D")
132 >>> z3d.read_z3d()
133 >>> print(z3d.station)
134 """
135 self.logger = logger
136 self.fn = fn
137 self.calibration_fn = None
139 self.header = Z3DHeader(fn)
140 self.schedule = Z3DSchedule(fn)
141 self.metadata = Z3DMetadata(fn)
143 self._gps_stamp_length = kwargs.pop("stamp_len", 64)
144 self._gps_bytes = self._gps_stamp_length / 4
146 self.gps_stamps = None
148 self._gps_flag_0 = np.int32(2147483647)
149 self._gps_flag_1 = np.int32(-2147483648)
150 self._gps_f0 = self._gps_flag_0.tobytes()
151 self._gps_f1 = self._gps_flag_1.tobytes()
152 self.gps_flag = self._gps_f0 + self._gps_f1
154 self._gps_dtype = np.dtype(
155 [
156 ("flag0", np.int32),
157 ("flag1", np.int32),
158 ("time", np.int32),
159 ("lat", np.float64),
160 ("lon", np.float64),
161 ("gps_sens", np.int32),
162 ("num_sat", np.int32),
163 ("temperature", np.float32),
164 ("voltage", np.float32),
165 ("num_fpga", np.int32),
166 ("num_adc", np.int32),
167 ("pps_count", np.int32),
168 ("dac_tune", np.int32),
169 ("block_len", np.int32),
170 ]
171 )
173 self._week_len = 604800
174 # '1980, 1, 6, 0, 0, 0, -1, -1, 0
175 self._gps_epoch = MTime(time_stamp="1980-01-06T00:00:00")
176 self._leap_seconds = 18
177 self._block_len = 2**16
178 # the number in the cac files is for volts, we want mV
179 self._counts_to_mv_conversion = 9.5367431640625e-10 * 1e3
180 self.num_sec_to_skip = 1
182 self.units = "digital counts"
183 self.sample_rate = None
184 self.time_series = None
185 self._max_time_diff = 20
187 self.ch_dict = {"hx": 1, "hy": 2, "hz": 3, "ex": 4, "ey": 5}
189 for key, value in kwargs.items():
190 setattr(self, key, value)
192 @property
193 def fn(self) -> Path | None:
194 """
195 Get the Z3D file path.
197 Returns
198 -------
199 Path or None
200 Path to the Z3D file, or None if not set.
201 """
202 return self._fn
204 @fn.setter
205 def fn(self, fn: str | Path | None) -> None:
206 """
207 Set the Z3D file path.
209 Parameters
210 ----------
211 fn : str, Path, or None
212 Path to the Z3D file to set.
213 """
214 if fn is not None:
215 self._fn = Path(fn)
216 else:
217 self._fn = None
219 @property
220 def file_size(self) -> int:
221 """
222 Get the size of the Z3D file in bytes.
224 Returns
225 -------
226 int
227 File size in bytes, or 0 if no file is set.
228 """
229 if self.fn is not None:
230 return self.fn.stat().st_size
231 return 0
233 @property
234 def n_samples(self) -> int:
235 """
236 Get the number of data samples in the file.
238 Returns
239 -------
240 int
241 Number of data samples. Calculated from file size if time_series
242 is not loaded, otherwise returns the actual array size.
244 Notes
245 -----
246 Calculation assumes 4 bytes per sample and accounts for metadata blocks.
247 If sample_rate is available, adds buffer for GPS stamps.
248 """
249 if self.time_series is None:
250 if self.sample_rate:
251 return int(
252 (self.file_size - 512 * (1 + self.metadata.count)) / 4
253 + 8 * self.sample_rate
254 )
255 else:
256 # assume just the 3 general metadata blocks
257 return int((self.file_size - 512 * 3) / 4)
258 else:
259 return self.time_series.size
261 @property
262 def station(self) -> str | None:
263 """
264 Get the station name.
266 Returns
267 -------
268 str or None
269 Station identifier name.
270 """
271 return self.metadata.station
273 @station.setter
274 def station(self, station: str) -> None:
275 """
276 Set the station name.
278 Parameters
279 ----------
280 station : str
281 Station identifier name to set.
282 """
283 self.metadata.station = station
285 @property
286 def dipole_length(self) -> float:
287 """
288 Get the dipole length for electric field measurements.
290 Returns
291 -------
292 float
293 Dipole length in meters. Calculated from electrode positions
294 if not directly specified in metadata. Returns 0 for magnetic
295 channels or if positions are not available.
297 Notes
298 -----
299 Length is calculated from xyz coordinates using Euclidean distance
300 formula when position data is available in metadata.
301 """
302 length = 0
303 if self.metadata.ch_length is not None:
304 length = float(self.metadata.ch_length)
305 elif hasattr(self.metadata, "ch_offset_xyz1"):
306 # only ex and ey have xyz2
307 if hasattr(self.metadata, "ch_offset_xyz2"):
308 x1, y1, z1 = [
309 float(offset) for offset in self.metadata.ch_offset_xyz1.split(":")
310 ]
311 x2, y2, z2 = [
312 float(offset) for offset in self.metadata.ch_offset_xyz2.split(":")
313 ]
314 length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2)
315 length = np.round(length, 2)
316 else:
317 length = 0
318 elif self.metadata.ch_xyz1 is not None:
319 x1, y1 = [float(d) for d in self.metadata.ch_xyz1.split(":")]
320 x2, y2 = [float(d) for d in self.metadata.ch_xyz2.split(":")]
321 length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) * 100.0
322 length = np.round(length, 2)
323 return length
325 @property
326 def azimuth(self) -> float | None:
327 """
328 Get the azimuth of instrument setup.
330 Returns
331 -------
332 float or None
333 Azimuth angle in degrees from north, or None if not available.
334 """
335 if self.metadata.ch_azimuth is not None:
336 return float(self.metadata.ch_azimuth)
337 elif self.metadata.rx_xazimuth is not None:
338 return float(self.metadata.rx_xazimuth)
339 else:
340 return None
342 @property
343 def component(self) -> str:
344 """
345 Get the channel component identifier.
347 Returns
348 -------
349 str
350 Channel component name in lowercase (e.g., 'ex', 'hy', 'hz').
351 """
352 return self.metadata.ch_cmp.lower()
354 @property
355 def latitude(self) -> float | None:
356 """
357 Get the latitude in decimal degrees.
359 Returns
360 -------
361 float or None
362 Latitude coordinate in decimal degrees, or None if not available.
363 """
364 return self.header.lat
366 @property
367 def longitude(self) -> float | None:
368 """
369 Get the longitude in decimal degrees.
371 Returns
372 -------
373 float or None
374 Longitude coordinate in decimal degrees, or None if not available.
375 """
376 return self.header.long
378 @property
379 def elevation(self) -> float | None:
380 """
381 Get the elevation in meters.
383 Returns
384 -------
385 float or None
386 Elevation above sea level in meters, or None if not available.
387 """
388 return self.header.alt
390 @property
391 def sample_rate(self) -> float | None:
392 """
393 Get the sampling rate in Hz.
395 Returns
396 -------
397 float or None
398 Data sampling rate in samples per second, or None if not available.
399 """
400 return self.header.ad_rate
402 @sample_rate.setter
403 def sample_rate(self, sampling_rate: float | None) -> None:
404 """
405 Set the sampling rate.
407 Parameters
408 ----------
409 sampling_rate : float or None
410 Sampling rate in Hz to set.
411 """
412 if sampling_rate is not None:
413 self.header.ad_rate = float(sampling_rate)
415 @property
416 def start(self) -> MTime:
417 """
418 Get the start time of the data.
420 Returns
421 -------
422 MTime
423 Start time from GPS stamps if available, otherwise scheduled time.
424 """
425 if self.gps_stamps is not None:
426 return self.get_UTC_date_time(
427 self.header.gpsweek, float(self.gps_stamps["time"][0])
428 )
429 return self.zen_schedule
431 @property
432 def end(self) -> MTime | float:
433 """
434 Get the end time of the data.
436 Returns
437 -------
438 MTime or float
439 End time from GPS stamps if available, otherwise calculated
440 from start time and number of samples.
441 """
442 if self.gps_stamps is not None:
443 return self.get_UTC_date_time(
444 self.header.gpsweek, float(self.gps_stamps["time"][-1])
445 )
446 return self.start + (self.n_samples / self.sample_rate)
448 @property
449 def zen_schedule(self) -> MTime:
450 """
451 Get the zen schedule date and time.
453 Returns
454 -------
455 MTime
456 Scheduled start time from header or schedule object.
457 """
458 if self.header.old_version is True:
459 return MTime(time_stamp=self.header.schedule)
460 return self.schedule.initial_start
462 @zen_schedule.setter
463 def zen_schedule(self, schedule_dt: MTime | str | datetime.datetime) -> None:
464 """
465 Set the zen schedule datetime.
467 Parameters
468 ----------
469 schedule_dt : MTime, str, or datetime.datetime
470 Schedule datetime to set.
472 Raises
473 ------
474 TypeError
475 If schedule_dt is not a valid time type.
476 """
477 if not isinstance(schedule_dt, MTime):
478 schedule_dt = MTime(time_stamp=schedule_dt)
479 raise TypeError("New schedule datetime must be type datetime.datetime")
480 self.schedule.initial_start = schedule_dt
482 @property
483 def coil_number(self) -> str | None:
484 """
485 Get the coil number identifier.
487 Returns
488 -------
489 str or None
490 Coil antenna number identifier, or None if not available.
491 """
492 if self.metadata.cal_ant is not None:
493 return self.metadata.cal_ant
494 elif self.metadata.ch_number is not None:
495 return self.metadata.ch_number
496 else:
497 return None
499 @property
500 def channel_number(self) -> int:
501 """
502 Get the channel number.
504 Returns
505 -------
506 int
507 Channel number identifier. Maps component names to standard
508 channel numbers or uses metadata channel number.
509 """
510 if self.metadata.ch_number:
511 ch_num = int(float(self.metadata.ch_number))
512 if ch_num > 6:
513 try:
514 ch_num = self.ch_dict[self.component]
515 except KeyError:
516 ch_num = 6
517 return ch_num
518 else:
519 try:
520 return self.ch_dict[self.component]
521 except KeyError:
522 return 0
524 @property
525 def channel_metadata(self):
526 """
527 Generate Channel metadata object from Z3D file information.
529 Creates either an Electric or Magnetic metadata object based on the
530 component type, populated with channel-specific parameters, sensor
531 information, and data statistics.
533 Returns
534 -------
535 Electric or Magnetic
536 Channel metadata object appropriate for the measurement type:
537 - Electric: includes dipole length, AC/DC statistics
538 - Magnetic: includes sensor details, field min/max values
540 Notes
541 -----
542 Electric channels (ex, ey) get dipole length and voltage statistics.
543 Magnetic channels (hx, hy, hz) get sensor information and field
544 strength statistics computed from the first and last seconds of data.
546 Examples
547 --------
548 >>> z3d = Z3D("/path/to/file.Z3D")
549 >>> z3d.read_z3d()
550 >>> ch_meta = z3d.channel_metadata
551 >>> print(f"Channel component: {ch_meta.component}")
552 """
554 # fill the time series object
555 if "e" in self.component:
556 ch = Electric()
557 ch.dipole_length = self.dipole_length
558 ch.ac.start = (
559 self.time_series[0 : int(self.sample_rate)].std()
560 * self.header.ch_factor
561 )
562 ch.ac.end = (
563 self.time_series[-int(self.sample_rate) :].std() * self.header.ch_factor
564 )
565 ch.dc.start = (
566 self.time_series[0 : int(self.sample_rate)].mean()
567 * self.header.ch_factor
568 )
569 ch.dc.end = (
570 self.time_series[-int(self.sample_rate) :].mean()
571 * self.header.ch_factor
572 )
573 elif "h" in self.component:
574 ch = Magnetic()
575 ch.sensor.id = self.coil_number
576 ch.sensor.manufacturer = "Geotell"
577 ch.sensor.model = "ANT-4"
578 ch.sensor.type = "induction coil"
579 ch.h_field_max.start = (
580 self.time_series[0 : int(self.sample_rate)].max()
581 * self.header.ch_factor
582 )
583 ch.h_field_max.end = (
584 self.time_series[-int(self.sample_rate) :].max() * self.header.ch_factor
585 )
586 ch.h_field_min.start = (
587 self.time_series[0 : int(self.sample_rate)].min()
588 * self.header.ch_factor
589 )
590 ch.h_field_min.end = (
591 self.time_series[-int(self.sample_rate) :].min() * self.header.ch_factor
592 )
593 ch.time_period.start = self.start.isoformat()
594 ch.time_period.end = self.end.isoformat()
595 ch.component = self.component
596 ch.sample_rate = self.sample_rate
597 ch.measurement_azimuth = self.azimuth
598 if ch.component in ["ey", "e2"] and self.azimuth == 0:
599 ch.measurement_azimuth = 90
600 ch.units = "digital counts"
601 ch.channel_number = self.channel_number
602 for count, f in enumerate(self.channel_response.filters_list, 1):
603 ch.add_filter(AppliedFilter(name=f.name, applied=True, stage=count))
604 return ch
606 @property
607 def station_metadata(self):
608 """
609 Generate Station metadata object from Z3D file information.
611 Creates a Station metadata object populated with location and timing
612 information extracted from the Z3D file header and metadata.
614 Returns
615 -------
616 Station
617 Station metadata object with populated fields including station ID,
618 coordinates, elevation, time period, and operator information.
620 Examples
621 --------
622 >>> z3d = Z3D("/path/to/file.Z3D")
623 >>> z3d.read_all_info()
624 >>> station_meta = z3d.station_metadata
625 >>> print(station_meta.id)
626 """
628 sm = Station()
629 sm.id = self.station
630 sm.fdsn.id = self.station
631 sm.location.latitude = self.latitude
632 sm.location.longitude = self.longitude
633 sm.location.elevation = self.elevation
634 sm.time_period.start = self.start.isoformat()
635 sm.time_period.end = self.end.isoformat()
636 sm.acquired_by.author = self.metadata.gdp_operator
638 return sm
640 @property
641 def run_metadata(self):
642 """
643 Generate Run metadata object from Z3D file information.
645 Creates a Run metadata object populated with data logger information,
646 timing details, and measurement parameters extracted from the Z3D file.
648 Returns
649 -------
650 Run
651 Run metadata object with populated fields including data logger
652 details, sample rate, time period, and data type information.
654 Examples
655 --------
656 >>> z3d = Z3D("/path/to/file.Z3D")
657 >>> z3d.read_all_info()
658 >>> run_meta = z3d.run_metadata
659 >>> print(f"Sample rate: {run_meta.sample_rate}")
660 """
661 rm = Run()
662 rm.data_logger.firmware.version = self.header.version
663 rm.data_logger.id = self.header.data_logger
664 rm.data_logger.manufacturer = "Zonge International"
665 rm.data_logger.model = "ZEN"
666 rm.time_period.start = self.start.isoformat()
667 rm.time_period.end = self.end.isoformat()
668 rm.sample_rate = self.sample_rate
669 rm.data_type = "BBMT"
670 rm.time_period.start = self.start.isoformat()
671 rm.time_period.end = self.end.isoformat()
672 rm.acquired_by.author = self.metadata.gdp_operator
673 rm.id = f"sr{int(self.sample_rate)}_001"
675 return rm
677 @property
678 def counts2mv_filter(self):
679 """
680 Create a counts to milliVolt coefficient filter.
682 Generate a coefficient filter for converting digital counts to milliVolt
683 using the channel factor from the Z3D file header.
685 Returns
686 -------
687 CoefficientFilter
688 Filter object configured for counts to milliVolt conversion with
689 gain set to the inverse of the channel factor.
691 Notes
692 -----
693 The gain is set to 1/channel_factor because this represents the
694 inverse operation when the instrument response has been divided
695 from the data during processing.
697 Examples
698 --------
699 >>> z3d = Z3D("/path/to/file.Z3D")
700 >>> z3d.read_all_info()
701 >>> filter_obj = z3d.counts2mv_filter
702 >>> print(f"Conversion gain: {filter_obj.gain}")
703 """
705 c2mv = CoefficientFilter()
706 c2mv.units_in = "milliVolt"
707 c2mv.units_out = "digital counts"
708 c2mv.name = "zen_counts2mv"
709 c2mv.gain = 1.0 / self.header.ch_factor
710 c2mv.comments = "digital counts to milliVolt"
712 return c2mv
714 @property
715 def coil_response(self):
716 """
717 Make the coile response into a FAP filter
719 Phase must be in radians
720 """
721 fap = None
722 # if there is no calibration file get from Z3D file, though it seems
723 # like these are not read in properly.
724 if self.calibration_fn in [None, "None"]:
725 # looks like zen outputs radial frequency
726 if self.metadata.cal_ant is not None:
727 fap = FrequencyResponseTableFilter()
728 fap.units_in = "nanoTesla"
729 fap.units_out = "milliVolt"
730 fap.frequencies = (1 / (2 * np.pi)) * self.metadata.coil_cal.frequency
731 fap.amplitudes = self.metadata.coil_cal.amplitude
732 fap.phases = self.metadata.coil_cal.phase / 1e3
733 fap.name = f"ant4_{self.coil_number}_response"
734 fap.comments = "induction coil response read from z3d file"
735 else:
736 c = CoilResponse(self.calibration_fn)
737 if c.has_coil_number(self.coil_number):
738 fap = c.get_coil_response_fap(self.coil_number)
739 return fap
741 @property
742 def zen_response(self):
743 """
744 Zen response, not sure the full calibration comes directly from the
745 Z3D file, so skipping for now. Will have to read a Zen##.cal file
746 to get the full calibration. This shouldn't be a big issue cause it
747 should roughly be the same for all channels and since the TF is
748 computing the ratio they will cancel out. Though we should look
749 more into this if just looking at calibrate time series.
751 """
752 return None
753 # fap = None
754 # find = False
755 # return
756 # if self.metadata.board_cal not in [None, []]:
757 # if self.metadata.board_cal[0][0] == "":
758 # return fap
759 # sr_dict = {256: 0, 1024: 1, 4096: 4}
760 # sr_int = sr_dict[int(self.sample_rate)]
761 # fap_table = self.metadata.board_cal[
762 # np.where(self.metadata.board_cal.rate == sr_int)
763 # ]
764 # frequency = fap_table.frequency
765 # amplitude = fap_table.amplitude
766 # phase = fap_table.phase / 1e3
767 # find = True
768 # elif self.metadata.cal_board is not None:
769 # try:
770 # fap_dict = self.metadata.cal_board[int(self.sample_rate)]
771 # frequency = fap_dict["frequency"]
772 # amplitude = fap_dict["amplitude"]
773 # phase = fap_dict["phase"]
774 # find = True
775 # except KeyError:
776 # try:
777 # fap_str = self.metadata.cal_board["cal.ch"]
778 # for ss in fap_str.split(";"):
779 # freq, _, resp = ss.split(",")
780 # ff, amp, phs = [float(item) for item in resp.split(":")]
781 # if float(freq) == self.sample_rate:
782 # frequency = ff
783 # amplitude = amp
784 # phase = phs / 1e3
785 # find = True
786 # except KeyError:
787 # return fap
788 # if find:
789 # freq = np.logspace(np.log10(6.00000e-04), np.log10(8.19200e03), 48)
790 # amp = np.ones(48)
791 # phases = np.zeros(48)
792 # for item_f, item_a, item_p in zip(frequency, amplitude, phase):
793 # index = np.abs(freq - item_f).argmin()
794 # freq[index] = item_f
795 # amp[index] = item_a
796 # phases[index] = item_p
797 # fap = FrequencyResponseTableFilter()
798 # fap.units_in = "milliVolt"
799 # fap.units_out = "milliVolt"
800 # fap.frequencies = freq
801 # fap.amplitudes = amp
802 # fap.phases = phases
803 # fap.name = (
804 # f"{self.header.data_logger.lower()}_{self.sample_rate:.0f}_response"
805 # )
806 # fap.comments = "data logger response read from z3d file"
807 # return fap
808 # return None
810 @property
811 def channel_response(self):
812 """
813 Generate comprehensive channel response for the Z3D data.
815 Creates a ChannelResponse object containing all applicable filters
816 including coil response, dipole conversion, and counts-to-milliVolt
817 transformation.
819 Returns
820 -------
821 ChannelResponse
822 Channel response object with appropriate filter chain for
823 converting raw Z3D data to physical units.
825 Notes
826 -----
827 The filter chain includes:
828 - Coil response (for magnetic channels) or dipole filter (for electric)
829 - Counts to milliVolt conversion filter
830 """
831 filter_list = []
832 # don't have a good handle on the zen response yet.
833 # if self.zen_response:
834 # filter_list.append(self.zen_response)
835 frequencies = np.empty(0)
836 if self.coil_response:
837 filter_list.append(self.coil_response)
838 frequencies = self.coil_response.frequencies
839 elif self.dipole_filter:
840 filter_list.append(self.dipole_filter)
842 filter_list.append(self.counts2mv_filter)
843 return ChannelResponse(filters_list=filter_list, frequencies=frequencies)
845 @property
846 def dipole_filter(self):
847 """
848 Create dipole conversion filter for electric field measurements.
850 Generate a coefficient filter for converting electric field measurements
851 from milliVolt per kilometer to milliVolt using the dipole length.
853 Returns
854 -------
855 CoefficientFilter or None
856 Filter object for dipole conversion if dipole_length is non-zero,
857 None otherwise.
859 Notes
860 -----
861 The gain is set to dipole_length/1000 to convert from mV/km to mV.
862 This represents the physical dipole length scaling for electric
863 field measurements.
865 Examples
866 --------
867 >>> z3d = Z3D("/path/to/electric.Z3D")
868 >>> z3d.read_all_info()
869 >>> if z3d.dipole_filter is not None:
870 ... print(f"Dipole length: {z3d.dipole_length} m")
871 """
872 dipole = None
873 # needs to be the inverse for processing
874 if self.dipole_length != 0:
875 dipole = CoefficientFilter()
876 dipole.units_in = "milliVolt per kilometer"
877 dipole.units_out = "milliVolt"
878 dipole.name = f"dipole_{self.dipole_length:.2f}m"
879 dipole.gain = self.dipole_length / 1000.0
880 dipole.comments = "convert to electric field"
881 return dipole
883 def _get_gps_stamp_type(self, old_version: bool = False) -> None:
884 """
885 Set the correct GPS stamp data type.
887 Configure GPS stamp structure for different Z3D file versions.
888 Older versions use 36-bit stamps while newer versions use 64-bit stamps.
890 Parameters
891 ----------
892 old_version : bool, default False
893 If True, configure for older Z3D file format with 36-bit stamps.
894 If False, use newer 64-bit stamp format.
895 """
896 if old_version is True:
897 self._gps_dtype = np.dtype(
898 [
899 ("gps", np.int32),
900 ("time", np.int32),
901 ("lat", np.float64),
902 ("lon", np.float64),
903 ("block_len", np.int32),
904 ("gps_accuracy", np.int32),
905 ("temperature", np.float32),
906 ]
907 )
908 self._gps_stamp_length = 36
909 self._gps_bytes = self._gps_stamp_length / 4
910 self._gps_flag_0 = -1
911 self._block_len = int(self._gps_stamp_length + self.sample_rate * 4)
912 self.gps_flag = self._gps_f0
913 else:
914 return
916 # ======================================
917 def _read_header(
918 self, fn: str | Path | None = None, fid: BinaryIO | None = None
919 ) -> None:
920 """
921 Read header information from Z3D file.
923 Parameters
924 ----------
925 fn : str, Path, or None, optional
926 Full path to Z3D file to read. If None, uses current fn attribute.
927 fid : BinaryIO or None, optional
928 Open file object. If provided, reads from this instead of opening fn.
930 Notes
931 -----
932 Populates the header object's attributes and handles version-specific
933 logic for older Z3D file formats.
935 Examples
936 --------
937 Read header from file path:
939 >>> z3d = Z3D()
940 >>> z3d._read_header("/path/to/file.Z3D")
942 Read header from open file object:
944 >>> with open("/path/to/file.Z3D", 'rb') as f:
945 ... z3d._read_header(fid=f)
946 """
947 if fn is not None:
948 self.fn = fn
949 self.header.read_header(fn=self.fn, fid=fid)
950 if self.header.old_version:
951 if self.header.box_number is None:
952 self.header.box_number = "6666"
954 # ======================================
955 def _read_schedule(
956 self, fn: str | Path | None = None, fid: BinaryIO | None = None
957 ) -> None:
958 """
959 Read schedule information from Z3D file.
961 Parameters
962 ----------
963 fn : str, Path, or None, optional
964 Full path to Z3D file to read. If None, uses current fn attribute.
965 fid : BinaryIO or None, optional
966 Open file object. If provided, reads from this instead of opening fn.
968 Notes
969 -----
970 Populates the schedule object's attributes. For older file versions,
971 extracts schedule information from the header.
973 Examples
974 --------
975 Read schedule from file path:
977 >>> z3d = Z3D()
978 >>> z3d._read_schedule("/path/to/file.Z3D")
980 Read schedule from open file object:
982 >>> with open("/path/to/file.Z3D", 'rb') as f:
983 ... z3d._read_schedule(fid=f)
984 """
985 if fn is not None:
986 self.fn = fn
987 self.schedule.read_schedule(fn=self.fn, fid=fid)
988 if self.header.old_version:
989 self.schedule.initial_start = MTime(
990 time_stamp=self.header.schedule, gps_time=True
991 )
993 # ======================================
994 def _read_metadata(
995 self, fn: str | Path | None = None, fid: BinaryIO | None = None
996 ) -> None:
997 """
998 Read metadata information from Z3D file.
1000 Parameters
1001 ----------
1002 fn : str, Path, or None, optional
1003 Full path to Z3D file to read. If None, uses current fn attribute.
1004 fid : BinaryIO or None, optional
1005 Open file object. If provided, reads from this instead of opening fn.
1007 Notes
1008 -----
1009 Populates the metadata object's attributes. For older file versions,
1010 sets schedule metadata length to 0.
1012 Examples
1013 --------
1014 Read metadata from file path:
1016 >>> z3d = Z3D()
1017 >>> z3d._read_metadata("/path/to/file.Z3D")
1019 Read metadata from open file object:
1021 >>> with open("/path/to/file.Z3D", 'rb') as f:
1022 ... z3d._read_metadata(fid=f)
1023 """
1024 if fn is not None:
1025 self.fn = fn
1026 if self.header.old_version:
1027 self.metadata._schedule_metadata_len = 0
1028 self.metadata.read_metadata(fn=self.fn, fid=fid)
1030 # =====================================
1031 def read_all_info(self) -> None:
1032 """
1033 Read header, schedule, and metadata from Z3D file.
1035 Convenience method to read all file information in one call.
1036 Opens the file once and reads all sections sequentially.
1038 Raises
1039 ------
1040 FileNotFoundError
1041 If the Z3D file does not exist.
1042 """
1043 with open(self.fn, "rb") as file_id:
1044 self._read_header(fid=file_id)
1045 self._read_schedule(fid=file_id)
1046 self._read_metadata(fid=file_id)
1048 def _find_first_gps_flag(self, fid: BinaryIO) -> int:
1049 """
1050 Find the first GPS flag in the file.
1052 The GPS flag should be at the end of the metadata, but sometimes
1053 there are extra bytes. This method searches byte by byte to find
1054 the first GPS flag.
1056 Parameters
1057 ----------
1058 fid : BinaryIO
1059 File object positioned after metadata.
1061 Returns
1062 -------
1063 int
1064 File position of the first GPS flag.
1066 Notes
1067 -----
1068 Includes failsafe to prevent infinite loops by limiting search
1069 to first 15000 bytes.
1070 """
1071 find_gps_flag = False
1072 fid_tell = self.metadata.m_tell - 1
1073 fid.seek(fid_tell)
1074 # adding a fail safe to not have an infinite loop
1075 while not find_gps_flag or fid_tell < 15000:
1076 fid_tell += 1
1077 fid.seek(fid_tell)
1078 line = fid.read(4)
1079 try:
1080 line = np.frombuffer(line, np.int32)[0]
1081 if line == np.int32(2147483647):
1082 return fid_tell
1083 except AttributeError:
1084 continue
1086 def _read_raw_string(self, fid: BinaryIO) -> np.ndarray:
1087 """
1088 Read raw binary data from Z3D file into array.
1090 Reads the entire data portion of the file as 32-bit integers,
1091 starting from after the metadata section.
1093 Parameters
1094 ----------
1095 fid : BinaryIO
1096 Open file object positioned after metadata.
1098 Returns
1099 -------
1100 numpy.ndarray
1101 Array of int32 values containing both data and GPS stamps.
1102 """
1103 # move the read value to where the end of the metadata is
1104 self.metadata.m_tell = self._find_first_gps_flag(fid)
1105 fid.seek(self.metadata.m_tell)
1107 # initalize a data array filled with zeros, everything goes into
1108 # this array then we parse later
1109 data = np.zeros(self.n_samples, dtype=np.int32)
1110 # go over a while loop until the data cound exceed the file size
1111 data_count = 0
1112 while True:
1113 # need to make sure the last block read is a multiple of 32 bit
1114 read_len = min(
1115 [
1116 self._block_len,
1117 int(32 * ((self.file_size - fid.tell()) // 32)),
1118 ]
1119 )
1120 test_str = np.frombuffer(fid.read(read_len), dtype=np.int32)
1121 if len(test_str) == 0:
1122 break
1123 data[data_count : data_count + len(test_str)] = test_str
1124 data_count += test_str.size
1126 # # now we need to trim off the extra zeros at the end
1127 # data = data[:data_count]
1128 return data
1130 def _unpack_data(self, data: np.ndarray, gps_stamp_index: list[int]) -> np.ndarray:
1131 """
1132 Unpack GPS stamps from raw data array and extract clean time series.
1134 Extract GPS timestamp information from the raw data array at
1135 specified indices and return a contiguous array of time series data
1136 with GPS stamps removed and all data blocks concatenated.
1138 Parameters
1139 ----------
1140 data : numpy.ndarray
1141 Raw data array containing both time series and GPS stamps.
1142 gps_stamp_index : list of int
1143 Indices where GPS stamps are located in the data array.
1145 Returns
1146 -------
1147 numpy.ndarray
1148 Clean time series data array with GPS stamps removed and
1149 all data blocks concatenated, including final block after last GPS stamp.
1150 """
1151 if len(gps_stamp_index) == 0:
1152 return data
1154 # Extract GPS stamps and calculate block lengths
1155 time_series_blocks = []
1157 for ii, gps_find in enumerate(gps_stamp_index):
1158 try:
1159 data[gps_find + 1]
1160 except IndexError:
1161 self.logger.warning(
1162 f"Failed gps stamp {ii+1} out of {len(gps_stamp_index)}"
1163 )
1164 break
1166 if (
1167 self.header.old_version is True
1168 or data[gps_find + 1] == self._gps_flag_1
1169 ):
1170 # Extract GPS stamp
1171 gps_str = struct.pack(
1172 "<" + "i" * int(self._gps_bytes),
1173 *data[int(gps_find) : int(gps_find + self._gps_bytes)],
1174 )
1175 self.gps_stamps[ii] = np.frombuffer(gps_str, dtype=self._gps_dtype)
1177 # Calculate block length and extract data block before this GPS stamp
1178 if ii > 0:
1179 block_start = gps_stamp_index[ii - 1] + int(self._gps_bytes)
1180 block_end = gps_find
1181 block_len = block_end - block_start
1182 self.gps_stamps[ii]["block_len"] = block_len
1184 # Extract the data block
1185 if block_len > 0:
1186 time_series_blocks.append(data[block_start:block_end])
1187 elif ii == 0:
1188 # First GPS stamp - data before it (if any)
1189 self.gps_stamps[ii]["block_len"] = 0
1190 if gps_find > 0:
1191 time_series_blocks.append(data[:gps_find])
1193 # Handle the final block of data after the last GPS stamp
1194 if len(gps_stamp_index) > 0:
1195 last_gps_pos = gps_stamp_index[-1]
1196 remaining_data_start = last_gps_pos + int(self._gps_bytes)
1198 if remaining_data_start < data.size:
1199 final_block = data[remaining_data_start:]
1200 # Only include final block if it contains significant non-zero data
1201 non_zero_count = np.count_nonzero(final_block)
1202 if non_zero_count > 0:
1203 self.logger.debug(
1204 f"Including final block with {final_block.size} samples "
1205 f"({non_zero_count} non-zero)"
1206 )
1207 time_series_blocks.append(final_block)
1209 # Concatenate all blocks to form clean time series
1210 if time_series_blocks:
1211 clean_data = np.concatenate(time_series_blocks)
1212 # Remove any remaining zeros
1213 clean_data = clean_data[np.nonzero(clean_data)]
1214 else:
1215 clean_data = np.array([], dtype=data.dtype)
1217 return clean_data
1219 # ======================================
1220 def read_z3d(self, z3d_fn: str | Path | None = None) -> None:
1221 """
1222 Read and parse Z3D file data.
1224 Comprehensive method to read a Z3D file and populate all object attributes.
1225 Performs the following operations:
1227 1. Read file as chunks of 32-bit integers
1228 2. Extract and validate GPS stamps
1229 3. Check GPS time stamp consistency (1 second intervals)
1230 4. Verify data block lengths match sampling rate
1231 5. Convert GPS time to seconds relative to GPS week
1232 6. Skip initial buffered data (first 2 seconds)
1233 7. Populate time series array with non-zero data
1235 Parameters
1236 ----------
1237 z3d_fn : str, Path, or None, optional
1238 Path to Z3D file to read. If None, uses current fn attribute.
1240 Raises
1241 ------
1242 ZenGPSError
1243 If data is too short or GPS timing issues prevent parsing.
1245 Examples
1246 --------
1247 >>> z3d = Z3D(r"/path/to/data/station_20150522_080000_256_EX.Z3D")
1248 >>> z3d.read_z3d()
1249 >>> print(f"Read {z3d.time_series.size} data points")
1250 """
1251 if z3d_fn is not None:
1252 self.fn = z3d_fn
1253 self.logger.debug(f"Reading {self.fn}")
1254 st = MTime().now()
1256 # using the with statement works in Python versions 2.7 or higher
1257 # the added benefit of the with statement is that it will close the
1258 # file object upon reading completion.
1259 with open(self.fn, "rb") as file_id:
1260 self._read_header(fid=file_id)
1261 self._read_schedule(fid=file_id)
1262 self._read_metadata(fid=file_id)
1264 if self.header.old_version is True:
1265 self._get_gps_stamp_type(True)
1266 data = self._read_raw_string(file_id)
1267 self.raw_data = data.copy()
1269 # find the gps stamps
1270 gps_stamp_find = self.get_gps_stamp_index(data, self.header.old_version)
1272 # skip the first two stamps and trim data
1273 try:
1274 data = data[gps_stamp_find[self.num_sec_to_skip] :]
1275 except IndexError:
1276 msg = f"Data is too short, cannot open file {self.fn}"
1277 self.logger.error(msg)
1278 raise ZenGPSError(msg)
1279 # find gps stamps of the trimmed data
1280 gps_stamp_find = self.get_gps_stamp_index(data, self.header.old_version)
1282 # read data chunks and GPS stamps
1283 self.gps_stamps = np.zeros(len(gps_stamp_find), dtype=self._gps_dtype)
1284 self.time_series = self._unpack_data(data, gps_stamp_find)
1286 # validate everything
1287 if not self.validate_gps_time():
1288 self.logger.debug(
1289 f"GPS stamps are not 1 second apart for file {self.fn.name}."
1290 )
1291 if not self.validate_time_blocks():
1292 self.logger.debug(
1293 f"Time block between stamps was not the sample rate for file {self.fn.name}"
1294 )
1295 self.convert_gps_time()
1296 self.zen_schedule = self.check_start_time()
1297 self.logger.debug(f"found {self.gps_stamps.shape[0]} GPS time stamps")
1298 self.logger.debug(f"found {self.time_series.size} data points")
1300 # time it
1301 et = MTime().now()
1302 read_time = et - st
1303 self.logger.debug(f"Reading data took: {read_time:.3f} seconds")
1305 # =================================================
1306 def get_gps_stamp_index(
1307 self, ts_data: np.ndarray, old_version: bool = False
1308 ) -> list[int]:
1309 """
1310 Locate GPS time stamp indices in time series data.
1312 Searches for GPS flag patterns in the data array. For newer files,
1313 verifies that flag_1 follows flag_0.
1315 Parameters
1316 ----------
1317 ts_data : numpy.ndarray
1318 Time series data array containing GPS stamps.
1319 old_version : bool, default False
1320 If True, only searches for single GPS flag (old format).
1321 If False, validates flag pairs (new format).
1323 Returns
1324 -------
1325 list of int
1326 List of indices where GPS stamps are located.
1327 """
1328 # find the gps stamps
1329 gps_stamp_find = np.where(ts_data == self._gps_flag_0)[0]
1331 if old_version is False:
1332 gps_stamp_find = [
1333 gps_find
1334 for gps_find in gps_stamp_find
1335 if ts_data[gps_find + 1] == self._gps_flag_1
1336 ]
1337 return list(gps_stamp_find)
1339 # =================================================
1340 def trim_data(self) -> None:
1341 """
1342 Trim the first 2 seconds of data due to SD buffer issues.
1344 Remove the first 2 GPS stamps and corresponding time series data
1345 to account for SD card buffering artifacts in early data.
1347 Notes
1348 -----
1349 This method may be deprecated after field testing confirms
1350 the buffer behavior is consistent across all instruments.
1352 .. deprecated::
1353 This method will be deprecated after field testing.
1354 """
1355 # the block length is the number of data points before the time stamp
1356 # therefore the first block length is 0. The indexing in python
1357 # goes to the last index - 1 so we need to put in 3
1358 ts_skip = self.gps_stamps["block_len"][0:3].sum()
1359 self.gps_stamps = self.gps_stamps[2:]
1360 self.gps_stamps[0]["block_len"] = 0
1361 self.time_series = self.time_series[ts_skip:]
1363 # =================================================
1364 def check_start_time(self) -> MTime:
1365 """
1366 Validate scheduled start time against first GPS stamp.
1368 Compare the scheduled start time from the file header with
1369 the actual first GPS timestamp to identify timing discrepancies.
1371 Returns
1372 -------
1373 MTime
1374 UTC start time from the first valid GPS stamp.
1376 Notes
1377 -----
1378 Logs warnings if the difference exceeds the maximum allowed
1379 time difference (default 20 seconds).
1380 """
1381 # make sure the time is in gps time
1382 zen_start_utc = self.get_UTC_date_time(
1383 self.header.gpsweek, float(self.gps_stamps["time"][0])
1384 )
1386 # estimate the time difference between the two
1387 time_diff = zen_start_utc - self.schedule.initial_start
1388 if time_diff > self._max_time_diff:
1389 self.logger.warning(f"ZEN Scheduled time was {self.schedule.initial_start}")
1390 self.logger.warning(f"1st good stamp was {zen_start_utc}")
1391 self.logger.warning(f"difference of {time_diff:.2f} seconds")
1393 self.logger.debug(f"Scheduled time was {self.schedule.initial_start}")
1394 self.logger.debug(f"1st good stamp was {zen_start_utc}")
1395 self.logger.debug(f"difference of {time_diff:.2f} seconds")
1397 return zen_start_utc
1399 # ==================================================
1400 def validate_gps_time(self) -> bool:
1401 """
1402 Validate that GPS time stamps are consistently 1 second apart.
1404 Returns
1405 -------
1406 bool
1407 True if all GPS stamps are properly spaced, False otherwise.
1409 Notes
1410 -----
1411 Logs debug information for any stamps that are more than 1 second apart.
1412 """
1413 # need to put the gps time into seconds
1414 t_diff = np.diff(self.gps_stamps["time"]) / 1024
1416 bad_times = np.where(abs(t_diff) > 1)[0]
1417 if len(bad_times) > 0:
1418 for bb in bad_times:
1419 self.logger.warning(
1420 f"Data block {bb} has a time difference between GPS stamps > 1 second [{t_diff[bb]} s]"
1421 )
1422 return False
1423 return True
1425 # ===================================================
1426 def validate_time_blocks(self) -> bool:
1427 """
1428 Validate GPS time stamps and verify data block lengths.
1430 Check that each GPS stamp block contains the expected number
1431 of data points (should equal sample rate for 1-second blocks).
1433 Returns
1434 -------
1435 bool
1436 True if all blocks have correct length, False otherwise.
1438 Notes
1439 -----
1440 If bad blocks are detected near the beginning (index < 5),
1441 this method will automatically skip those blocks and trim
1442 the time series data accordingly.
1443 """
1444 # first check if the gps stamp blocks are of the correct length
1445 bad_blocks = np.where(self.gps_stamps["block_len"][1:] != self.header.ad_rate)[
1446 0
1447 ]
1449 if len(bad_blocks) > 0:
1450 if bad_blocks.max() < 5:
1451 ts_skip = self.gps_stamps["block_len"][0 : bad_blocks[-1] + 1].sum()
1452 self.gps_stamps = self.gps_stamps[bad_blocks[-1] :]
1453 self.time_series = self.time_series[ts_skip:]
1455 self.logger.warning(f"Skipped the first {bad_blocks[-1]} seconds")
1456 self.logger.warning(f"Skipped first {ts_skip} points in time series")
1457 return True
1458 for bb in bad_blocks:
1459 self.logger.warning(
1460 f"Data block {bb} has {self.gps_stamps['block_len'][bb+1]} samples, "
1461 f"expected {self.header.ad_rate} samples."
1462 )
1463 return False
1464 return True
1466 # ==================================================
1467 def convert_gps_time(self) -> None:
1468 """
1469 Convert GPS time integers to floating point seconds.
1471 Transform GPS time from integer format to float and convert
1472 from GPS time units to seconds relative to the GPS week.
1474 Notes
1475 -----
1476 GPS time is initially stored as integers in units of 1/1024 seconds.
1477 This method converts to floating point seconds and applies the
1478 necessary scaling factors.
1479 """
1480 # need to convert gps_time to type float from int
1481 dt = self._gps_dtype.descr
1482 if self.header.old_version is True:
1483 dt[1] = ("time", np.float32)
1484 else:
1485 dt[2] = ("time", np.float32)
1486 self.gps_stamps = self.gps_stamps.astype(np.dtype(dt))
1488 # convert to seconds
1489 # these are seconds relative to the gps week
1490 time_conv = self.gps_stamps["time"].copy() / 1024.0
1491 time_ms = (time_conv - np.floor(time_conv)) * 1.024
1492 time_conv = np.floor(time_conv) + time_ms
1494 self.gps_stamps["time"][:] = time_conv
1496 # ==================================================
1497 def convert_counts_to_mv(self, data: np.ndarray) -> np.ndarray:
1498 """
1499 Convert time series data from counts to milliVolt.
1501 Parameters
1502 ----------
1503 data : numpy.ndarray
1504 Time series data in digital counts.
1506 Returns
1507 -------
1508 numpy.ndarray
1509 Time series data converted to milliVolt.
1510 """
1511 data *= self._counts_to_mv_conversion
1512 return data
1514 def convert_mv_to_counts(self, data: np.ndarray) -> np.ndarray:
1515 """
1516 Convert time series data from milliVolt to counts.
1518 Parameters
1519 ----------
1520 data : numpy.ndarray
1521 Time series data in milliVolt.
1523 Returns
1524 -------
1525 numpy.ndarray
1526 Time series data converted to digital counts.
1528 Notes
1529 -----
1530 Assumes no other scaling has been applied to the data.
1531 """
1532 data /= self._counts_to_mv_conversion
1533 return data
1535 # ==================================================
1536 def get_gps_time(self, gps_int: int, gps_week: int = 0) -> tuple[float, int]:
1537 """
1538 Convert GPS integer timestamp to seconds and GPS week.
1540 Parameters
1541 ----------
1542 gps_int : int
1543 Integer from the GPS time stamp line.
1544 gps_week : int, default 0
1545 Relative GPS week. If seconds exceed one week, this is incremented.
1547 Returns
1548 -------
1549 tuple[float, int]
1550 GPS time in seconds from beginning of GPS week, and updated GPS week.
1552 Notes
1553 -----
1554 GPS integers are in units of 1/1024 seconds. This method handles
1555 week rollovers when seconds exceed 604800.
1556 """
1557 gps_seconds = gps_int / 1024.0
1559 gps_ms = (gps_seconds - np.floor(gps_int / 1024.0)) * (1.024)
1561 cc = 0
1562 if gps_seconds > self._week_len:
1563 gps_week += 1
1564 cc = gps_week * self._week_len
1565 gps_seconds -= self._week_len
1566 gps_time = np.floor(gps_seconds) + gps_ms + cc
1568 return gps_time, gps_week
1570 # ==================================================
1571 def get_UTC_date_time(self, gps_week: int, gps_time: float) -> MTime:
1572 """
1573 Convert GPS week and time to UTC datetime.
1575 Calculate the actual UTC date and time of measurement from
1576 GPS week number and seconds within that week.
1578 Parameters
1579 ----------
1580 gps_week : int
1581 GPS week number when data was collected.
1582 gps_time : float
1583 Number of seconds from beginning of GPS week.
1585 Returns
1586 -------
1587 MTime
1588 UTC datetime object for the measurement time.
1590 Notes
1591 -----
1592 Automatically handles GPS time rollover when seconds exceed
1593 one week (604800 seconds).
1594 """
1595 # need to check to see if the time in seconds is more than a gps week
1596 # if it is add 1 to the gps week and reduce the gps time by a week
1597 if gps_time > self._week_len:
1598 gps_week += 1
1599 gps_time -= self._week_len
1600 # compute seconds using weeks and gps time
1601 # Convert gps_time to Python float to avoid precision issues with numpy types
1602 utc_seconds = (
1603 self._gps_epoch.epoch_seconds
1604 + (gps_week * self._week_len)
1605 + float(gps_time)
1606 )
1608 # compute date and time from seconds and return a datetime object
1609 # easier to manipulate later, must be in nanoseconds
1610 return MTime(time_stamp=utc_seconds, gps_time=True)
1612 # =================================================
1613 def to_channelts(self) -> ChannelTS:
1614 """
1615 Convert Z3D data to ChannelTS time series object.
1617 Create a ChannelTS object populated with the time series data
1618 and all associated metadata from the Z3D file.
1620 Returns
1621 -------
1622 ChannelTS
1623 Time series object with data, metadata, and instrument response.
1624 """
1625 return ChannelTS(
1626 self.channel_metadata.type,
1627 data=self.time_series,
1628 channel_metadata=self.channel_metadata,
1629 station_metadata=self.station_metadata,
1630 run_metadata=self.run_metadata,
1631 channel_response=self.channel_response,
1632 )
1635# ==============================================================================
1636# Error instances for Zen
1637# ==============================================================================
1638class ZenGPSError(Exception):
1639 """Exception raised for GPS timing errors in Z3D files."""
1642class ZenSamplingRateError(Exception):
1643 """Exception raised for sampling rate inconsistencies."""
1646class ZenInputFileError(Exception):
1647 """Exception raised for Z3D file input/reading errors."""
1650def read_z3d(
1651 fn: str | Path,
1652 calibration_fn: str | Path | None = None,
1653 logger_file_handler: Any | None = None,
1654) -> ChannelTS | None:
1655 """
1656 Read a Z3D file and return a ChannelTS object.
1658 Convenience function to read Z3D files with error handling.
1660 Parameters
1661 ----------
1662 fn : str or Path
1663 Path to the Z3D file to read.
1664 calibration_fn : str, Path, or None, optional
1665 Path to calibration file. Default is None.
1666 logger_file_handler : optional
1667 Logger file handler to add to Z3D logger. Default is None.
1669 Returns
1670 -------
1671 ChannelTS or None
1672 Time series object if successful, None if GPS timing errors occur.
1674 Examples
1675 --------
1676 >>> ts = read_z3d("/path/to/data/station_EX.Z3D")
1677 >>> if ts is not None:
1678 ... print(f"Read {ts.n_samples} samples")
1679 """
1680 z3d_obj = Z3D(fn, calibration_fn=calibration_fn)
1681 if logger_file_handler:
1682 z3d_obj.logger.addHandler(logger_file_handler)
1683 try:
1684 z3d_obj.read_z3d()
1685 except ZenGPSError as error:
1686 z3d_obj.logger.exception(error)
1687 z3d_obj.logger.warning(f"Skipping {fn}, check file for GPS timing.")
1688 return None
1689 return z3d_obj.to_channelts()