Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ io \ usgs_ascii \ usgs_ascii.py: 84%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:01 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Thu Aug 27 16:54:09 2020 

4 

5:author: Jared Peacock 

6 

7:license: MIT 

8 

9""" 

10 

11# ============================================================================= 

12# Imports 

13# ============================================================================= 

14 

15import datetime 

16import gzip 

17import time 

18from pathlib import Path 

19 

20import numpy as np 

21import pandas as pd 

22 

23from mth5.io.usgs_ascii import AsciiMetadata 

24from mth5.timeseries import ChannelTS, RunTS 

25 

26 

27# ============================================================================= 

28# Metadata for usgs ascii file 

29# ============================================================================= 

30 

31 

32class USGSascii(AsciiMetadata): 

33 """ 

34 Read and write USGS ascii formatted time series. 

35 

36 =================== ======================================================= 

37 Attributes Description 

38 =================== ======================================================= 

39 ts Pandas dataframe holding the time series data 

40 fn Full path to .asc file 

41 station_dir Full path to station directory 

42 meta_notes Notes of how the station was collected 

43 =================== ======================================================= 

44 

45 :Example: :: 

46 

47 >>> zc = Z3DCollection() 

48 >>> fn_list = zc.get_time_blocks(z3d_path) 

49 >>> zm = USGSasc() 

50 >>> zm.SurveyID = 'iMUSH' 

51 >>> zm.get_z3d_db(fn_list[0]) 

52 >>> zm.read_mtft24_cfg() 

53 >>> zm.CoordinateSystem = 'Geomagnetic North' 

54 >>> zm.SurveyID = 'MT' 

55 >>> zm.write_asc_file(str_fmt='%15.7e') 

56 >>> zm.write_station_info_metadata() 

57 

58 """ 

59 

60 def __init__(self, fn=None, **kwargs): 

61 super().__init__(fn, **kwargs) 

62 self.ts = None 

63 self.station_dir = Path().cwd() 

64 self.meta_notes = None 

65 for key in kwargs.keys(): 

66 setattr(self, key, kwargs[key]) 

67 

68 ### need copy in the metadata otherwise the order in the channels 

69 ### gets messed up 

70 

71 @property 

72 def hx(self): 

73 """HX""" 

74 if self.ts is not None: 

75 return ChannelTS( 

76 "magnetic", 

77 data=self.ts.hx.to_numpy(), 

78 channel_metadata=self.hx_metadata.copy(), 

79 run_metadata=self.run_metadata.copy(), 

80 station_metadata=self.station_metadata.copy(), 

81 survey_metadata=self.survey_metadata.copy(), 

82 ) 

83 return None 

84 

85 @property 

86 def hy(self): 

87 """hy""" 

88 if self.ts is not None: 

89 return ChannelTS( 

90 "magnetic", 

91 data=self.ts.hy.to_numpy(), 

92 channel_metadata=self.hy_metadata.copy(), 

93 run_metadata=self.run_metadata.copy(), 

94 station_metadata=self.station_metadata.copy(), 

95 survey_metadata=self.survey_metadata.copy(), 

96 ) 

97 return None 

98 

99 @property 

100 def hz(self): 

101 """hz""" 

102 if self.ts is not None: 

103 return ChannelTS( 

104 "magnetic", 

105 data=self.ts.hz.to_numpy(), 

106 channel_metadata=self.hz_metadata.copy(), 

107 run_metadata=self.run_metadata.copy(), 

108 station_metadata=self.station_metadata.copy(), 

109 survey_metadata=self.survey_metadata.copy(), 

110 ) 

111 return None 

112 

113 @property 

114 def ex(self): 

115 """ex""" 

116 if self.ts is not None: 

117 return ChannelTS( 

118 "electric", 

119 data=self.ts.ex.to_numpy(), 

120 channel_metadata=self.ex_metadata.copy(), 

121 run_metadata=self.run_metadata.copy(), 

122 station_metadata=self.station_metadata.copy(), 

123 survey_metadata=self.survey_metadata.copy(), 

124 ) 

125 return None 

126 

127 @property 

128 def ey(self): 

129 """ey""" 

130 if self.ts is not None: 

131 return ChannelTS( 

132 "electric", 

133 data=self.ts.ey.to_numpy(), 

134 channel_metadata=self.ey_metadata.copy(), 

135 run_metadata=self.run_metadata.copy(), 

136 station_metadata=self.station_metadata.copy(), 

137 survey_metadata=self.survey_metadata.copy(), 

138 ) 

139 return None 

140 

141 def to_run_ts(self): 

142 """Get xarray for run""" 

143 if self.ts is not None: 

144 return RunTS( 

145 array_list=[self.hx, self.hy, self.hz, self.ex, self.ey], 

146 run_metadata=self.run_metadata.copy(), 

147 station_metadata=self.station_metadata.copy(), 

148 survey_metadata=self.survey_metadata.copy(), 

149 ) 

150 return None 

151 

152 def read(self, fn=None): 

153 """ 

154 Read in a USGS ascii file and fill attributes accordingly. 

155 

156 :param fn: full path to .asc file to be read in 

157 :type fn: string 

158 """ 

159 if fn is not None: 

160 self.fn = fn 

161 st = datetime.datetime.now() 

162 data_line = self.read_metadata() 

163 self.ts = pd.read_csv( 

164 self.fn, 

165 sep="\s+", 

166 skiprows=data_line, 

167 dtype=np.float32, 

168 ) 

169 dt_index = pd.date_range( 

170 start=self.start.time_stamp, periods=self.n_samples, end=self.end.time_stamp 

171 ) 

172 self.ts.index = dt_index 

173 self.ts.columns = self.ts.columns.str.lower() 

174 

175 et = datetime.datetime.now() 

176 read_time = et - st 

177 self.logger.info(f"Reading took {read_time.total_seconds()}") 

178 

179 def _make_file_name(self, save_path=None, compression=True, compress_type="zip"): 

180 """ 

181 get the file name to save to 

182 

183 :param save_path: full path to directory to save file to 

184 :type save_path: string 

185 

186 :param compression: compress file 

187 :type compression: [ True | False ] 

188 

189 :return: save_fn 

190 :rtype: string 

191 

192 """ 

193 # make the file name to save to 

194 if save_path is not None: 

195 save_path = Path(save_path) 

196 save_fn = save_path.joinpath( 

197 "{0}_{1}T{2}_{3:.0f}.asc".format( 

198 self.SiteID, 

199 self._start_time.strftime("%Y-%m-%d"), 

200 self._start_time.strftime("%H%M%S"), 

201 self.AcqSmpFreq, 

202 ), 

203 ) 

204 else: 

205 save_fn = self.station_dir.joinpath( 

206 "{0}_{1}T{2}_{3:.0f}.asc".format( 

207 self.SiteID, 

208 self._start_time.strftime("%Y-%m-%d"), 

209 self._start_time.strftime("%H%M%S"), 

210 self.AcqSmpFreq, 

211 ), 

212 ) 

213 if compression: 

214 # Use conventional suffixes: zip -> .zip, gzip -> .gz 

215 if compress_type == "zip": 

216 save_fn = save_fn.with_suffix(save_fn.suffix + ".zip") 

217 elif compress_type == "gzip": 

218 save_fn = save_fn.with_suffix(save_fn.suffix + ".gz") 

219 return save_fn 

220 

221 def write( 

222 self, 

223 save_fn=None, 

224 chunk_size=1024, 

225 str_fmt="%15.7e", 

226 full=True, 

227 compress=False, 

228 save_dir=None, 

229 compress_type="zip", 

230 convert_electrics=True, 

231 ): 

232 """ 

233 Write an ascii file in the USGS ascii format. 

234 

235 :param save_fn: full path to file name to save the merged ascii to 

236 :type save_fn: string 

237 

238 :param chunck_size: chunck size to write file in blocks, larger numbers 

239 are typically slower. 

240 :type chunck_size: int 

241 

242 :param str_fmt: format of the data as written 

243 :type str_fmt: string 

244 

245 :param full: write out the complete file, mostly for testing. 

246 :type full: boolean [ True | False ] 

247 

248 :param compress: compress file 

249 :type compress: boolean [ True | False ] 

250 

251 :param compress_type: compress file using zip or gzip 

252 :type compress_type: boolean [ zip | gzip ] 

253 """ 

254 # get the filename to save to 

255 save_fn = self._make_file_name( 

256 save_path=save_dir, 

257 compression=compress, 

258 compress_type=compress_type, 

259 ) 

260 # get the number of characters in the desired string 

261 s_num = int(str_fmt[1 : str_fmt.find(".")]) 

262 

263 # convert electric fields into mV/km 

264 if convert_electrics: 

265 self.convert_electrics() 

266 self.logger.debug(f"==> {save_fn}") 

267 self.logger.debug("START --> {time.ctime()}") 

268 st = datetime.datetime.now() 

269 

270 # write meta data first 

271 # sort channel information same as columns 

272 meta_lines = self.write_metadata( 

273 chn_list=[c.capitalize() for c in self.ts.columns] 

274 ) 

275 if compress == True and compress_type == "gzip": 

276 # gzip expects bytes; encode strings before writing 

277 with gzip.open(save_fn, "wb") as fid: 

278 h_line = [ 

279 "".join( 

280 [ 

281 "{0:>{1}}".format(c.capitalize(), s_num) 

282 for c in self.ts.columns 

283 ] 

284 ) 

285 ] 

286 header_bytes = ("\n".join(meta_lines + h_line) + "\n").encode("utf-8") 

287 fid.write(header_bytes) 

288 

289 # write out data 

290 if full is False: 

291 out = np.array(self.ts[0:chunk_size]) 

292 out[np.where(out == 0)] = float(self.MissingDataFlag) 

293 out = np.char.mod(str_fmt, out) 

294 lines = "\n".join( 

295 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

296 ) 

297 fid.write((lines + "\n").encode("utf-8")) 

298 self.logger.debug(f"END --> {time.ctime()}") 

299 et = datetime.datetime.now() 

300 write_time = et - st 

301 self.logger.debug( 

302 f"Writing took: {write_time.total_seconds()} seconds" 

303 ) 

304 return 

305 for chunk in range(int(self.ts.shape[0] / chunk_size)): 

306 out = np.array( 

307 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size] 

308 ) 

309 out[np.where(out == 0)] = float(self.MissingDataFlag) 

310 out = np.char.mod(str_fmt, out) 

311 lines = "\n".join( 

312 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

313 ) 

314 fid.write((lines + "\n").encode("utf-8")) 

315 else: 

316 if compress == True and compress_type == "zip": 

317 # Create a temporary .asc file, then zip it into the final 

318 # .zip archive. `save_fn` is expected to be a Path ending with 

319 # ".zip" (from _make_file_name). 

320 import zipfile 

321 

322 zip_path = Path(save_fn) 

323 # inner asc filename is the zip name without the .zip suffix 

324 asc_name = zip_path.name[:-4] 

325 asc_temp = zip_path.parent / asc_name 

326 

327 # write the ascii file 

328 need_zip = False 

329 with open(asc_temp, "w", encoding="utf-8") as fid: 

330 h_line = [ 

331 "".join( 

332 [ 

333 "{0:>{1}}".format(c.capitalize(), s_num) 

334 for c in self.ts.columns 

335 ] 

336 ) 

337 ] 

338 fid.write("\n".join(meta_lines + h_line) + "\n") 

339 

340 # write out data 

341 if full is False: 

342 out = np.array(self.ts[0:chunk_size]) 

343 out[np.where(out == 0)] = float(self.MissingDataFlag) 

344 out = np.char.mod(str_fmt, out) 

345 lines = "\n".join( 

346 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

347 ) 

348 fid.write(lines + "\n") 

349 self.logger.debug(f"END --> {time.ctime()}") 

350 et = datetime.datetime.now() 

351 write_time = et - st 

352 self.logger.debug( 

353 f"Writing took: {write_time.total_seconds()} seconds" 

354 ) 

355 need_zip = True 

356 else: 

357 for chunk in range(int(self.ts.shape[0] / chunk_size)): 

358 out = np.array( 

359 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size] 

360 ) 

361 out[np.where(out == 0)] = float(self.MissingDataFlag) 

362 out = np.char.mod(str_fmt, out) 

363 lines = "\n".join( 

364 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

365 ) 

366 fid.write(lines + "\n") 

367 

368 # create zip and clean up (only if we need it) 

369 if need_zip: 

370 try: 

371 with zipfile.ZipFile( 

372 zip_path, "w", compression=zipfile.ZIP_DEFLATED 

373 ) as zf: 

374 zf.write(asc_temp, arcname=asc_temp.name) 

375 finally: 

376 try: 

377 asc_temp.unlink(missing_ok=True) 

378 except Exception: 

379 pass 

380 return 

381 

382 # default uncompressed write 

383 with open(save_fn, "w", encoding="utf-8") as fid: 

384 h_line = [ 

385 "".join( 

386 [ 

387 "{0:>{1}}".format(c.capitalize(), s_num) 

388 for c in self.ts.columns 

389 ] 

390 ) 

391 ] 

392 fid.write("\n".join(meta_lines + h_line) + "\n") 

393 

394 # write out data 

395 if full is False: 

396 out = np.array(self.ts[0:chunk_size]) 

397 out[np.where(out == 0)] = float(self.MissingDataFlag) 

398 out = np.char.mod(str_fmt, out) 

399 lines = "\n".join( 

400 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

401 ) 

402 fid.write(lines + "\n") 

403 self.logger.debug(f"END --> {time.ctime()}") 

404 et = datetime.datetime.now() 

405 write_time = et - st 

406 self.logger.debug( 

407 f"Writing took: {write_time.total_seconds()} seconds" 

408 ) 

409 return 

410 for chunk in range(int(self.ts.shape[0] / chunk_size)): 

411 out = np.array( 

412 self.ts[chunk * chunk_size : (chunk + 1) * chunk_size] 

413 ) 

414 out[np.where(out == 0)] = float(self.MissingDataFlag) 

415 out = np.char.mod(str_fmt, out) 

416 lines = "\n".join( 

417 ["".join(out[ii, :]) for ii in range(out.shape[0])] 

418 ) 

419 fid.write(lines + "\n") 

420 # for some fucking reason, all interal variables don't exist anymore 

421 # and if you try to do the zipping nothing happens, so have to do 

422 # it externally. WTF 

423 self.logger.debug(f"END --> {time.ctime()}") 

424 et = datetime.datetime.now() 

425 write_time = et - st 

426 self.logger.debug("Writing took: {write_time.total_seconds()} seconds") 

427 

428 

429def read_ascii(fn): 

430 """ 

431 read USGS ASCII formatted file 

432 

433 :param fn: DESCRIPTION 

434 :type fn: TYPE 

435 :return: DESCRIPTION 

436 :rtype: TYPE 

437 

438 """ 

439 

440 asc_obj = USGSascii(fn) 

441 asc_obj.read_ascii_file() 

442 

443 return asc_obj.to_run_ts()