Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ clients \ zen.py: 96%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1#!/usr/bin/env python 

2# coding: utf-8 

3 

4# ============================================================================= 

5# Imports 

6# ============================================================================= 

7from pathlib import Path 

8 

9from mth5 import read_file 

10from mth5.clients.base import ClientBase 

11from mth5.io.zen import Z3DCollection 

12from mth5.mth5 import MTH5 

13 

14 

15# ============================================================================= 

16 

17 

18class ZenClient(ClientBase): 

19 def __init__( 

20 self, 

21 data_path, 

22 sample_rates=[4096, 1024, 256], 

23 save_path=None, 

24 calibration_path=None, 

25 mth5_filename="from_zen.h5", 

26 **kwargs, 

27 ): 

28 super().__init__( 

29 data_path, 

30 save_path=save_path, 

31 sample_rates=sample_rates, 

32 mth5_filename=mth5_filename, 

33 **kwargs, 

34 ) 

35 

36 self.calibration_path = calibration_path 

37 self.collection = Z3DCollection(self.data_path) 

38 self.station_stem = None 

39 

40 @property 

41 def calibration_path(self): 

42 """Path to calibration data""" 

43 return self._calibration_path 

44 

45 @calibration_path.setter 

46 def calibration_path(self, value): 

47 """ 

48 

49 :param value: DESCRIPTION 

50 :type value: TYPE 

51 :return: DESCRIPTION 

52 :rtype: TYPE 

53 

54 """ 

55 

56 if value is not None: 

57 self._calibration_path = Path(value) 

58 if not self._calibration_path.exists(): 

59 raise IOError(f"Could not find {self._calibration_path}") 

60 

61 else: 

62 self._calibration_path = None 

63 # raise ValueError("calibration_path cannot be None") 

64 

65 def get_run_dict(self): 

66 """ 

67 Get Run information 

68 

69 :return: DESCRIPTION 

70 :rtype: TYPE 

71 

72 """ 

73 

74 return self.collection.get_runs( 

75 sample_rates=self.sample_rates, 

76 calibration_path=self.calibration_path, 

77 ) 

78 

79 def get_survey(self, station_dict): 

80 """ 

81 get survey name from a dictionary of a single station of runs 

82 :param station_dict: DESCRIPTION 

83 :type station_dict: TYPE 

84 :return: DESCRIPTION 

85 :rtype: TYPE 

86 

87 """ 

88 

89 return list( 

90 set([station_dict[k].survey.unique()[0] for k in station_dict.keys()]) 

91 )[0] 

92 

93 def _get_station_group_id(self, station_id): 

94 """ 

95 Get the station group id from the station id. 

96 :param station_id: DESCRIPTION 

97 :type station_id: TYPE 

98 :return: DESCRIPTION 

99 :rtype: TYPE 

100 

101 """ 

102 if self.station_stem is not None: 

103 if not station_id.startswith(self.station_stem): 

104 return f"{self.station_stem}{station_id}" 

105 else: 

106 return station_id 

107 else: 

108 return station_id 

109 

110 def make_mth5_from_zen(self, survey_id=None, combine=True, **kwargs): 

111 """ 

112 Make an MTH5 from Phoenix files. Split into runs, account for filters 

113 

114 :param data_path: DESCRIPTION, defaults to None 

115 :type data_path: TYPE, optional 

116 :param sample_rates: DESCRIPTION, defaults to None 

117 :type sample_rates: TYPE, optional 

118 :param save_path: DESCRIPTION, defaults to None 

119 :type save_path: TYPE, optional 

120 :return: DESCRIPTION 

121 :rtype: TYPE 

122 

123 """ 

124 

125 for key, value in kwargs.items(): 

126 if value is not None: 

127 setattr(self, key, value) 

128 

129 runs = self.get_run_dict() 

130 

131 with MTH5(**self.h5_kwargs) as m: 

132 m.open_mth5(self.save_path, self.mth5_file_mode) 

133 

134 for station_id, station_dict in runs.items(): 

135 if survey_id is None: 

136 survey_id = self.get_survey(station_dict) 

137 survey_group = m.add_survey(survey_id) 

138 station_group_id = self._get_station_group_id(station_id) 

139 station_group = survey_group.stations_group.add_station( 

140 station_group_id 

141 ) 

142 station_group.metadata.update( 

143 self.collection.station_metadata_dict[station_id] 

144 ) 

145 station_group.metadata.id = station_group_id 

146 station_group.write_metadata() 

147 if combine: 

148 run_list = [] 

149 for run_id, run_df in station_dict.items(): 

150 run_group = station_group.add_run(run_id) 

151 for row in run_df.itertuples(): 

152 ch_ts = read_file( 

153 row.fn, 

154 calibration_fn=row.calibration_fn, 

155 ) 

156 run_group.from_channel_ts(ch_ts) 

157 run_group.update_metadata() 

158 if combine: 

159 run_list.append(run_group.to_runts()) 

160 if combine: 

161 # Combine runs and down sample to 1 second. 

162 combined_run = run_list[0].merge(run_list[1:], new_sample_rate=1) 

163 combined_run.run_metadata.id = "sr1_0001" 

164 combined_run_group = station_group.add_run("sr1_0001") 

165 combined_run_group.from_runts(combined_run) 

166 combined_run_group.update_metadata() 

167 station_group.update_metadata() 

168 survey_group.update_metadata() 

169 

170 self.logger.info(f"Wrote MTH5 file to: {self.save_path}") 

171 

172 return self.save_path