Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ zen \ zen.py: 85%

488 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3==================== 

4Zen 

5==================== 

6 

7 * Tools for reading and writing files for Zen and processing software 

8 * Tools for copying data from SD cards 

9 * Tools for copying schedules to SD cards 

10 

11Created on Tue Jun 11 10:53:23 2013 

12Updated August 2020 (JP) 

13 

14:copyright: 

15 Jared Peacock (jpeacock@usgs.gov) 

16 

17:license: 

18 MIT 

19 

20""" 

21 

22# ============================================================================== 

23 

24from __future__ import annotations 

25 

26import datetime 

27import struct 

28from pathlib import Path 

29from typing import Any, BinaryIO 

30 

31import numpy as np 

32from loguru import logger 

33from mt_metadata.common.mttime import MTime 

34from mt_metadata.timeseries import AppliedFilter, Electric, Magnetic, Run, Station 

35from mt_metadata.timeseries.filters import ( 

36 ChannelResponse, 

37 CoefficientFilter, 

38 FrequencyResponseTableFilter, 

39) 

40 

41from mth5.io.zen import Z3DHeader, Z3DMetadata, Z3DSchedule 

42from mth5.io.zen.coil_response import CoilResponse 

43from mth5.timeseries import ChannelTS 

44 

45 

46# ============================================================================== 

47# 

48# ============================================================================== 

49class Z3D: 

50 """ 

51 A class for reading and processing Z3D files output by Zen data loggers. 

52 

53 This class handles the parsing of Z3D binary files which contain GPS-stamped 

54 time series data from magnetotelluric measurements. It provides methods for 

55 reading file headers, metadata, schedule information, and time series data, 

56 as well as converting between different units and formats. 

57 

58 Parameters 

59 ---------- 

60 fn : str or Path, optional 

61 Full path to the .Z3D file to be read. Default is None. 

62 **kwargs : dict 

63 Additional keyword arguments including: 

64 - stamp_len : int, default 64 

65 GPS stamp length in bits 

66 

67 Attributes 

68 ---------- 

69 fn : Path or None 

70 Path to the Z3D file 

71 calibration_fn : str or None 

72 Path to calibration file 

73 header : Z3DHeader 

74 Header information object 

75 schedule : Z3DSchedule 

76 Schedule information object 

77 metadata : Z3DMetadata 

78 Metadata information object 

79 gps_stamps : numpy.ndarray or None 

80 Array of GPS time stamps 

81 time_series : numpy.ndarray or None 

82 Time series data array 

83 sample_rate : float or None 

84 Data sampling rate in Hz 

85 units : str 

86 Data units, default 'counts' 

87 

88 Notes 

89 ----- 

90 GPS data type is formatted as:: 

91 

92 numpy.dtype([('flag0', numpy.int32), 

93 ('flag1', numpy.int32), 

94 ('time', numpy.int32), 

95 ('lat', numpy.float64), 

96 ('lon', numpy.float64), 

97 ('num_sat', numpy.int32), 

98 ('gps_sens', numpy.int32), 

99 ('temperature', numpy.float32), 

100 ('voltage', numpy.float32), 

101 ('num_fpga', numpy.int32), 

102 ('num_adc', numpy.int32), 

103 ('pps_count', numpy.int32), 

104 ('dac_tune', numpy.int32), 

105 ('block_len', numpy.int32)]) 

106 

107 Examples 

108 -------- 

109 >>> from mth5.io.zen import Z3D 

110 >>> z3d = Z3D(r"/path/to/data/station_20150522_080000_256_EX.Z3D") 

111 >>> z3d.read_z3d() 

112 >>> print(f"Found {z3d.gps_stamps.shape[0]} GPS time stamps") 

113 >>> print(f"Found {z3d.time_series.size} data points") 

114 """ 

115 

116 def __init__(self, fn: str | Path | None = None, **kwargs: Any) -> None: 

117 """ 

118 Initialize Z3D file reader object. 

119 

120 Parameters 

121 ---------- 

122 fn : str or Path, optional 

123 Full path to the Z3D file to be processed, by default None 

124 **kwargs : dict 

125 Additional keyword arguments: 

126 - stamp_len : int, default 64 

127 GPS stamp length in bits 

128 

129 Examples 

130 -------- 

131 >>> z3d = Z3D("/path/to/file.Z3D") 

132 >>> z3d.read_z3d() 

133 >>> print(z3d.station) 

134 """ 

135 self.logger = logger 

136 self.fn = fn 

137 self.calibration_fn = None 

138 

139 self.header = Z3DHeader(fn) 

140 self.schedule = Z3DSchedule(fn) 

141 self.metadata = Z3DMetadata(fn) 

142 

143 self._gps_stamp_length = kwargs.pop("stamp_len", 64) 

144 self._gps_bytes = self._gps_stamp_length / 4 

145 

146 self.gps_stamps = None 

147 

148 self._gps_flag_0 = np.int32(2147483647) 

149 self._gps_flag_1 = np.int32(-2147483648) 

150 self._gps_f0 = self._gps_flag_0.tobytes() 

151 self._gps_f1 = self._gps_flag_1.tobytes() 

152 self.gps_flag = self._gps_f0 + self._gps_f1 

153 

154 self._gps_dtype = np.dtype( 

155 [ 

156 ("flag0", np.int32), 

157 ("flag1", np.int32), 

158 ("time", np.int32), 

159 ("lat", np.float64), 

160 ("lon", np.float64), 

161 ("gps_sens", np.int32), 

162 ("num_sat", np.int32), 

163 ("temperature", np.float32), 

164 ("voltage", np.float32), 

165 ("num_fpga", np.int32), 

166 ("num_adc", np.int32), 

167 ("pps_count", np.int32), 

168 ("dac_tune", np.int32), 

169 ("block_len", np.int32), 

170 ] 

171 ) 

172 

173 self._week_len = 604800 

174 # '1980, 1, 6, 0, 0, 0, -1, -1, 0 

175 self._gps_epoch = MTime(time_stamp="1980-01-06T00:00:00") 

176 self._leap_seconds = 18 

177 self._block_len = 2**16 

178 # the number in the cac files is for volts, we want mV 

179 self._counts_to_mv_conversion = 9.5367431640625e-10 * 1e3 

180 self.num_sec_to_skip = 1 

181 

182 self.units = "digital counts" 

183 self.sample_rate = None 

184 self.time_series = None 

185 self._max_time_diff = 20 

186 

187 self.ch_dict = {"hx": 1, "hy": 2, "hz": 3, "ex": 4, "ey": 5} 

188 

189 for key, value in kwargs.items(): 

190 setattr(self, key, value) 

191 

192 @property 

193 def fn(self) -> Path | None: 

194 """ 

195 Get the Z3D file path. 

196 

197 Returns 

198 ------- 

199 Path or None 

200 Path to the Z3D file, or None if not set. 

201 """ 

202 return self._fn 

203 

204 @fn.setter 

205 def fn(self, fn: str | Path | None) -> None: 

206 """ 

207 Set the Z3D file path. 

208 

209 Parameters 

210 ---------- 

211 fn : str, Path, or None 

212 Path to the Z3D file to set. 

213 """ 

214 if fn is not None: 

215 self._fn = Path(fn) 

216 else: 

217 self._fn = None 

218 

219 @property 

220 def file_size(self) -> int: 

221 """ 

222 Get the size of the Z3D file in bytes. 

223 

224 Returns 

225 ------- 

226 int 

227 File size in bytes, or 0 if no file is set. 

228 """ 

229 if self.fn is not None: 

230 return self.fn.stat().st_size 

231 return 0 

232 

233 @property 

234 def n_samples(self) -> int: 

235 """ 

236 Get the number of data samples in the file. 

237 

238 Returns 

239 ------- 

240 int 

241 Number of data samples. Calculated from file size if time_series 

242 is not loaded, otherwise returns the actual array size. 

243 

244 Notes 

245 ----- 

246 Calculation assumes 4 bytes per sample and accounts for metadata blocks. 

247 If sample_rate is available, adds buffer for GPS stamps. 

248 """ 

249 if self.time_series is None: 

250 if self.sample_rate: 

251 return int( 

252 (self.file_size - 512 * (1 + self.metadata.count)) / 4 

253 + 8 * self.sample_rate 

254 ) 

255 else: 

256 # assume just the 3 general metadata blocks 

257 return int((self.file_size - 512 * 3) / 4) 

258 else: 

259 return self.time_series.size 

260 

261 @property 

262 def station(self) -> str | None: 

263 """ 

264 Get the station name. 

265 

266 Returns 

267 ------- 

268 str or None 

269 Station identifier name. 

270 """ 

271 return self.metadata.station 

272 

273 @station.setter 

274 def station(self, station: str) -> None: 

275 """ 

276 Set the station name. 

277 

278 Parameters 

279 ---------- 

280 station : str 

281 Station identifier name to set. 

282 """ 

283 self.metadata.station = station 

284 

285 @property 

286 def dipole_length(self) -> float: 

287 """ 

288 Get the dipole length for electric field measurements. 

289 

290 Returns 

291 ------- 

292 float 

293 Dipole length in meters. Calculated from electrode positions 

294 if not directly specified in metadata. Returns 0 for magnetic 

295 channels or if positions are not available. 

296 

297 Notes 

298 ----- 

299 Length is calculated from xyz coordinates using Euclidean distance 

300 formula when position data is available in metadata. 

301 """ 

302 length = 0 

303 if self.metadata.ch_length is not None: 

304 length = float(self.metadata.ch_length) 

305 elif hasattr(self.metadata, "ch_offset_xyz1"): 

306 # only ex and ey have xyz2 

307 if hasattr(self.metadata, "ch_offset_xyz2"): 

308 x1, y1, z1 = [ 

309 float(offset) for offset in self.metadata.ch_offset_xyz1.split(":") 

310 ] 

311 x2, y2, z2 = [ 

312 float(offset) for offset in self.metadata.ch_offset_xyz2.split(":") 

313 ] 

314 length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) 

315 length = np.round(length, 2) 

316 else: 

317 length = 0 

318 elif self.metadata.ch_xyz1 is not None: 

319 x1, y1 = [float(d) for d in self.metadata.ch_xyz1.split(":")] 

320 x2, y2 = [float(d) for d in self.metadata.ch_xyz2.split(":")] 

321 length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) * 100.0 

322 length = np.round(length, 2) 

323 return length 

324 

325 @property 

326 def azimuth(self) -> float | None: 

327 """ 

328 Get the azimuth of instrument setup. 

329 

330 Returns 

331 ------- 

332 float or None 

333 Azimuth angle in degrees from north, or None if not available. 

334 """ 

335 if self.metadata.ch_azimuth is not None: 

336 return float(self.metadata.ch_azimuth) 

337 elif self.metadata.rx_xazimuth is not None: 

338 return float(self.metadata.rx_xazimuth) 

339 else: 

340 return None 

341 

342 @property 

343 def component(self) -> str: 

344 """ 

345 Get the channel component identifier. 

346 

347 Returns 

348 ------- 

349 str 

350 Channel component name in lowercase (e.g., 'ex', 'hy', 'hz'). 

351 """ 

352 return self.metadata.ch_cmp.lower() 

353 

354 @property 

355 def latitude(self) -> float | None: 

356 """ 

357 Get the latitude in decimal degrees. 

358 

359 Returns 

360 ------- 

361 float or None 

362 Latitude coordinate in decimal degrees, or None if not available. 

363 """ 

364 return self.header.lat 

365 

366 @property 

367 def longitude(self) -> float | None: 

368 """ 

369 Get the longitude in decimal degrees. 

370 

371 Returns 

372 ------- 

373 float or None 

374 Longitude coordinate in decimal degrees, or None if not available. 

375 """ 

376 return self.header.long 

377 

378 @property 

379 def elevation(self) -> float | None: 

380 """ 

381 Get the elevation in meters. 

382 

383 Returns 

384 ------- 

385 float or None 

386 Elevation above sea level in meters, or None if not available. 

387 """ 

388 return self.header.alt 

389 

390 @property 

391 def sample_rate(self) -> float | None: 

392 """ 

393 Get the sampling rate in Hz. 

394 

395 Returns 

396 ------- 

397 float or None 

398 Data sampling rate in samples per second, or None if not available. 

399 """ 

400 return self.header.ad_rate 

401 

402 @sample_rate.setter 

403 def sample_rate(self, sampling_rate: float | None) -> None: 

404 """ 

405 Set the sampling rate. 

406 

407 Parameters 

408 ---------- 

409 sampling_rate : float or None 

410 Sampling rate in Hz to set. 

411 """ 

412 if sampling_rate is not None: 

413 self.header.ad_rate = float(sampling_rate) 

414 

415 @property 

416 def start(self) -> MTime: 

417 """ 

418 Get the start time of the data. 

419 

420 Returns 

421 ------- 

422 MTime 

423 Start time from GPS stamps if available, otherwise scheduled time. 

424 """ 

425 if self.gps_stamps is not None: 

426 return self.get_UTC_date_time( 

427 self.header.gpsweek, float(self.gps_stamps["time"][0]) 

428 ) 

429 return self.zen_schedule 

430 

431 @property 

432 def end(self) -> MTime | float: 

433 """ 

434 Get the end time of the data. 

435 

436 Returns 

437 ------- 

438 MTime or float 

439 End time from GPS stamps if available, otherwise calculated 

440 from start time and number of samples. 

441 """ 

442 if self.gps_stamps is not None: 

443 return self.get_UTC_date_time( 

444 self.header.gpsweek, float(self.gps_stamps["time"][-1]) 

445 ) 

446 return self.start + (self.n_samples / self.sample_rate) 

447 

448 @property 

449 def zen_schedule(self) -> MTime: 

450 """ 

451 Get the zen schedule date and time. 

452 

453 Returns 

454 ------- 

455 MTime 

456 Scheduled start time from header or schedule object. 

457 """ 

458 if self.header.old_version is True: 

459 return MTime(time_stamp=self.header.schedule) 

460 return self.schedule.initial_start 

461 

462 @zen_schedule.setter 

463 def zen_schedule(self, schedule_dt: MTime | str | datetime.datetime) -> None: 

464 """ 

465 Set the zen schedule datetime. 

466 

467 Parameters 

468 ---------- 

469 schedule_dt : MTime, str, or datetime.datetime 

470 Schedule datetime to set. 

471 

472 Raises 

473 ------ 

474 TypeError 

475 If schedule_dt is not a valid time type. 

476 """ 

477 if not isinstance(schedule_dt, MTime): 

478 schedule_dt = MTime(time_stamp=schedule_dt) 

479 raise TypeError("New schedule datetime must be type datetime.datetime") 

480 self.schedule.initial_start = schedule_dt 

481 

482 @property 

483 def coil_number(self) -> str | None: 

484 """ 

485 Get the coil number identifier. 

486 

487 Returns 

488 ------- 

489 str or None 

490 Coil antenna number identifier, or None if not available. 

491 """ 

492 if self.metadata.cal_ant is not None: 

493 return self.metadata.cal_ant 

494 elif self.metadata.ch_number is not None: 

495 return self.metadata.ch_number 

496 else: 

497 return None 

498 

499 @property 

500 def channel_number(self) -> int: 

501 """ 

502 Get the channel number. 

503 

504 Returns 

505 ------- 

506 int 

507 Channel number identifier. Maps component names to standard 

508 channel numbers or uses metadata channel number. 

509 """ 

510 if self.metadata.ch_number: 

511 ch_num = int(float(self.metadata.ch_number)) 

512 if ch_num > 6: 

513 try: 

514 ch_num = self.ch_dict[self.component] 

515 except KeyError: 

516 ch_num = 6 

517 return ch_num 

518 else: 

519 try: 

520 return self.ch_dict[self.component] 

521 except KeyError: 

522 return 0 

523 

524 @property 

525 def channel_metadata(self): 

526 """ 

527 Generate Channel metadata object from Z3D file information. 

528 

529 Creates either an Electric or Magnetic metadata object based on the 

530 component type, populated with channel-specific parameters, sensor 

531 information, and data statistics. 

532 

533 Returns 

534 ------- 

535 Electric or Magnetic 

536 Channel metadata object appropriate for the measurement type: 

537 - Electric: includes dipole length, AC/DC statistics 

538 - Magnetic: includes sensor details, field min/max values 

539 

540 Notes 

541 ----- 

542 Electric channels (ex, ey) get dipole length and voltage statistics. 

543 Magnetic channels (hx, hy, hz) get sensor information and field 

544 strength statistics computed from the first and last seconds of data. 

545 

546 Examples 

547 -------- 

548 >>> z3d = Z3D("/path/to/file.Z3D") 

549 >>> z3d.read_z3d() 

550 >>> ch_meta = z3d.channel_metadata 

551 >>> print(f"Channel component: {ch_meta.component}") 

552 """ 

553 

554 # fill the time series object 

555 if "e" in self.component: 

556 ch = Electric() 

557 ch.dipole_length = self.dipole_length 

558 ch.ac.start = ( 

559 self.time_series[0 : int(self.sample_rate)].std() 

560 * self.header.ch_factor 

561 ) 

562 ch.ac.end = ( 

563 self.time_series[-int(self.sample_rate) :].std() * self.header.ch_factor 

564 ) 

565 ch.dc.start = ( 

566 self.time_series[0 : int(self.sample_rate)].mean() 

567 * self.header.ch_factor 

568 ) 

569 ch.dc.end = ( 

570 self.time_series[-int(self.sample_rate) :].mean() 

571 * self.header.ch_factor 

572 ) 

573 elif "h" in self.component: 

574 ch = Magnetic() 

575 ch.sensor.id = self.coil_number 

576 ch.sensor.manufacturer = "Geotell" 

577 ch.sensor.model = "ANT-4" 

578 ch.sensor.type = "induction coil" 

579 ch.h_field_max.start = ( 

580 self.time_series[0 : int(self.sample_rate)].max() 

581 * self.header.ch_factor 

582 ) 

583 ch.h_field_max.end = ( 

584 self.time_series[-int(self.sample_rate) :].max() * self.header.ch_factor 

585 ) 

586 ch.h_field_min.start = ( 

587 self.time_series[0 : int(self.sample_rate)].min() 

588 * self.header.ch_factor 

589 ) 

590 ch.h_field_min.end = ( 

591 self.time_series[-int(self.sample_rate) :].min() * self.header.ch_factor 

592 ) 

593 ch.time_period.start = self.start.isoformat() 

594 ch.time_period.end = self.end.isoformat() 

595 ch.component = self.component 

596 ch.sample_rate = self.sample_rate 

597 ch.measurement_azimuth = self.azimuth 

598 if ch.component in ["ey", "e2"] and self.azimuth == 0: 

599 ch.measurement_azimuth = 90 

600 ch.units = "digital counts" 

601 ch.channel_number = self.channel_number 

602 for count, f in enumerate(self.channel_response.filters_list, 1): 

603 ch.add_filter(AppliedFilter(name=f.name, applied=True, stage=count)) 

604 return ch 

605 

606 @property 

607 def station_metadata(self): 

608 """ 

609 Generate Station metadata object from Z3D file information. 

610 

611 Creates a Station metadata object populated with location and timing 

612 information extracted from the Z3D file header and metadata. 

613 

614 Returns 

615 ------- 

616 Station 

617 Station metadata object with populated fields including station ID, 

618 coordinates, elevation, time period, and operator information. 

619 

620 Examples 

621 -------- 

622 >>> z3d = Z3D("/path/to/file.Z3D") 

623 >>> z3d.read_all_info() 

624 >>> station_meta = z3d.station_metadata 

625 >>> print(station_meta.id) 

626 """ 

627 

628 sm = Station() 

629 sm.id = self.station 

630 sm.fdsn.id = self.station 

631 sm.location.latitude = self.latitude 

632 sm.location.longitude = self.longitude 

633 sm.location.elevation = self.elevation 

634 sm.time_period.start = self.start.isoformat() 

635 sm.time_period.end = self.end.isoformat() 

636 sm.acquired_by.author = self.metadata.gdp_operator 

637 

638 return sm 

639 

640 @property 

641 def run_metadata(self): 

642 """ 

643 Generate Run metadata object from Z3D file information. 

644 

645 Creates a Run metadata object populated with data logger information, 

646 timing details, and measurement parameters extracted from the Z3D file. 

647 

648 Returns 

649 ------- 

650 Run 

651 Run metadata object with populated fields including data logger 

652 details, sample rate, time period, and data type information. 

653 

654 Examples 

655 -------- 

656 >>> z3d = Z3D("/path/to/file.Z3D") 

657 >>> z3d.read_all_info() 

658 >>> run_meta = z3d.run_metadata 

659 >>> print(f"Sample rate: {run_meta.sample_rate}") 

660 """ 

661 rm = Run() 

662 rm.data_logger.firmware.version = self.header.version 

663 rm.data_logger.id = self.header.data_logger 

664 rm.data_logger.manufacturer = "Zonge International" 

665 rm.data_logger.model = "ZEN" 

666 rm.time_period.start = self.start.isoformat() 

667 rm.time_period.end = self.end.isoformat() 

668 rm.sample_rate = self.sample_rate 

669 rm.data_type = "BBMT" 

670 rm.time_period.start = self.start.isoformat() 

671 rm.time_period.end = self.end.isoformat() 

672 rm.acquired_by.author = self.metadata.gdp_operator 

673 rm.id = f"sr{int(self.sample_rate)}_001" 

674 

675 return rm 

676 

677 @property 

678 def counts2mv_filter(self): 

679 """ 

680 Create a counts to milliVolt coefficient filter. 

681 

682 Generate a coefficient filter for converting digital counts to milliVolt 

683 using the channel factor from the Z3D file header. 

684 

685 Returns 

686 ------- 

687 CoefficientFilter 

688 Filter object configured for counts to milliVolt conversion with 

689 gain set to the inverse of the channel factor. 

690 

691 Notes 

692 ----- 

693 The gain is set to 1/channel_factor because this represents the 

694 inverse operation when the instrument response has been divided 

695 from the data during processing. 

696 

697 Examples 

698 -------- 

699 >>> z3d = Z3D("/path/to/file.Z3D") 

700 >>> z3d.read_all_info() 

701 >>> filter_obj = z3d.counts2mv_filter 

702 >>> print(f"Conversion gain: {filter_obj.gain}") 

703 """ 

704 

705 c2mv = CoefficientFilter() 

706 c2mv.units_in = "milliVolt" 

707 c2mv.units_out = "digital counts" 

708 c2mv.name = "zen_counts2mv" 

709 c2mv.gain = 1.0 / self.header.ch_factor 

710 c2mv.comments = "digital counts to milliVolt" 

711 

712 return c2mv 

713 

714 @property 

715 def coil_response(self): 

716 """ 

717 Make the coile response into a FAP filter 

718 

719 Phase must be in radians 

720 """ 

721 fap = None 

722 # if there is no calibration file get from Z3D file, though it seems 

723 # like these are not read in properly. 

724 if self.calibration_fn in [None, "None"]: 

725 # looks like zen outputs radial frequency 

726 if self.metadata.cal_ant is not None: 

727 fap = FrequencyResponseTableFilter() 

728 fap.units_in = "nanoTesla" 

729 fap.units_out = "milliVolt" 

730 fap.frequencies = (1 / (2 * np.pi)) * self.metadata.coil_cal.frequency 

731 fap.amplitudes = self.metadata.coil_cal.amplitude 

732 fap.phases = self.metadata.coil_cal.phase / 1e3 

733 fap.name = f"ant4_{self.coil_number}_response" 

734 fap.comments = "induction coil response read from z3d file" 

735 else: 

736 c = CoilResponse(self.calibration_fn) 

737 if c.has_coil_number(self.coil_number): 

738 fap = c.get_coil_response_fap(self.coil_number) 

739 return fap 

740 

741 @property 

742 def zen_response(self): 

743 """ 

744 Zen response, not sure the full calibration comes directly from the 

745 Z3D file, so skipping for now. Will have to read a Zen##.cal file 

746 to get the full calibration. This shouldn't be a big issue cause it 

747 should roughly be the same for all channels and since the TF is 

748 computing the ratio they will cancel out. Though we should look 

749 more into this if just looking at calibrate time series. 

750 

751 """ 

752 return None 

753 # fap = None 

754 # find = False 

755 # return 

756 # if self.metadata.board_cal not in [None, []]: 

757 # if self.metadata.board_cal[0][0] == "": 

758 # return fap 

759 # sr_dict = {256: 0, 1024: 1, 4096: 4} 

760 # sr_int = sr_dict[int(self.sample_rate)] 

761 # fap_table = self.metadata.board_cal[ 

762 # np.where(self.metadata.board_cal.rate == sr_int) 

763 # ] 

764 # frequency = fap_table.frequency 

765 # amplitude = fap_table.amplitude 

766 # phase = fap_table.phase / 1e3 

767 # find = True 

768 # elif self.metadata.cal_board is not None: 

769 # try: 

770 # fap_dict = self.metadata.cal_board[int(self.sample_rate)] 

771 # frequency = fap_dict["frequency"] 

772 # amplitude = fap_dict["amplitude"] 

773 # phase = fap_dict["phase"] 

774 # find = True 

775 # except KeyError: 

776 # try: 

777 # fap_str = self.metadata.cal_board["cal.ch"] 

778 # for ss in fap_str.split(";"): 

779 # freq, _, resp = ss.split(",") 

780 # ff, amp, phs = [float(item) for item in resp.split(":")] 

781 # if float(freq) == self.sample_rate: 

782 # frequency = ff 

783 # amplitude = amp 

784 # phase = phs / 1e3 

785 # find = True 

786 # except KeyError: 

787 # return fap 

788 # if find: 

789 # freq = np.logspace(np.log10(6.00000e-04), np.log10(8.19200e03), 48) 

790 # amp = np.ones(48) 

791 # phases = np.zeros(48) 

792 # for item_f, item_a, item_p in zip(frequency, amplitude, phase): 

793 # index = np.abs(freq - item_f).argmin() 

794 # freq[index] = item_f 

795 # amp[index] = item_a 

796 # phases[index] = item_p 

797 # fap = FrequencyResponseTableFilter() 

798 # fap.units_in = "milliVolt" 

799 # fap.units_out = "milliVolt" 

800 # fap.frequencies = freq 

801 # fap.amplitudes = amp 

802 # fap.phases = phases 

803 # fap.name = ( 

804 # f"{self.header.data_logger.lower()}_{self.sample_rate:.0f}_response" 

805 # ) 

806 # fap.comments = "data logger response read from z3d file" 

807 # return fap 

808 # return None 

809 

810 @property 

811 def channel_response(self): 

812 """ 

813 Generate comprehensive channel response for the Z3D data. 

814 

815 Creates a ChannelResponse object containing all applicable filters 

816 including coil response, dipole conversion, and counts-to-milliVolt 

817 transformation. 

818 

819 Returns 

820 ------- 

821 ChannelResponse 

822 Channel response object with appropriate filter chain for 

823 converting raw Z3D data to physical units. 

824 

825 Notes 

826 ----- 

827 The filter chain includes: 

828 - Coil response (for magnetic channels) or dipole filter (for electric) 

829 - Counts to milliVolt conversion filter 

830 """ 

831 filter_list = [] 

832 # don't have a good handle on the zen response yet. 

833 # if self.zen_response: 

834 # filter_list.append(self.zen_response) 

835 frequencies = np.empty(0) 

836 if self.coil_response: 

837 filter_list.append(self.coil_response) 

838 frequencies = self.coil_response.frequencies 

839 elif self.dipole_filter: 

840 filter_list.append(self.dipole_filter) 

841 

842 filter_list.append(self.counts2mv_filter) 

843 return ChannelResponse(filters_list=filter_list, frequencies=frequencies) 

844 

845 @property 

846 def dipole_filter(self): 

847 """ 

848 Create dipole conversion filter for electric field measurements. 

849 

850 Generate a coefficient filter for converting electric field measurements 

851 from milliVolt per kilometer to milliVolt using the dipole length. 

852 

853 Returns 

854 ------- 

855 CoefficientFilter or None 

856 Filter object for dipole conversion if dipole_length is non-zero, 

857 None otherwise. 

858 

859 Notes 

860 ----- 

861 The gain is set to dipole_length/1000 to convert from mV/km to mV. 

862 This represents the physical dipole length scaling for electric 

863 field measurements. 

864 

865 Examples 

866 -------- 

867 >>> z3d = Z3D("/path/to/electric.Z3D") 

868 >>> z3d.read_all_info() 

869 >>> if z3d.dipole_filter is not None: 

870 ... print(f"Dipole length: {z3d.dipole_length} m") 

871 """ 

872 dipole = None 

873 # needs to be the inverse for processing 

874 if self.dipole_length != 0: 

875 dipole = CoefficientFilter() 

876 dipole.units_in = "milliVolt per kilometer" 

877 dipole.units_out = "milliVolt" 

878 dipole.name = f"dipole_{self.dipole_length:.2f}m" 

879 dipole.gain = self.dipole_length / 1000.0 

880 dipole.comments = "convert to electric field" 

881 return dipole 

882 

883 def _get_gps_stamp_type(self, old_version: bool = False) -> None: 

884 """ 

885 Set the correct GPS stamp data type. 

886 

887 Configure GPS stamp structure for different Z3D file versions. 

888 Older versions use 36-bit stamps while newer versions use 64-bit stamps. 

889 

890 Parameters 

891 ---------- 

892 old_version : bool, default False 

893 If True, configure for older Z3D file format with 36-bit stamps. 

894 If False, use newer 64-bit stamp format. 

895 """ 

896 if old_version is True: 

897 self._gps_dtype = np.dtype( 

898 [ 

899 ("gps", np.int32), 

900 ("time", np.int32), 

901 ("lat", np.float64), 

902 ("lon", np.float64), 

903 ("block_len", np.int32), 

904 ("gps_accuracy", np.int32), 

905 ("temperature", np.float32), 

906 ] 

907 ) 

908 self._gps_stamp_length = 36 

909 self._gps_bytes = self._gps_stamp_length / 4 

910 self._gps_flag_0 = -1 

911 self._block_len = int(self._gps_stamp_length + self.sample_rate * 4) 

912 self.gps_flag = self._gps_f0 

913 else: 

914 return 

915 

916 # ====================================== 

917 def _read_header( 

918 self, fn: str | Path | None = None, fid: BinaryIO | None = None 

919 ) -> None: 

920 """ 

921 Read header information from Z3D file. 

922 

923 Parameters 

924 ---------- 

925 fn : str, Path, or None, optional 

926 Full path to Z3D file to read. If None, uses current fn attribute. 

927 fid : BinaryIO or None, optional 

928 Open file object. If provided, reads from this instead of opening fn. 

929 

930 Notes 

931 ----- 

932 Populates the header object's attributes and handles version-specific 

933 logic for older Z3D file formats. 

934 

935 Examples 

936 -------- 

937 Read header from file path: 

938 

939 >>> z3d = Z3D() 

940 >>> z3d._read_header("/path/to/file.Z3D") 

941 

942 Read header from open file object: 

943 

944 >>> with open("/path/to/file.Z3D", 'rb') as f: 

945 ... z3d._read_header(fid=f) 

946 """ 

947 if fn is not None: 

948 self.fn = fn 

949 self.header.read_header(fn=self.fn, fid=fid) 

950 if self.header.old_version: 

951 if self.header.box_number is None: 

952 self.header.box_number = "6666" 

953 

954 # ====================================== 

955 def _read_schedule( 

956 self, fn: str | Path | None = None, fid: BinaryIO | None = None 

957 ) -> None: 

958 """ 

959 Read schedule information from Z3D file. 

960 

961 Parameters 

962 ---------- 

963 fn : str, Path, or None, optional 

964 Full path to Z3D file to read. If None, uses current fn attribute. 

965 fid : BinaryIO or None, optional 

966 Open file object. If provided, reads from this instead of opening fn. 

967 

968 Notes 

969 ----- 

970 Populates the schedule object's attributes. For older file versions, 

971 extracts schedule information from the header. 

972 

973 Examples 

974 -------- 

975 Read schedule from file path: 

976 

977 >>> z3d = Z3D() 

978 >>> z3d._read_schedule("/path/to/file.Z3D") 

979 

980 Read schedule from open file object: 

981 

982 >>> with open("/path/to/file.Z3D", 'rb') as f: 

983 ... z3d._read_schedule(fid=f) 

984 """ 

985 if fn is not None: 

986 self.fn = fn 

987 self.schedule.read_schedule(fn=self.fn, fid=fid) 

988 if self.header.old_version: 

989 self.schedule.initial_start = MTime( 

990 time_stamp=self.header.schedule, gps_time=True 

991 ) 

992 

993 # ====================================== 

994 def _read_metadata( 

995 self, fn: str | Path | None = None, fid: BinaryIO | None = None 

996 ) -> None: 

997 """ 

998 Read metadata information from Z3D file. 

999 

1000 Parameters 

1001 ---------- 

1002 fn : str, Path, or None, optional 

1003 Full path to Z3D file to read. If None, uses current fn attribute. 

1004 fid : BinaryIO or None, optional 

1005 Open file object. If provided, reads from this instead of opening fn. 

1006 

1007 Notes 

1008 ----- 

1009 Populates the metadata object's attributes. For older file versions, 

1010 sets schedule metadata length to 0. 

1011 

1012 Examples 

1013 -------- 

1014 Read metadata from file path: 

1015 

1016 >>> z3d = Z3D() 

1017 >>> z3d._read_metadata("/path/to/file.Z3D") 

1018 

1019 Read metadata from open file object: 

1020 

1021 >>> with open("/path/to/file.Z3D", 'rb') as f: 

1022 ... z3d._read_metadata(fid=f) 

1023 """ 

1024 if fn is not None: 

1025 self.fn = fn 

1026 if self.header.old_version: 

1027 self.metadata._schedule_metadata_len = 0 

1028 self.metadata.read_metadata(fn=self.fn, fid=fid) 

1029 

1030 # ===================================== 

1031 def read_all_info(self) -> None: 

1032 """ 

1033 Read header, schedule, and metadata from Z3D file. 

1034 

1035 Convenience method to read all file information in one call. 

1036 Opens the file once and reads all sections sequentially. 

1037 

1038 Raises 

1039 ------ 

1040 FileNotFoundError 

1041 If the Z3D file does not exist. 

1042 """ 

1043 with open(self.fn, "rb") as file_id: 

1044 self._read_header(fid=file_id) 

1045 self._read_schedule(fid=file_id) 

1046 self._read_metadata(fid=file_id) 

1047 

1048 def _find_first_gps_flag(self, fid: BinaryIO) -> int: 

1049 """ 

1050 Find the first GPS flag in the file. 

1051 

1052 The GPS flag should be at the end of the metadata, but sometimes 

1053 there are extra bytes. This method searches byte by byte to find 

1054 the first GPS flag. 

1055 

1056 Parameters 

1057 ---------- 

1058 fid : BinaryIO 

1059 File object positioned after metadata. 

1060 

1061 Returns 

1062 ------- 

1063 int 

1064 File position of the first GPS flag. 

1065 

1066 Notes 

1067 ----- 

1068 Includes failsafe to prevent infinite loops by limiting search 

1069 to first 15000 bytes. 

1070 """ 

1071 find_gps_flag = False 

1072 fid_tell = self.metadata.m_tell - 1 

1073 fid.seek(fid_tell) 

1074 # adding a fail safe to not have an infinite loop 

1075 while not find_gps_flag or fid_tell < 15000: 

1076 fid_tell += 1 

1077 fid.seek(fid_tell) 

1078 line = fid.read(4) 

1079 try: 

1080 line = np.frombuffer(line, np.int32)[0] 

1081 if line == np.int32(2147483647): 

1082 return fid_tell 

1083 except AttributeError: 

1084 continue 

1085 

1086 def _read_raw_string(self, fid: BinaryIO) -> np.ndarray: 

1087 """ 

1088 Read raw binary data from Z3D file into array. 

1089 

1090 Reads the entire data portion of the file as 32-bit integers, 

1091 starting from after the metadata section. 

1092 

1093 Parameters 

1094 ---------- 

1095 fid : BinaryIO 

1096 Open file object positioned after metadata. 

1097 

1098 Returns 

1099 ------- 

1100 numpy.ndarray 

1101 Array of int32 values containing both data and GPS stamps. 

1102 """ 

1103 # move the read value to where the end of the metadata is 

1104 self.metadata.m_tell = self._find_first_gps_flag(fid) 

1105 fid.seek(self.metadata.m_tell) 

1106 

1107 # initalize a data array filled with zeros, everything goes into 

1108 # this array then we parse later 

1109 data = np.zeros(self.n_samples, dtype=np.int32) 

1110 # go over a while loop until the data cound exceed the file size 

1111 data_count = 0 

1112 while True: 

1113 # need to make sure the last block read is a multiple of 32 bit 

1114 read_len = min( 

1115 [ 

1116 self._block_len, 

1117 int(32 * ((self.file_size - fid.tell()) // 32)), 

1118 ] 

1119 ) 

1120 test_str = np.frombuffer(fid.read(read_len), dtype=np.int32) 

1121 if len(test_str) == 0: 

1122 break 

1123 data[data_count : data_count + len(test_str)] = test_str 

1124 data_count += test_str.size 

1125 

1126 # # now we need to trim off the extra zeros at the end 

1127 # data = data[:data_count] 

1128 return data 

1129 

1130 def _unpack_data(self, data: np.ndarray, gps_stamp_index: list[int]) -> np.ndarray: 

1131 """ 

1132 Unpack GPS stamps from raw data array and extract clean time series. 

1133 

1134 Extract GPS timestamp information from the raw data array at 

1135 specified indices and return a contiguous array of time series data 

1136 with GPS stamps removed and all data blocks concatenated. 

1137 

1138 Parameters 

1139 ---------- 

1140 data : numpy.ndarray 

1141 Raw data array containing both time series and GPS stamps. 

1142 gps_stamp_index : list of int 

1143 Indices where GPS stamps are located in the data array. 

1144 

1145 Returns 

1146 ------- 

1147 numpy.ndarray 

1148 Clean time series data array with GPS stamps removed and 

1149 all data blocks concatenated, including final block after last GPS stamp. 

1150 """ 

1151 if len(gps_stamp_index) == 0: 

1152 return data 

1153 

1154 # Extract GPS stamps and calculate block lengths 

1155 time_series_blocks = [] 

1156 

1157 for ii, gps_find in enumerate(gps_stamp_index): 

1158 try: 

1159 data[gps_find + 1] 

1160 except IndexError: 

1161 self.logger.warning( 

1162 f"Failed gps stamp {ii+1} out of {len(gps_stamp_index)}" 

1163 ) 

1164 break 

1165 

1166 if ( 

1167 self.header.old_version is True 

1168 or data[gps_find + 1] == self._gps_flag_1 

1169 ): 

1170 # Extract GPS stamp 

1171 gps_str = struct.pack( 

1172 "<" + "i" * int(self._gps_bytes), 

1173 *data[int(gps_find) : int(gps_find + self._gps_bytes)], 

1174 ) 

1175 self.gps_stamps[ii] = np.frombuffer(gps_str, dtype=self._gps_dtype) 

1176 

1177 # Calculate block length and extract data block before this GPS stamp 

1178 if ii > 0: 

1179 block_start = gps_stamp_index[ii - 1] + int(self._gps_bytes) 

1180 block_end = gps_find 

1181 block_len = block_end - block_start 

1182 self.gps_stamps[ii]["block_len"] = block_len 

1183 

1184 # Extract the data block 

1185 if block_len > 0: 

1186 time_series_blocks.append(data[block_start:block_end]) 

1187 elif ii == 0: 

1188 # First GPS stamp - data before it (if any) 

1189 self.gps_stamps[ii]["block_len"] = 0 

1190 if gps_find > 0: 

1191 time_series_blocks.append(data[:gps_find]) 

1192 

1193 # Handle the final block of data after the last GPS stamp 

1194 if len(gps_stamp_index) > 0: 

1195 last_gps_pos = gps_stamp_index[-1] 

1196 remaining_data_start = last_gps_pos + int(self._gps_bytes) 

1197 

1198 if remaining_data_start < data.size: 

1199 final_block = data[remaining_data_start:] 

1200 # Only include final block if it contains significant non-zero data 

1201 non_zero_count = np.count_nonzero(final_block) 

1202 if non_zero_count > 0: 

1203 self.logger.debug( 

1204 f"Including final block with {final_block.size} samples " 

1205 f"({non_zero_count} non-zero)" 

1206 ) 

1207 time_series_blocks.append(final_block) 

1208 

1209 # Concatenate all blocks to form clean time series 

1210 if time_series_blocks: 

1211 clean_data = np.concatenate(time_series_blocks) 

1212 # Remove any remaining zeros 

1213 clean_data = clean_data[np.nonzero(clean_data)] 

1214 else: 

1215 clean_data = np.array([], dtype=data.dtype) 

1216 

1217 return clean_data 

1218 

1219 # ====================================== 

1220 def read_z3d(self, z3d_fn: str | Path | None = None) -> None: 

1221 """ 

1222 Read and parse Z3D file data. 

1223 

1224 Comprehensive method to read a Z3D file and populate all object attributes. 

1225 Performs the following operations: 

1226 

1227 1. Read file as chunks of 32-bit integers 

1228 2. Extract and validate GPS stamps 

1229 3. Check GPS time stamp consistency (1 second intervals) 

1230 4. Verify data block lengths match sampling rate 

1231 5. Convert GPS time to seconds relative to GPS week 

1232 6. Skip initial buffered data (first 2 seconds) 

1233 7. Populate time series array with non-zero data 

1234 

1235 Parameters 

1236 ---------- 

1237 z3d_fn : str, Path, or None, optional 

1238 Path to Z3D file to read. If None, uses current fn attribute. 

1239 

1240 Raises 

1241 ------ 

1242 ZenGPSError 

1243 If data is too short or GPS timing issues prevent parsing. 

1244 

1245 Examples 

1246 -------- 

1247 >>> z3d = Z3D(r"/path/to/data/station_20150522_080000_256_EX.Z3D") 

1248 >>> z3d.read_z3d() 

1249 >>> print(f"Read {z3d.time_series.size} data points") 

1250 """ 

1251 if z3d_fn is not None: 

1252 self.fn = z3d_fn 

1253 self.logger.debug(f"Reading {self.fn}") 

1254 st = MTime().now() 

1255 

1256 # using the with statement works in Python versions 2.7 or higher 

1257 # the added benefit of the with statement is that it will close the 

1258 # file object upon reading completion. 

1259 with open(self.fn, "rb") as file_id: 

1260 self._read_header(fid=file_id) 

1261 self._read_schedule(fid=file_id) 

1262 self._read_metadata(fid=file_id) 

1263 

1264 if self.header.old_version is True: 

1265 self._get_gps_stamp_type(True) 

1266 data = self._read_raw_string(file_id) 

1267 self.raw_data = data.copy() 

1268 

1269 # find the gps stamps 

1270 gps_stamp_find = self.get_gps_stamp_index(data, self.header.old_version) 

1271 

1272 # skip the first two stamps and trim data 

1273 try: 

1274 data = data[gps_stamp_find[self.num_sec_to_skip] :] 

1275 except IndexError: 

1276 msg = f"Data is too short, cannot open file {self.fn}" 

1277 self.logger.error(msg) 

1278 raise ZenGPSError(msg) 

1279 # find gps stamps of the trimmed data 

1280 gps_stamp_find = self.get_gps_stamp_index(data, self.header.old_version) 

1281 

1282 # read data chunks and GPS stamps 

1283 self.gps_stamps = np.zeros(len(gps_stamp_find), dtype=self._gps_dtype) 

1284 self.time_series = self._unpack_data(data, gps_stamp_find) 

1285 

1286 # validate everything 

1287 if not self.validate_gps_time(): 

1288 self.logger.debug( 

1289 f"GPS stamps are not 1 second apart for file {self.fn.name}." 

1290 ) 

1291 if not self.validate_time_blocks(): 

1292 self.logger.debug( 

1293 f"Time block between stamps was not the sample rate for file {self.fn.name}" 

1294 ) 

1295 self.convert_gps_time() 

1296 self.zen_schedule = self.check_start_time() 

1297 self.logger.debug(f"found {self.gps_stamps.shape[0]} GPS time stamps") 

1298 self.logger.debug(f"found {self.time_series.size} data points") 

1299 

1300 # time it 

1301 et = MTime().now() 

1302 read_time = et - st 

1303 self.logger.debug(f"Reading data took: {read_time:.3f} seconds") 

1304 

1305 # ================================================= 

1306 def get_gps_stamp_index( 

1307 self, ts_data: np.ndarray, old_version: bool = False 

1308 ) -> list[int]: 

1309 """ 

1310 Locate GPS time stamp indices in time series data. 

1311 

1312 Searches for GPS flag patterns in the data array. For newer files, 

1313 verifies that flag_1 follows flag_0. 

1314 

1315 Parameters 

1316 ---------- 

1317 ts_data : numpy.ndarray 

1318 Time series data array containing GPS stamps. 

1319 old_version : bool, default False 

1320 If True, only searches for single GPS flag (old format). 

1321 If False, validates flag pairs (new format). 

1322 

1323 Returns 

1324 ------- 

1325 list of int 

1326 List of indices where GPS stamps are located. 

1327 """ 

1328 # find the gps stamps 

1329 gps_stamp_find = np.where(ts_data == self._gps_flag_0)[0] 

1330 

1331 if old_version is False: 

1332 gps_stamp_find = [ 

1333 gps_find 

1334 for gps_find in gps_stamp_find 

1335 if ts_data[gps_find + 1] == self._gps_flag_1 

1336 ] 

1337 return list(gps_stamp_find) 

1338 

1339 # ================================================= 

1340 def trim_data(self) -> None: 

1341 """ 

1342 Trim the first 2 seconds of data due to SD buffer issues. 

1343 

1344 Remove the first 2 GPS stamps and corresponding time series data 

1345 to account for SD card buffering artifacts in early data. 

1346 

1347 Notes 

1348 ----- 

1349 This method may be deprecated after field testing confirms 

1350 the buffer behavior is consistent across all instruments. 

1351 

1352 .. deprecated:: 

1353 This method will be deprecated after field testing. 

1354 """ 

1355 # the block length is the number of data points before the time stamp 

1356 # therefore the first block length is 0. The indexing in python 

1357 # goes to the last index - 1 so we need to put in 3 

1358 ts_skip = self.gps_stamps["block_len"][0:3].sum() 

1359 self.gps_stamps = self.gps_stamps[2:] 

1360 self.gps_stamps[0]["block_len"] = 0 

1361 self.time_series = self.time_series[ts_skip:] 

1362 

1363 # ================================================= 

1364 def check_start_time(self) -> MTime: 

1365 """ 

1366 Validate scheduled start time against first GPS stamp. 

1367 

1368 Compare the scheduled start time from the file header with 

1369 the actual first GPS timestamp to identify timing discrepancies. 

1370 

1371 Returns 

1372 ------- 

1373 MTime 

1374 UTC start time from the first valid GPS stamp. 

1375 

1376 Notes 

1377 ----- 

1378 Logs warnings if the difference exceeds the maximum allowed 

1379 time difference (default 20 seconds). 

1380 """ 

1381 # make sure the time is in gps time 

1382 zen_start_utc = self.get_UTC_date_time( 

1383 self.header.gpsweek, float(self.gps_stamps["time"][0]) 

1384 ) 

1385 

1386 # estimate the time difference between the two 

1387 time_diff = zen_start_utc - self.schedule.initial_start 

1388 if time_diff > self._max_time_diff: 

1389 self.logger.warning(f"ZEN Scheduled time was {self.schedule.initial_start}") 

1390 self.logger.warning(f"1st good stamp was {zen_start_utc}") 

1391 self.logger.warning(f"difference of {time_diff:.2f} seconds") 

1392 

1393 self.logger.debug(f"Scheduled time was {self.schedule.initial_start}") 

1394 self.logger.debug(f"1st good stamp was {zen_start_utc}") 

1395 self.logger.debug(f"difference of {time_diff:.2f} seconds") 

1396 

1397 return zen_start_utc 

1398 

1399 # ================================================== 

1400 def validate_gps_time(self) -> bool: 

1401 """ 

1402 Validate that GPS time stamps are consistently 1 second apart. 

1403 

1404 Returns 

1405 ------- 

1406 bool 

1407 True if all GPS stamps are properly spaced, False otherwise. 

1408 

1409 Notes 

1410 ----- 

1411 Logs debug information for any stamps that are more than 1 second apart. 

1412 """ 

1413 # need to put the gps time into seconds 

1414 t_diff = np.diff(self.gps_stamps["time"]) / 1024 

1415 

1416 bad_times = np.where(abs(t_diff) > 1)[0] 

1417 if len(bad_times) > 0: 

1418 for bb in bad_times: 

1419 self.logger.warning( 

1420 f"Data block {bb} has a time difference between GPS stamps > 1 second [{t_diff[bb]} s]" 

1421 ) 

1422 return False 

1423 return True 

1424 

1425 # =================================================== 

1426 def validate_time_blocks(self) -> bool: 

1427 """ 

1428 Validate GPS time stamps and verify data block lengths. 

1429 

1430 Check that each GPS stamp block contains the expected number 

1431 of data points (should equal sample rate for 1-second blocks). 

1432 

1433 Returns 

1434 ------- 

1435 bool 

1436 True if all blocks have correct length, False otherwise. 

1437 

1438 Notes 

1439 ----- 

1440 If bad blocks are detected near the beginning (index < 5), 

1441 this method will automatically skip those blocks and trim 

1442 the time series data accordingly. 

1443 """ 

1444 # first check if the gps stamp blocks are of the correct length 

1445 bad_blocks = np.where(self.gps_stamps["block_len"][1:] != self.header.ad_rate)[ 

1446 0 

1447 ] 

1448 

1449 if len(bad_blocks) > 0: 

1450 if bad_blocks.max() < 5: 

1451 ts_skip = self.gps_stamps["block_len"][0 : bad_blocks[-1] + 1].sum() 

1452 self.gps_stamps = self.gps_stamps[bad_blocks[-1] :] 

1453 self.time_series = self.time_series[ts_skip:] 

1454 

1455 self.logger.warning(f"Skipped the first {bad_blocks[-1]} seconds") 

1456 self.logger.warning(f"Skipped first {ts_skip} points in time series") 

1457 return True 

1458 for bb in bad_blocks: 

1459 self.logger.warning( 

1460 f"Data block {bb} has {self.gps_stamps['block_len'][bb+1]} samples, " 

1461 f"expected {self.header.ad_rate} samples." 

1462 ) 

1463 return False 

1464 return True 

1465 

1466 # ================================================== 

1467 def convert_gps_time(self) -> None: 

1468 """ 

1469 Convert GPS time integers to floating point seconds. 

1470 

1471 Transform GPS time from integer format to float and convert 

1472 from GPS time units to seconds relative to the GPS week. 

1473 

1474 Notes 

1475 ----- 

1476 GPS time is initially stored as integers in units of 1/1024 seconds. 

1477 This method converts to floating point seconds and applies the 

1478 necessary scaling factors. 

1479 """ 

1480 # need to convert gps_time to type float from int 

1481 dt = self._gps_dtype.descr 

1482 if self.header.old_version is True: 

1483 dt[1] = ("time", np.float32) 

1484 else: 

1485 dt[2] = ("time", np.float32) 

1486 self.gps_stamps = self.gps_stamps.astype(np.dtype(dt)) 

1487 

1488 # convert to seconds 

1489 # these are seconds relative to the gps week 

1490 time_conv = self.gps_stamps["time"].copy() / 1024.0 

1491 time_ms = (time_conv - np.floor(time_conv)) * 1.024 

1492 time_conv = np.floor(time_conv) + time_ms 

1493 

1494 self.gps_stamps["time"][:] = time_conv 

1495 

1496 # ================================================== 

1497 def convert_counts_to_mv(self, data: np.ndarray) -> np.ndarray: 

1498 """ 

1499 Convert time series data from counts to milliVolt. 

1500 

1501 Parameters 

1502 ---------- 

1503 data : numpy.ndarray 

1504 Time series data in digital counts. 

1505 

1506 Returns 

1507 ------- 

1508 numpy.ndarray 

1509 Time series data converted to milliVolt. 

1510 """ 

1511 data *= self._counts_to_mv_conversion 

1512 return data 

1513 

1514 def convert_mv_to_counts(self, data: np.ndarray) -> np.ndarray: 

1515 """ 

1516 Convert time series data from milliVolt to counts. 

1517 

1518 Parameters 

1519 ---------- 

1520 data : numpy.ndarray 

1521 Time series data in milliVolt. 

1522 

1523 Returns 

1524 ------- 

1525 numpy.ndarray 

1526 Time series data converted to digital counts. 

1527 

1528 Notes 

1529 ----- 

1530 Assumes no other scaling has been applied to the data. 

1531 """ 

1532 data /= self._counts_to_mv_conversion 

1533 return data 

1534 

1535 # ================================================== 

1536 def get_gps_time(self, gps_int: int, gps_week: int = 0) -> tuple[float, int]: 

1537 """ 

1538 Convert GPS integer timestamp to seconds and GPS week. 

1539 

1540 Parameters 

1541 ---------- 

1542 gps_int : int 

1543 Integer from the GPS time stamp line. 

1544 gps_week : int, default 0 

1545 Relative GPS week. If seconds exceed one week, this is incremented. 

1546 

1547 Returns 

1548 ------- 

1549 tuple[float, int] 

1550 GPS time in seconds from beginning of GPS week, and updated GPS week. 

1551 

1552 Notes 

1553 ----- 

1554 GPS integers are in units of 1/1024 seconds. This method handles 

1555 week rollovers when seconds exceed 604800. 

1556 """ 

1557 gps_seconds = gps_int / 1024.0 

1558 

1559 gps_ms = (gps_seconds - np.floor(gps_int / 1024.0)) * (1.024) 

1560 

1561 cc = 0 

1562 if gps_seconds > self._week_len: 

1563 gps_week += 1 

1564 cc = gps_week * self._week_len 

1565 gps_seconds -= self._week_len 

1566 gps_time = np.floor(gps_seconds) + gps_ms + cc 

1567 

1568 return gps_time, gps_week 

1569 

1570 # ================================================== 

1571 def get_UTC_date_time(self, gps_week: int, gps_time: float) -> MTime: 

1572 """ 

1573 Convert GPS week and time to UTC datetime. 

1574 

1575 Calculate the actual UTC date and time of measurement from 

1576 GPS week number and seconds within that week. 

1577 

1578 Parameters 

1579 ---------- 

1580 gps_week : int 

1581 GPS week number when data was collected. 

1582 gps_time : float 

1583 Number of seconds from beginning of GPS week. 

1584 

1585 Returns 

1586 ------- 

1587 MTime 

1588 UTC datetime object for the measurement time. 

1589 

1590 Notes 

1591 ----- 

1592 Automatically handles GPS time rollover when seconds exceed 

1593 one week (604800 seconds). 

1594 """ 

1595 # need to check to see if the time in seconds is more than a gps week 

1596 # if it is add 1 to the gps week and reduce the gps time by a week 

1597 if gps_time > self._week_len: 

1598 gps_week += 1 

1599 gps_time -= self._week_len 

1600 # compute seconds using weeks and gps time 

1601 # Convert gps_time to Python float to avoid precision issues with numpy types 

1602 utc_seconds = ( 

1603 self._gps_epoch.epoch_seconds 

1604 + (gps_week * self._week_len) 

1605 + float(gps_time) 

1606 ) 

1607 

1608 # compute date and time from seconds and return a datetime object 

1609 # easier to manipulate later, must be in nanoseconds 

1610 return MTime(time_stamp=utc_seconds, gps_time=True) 

1611 

1612 # ================================================= 

1613 def to_channelts(self) -> ChannelTS: 

1614 """ 

1615 Convert Z3D data to ChannelTS time series object. 

1616 

1617 Create a ChannelTS object populated with the time series data 

1618 and all associated metadata from the Z3D file. 

1619 

1620 Returns 

1621 ------- 

1622 ChannelTS 

1623 Time series object with data, metadata, and instrument response. 

1624 """ 

1625 return ChannelTS( 

1626 self.channel_metadata.type, 

1627 data=self.time_series, 

1628 channel_metadata=self.channel_metadata, 

1629 station_metadata=self.station_metadata, 

1630 run_metadata=self.run_metadata, 

1631 channel_response=self.channel_response, 

1632 ) 

1633 

1634 

1635# ============================================================================== 

1636# Error instances for Zen 

1637# ============================================================================== 

1638class ZenGPSError(Exception): 

1639 """Exception raised for GPS timing errors in Z3D files.""" 

1640 

1641 

1642class ZenSamplingRateError(Exception): 

1643 """Exception raised for sampling rate inconsistencies.""" 

1644 

1645 

1646class ZenInputFileError(Exception): 

1647 """Exception raised for Z3D file input/reading errors.""" 

1648 

1649 

1650def read_z3d( 

1651 fn: str | Path, 

1652 calibration_fn: str | Path | None = None, 

1653 logger_file_handler: Any | None = None, 

1654) -> ChannelTS | None: 

1655 """ 

1656 Read a Z3D file and return a ChannelTS object. 

1657 

1658 Convenience function to read Z3D files with error handling. 

1659 

1660 Parameters 

1661 ---------- 

1662 fn : str or Path 

1663 Path to the Z3D file to read. 

1664 calibration_fn : str, Path, or None, optional 

1665 Path to calibration file. Default is None. 

1666 logger_file_handler : optional 

1667 Logger file handler to add to Z3D logger. Default is None. 

1668 

1669 Returns 

1670 ------- 

1671 ChannelTS or None 

1672 Time series object if successful, None if GPS timing errors occur. 

1673 

1674 Examples 

1675 -------- 

1676 >>> ts = read_z3d("/path/to/data/station_EX.Z3D") 

1677 >>> if ts is not None: 

1678 ... print(f"Read {ts.n_samples} samples") 

1679 """ 

1680 z3d_obj = Z3D(fn, calibration_fn=calibration_fn) 

1681 if logger_file_handler: 

1682 z3d_obj.logger.addHandler(logger_file_handler) 

1683 try: 

1684 z3d_obj.read_z3d() 

1685 except ZenGPSError as error: 

1686 z3d_obj.logger.exception(error) 

1687 z3d_obj.logger.warning(f"Skipping {fn}, check file for GPS timing.") 

1688 return None 

1689 return z3d_obj.to_channelts()