Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mth5 \ mth5 \ clients \ geomag.py: 98%
235 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-27 20:09 -0800
1# -*- coding: utf-8 -*-
2"""
3Created on Mon Nov 14 13:58:44 2022
5@author: jpeacock
6"""
8import json
9import platform
10import sys
11from pathlib import Path
13import numpy as np
14import pandas as pd
16# =============================================================================
17# Imports
18# =============================================================================
19import requests
20from mt_metadata.common.mttime import MTime
21from mt_metadata.timeseries import Magnetic, Run, Station, Survey
23from mth5 import __version__ as mth5_version
24from mth5.mth5 import MTH5
25from mth5.timeseries import ChannelTS, RunTS
28# =============================================================================
30"https://geomag.usgs.gov/ws/data/?id=FRN&type=adjusted&elements=H&sampling_period=1&format=json&starttime=2020-06-02T19:00:00Z&endtime=2020-06-02T22:07:46Z"
33class GeomagClient:
34 """
35 Get geomagnetic data from observatories.
37 key words
39 - **observatory**: Geogmangetic observatory ID
40 - **type**: type of data to get 'adjusted'
41 - **start**: start date time to request UTC
42 - **end**: end date time to request UTC
43 - **elements**: components to get
44 - **sampling_period**: samples between measurements in seconds
45 - **format**: JSON or IAGA2002
47 .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data
49 """
51 def __init__(self, **kwargs):
52 self._base_url = r"https://geomag.usgs.gov/ws/data/"
53 self._timeout = 120
55 self._valid_observatories = [
56 "BDT",
57 "BOU",
58 "TST",
59 "BRW",
60 "BRT",
61 "BSL",
62 "CMO",
63 "CMT",
64 "DED",
65 "DHT",
66 "FRD",
67 "FRN",
68 "GUA",
69 "HON",
70 "NEW",
71 "SHU",
72 "SIT",
73 "SJG",
74 "TUC",
75 "USGS",
76 "BLC",
77 "BRD",
78 "CBB",
79 "EUA",
80 "FCC",
81 "IQA",
82 "MEA",
83 "OTT",
84 "RES",
85 "SNK",
86 "STJ",
87 "VIC",
88 "YKC",
89 "HAD",
90 "HER",
91 "KAK",
92 ]
94 self._valid_elements = [
95 "D",
96 "DIST",
97 "DST",
98 "E",
99 "E-E",
100 "E-N",
101 "F",
102 "G",
103 "H",
104 "SQ",
105 "SV",
106 "UK1",
107 "UK2",
108 "UK3",
109 "UK4",
110 "X",
111 "Y",
112 "Z",
113 ]
115 self._ch_map = {"x": "hx", "y": "hy", "z": "hz"}
117 self._valid_sampling_periods = [1, 60, 3600]
118 self._valid_output_formats = ["json", "iaga2002"]
120 self.type = "adjusted"
121 self.sampling_period = 1
122 self.elements = ["x", "y"]
123 self.format = "json"
124 self._timeout = 120
125 self.observatory = "FRN"
126 self._max_length = 144000
127 self.start = None
128 self.end = None
130 for key, value in kwargs.items():
131 setattr(self, key, value)
133 @property
134 def user_agent(self):
135 """
136 User agent for the URL request
138 :return: DESCRIPTION
139 :rtype: TYPE
141 """
142 encoding = sys.getdefaultencoding() or "UTF-8"
143 platform_ = platform.platform().encode(encoding).decode("ascii", "ignore")
145 return f"MTH5 v{mth5_version} ({platform_}, Python {platform.python_version()})"
147 @property
148 def observatory(self):
149 return self._id
151 @observatory.setter
152 def observatory(self, value):
153 """
154 make sure value is in accepted list of observatories
156 :param value: DESCRIPTION
157 :type value: TYPE
158 :return: DESCRIPTION
159 :rtype: TYPE
161 """
162 if not isinstance(value, str):
163 raise TypeError("input must be a string")
165 value = value.upper()
166 if value not in self._valid_observatories:
167 raise ValueError(
168 f"{value} not in accepted observatories see "
169 "https://www.usgs.gov/tools/web-service-geomagnetism-data "
170 "for more information."
171 )
173 self._id = value
175 @property
176 def elements(self):
177 return self._elements
179 @elements.setter
180 def elements(self, value):
181 """
182 make sure elements are in accepted elements
183 """
185 if isinstance(value, str):
186 if value.count(",") > 0:
187 value = [item.strip() for item in value.split(",")]
189 if not isinstance(value, list):
190 value = [value]
192 elements = []
193 for item in value:
194 if not isinstance(item, str):
195 raise TypeError(f"{item} in element list must be a string")
196 item = item.upper()
197 if item not in self._valid_elements:
198 raise ValueError(
199 f"{item} is not an accepted element see "
200 "https://www.usgs.gov/tools/web-service-geomagnetism-data "
201 "for more information."
202 )
203 elements.append(item)
205 self._elements = elements
207 @property
208 def sampling_period(self):
209 return self._sampling_period
211 @sampling_period.setter
212 def sampling_period(self, value):
213 """
214 validate sample period value
216 :param value: DESCRIPTION
217 :type value: TYPE
218 :return: DESCRIPTION
219 :rtype: TYPE
221 """
223 if isinstance(value, str):
224 try:
225 value = int(value)
226 except ValueError:
227 raise ValueError(f"{value} must be able to convert to an integer.")
229 if not isinstance(value, (int, float)):
230 raise TypeError(
231 f"{value} must be an integer or float not type({type(value)}"
232 )
234 if value not in self._valid_sampling_periods:
235 raise ValueError(f"{value} must be in [1, 60, 3600]")
237 self._sampling_period = value
239 @property
240 def start(self):
241 return f"{self._start.iso_no_tz}Z"
243 @start.setter
244 def start(self, value):
245 if value is None:
246 self._start = None
247 else:
248 self._start = MTime(time_stamp=value)
250 @property
251 def end(self):
252 return f"{self._end.iso_no_tz}Z"
254 @end.setter
255 def end(self, value):
256 if value is None:
257 self._end = None
258 else:
259 self._end = MTime(time_stamp=value)
261 def get_chunks(self):
262 """
263 Get the number of chunks of allowable sized to request, includes the elements
265 So the max length is the maximum time period that can be requested but includes
266 the number of elements in the request. So if the max length is 172800 seconds
267 and the sampling period is 1 second, then the maximum number of elements that can
268 be requested is 172800 / (1 * len(elements)).
270 :return: DESCRIPTION
271 :rtype: TYPE
273 """
275 if self.start is not None and self.end is not None:
276 dt = np.arange(
277 np.datetime64(self._start.iso_no_tz),
278 np.datetime64(self._end.iso_no_tz),
279 np.timedelta64(
280 int(
281 (round(self._max_length / len(self.elements)))
282 * self.sampling_period
283 ),
284 "s",
285 ),
286 )
287 dt = np.append(dt, np.array([np.datetime64(self._end.iso_no_tz)]))
289 dt_request = [
290 (
291 f"{MTime(time_stamp=dt[ii]).iso_no_tz}Z",
292 f"{MTime(time_stamp=dt[ii + 1]).iso_no_tz}Z",
293 )
294 for ii in range(len(dt) - 1)
295 ]
297 return dt_request
299 def _get_request_params(self, start, end):
300 """
301 Get request parameters
303 :param start: DESCRIPTION
304 :type start: TYPE
305 :param end: DESCRIPTION
306 :type end: TYPE
307 :return: DESCRIPTION
308 :rtype: TYPE
310 """
311 return {
312 "id": self.observatory,
313 "type": self.type,
314 "elements": ",".join(self.elements),
315 "sampling_period": self.sampling_period,
316 "format": "json",
317 "starttime": start,
318 "endtime": end,
319 }
321 def _get_request_dictionary(self, start, end):
322 """
323 get the request dictionary
325 :param start: DESCRIPTION
326 :type start: TYPE
327 :param end: DESCRIPTION
328 :type end: TYPE
329 :return: DESCRIPTION
330 :rtype: TYPE
332 """
334 return {
335 "url": self._base_url,
336 "headers": {"User-Agent": self.user_agent},
337 "params": self._get_request_params(start, end),
338 "timeout": self._timeout,
339 }
341 def _request_data(self, request_dictionary):
342 """
343 request data from geomag for start and end times using
344 `request.get(**request_dictionary)
346 :param request_dictionary: DESCRIPTION
347 :type request_dictionary: TYPE
348 :return: DESCRIPTION
349 :rtype: TYPE
351 """
353 return requests.get(**request_dictionary)
355 def _to_station_metadata(self, request_metadata):
356 """
358 :param request_metadata: DESCRIPTION
359 :type request_metadata: TYPE
360 :return: DESCRIPTION
361 :rtype: TYPE
363 """
365 sm = Station()
366 sm.id = request_metadata["intermagnet"]["imo"]["name"]
367 sm.fdsn.id = request_metadata["intermagnet"]["imo"]["iaga_code"]
369 coords = request_metadata["intermagnet"]["imo"]["coordinates"]
371 if coords[0] > 180:
372 sm.location.longitude = coords[0] - 360
373 else:
374 sm.location.longitude = coords[0]
375 sm.location.latitude = coords[1]
376 sm.location.elevation = coords[2]
377 sm.provenance.creation_time = request_metadata["generated"]
379 return sm
381 def get_data(self, run_id="001"):
382 """
383 Get data from geomag client at USGS based on the request. This might
384 have to be done in chunks depending on the request size. The returned
385 output is a json object, which we should turn into a ChannelTS object
387 For now read into a pandas dataframe and then into a ChannelTS
389 In the future, if the request is large, think about writing
390 directly to an MTH5 for better efficiency.
392 :return: DESCRIPTION
393 :rtype: TYPE
395 """
397 ch = dict([(c.lower(), []) for c in self.elements])
399 for interval in self.get_chunks():
400 request_obj = self._request_data(
401 self._get_request_dictionary(interval[0], interval[1])
402 )
403 if request_obj.status_code == 200:
404 request_json = json.loads(request_obj.content)
405 for element in request_json["values"]:
406 ch[element["metadata"]["element"].lower()].append(
407 pd.DataFrame(
408 {
409 "data": element["values"],
410 },
411 index=request_json["times"],
412 )
413 )
414 else:
415 raise IOError(
416 "Could not connect to server. Error code: "
417 f"{request_obj.status_code}"
418 )
420 survey_metadata = Survey(id="USGS-GEOMAG")
421 station_metadata = self._to_station_metadata(request_json["metadata"])
422 run_metadata = Run(id=run_id)
424 ch_list = []
425 for key, df_list in ch.items():
426 df = pd.concat(df_list).astype(float)
427 ch_metadata = Magnetic()
428 ch_metadata.component = self._ch_map[key]
429 ch_metadata.sample_rate = 1.0 / self.sampling_period
430 ch_metadata.units = "nanotesla"
431 if "y" in ch_metadata.component:
432 ch_metadata.measurement_azimuth = 90
433 ch_metadata.location.latitude = station_metadata.location.latitude
434 ch_metadata.location.longitude = station_metadata.location.longitude
435 ch_metadata.location.elevation = station_metadata.location.elevation
436 ch_metadata.time_period.start = df.index[0]
437 ch_metadata.time_period.end = df.index[-1]
438 run_metadata.time_period.start = df.index[0]
439 run_metadata.time_period.end = df.index[-1]
440 station_metadata.time_period.start = df.index[0]
441 station_metadata.time_period.end = df.index[-1]
442 survey_metadata.time_period.start = df.index[0]
443 survey_metadata.time_period.end = df.index[-1]
444 ch_list.append(
445 ChannelTS(
446 channel_type="magnetic",
447 data=df,
448 channel_metadata=ch_metadata,
449 run_metadata=run_metadata,
450 station_metadata=station_metadata,
451 survey_metadata=survey_metadata,
452 )
453 )
455 return RunTS(
456 ch_list,
457 run_metadata=run_metadata,
458 station_metadata=station_metadata,
459 survey_metadata=survey_metadata,
460 )
463class USGSGeomag:
464 def __init__(self, **kwargs):
465 self.save_path = Path().cwd()
466 self.mth5_filename = None
467 self.interact = False
468 self.request_columns = [
469 "observatory",
470 "type",
471 "elements",
472 "sampling_period",
473 "start",
474 "end",
475 ]
477 # parameters of hdf5 file
478 self.h5_compression = "gzip"
479 self.h5_compression_opts = 4
480 self.h5_shuffle = True
481 self.h5_fletcher32 = True
482 self.h5_data_level = 1
483 self.mth5_file_mode = "w"
484 self.mth5_version = "0.2.0"
485 self._ch_map = {"x": "hx", "y": "hy", "z": "hz"}
487 for key, value in kwargs.items():
488 setattr(self, key, value)
490 @property
491 def h5_kwargs(self):
492 h5_params = dict(
493 file_version=self.mth5_version,
494 compression=self.h5_compression,
495 compression_opts=self.h5_compression_opts,
496 shuffle=self.h5_shuffle,
497 fletcher32=self.h5_fletcher32,
498 data_level=self.h5_data_level,
499 )
501 for key, value in self.__dict__.items():
502 if key.startswith("h5"):
503 h5_params[key[3:]] = value
505 return h5_params
507 def validate_request_df(self, request_df):
508 """
509 Make sure the input request dataframe has the appropriate columns
511 :param request_df: request dataframe
512 :type request_df: :class:`pandas.DataFrame`
513 :return: valid request dataframe
514 :rtype: :class:`pandas.DataFrame`
516 """
518 if not isinstance(request_df, pd.DataFrame):
519 if isinstance(request_df, (str, Path)):
520 fn = Path(request_df)
521 if not fn.exists():
522 raise IOError(f"File {fn} does not exist. Check path")
523 request_df = pd.read_csv(fn, infer_datetime_format=True)
524 else:
525 raise TypeError(
526 f"Request input must be a pandas.DataFrame, not {type(request_df)}."
527 )
529 if "run" in request_df.columns:
530 if sorted(request_df.columns.tolist()) != sorted(
531 self.request_columns + ["run"]
532 ):
533 raise ValueError(
534 f"Request must have columns {', '.join(self.request_columns)}"
535 )
536 else:
537 if sorted(request_df.columns.tolist()) != sorted(self.request_columns):
538 raise ValueError(
539 f"Request must have columns {', '.join(self.request_columns)}"
540 )
542 request_df = self.add_run_id(request_df)
544 return request_df
546 def add_run_id(self, request_df):
547 """
548 Add run id to request df
550 :param request_df: request dataframe
551 :type request_df: :class:`pandas.DataFrame`
552 :return: add a run number to unique time windows for each observatory
553 at each unique sampling period.
554 :rtype: :class:`pandas.DataFrame`
556 """
558 request_df.start = pd.to_datetime(request_df.start)
559 request_df.end = pd.to_datetime(request_df.end)
560 request_df["run"] = ""
562 for obs in request_df.observatory.unique():
563 for sr in request_df.loc[
564 request_df.observatory == obs, "sampling_period"
565 ].unique():
566 sr_df = request_df.loc[
567 (request_df.observatory == obs) & (request_df.sampling_period == sr)
568 ].sort_values("start")
569 request_df.loc[
570 (request_df.observatory == obs)
571 & (request_df.sampling_period == sr),
572 "run",
573 ] = [f"sp{sr}_{ii+1:03}" for ii in range(len(sr_df))]
575 return request_df
577 def _make_filename(self, save_path, request_df):
578 """
580 Create filename from the information in the dataframe
582 The filename will look like f"usgs_geomag_{obs}_{elements}.h5"
584 :param request_df: request dataframe
585 :type request_df: :class:`pandas.DataFrame`
586 :return: file name derived from dataframe
587 :rtype: :class:`pathlib.Path`
589 """
591 elements = "".join(request_df.elements.explode().unique().tolist())
592 obs = "_".join(sorted(request_df.observatory.unique().tolist()))
594 save_path = Path(save_path)
595 if save_path.is_dir():
596 fn = f"usgs_geomag_{obs}_{elements}.h5"
597 save_path = save_path.joinpath(fn)
599 return save_path
601 def make_mth5_from_geomag(self, request_df):
602 """
603 Download geomagnetic observatory data from USGS webservices into an
604 MTH5 using a request dataframe or csv file.
606 :param request_df: DataFrame with columns
608 - 'observatory' --> Observatory code
609 - 'type' --> data type [ 'variation' | 'adjusted' | 'quasi-definitive' | 'definitive' ]
610 - 'elements' --> Elements to get [D, DIST, DST, E, E-E, E-N, F, G, H, SQ, SV, UK1, UK2, UK3, UK4, X, Y, Z]
611 - 'sampling_period' --> sample period [ 1 | 60 | 3600 ]
612 - 'start' --> Start time YYYY-MM-DDThh:mm:ss
613 - 'end' --> End time YYYY-MM-DDThh:mm:ss
615 :type request_df: :class:`pandas.DataFrame`, str or Path if csv file
618 :return: if interact is True an MTH5 object is returned otherwise the
619 path to the file is returned
620 :rtype: Path or :class:`mth5.mth5.MTH5`
622 .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data
624 """
626 request_df = self.validate_request_df(request_df)
628 fn = self._make_filename(self.save_path, request_df)
630 with MTH5(**self.h5_kwargs) as m:
631 m.open_mth5(fn, self.mth5_file_mode)
633 if self.mth5_version in ["0.1.0"]:
634 survey_group = m.survey_group
635 survey_group.metadata.id = "USGS-GEOMAG"
636 elif self.mth5_version in ["0.2.0"]:
637 survey_group = m.add_survey("USGS-GEOMAG")
638 else:
639 raise ValueError(
640 f"MTH5 version must be [ '0.1.0' | '0.2.0' ] not {self.mth5_version}"
641 )
643 for row in request_df.itertuples():
644 geomag_client = GeomagClient(
645 observatory=row.observatory,
646 type=row.type,
647 elements=row.elements,
648 start=row.start,
649 end=row.end,
650 sampling_period=row.sampling_period,
651 **{"_ch_map": {"x": "hx", "y": "hy", "z": "hz"}},
652 )
654 run = geomag_client.get_data(run_id=row.run)
655 station_group = survey_group.stations_group.add_station(
656 run.station_metadata.id,
657 station_metadata=run.station_metadata,
658 )
659 run_group = station_group.add_run(
660 run.run_metadata.id, run_metadata=run.run_metadata
661 )
662 run_group.from_runts(run)
663 station_group.update_metadata()
664 survey_group.update_metadata()
666 if self.interact:
667 m.open_mth5(m.filename, self.mth5_file_mode)
668 return m
669 else:
670 return m.filename