Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ scripps \ zenc.py: 39%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Jan 25 11:36:55 2024 

4 

5@author: jpeacock 

6""" 

7 

8# ============================================================================= 

9# Imports 

10# ============================================================================= 

11from collections import OrderedDict 

12 

13from loguru import logger 

14 

15from mth5.mth5 import MTH5 

16from mth5.timeseries import RunTS 

17 

18 

19# ============================================================================= 

20 

21 

22class ZENC: 

23 """ 

24 Deal with .zenc files, which are apparently used to process data in EMTF. 

25 It was specifically built for processing ZEN data in EMTF, but should 

26 work regardless of data logger. 

27 

28 The format is a header and then n_channels x n_samples of float32 values 

29 

30 This class will read/write .zenc files. 

31 

32 You need to input the path to an existing or new MTH5 file and a 

33 channel map to read/write. 

34 

35 The `channel_map` needs to be in the form 

36 

37 .. code-block:: 

38 channel_map = { 

39 "channel_1_name": 

40 {"survey": survey_name, 

41 "station": station_name, 

42 "run": run_name, 

43 "channel": channel_name, 

44 "channel_number": channel_number}, 

45 "channel_2_name": 

46 {"survey": survey_name, 

47 "station": station_name, 

48 "run": run_name, 

49 "channel": channel_name, 

50 "channel_number": channel_number}, 

51 ... 

52 } 

53 

54 

55 """ 

56 

57 def __init__(self, channel_map): 

58 self.logger = logger 

59 self._channel_map_keys = [ 

60 "survey", 

61 "station", 

62 "run", 

63 "channel", 

64 "channel_number", 

65 ] 

66 self._expected_channel_order = ["hx", "hy", "hz", "ex", "ey"] 

67 self.channel_map = channel_map 

68 

69 @property 

70 def channel_map(self): 

71 return self._channel_map 

72 

73 @channel_map.setter 

74 def channel_map(self, value): 

75 """ 

76 need to make sure channel map is in the correct format 

77 

78 :param value: dictionary of channels to use 

79 :type value: dict 

80 

81 """ 

82 

83 if not isinstance(value, dict): 

84 raise ValueError( 

85 f"Input channel_map must be a dictionary not type{type(value)}" 

86 ) 

87 

88 for key, kdict in value.items(): 

89 if not isinstance(kdict, dict): 

90 raise ValueError( 

91 f"Input channel must be a dictionary not type{type(value)}" 

92 ) 

93 if sorted(kdict.keys()) != sorted(self._channel_map_keys): 

94 raise KeyError( 

95 f"Keys of channel dictionary must be {self._channel_map_keys} " 

96 f"not {kdict.keys()}." 

97 ) 

98 self._channel_map = self._sort_channel_map(value) 

99 

100 def _sort_channel_map(self, channel_map): 

101 """ 

102 sort by channel number 

103 

104 :param channel_map: DESCRIPTION 

105 :type channel_map: TYPE 

106 :return: DESCRIPTION 

107 :rtype: TYPE 

108 

109 """ 

110 sorted_channel_map = OrderedDict() 

111 

112 for ch in self._expected_channel_order: 

113 try: 

114 sorted_channel_map[ch] = channel_map[ch] 

115 except KeyError: 

116 self.logger.info(f"Could not find {ch} in channel_map, skipping") 

117 

118 return sorted_channel_map 

119 

120 def to_zenc(self, mth5_file, channel_map=None): 

121 """ 

122 write out a .zenc file 

123 

124 :param mth5_file: DESCRIPTION 

125 :type mth5_file: TYPE 

126 :param channel_map: DESCRIPTION, defaults to None 

127 :type channel_map: TYPE, optional 

128 :return: DESCRIPTION 

129 :rtype: TYPE 

130 

131 """ 

132 

133 if channel_map is not None: 

134 self.channel_map = channel_map 

135 

136 with MTH5() as m: 

137 m.open_mth5(mth5_file, mode="r") 

138 ch_list = [] 

139 ch_metadata_list = [] 

140 for key, ch_dict in self.channel_map.items(): 

141 ch = m.get_channel( 

142 ch_dict["station"], 

143 ch_dict["run"], 

144 ch_dict["channel"], 

145 survey=ch_dict["survey"], 

146 ).to_channel_ts() 

147 ch_list.append(ch) 

148 ch_metadata_list.append(self._get_ch_metadata(ch)) 

149 

150 run = RunTS(ch_list) 

151 

152 # write out file 

153 # write metadata 

154 

155 with open(mth5_file, "w") as fid: 

156 lines = self._write_metadata(run) 

157 fid.write("\n".join(lines)) 

158 

159 for ii in range(len(run.time)): 

160 for comp in self._expected_channel_order: 

161 run.dataset[comp].data[ii] 

162 

163 # write data as (hx, hy, hz, ex, ey, ...) 

164 

165 def _write_metadata(self, run_ts): 

166 """ 

167 write metadata for the zenc file. 

168 

169 of the form 

170 

171 4096 

172 version: 1.0 

173 boxNumber: 74 

174 samplingFrequency: 4096 

175 timeDataStart: 2021-07-23 08:00:14 

176 timeDataEnd: 2021-07-23 08:14:58 

177 latitude: 58.22444 

178 longitude: -155.66579 

179 altitude: 251.30000 

180 rx_stn: 1 

181 TxFreq: 0 

182 TxDuty: inf 

183 numChans: 5 

184 channel: 1 

185 component: Hx 

186 length: 

187 sensorID: 4044 

188 azimuth: 0 

189 xyz1: 0:0:0 

190 xyz2: 0:0:0 

191 units: V 

192 countconversion: 9.5367431640625e-10 

193 next channel 

194 

195 :param run_ts: DESCRIPTION 

196 :type run_ts: TYPE 

197 :return: DESCRIPTION 

198 :rtype: TYPE 

199 

200 """ 

201 

202 lines = [run_ts.sample_rate] 

203 for key, value in self.get_run_metadata(run_ts): 

204 lines.append(f"{key}: {value}") 

205 

206 for ch in run_ts.channels: 

207 for key, value in self.get_ch_metadata(run_ts[ch]): 

208 lines.append(f"{key}: {value}") 

209 

210 return lines 

211 

212 def get_run_metadata(self, run_ts): 

213 """ 

214 get run metadata from RunTS object 

215 

216 4096 

217 version: 1.0 

218 boxNumber: 74 

219 samplingFrequency: 4096 

220 timeDataStart: 2021-07-23 08:00:14 

221 timeDataEnd: 2021-07-23 08:14:58 

222 latitude: 58.22444 

223 longitude: -155.66579 

224 altitude: 251.30000 

225 rx_stn: 1 

226 TxFreq: 0 

227 TxDuty: inf 

228 numChans: 5 

229 

230 :param run_ts: DESCRIPTION 

231 :type run_ts: TYPE 

232 :return: DESCRIPTION 

233 :rtype: TYPE 

234 

235 """ 

236 

237 run_dict = OrderedDict() 

238 run_dict["version"] = 1.0 

239 run_dict["boxNumber"] = run_ts.run_metadata.data_logger.id 

240 run_dict["samplingFrequency"] = run_ts.sample_rate 

241 run_dict["timeDataStart"] = run_ts.start 

242 run_dict["timeDataEnd"] = run_ts.end 

243 run_dict["latitude"] = run_ts.station_metadata.location.latitude 

244 run_dict["longitude"] = run_ts.station_metadata.location.longitude 

245 run_dict["altitude"] = run_ts.station_metadata.location.elevation 

246 run_dict["rx_stn"] = run_ts.station_metadata.id 

247 run_dict["TxFreq"] = 0 

248 run_dict["TxDuty"] = "inf" 

249 run_dict["numChans"] = len(run_ts.channels) 

250 

251 return run_dict 

252 

253 def _get_ch_metadata(self, ch): 

254 """ 

255 get channel metadata from ChannelTS 

256 

257 channel: 1 

258 component: Hx 

259 length: 

260 sensorID: 4044 

261 azimuth: 0 

262 xyz1: 0:0:0 

263 xyz2: 0:0:0 

264 units: V 

265 countconversion: 9.5367431640625e-10 

266 

267 :param ch: DESCRIPTION 

268 :type ch: TYPE 

269 :return: DESCRIPTION 

270 :rtype: TYPE 

271 

272 """ 

273 

274 ch_dict = OrderedDict() 

275 ch_dict["channel"] = ch.channel_metadata.channel_number 

276 ch_dict["compnent"] = ch.channel_metadata.component 

277 if hasattr(ch.channel_metadata, "dipole_length"): 

278 ch_dict["length"] = ch.channel_metadata.dipole_length 

279 else: 

280 ch_dict["length"] = None 

281 

282 if ch.channel_metadata.type.lower() in ["magnetic"]: 

283 ch_dict["sensorID"] = ch.channel_metadata.sensor.id 

284 else: 

285 ch_dict["sensorID"] = None 

286 ch_dict["azimuth"] = ch.channel_metadata.measurement_azimuth 

287 ch_dict["xyz1"] = "0:0:0" 

288 ch_dict["xyz2"] = "0:0:0" 

289 ch_dict["units"] = ch.channel_metadata.units 

290 ch_dict["countconversion"] = 9.5367431640625e-10 

291 

292 return ch_dict