Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ clients \ geomag.py: 98%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-27 20:09 -0800

1# -*- coding: utf-8 -*- 

2""" 

3Created on Mon Nov 14 13:58:44 2022 

4 

5@author: jpeacock 

6""" 

7 

8import json 

9import platform 

10import sys 

11from pathlib import Path 

12 

13import numpy as np 

14import pandas as pd 

15 

16# ============================================================================= 

17# Imports 

18# ============================================================================= 

19import requests 

20from mt_metadata.common.mttime import MTime 

21from mt_metadata.timeseries import Magnetic, Run, Station, Survey 

22 

23from mth5 import __version__ as mth5_version 

24from mth5.mth5 import MTH5 

25from mth5.timeseries import ChannelTS, RunTS 

26 

27 

28# ============================================================================= 

29 

30"https://geomag.usgs.gov/ws/data/?id=FRN&type=adjusted&elements=H&sampling_period=1&format=json&starttime=2020-06-02T19:00:00Z&endtime=2020-06-02T22:07:46Z" 

31 

32 

33class GeomagClient: 

34 """ 

35 Get geomagnetic data from observatories. 

36 

37 key words 

38 

39 - **observatory**: Geogmangetic observatory ID 

40 - **type**: type of data to get 'adjusted' 

41 - **start**: start date time to request UTC 

42 - **end**: end date time to request UTC 

43 - **elements**: components to get 

44 - **sampling_period**: samples between measurements in seconds 

45 - **format**: JSON or IAGA2002 

46 

47 .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data 

48 

49 """ 

50 

51 def __init__(self, **kwargs): 

52 self._base_url = r"https://geomag.usgs.gov/ws/data/" 

53 self._timeout = 120 

54 

55 self._valid_observatories = [ 

56 "BDT", 

57 "BOU", 

58 "TST", 

59 "BRW", 

60 "BRT", 

61 "BSL", 

62 "CMO", 

63 "CMT", 

64 "DED", 

65 "DHT", 

66 "FRD", 

67 "FRN", 

68 "GUA", 

69 "HON", 

70 "NEW", 

71 "SHU", 

72 "SIT", 

73 "SJG", 

74 "TUC", 

75 "USGS", 

76 "BLC", 

77 "BRD", 

78 "CBB", 

79 "EUA", 

80 "FCC", 

81 "IQA", 

82 "MEA", 

83 "OTT", 

84 "RES", 

85 "SNK", 

86 "STJ", 

87 "VIC", 

88 "YKC", 

89 "HAD", 

90 "HER", 

91 "KAK", 

92 ] 

93 

94 self._valid_elements = [ 

95 "D", 

96 "DIST", 

97 "DST", 

98 "E", 

99 "E-E", 

100 "E-N", 

101 "F", 

102 "G", 

103 "H", 

104 "SQ", 

105 "SV", 

106 "UK1", 

107 "UK2", 

108 "UK3", 

109 "UK4", 

110 "X", 

111 "Y", 

112 "Z", 

113 ] 

114 

115 self._ch_map = {"x": "hx", "y": "hy", "z": "hz"} 

116 

117 self._valid_sampling_periods = [1, 60, 3600] 

118 self._valid_output_formats = ["json", "iaga2002"] 

119 

120 self.type = "adjusted" 

121 self.sampling_period = 1 

122 self.elements = ["x", "y"] 

123 self.format = "json" 

124 self._timeout = 120 

125 self.observatory = "FRN" 

126 self._max_length = 144000 

127 self.start = None 

128 self.end = None 

129 

130 for key, value in kwargs.items(): 

131 setattr(self, key, value) 

132 

133 @property 

134 def user_agent(self): 

135 """ 

136 User agent for the URL request 

137 

138 :return: DESCRIPTION 

139 :rtype: TYPE 

140 

141 """ 

142 encoding = sys.getdefaultencoding() or "UTF-8" 

143 platform_ = platform.platform().encode(encoding).decode("ascii", "ignore") 

144 

145 return f"MTH5 v{mth5_version} ({platform_}, Python {platform.python_version()})" 

146 

147 @property 

148 def observatory(self): 

149 return self._id 

150 

151 @observatory.setter 

152 def observatory(self, value): 

153 """ 

154 make sure value is in accepted list of observatories 

155 

156 :param value: DESCRIPTION 

157 :type value: TYPE 

158 :return: DESCRIPTION 

159 :rtype: TYPE 

160 

161 """ 

162 if not isinstance(value, str): 

163 raise TypeError("input must be a string") 

164 

165 value = value.upper() 

166 if value not in self._valid_observatories: 

167 raise ValueError( 

168 f"{value} not in accepted observatories see " 

169 "https://www.usgs.gov/tools/web-service-geomagnetism-data " 

170 "for more information." 

171 ) 

172 

173 self._id = value 

174 

175 @property 

176 def elements(self): 

177 return self._elements 

178 

179 @elements.setter 

180 def elements(self, value): 

181 """ 

182 make sure elements are in accepted elements 

183 """ 

184 

185 if isinstance(value, str): 

186 if value.count(",") > 0: 

187 value = [item.strip() for item in value.split(",")] 

188 

189 if not isinstance(value, list): 

190 value = [value] 

191 

192 elements = [] 

193 for item in value: 

194 if not isinstance(item, str): 

195 raise TypeError(f"{item} in element list must be a string") 

196 item = item.upper() 

197 if item not in self._valid_elements: 

198 raise ValueError( 

199 f"{item} is not an accepted element see " 

200 "https://www.usgs.gov/tools/web-service-geomagnetism-data " 

201 "for more information." 

202 ) 

203 elements.append(item) 

204 

205 self._elements = elements 

206 

207 @property 

208 def sampling_period(self): 

209 return self._sampling_period 

210 

211 @sampling_period.setter 

212 def sampling_period(self, value): 

213 """ 

214 validate sample period value 

215 

216 :param value: DESCRIPTION 

217 :type value: TYPE 

218 :return: DESCRIPTION 

219 :rtype: TYPE 

220 

221 """ 

222 

223 if isinstance(value, str): 

224 try: 

225 value = int(value) 

226 except ValueError: 

227 raise ValueError(f"{value} must be able to convert to an integer.") 

228 

229 if not isinstance(value, (int, float)): 

230 raise TypeError( 

231 f"{value} must be an integer or float not type({type(value)}" 

232 ) 

233 

234 if value not in self._valid_sampling_periods: 

235 raise ValueError(f"{value} must be in [1, 60, 3600]") 

236 

237 self._sampling_period = value 

238 

239 @property 

240 def start(self): 

241 return f"{self._start.iso_no_tz}Z" 

242 

243 @start.setter 

244 def start(self, value): 

245 if value is None: 

246 self._start = None 

247 else: 

248 self._start = MTime(time_stamp=value) 

249 

250 @property 

251 def end(self): 

252 return f"{self._end.iso_no_tz}Z" 

253 

254 @end.setter 

255 def end(self, value): 

256 if value is None: 

257 self._end = None 

258 else: 

259 self._end = MTime(time_stamp=value) 

260 

261 def get_chunks(self): 

262 """ 

263 Get the number of chunks of allowable sized to request, includes the elements 

264 

265 So the max length is the maximum time period that can be requested but includes 

266 the number of elements in the request. So if the max length is 172800 seconds 

267 and the sampling period is 1 second, then the maximum number of elements that can 

268 be requested is 172800 / (1 * len(elements)). 

269 

270 :return: DESCRIPTION 

271 :rtype: TYPE 

272 

273 """ 

274 

275 if self.start is not None and self.end is not None: 

276 dt = np.arange( 

277 np.datetime64(self._start.iso_no_tz), 

278 np.datetime64(self._end.iso_no_tz), 

279 np.timedelta64( 

280 int( 

281 (round(self._max_length / len(self.elements))) 

282 * self.sampling_period 

283 ), 

284 "s", 

285 ), 

286 ) 

287 dt = np.append(dt, np.array([np.datetime64(self._end.iso_no_tz)])) 

288 

289 dt_request = [ 

290 ( 

291 f"{MTime(time_stamp=dt[ii]).iso_no_tz}Z", 

292 f"{MTime(time_stamp=dt[ii + 1]).iso_no_tz}Z", 

293 ) 

294 for ii in range(len(dt) - 1) 

295 ] 

296 

297 return dt_request 

298 

299 def _get_request_params(self, start, end): 

300 """ 

301 Get request parameters 

302 

303 :param start: DESCRIPTION 

304 :type start: TYPE 

305 :param end: DESCRIPTION 

306 :type end: TYPE 

307 :return: DESCRIPTION 

308 :rtype: TYPE 

309 

310 """ 

311 return { 

312 "id": self.observatory, 

313 "type": self.type, 

314 "elements": ",".join(self.elements), 

315 "sampling_period": self.sampling_period, 

316 "format": "json", 

317 "starttime": start, 

318 "endtime": end, 

319 } 

320 

321 def _get_request_dictionary(self, start, end): 

322 """ 

323 get the request dictionary 

324 

325 :param start: DESCRIPTION 

326 :type start: TYPE 

327 :param end: DESCRIPTION 

328 :type end: TYPE 

329 :return: DESCRIPTION 

330 :rtype: TYPE 

331 

332 """ 

333 

334 return { 

335 "url": self._base_url, 

336 "headers": {"User-Agent": self.user_agent}, 

337 "params": self._get_request_params(start, end), 

338 "timeout": self._timeout, 

339 } 

340 

341 def _request_data(self, request_dictionary): 

342 """ 

343 request data from geomag for start and end times using 

344 `request.get(**request_dictionary) 

345 

346 :param request_dictionary: DESCRIPTION 

347 :type request_dictionary: TYPE 

348 :return: DESCRIPTION 

349 :rtype: TYPE 

350 

351 """ 

352 

353 return requests.get(**request_dictionary) 

354 

355 def _to_station_metadata(self, request_metadata): 

356 """ 

357 

358 :param request_metadata: DESCRIPTION 

359 :type request_metadata: TYPE 

360 :return: DESCRIPTION 

361 :rtype: TYPE 

362 

363 """ 

364 

365 sm = Station() 

366 sm.id = request_metadata["intermagnet"]["imo"]["name"] 

367 sm.fdsn.id = request_metadata["intermagnet"]["imo"]["iaga_code"] 

368 

369 coords = request_metadata["intermagnet"]["imo"]["coordinates"] 

370 

371 if coords[0] > 180: 

372 sm.location.longitude = coords[0] - 360 

373 else: 

374 sm.location.longitude = coords[0] 

375 sm.location.latitude = coords[1] 

376 sm.location.elevation = coords[2] 

377 sm.provenance.creation_time = request_metadata["generated"] 

378 

379 return sm 

380 

381 def get_data(self, run_id="001"): 

382 """ 

383 Get data from geomag client at USGS based on the request. This might 

384 have to be done in chunks depending on the request size. The returned 

385 output is a json object, which we should turn into a ChannelTS object 

386 

387 For now read into a pandas dataframe and then into a ChannelTS 

388 

389 In the future, if the request is large, think about writing 

390 directly to an MTH5 for better efficiency. 

391 

392 :return: DESCRIPTION 

393 :rtype: TYPE 

394 

395 """ 

396 

397 ch = dict([(c.lower(), []) for c in self.elements]) 

398 

399 for interval in self.get_chunks(): 

400 request_obj = self._request_data( 

401 self._get_request_dictionary(interval[0], interval[1]) 

402 ) 

403 if request_obj.status_code == 200: 

404 request_json = json.loads(request_obj.content) 

405 for element in request_json["values"]: 

406 ch[element["metadata"]["element"].lower()].append( 

407 pd.DataFrame( 

408 { 

409 "data": element["values"], 

410 }, 

411 index=request_json["times"], 

412 ) 

413 ) 

414 else: 

415 raise IOError( 

416 "Could not connect to server. Error code: " 

417 f"{request_obj.status_code}" 

418 ) 

419 

420 survey_metadata = Survey(id="USGS-GEOMAG") 

421 station_metadata = self._to_station_metadata(request_json["metadata"]) 

422 run_metadata = Run(id=run_id) 

423 

424 ch_list = [] 

425 for key, df_list in ch.items(): 

426 df = pd.concat(df_list).astype(float) 

427 ch_metadata = Magnetic() 

428 ch_metadata.component = self._ch_map[key] 

429 ch_metadata.sample_rate = 1.0 / self.sampling_period 

430 ch_metadata.units = "nanotesla" 

431 if "y" in ch_metadata.component: 

432 ch_metadata.measurement_azimuth = 90 

433 ch_metadata.location.latitude = station_metadata.location.latitude 

434 ch_metadata.location.longitude = station_metadata.location.longitude 

435 ch_metadata.location.elevation = station_metadata.location.elevation 

436 ch_metadata.time_period.start = df.index[0] 

437 ch_metadata.time_period.end = df.index[-1] 

438 run_metadata.time_period.start = df.index[0] 

439 run_metadata.time_period.end = df.index[-1] 

440 station_metadata.time_period.start = df.index[0] 

441 station_metadata.time_period.end = df.index[-1] 

442 survey_metadata.time_period.start = df.index[0] 

443 survey_metadata.time_period.end = df.index[-1] 

444 ch_list.append( 

445 ChannelTS( 

446 channel_type="magnetic", 

447 data=df, 

448 channel_metadata=ch_metadata, 

449 run_metadata=run_metadata, 

450 station_metadata=station_metadata, 

451 survey_metadata=survey_metadata, 

452 ) 

453 ) 

454 

455 return RunTS( 

456 ch_list, 

457 run_metadata=run_metadata, 

458 station_metadata=station_metadata, 

459 survey_metadata=survey_metadata, 

460 ) 

461 

462 

463class USGSGeomag: 

464 def __init__(self, **kwargs): 

465 self.save_path = Path().cwd() 

466 self.mth5_filename = None 

467 self.interact = False 

468 self.request_columns = [ 

469 "observatory", 

470 "type", 

471 "elements", 

472 "sampling_period", 

473 "start", 

474 "end", 

475 ] 

476 

477 # parameters of hdf5 file 

478 self.h5_compression = "gzip" 

479 self.h5_compression_opts = 4 

480 self.h5_shuffle = True 

481 self.h5_fletcher32 = True 

482 self.h5_data_level = 1 

483 self.mth5_file_mode = "w" 

484 self.mth5_version = "0.2.0" 

485 self._ch_map = {"x": "hx", "y": "hy", "z": "hz"} 

486 

487 for key, value in kwargs.items(): 

488 setattr(self, key, value) 

489 

490 @property 

491 def h5_kwargs(self): 

492 h5_params = dict( 

493 file_version=self.mth5_version, 

494 compression=self.h5_compression, 

495 compression_opts=self.h5_compression_opts, 

496 shuffle=self.h5_shuffle, 

497 fletcher32=self.h5_fletcher32, 

498 data_level=self.h5_data_level, 

499 ) 

500 

501 for key, value in self.__dict__.items(): 

502 if key.startswith("h5"): 

503 h5_params[key[3:]] = value 

504 

505 return h5_params 

506 

507 def validate_request_df(self, request_df): 

508 """ 

509 Make sure the input request dataframe has the appropriate columns 

510 

511 :param request_df: request dataframe 

512 :type request_df: :class:`pandas.DataFrame` 

513 :return: valid request dataframe 

514 :rtype: :class:`pandas.DataFrame` 

515 

516 """ 

517 

518 if not isinstance(request_df, pd.DataFrame): 

519 if isinstance(request_df, (str, Path)): 

520 fn = Path(request_df) 

521 if not fn.exists(): 

522 raise IOError(f"File {fn} does not exist. Check path") 

523 request_df = pd.read_csv(fn, infer_datetime_format=True) 

524 else: 

525 raise TypeError( 

526 f"Request input must be a pandas.DataFrame, not {type(request_df)}." 

527 ) 

528 

529 if "run" in request_df.columns: 

530 if sorted(request_df.columns.tolist()) != sorted( 

531 self.request_columns + ["run"] 

532 ): 

533 raise ValueError( 

534 f"Request must have columns {', '.join(self.request_columns)}" 

535 ) 

536 else: 

537 if sorted(request_df.columns.tolist()) != sorted(self.request_columns): 

538 raise ValueError( 

539 f"Request must have columns {', '.join(self.request_columns)}" 

540 ) 

541 

542 request_df = self.add_run_id(request_df) 

543 

544 return request_df 

545 

546 def add_run_id(self, request_df): 

547 """ 

548 Add run id to request df 

549 

550 :param request_df: request dataframe 

551 :type request_df: :class:`pandas.DataFrame` 

552 :return: add a run number to unique time windows for each observatory 

553 at each unique sampling period. 

554 :rtype: :class:`pandas.DataFrame` 

555 

556 """ 

557 

558 request_df.start = pd.to_datetime(request_df.start) 

559 request_df.end = pd.to_datetime(request_df.end) 

560 request_df["run"] = "" 

561 

562 for obs in request_df.observatory.unique(): 

563 for sr in request_df.loc[ 

564 request_df.observatory == obs, "sampling_period" 

565 ].unique(): 

566 sr_df = request_df.loc[ 

567 (request_df.observatory == obs) & (request_df.sampling_period == sr) 

568 ].sort_values("start") 

569 request_df.loc[ 

570 (request_df.observatory == obs) 

571 & (request_df.sampling_period == sr), 

572 "run", 

573 ] = [f"sp{sr}_{ii+1:03}" for ii in range(len(sr_df))] 

574 

575 return request_df 

576 

577 def _make_filename(self, save_path, request_df): 

578 """ 

579 

580 Create filename from the information in the dataframe 

581 

582 The filename will look like f"usgs_geomag_{obs}_{elements}.h5" 

583 

584 :param request_df: request dataframe 

585 :type request_df: :class:`pandas.DataFrame` 

586 :return: file name derived from dataframe 

587 :rtype: :class:`pathlib.Path` 

588 

589 """ 

590 

591 elements = "".join(request_df.elements.explode().unique().tolist()) 

592 obs = "_".join(sorted(request_df.observatory.unique().tolist())) 

593 

594 save_path = Path(save_path) 

595 if save_path.is_dir(): 

596 fn = f"usgs_geomag_{obs}_{elements}.h5" 

597 save_path = save_path.joinpath(fn) 

598 

599 return save_path 

600 

601 def make_mth5_from_geomag(self, request_df): 

602 """ 

603 Download geomagnetic observatory data from USGS webservices into an 

604 MTH5 using a request dataframe or csv file. 

605 

606 :param request_df: DataFrame with columns 

607 

608 - 'observatory' --> Observatory code 

609 - 'type' --> data type [ 'variation' | 'adjusted' | 'quasi-definitive' | 'definitive' ] 

610 - 'elements' --> Elements to get [D, DIST, DST, E, E-E, E-N, F, G, H, SQ, SV, UK1, UK2, UK3, UK4, X, Y, Z] 

611 - 'sampling_period' --> sample period [ 1 | 60 | 3600 ] 

612 - 'start' --> Start time YYYY-MM-DDThh:mm:ss 

613 - 'end' --> End time YYYY-MM-DDThh:mm:ss 

614 

615 :type request_df: :class:`pandas.DataFrame`, str or Path if csv file 

616 

617 

618 :return: if interact is True an MTH5 object is returned otherwise the 

619 path to the file is returned 

620 :rtype: Path or :class:`mth5.mth5.MTH5` 

621 

622 .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data 

623 

624 """ 

625 

626 request_df = self.validate_request_df(request_df) 

627 

628 fn = self._make_filename(self.save_path, request_df) 

629 

630 with MTH5(**self.h5_kwargs) as m: 

631 m.open_mth5(fn, self.mth5_file_mode) 

632 

633 if self.mth5_version in ["0.1.0"]: 

634 survey_group = m.survey_group 

635 survey_group.metadata.id = "USGS-GEOMAG" 

636 elif self.mth5_version in ["0.2.0"]: 

637 survey_group = m.add_survey("USGS-GEOMAG") 

638 else: 

639 raise ValueError( 

640 f"MTH5 version must be [ '0.1.0' | '0.2.0' ] not {self.mth5_version}" 

641 ) 

642 

643 for row in request_df.itertuples(): 

644 geomag_client = GeomagClient( 

645 observatory=row.observatory, 

646 type=row.type, 

647 elements=row.elements, 

648 start=row.start, 

649 end=row.end, 

650 sampling_period=row.sampling_period, 

651 **{"_ch_map": {"x": "hx", "y": "hy", "z": "hz"}}, 

652 ) 

653 

654 run = geomag_client.get_data(run_id=row.run) 

655 station_group = survey_group.stations_group.add_station( 

656 run.station_metadata.id, 

657 station_metadata=run.station_metadata, 

658 ) 

659 run_group = station_group.add_run( 

660 run.run_metadata.id, run_metadata=run.run_metadata 

661 ) 

662 run_group.from_runts(run) 

663 station_group.update_metadata() 

664 survey_group.update_metadata() 

665 

666 if self.interact: 

667 m.open_mth5(m.filename, self.mth5_file_mode) 

668 return m 

669 else: 

670 return m.filename