Source code for mth5.clients.geomag

# -*- coding: utf-8 -*-
"""
Created on Mon Nov 14 13:58:44 2022

@author: jpeacock
"""

# =============================================================================
# Imports
# =============================================================================
import requests
import json
import sys
import platform
from pathlib import Path

import numpy as np
import pandas as pd

from mth5 import __version__ as mth5_version
from mth5.timeseries import ChannelTS, RunTS
from mth5.mth5 import MTH5

from mt_metadata.utils.mttime import MTime
from mt_metadata.timeseries import Survey, Station, Run, Magnetic

# =============================================================================

"https://geomag.usgs.gov/ws/data/?id=FRN&type=adjusted&elements=H&sampling_period=1&format=json&starttime=2020-06-02T19:00:00Z&endtime=2020-06-02T22:07:46Z"


class GeomagClient:
    """
    Get geomagnetic data from observatories.

    key words

    - **observatory**: Geogmangetic observatory ID
    - **type**: type of data to get 'adjusted'
    - **start**: start date time to request UTC
    - **end**: end date time to request UTC
    - **elements**: components to get
    - **sampling_period**: samples between measurements in seconds
    - **format**: JSON or IAGA2002

    .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data

    """

    def __init__(self, **kwargs):

        self._base_url = r"https://geomag.usgs.gov/ws/data/"
        self._timeout = 120
        self._max_length = 172800

        self._valid_observatories = [
            "BDT",
            "BOU",
            "TST",
            "BRW",
            "BRT",
            "BSL",
            "CMO",
            "CMT",
            "DED",
            "DHT",
            "FRD",
            "FRN",
            "GUA",
            "HON",
            "NEW",
            "SHU",
            "SIT",
            "SJG",
            "TUC",
            "USGS",
            "BLC",
            "BRD",
            "CBB",
            "EUA",
            "FCC",
            "IQA",
            "MEA",
            "OTT",
            "RES",
            "SNK",
            "STJ",
            "VIC",
            "YKC",
            "HAD",
            "HER",
            "KAK",
        ]

        self._valid_elements = [
            "D",
            "DIST",
            "DST",
            "E",
            "E-E",
            "E-N",
            "F",
            "G",
            "H",
            "SQ",
            "SV",
            "UK1",
            "UK2",
            "UK3",
            "UK4",
            "X",
            "Y",
            "Z",
        ]

        self._ch_map = {"x": "hx", "y": "hy", "z": "hz"}

        self._valid_sampling_periods = [1, 60, 3600]
        self._valid_output_formats = ["json", "iaga2002"]

        self.type = "adjusted"
        self.sampling_period = 1
        self.elements = ["x", "y"]
        self.format = "json"
        self._timeout = 120
        self.observatory = "FRN"
        self._max_length = 172800
        self.start = None
        self.end = None

        for key, value in kwargs.items():
            setattr(self, key, value)

    @property
    def user_agent(self):
        """
        User agent for the URL request

        :return: DESCRIPTION
        :rtype: TYPE

        """
        encoding = sys.getdefaultencoding() or "UTF-8"
        platform_ = (
            platform.platform().encode(encoding).decode("ascii", "ignore")
        )

        return f"MTH5 v{mth5_version} ({platform_}, Python {platform.python_version()})"

    @property
    def observatory(self):
        return self._id

    @observatory.setter
    def observatory(self, value):
        """
        make sure value is in accepted list of observatories

        :param value: DESCRIPTION
        :type value: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """
        if not isinstance(value, str):
            raise TypeError("input must be a string")

        value = value.upper()
        if value not in self._valid_observatories:
            raise ValueError(
                f"{value} not in accepted observatories see "
                "https://www.usgs.gov/tools/web-service-geomagnetism-data "
                "for more information."
            )

        self._id = value

    @property
    def elements(self):
        return self._elements

    @elements.setter
    def elements(self, value):
        """
        make sure elements are in accepted elements
        """

        if isinstance(value, str):
            if value.count(",") > 0:
                value = [item.strip() for item in value.split(",")]

        if not isinstance(value, list):
            value = [value]

        elements = []
        for item in value:
            if not isinstance(item, str):
                raise TypeError(f"{item} in element list must be a string")
            item = item.upper()
            if item not in self._valid_elements:
                raise ValueError(
                    f"{item} is not an accepted element see "
                    "https://www.usgs.gov/tools/web-service-geomagnetism-data "
                    "for more information."
                )
            elements.append(item)

        self._elements = elements

    @property
    def sampling_period(self):
        return self._sampling_period

    @sampling_period.setter
    def sampling_period(self, value):
        """
        validate sample period value

        :param value: DESCRIPTION
        :type value: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """

        if isinstance(value, str):
            try:
                value = int(value)
            except ValueError:
                raise ValueError(
                    f"{value} must be able to convert to an integer."
                )

        if not isinstance(value, (int, float)):
            raise TypeError(
                f"{value} must be an integer or float not type({type(value)}"
            )

        if value not in self._valid_sampling_periods:
            raise ValueError(f"{value} must be in [1, 60, 3600]")

        self._sampling_period = value

    @property
    def start(self):
        return f"{self._start.iso_no_tz}Z"

    @start.setter
    def start(self, value):
        if value is None:
            self._start = None
        else:
            self._start = MTime(value)

    @property
    def end(self):
        return f"{self._end.iso_no_tz}Z"

    @end.setter
    def end(self, value):
        if value is None:
            self._end = None
        else:
            self._end = MTime(value)

    def get_chunks(self):
        """
        Get the number of chunks of allowable sized to request

        :return: DESCRIPTION
        :rtype: TYPE

        """

        if self.start is not None and self.end is not None:
            dt = np.arange(
                np.datetime64(self._start.iso_no_tz),
                np.datetime64(self._end.iso_no_tz),
                np.timedelta64(self._max_length * self.sampling_period, "s"),
            )
            dt = np.append(dt, np.array([np.datetime64(self._end.iso_no_tz)]))

            dt_request = [
                (
                    f"{MTime(dt[ii]).iso_no_tz}Z",
                    f"{MTime(dt[ii + 1]).iso_no_tz}Z",
                )
                for ii in range(len(dt) - 1)
            ]

            return dt_request

    def _get_request_params(self, start, end):
        """
        Get request parameters

        :param start: DESCRIPTION
        :type start: TYPE
        :param end: DESCRIPTION
        :type end: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """
        return {
            "id": self.observatory,
            "type": self.type,
            "elements": ",".join(self.elements),
            "sampling_period": self.sampling_period,
            "format": "json",
            "starttime": start,
            "endtime": end,
        }

    def _get_request_dictionary(self, start, end):
        """
        get the request dictionary

        :param start: DESCRIPTION
        :type start: TYPE
        :param end: DESCRIPTION
        :type end: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """

        return {
            "url": self._base_url,
            "headers": {"User-Agent": self.user_agent},
            "params": self._get_request_params(start, end),
            "timeout": self._timeout,
        }

    def _request_data(self, request_dictionary):
        """
        request data from geomag for start and end times using
        `request.get(**request_dictionary)

        :param request_dictionary: DESCRIPTION
        :type request_dictionary: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """

        return requests.get(**request_dictionary)

    def _to_station_metadata(self, request_metadata):
        """

        :param request_metadata: DESCRIPTION
        :type request_metadata: TYPE
        :return: DESCRIPTION
        :rtype: TYPE

        """

        sm = Station()
        sm.id = request_metadata["intermagnet"]["imo"]["name"]
        sm.fdsn.id = request_metadata["intermagnet"]["imo"]["iaga_code"]

        coords = request_metadata["intermagnet"]["imo"]["coordinates"]

        if coords[0] > 180:
            sm.location.longitude = coords[0] - 360
        else:
            sm.location.longitude = coords[0]
        sm.location.latitude = coords[1]
        sm.location.elevation = coords[2]
        sm.provenance.creation_time = request_metadata["generated"]

        return sm

    def get_data(self, run_id="001"):
        """
        Get data from geomag client at USGS based on the request.  This might
        have to be done in chunks depending on the request size.  The returned
        output is a json object, which we should turn into a ChannelTS object

        For now read into a pandas dataframe and then into a ChannelTS

        In the future, if the request is large, think about writing
        directly to an MTH5 for better efficiency.

        :return: DESCRIPTION
        :rtype: TYPE

        """

        ch = dict([(c.lower(), []) for c in self.elements])

        for interval in self.get_chunks():
            request_obj = self._request_data(
                self._get_request_dictionary(interval[0], interval[1])
            )
            if request_obj.status_code == 200:
                request_json = json.loads(request_obj.content)
                for element in request_json["values"]:
                    ch[element["metadata"]["element"].lower()].append(
                        pd.DataFrame(
                            {
                                "data": element["values"],
                            },
                            index=request_json["times"],
                        )
                    )
            else:
                raise IOError(
                    "Could not connect to server. Error code: "
                    f"{request_obj.status_code}"
                )

        survey_metadata = Survey(id="USGS-GEOMAG")
        station_metadata = self._to_station_metadata(request_json["metadata"])
        run_metadata = Run(id=run_id)

        ch_list = []
        for key, df_list in ch.items():
            df = pd.concat(df_list).astype(float)
            ch_metadata = Magnetic()
            ch_metadata.component = self._ch_map[key]
            ch_metadata.sample_rate = 1.0 / self.sampling_period
            ch_metadata.units = "nanotesla"
            if "y" in ch_metadata.component:
                ch_metadata.measurement_azimuth = 90
            ch_metadata.location.latitude = station_metadata.location.latitude
            ch_metadata.location.longitude = (
                station_metadata.location.longitude
            )
            ch_metadata.location.elevation = (
                station_metadata.location.elevation
            )
            ch_metadata.time_period.start = df.index[0]
            ch_metadata.time_period.end = df.index[-1]
            run_metadata.time_period.start = df.index[0]
            run_metadata.time_period.end = df.index[-1]
            station_metadata.time_period.start = df.index[0]
            station_metadata.time_period.end = df.index[-1]
            survey_metadata.time_period.start = df.index[0]
            survey_metadata.time_period.end = df.index[-1]
            ch_list.append(
                ChannelTS(
                    channel_type="magnetic",
                    data=df,
                    channel_metadata=ch_metadata,
                    run_metadata=run_metadata,
                    station_metadata=station_metadata,
                    survey_metadata=survey_metadata,
                )
            )

        return RunTS(
            ch_list,
            run_metadata=run_metadata,
            station_metadata=station_metadata,
            survey_metadata=survey_metadata,
        )


[docs]class USGSGeomag: def __init__(self, **kwargs): self.save_path = Path().cwd() self.filename = None self.interact = False self.request_columns = [ "observatory", "type", "elements", "sampling_period", "start", "end", ] # parameters of hdf5 file self.compression = "gzip" self.compression_opts = 4 self.shuffle = True self.fletcher32 = True self.data_level = 1 self.mth5_version = "0.2.0" self._ch_map = {"x": "hx", "y": "hy", "z": "hz"} for key, value in kwargs.items(): setattr(self, key, value)
[docs] def validate_request_df(self, request_df): """ Make sure the input request dataframe has the appropriate columns :param request_df: request dataframe :type request_df: :class:`pandas.DataFrame` :return: valid request dataframe :rtype: :class:`pandas.DataFrame` """ if not isinstance(request_df, pd.DataFrame): if isinstance(request_df, (str, Path)): fn = Path(request_df) if not fn.exists(): raise IOError(f"File {fn} does not exist. Check path") request_df = pd.read_csv(fn, infer_datetime_format=True) else: raise TypeError( f"Request input must be a pandas.DataFrame, not {type(request_df)}." ) if "run" in request_df.columns: if sorted(request_df.columns.tolist()) != sorted( self.request_columns + ["run"] ): raise ValueError( f"Request must have columns {', '.join(self.request_columns)}" ) else: if sorted(request_df.columns.tolist()) != sorted( self.request_columns ): raise ValueError( f"Request must have columns {', '.join(self.request_columns)}" ) request_df = self.add_run_id(request_df) return request_df
[docs] def add_run_id(self, request_df): """ Add run id to request df :param request_df: request dataframe :type request_df: :class:`pandas.DataFrame` :return: add a run number to unique time windows for each observatory at each unique sampling period. :rtype: :class:`pandas.DataFrame` """ request_df.start = pd.to_datetime(request_df.start) request_df.end = pd.to_datetime(request_df.end) request_df["run"] = "" for obs in request_df.observatory.unique(): for sr in request_df.loc[ request_df.observatory == obs, "sampling_period" ].unique(): sr_df = request_df.loc[ (request_df.observatory == obs) & (request_df.sampling_period == sr) ].sort_values("start") request_df.loc[ (request_df.observatory == obs) & (request_df.sampling_period == sr), "run", ] = [f"sp{sr}_{ii+1:03}" for ii in range(len(sr_df))] return request_df
def _make_filename(self, save_path, request_df): """ Create filename from the information in the dataframe The filename will look like f"usgs_geomag_{obs}_{elements}.h5" :param request_df: request dataframe :type request_df: :class:`pandas.DataFrame` :return: file name derived from dataframe :rtype: :class:`pathlib.Path` """ elements = "".join(request_df.elements.explode().unique().tolist()) obs = "_".join(sorted(request_df.observatory.unique().tolist())) save_path = Path(save_path) if save_path.is_dir(): fn = f"usgs_geomag_{obs}_{elements}.h5" save_path = save_path.joinpath(fn) return save_path
[docs] def make_mth5_from_geomag(self, request_df): """ Download geomagnetic observatory data from USGS webservices into an MTH5 using a request dataframe or csv file. :param request_df: DataFrame with columns - 'observatory' --> Observatory code - 'type' --> data type [ 'variation' | 'adjusted' | 'quasi-definitive' | 'definitive' ] - 'elements' --> Elements to get [D, DIST, DST, E, E-E, E-N, F, G, H, SQ, SV, UK1, UK2, UK3, UK4, X, Y, Z] - 'sampling_period' --> sample period [ 1 | 60 | 3600 ] - 'start' --> Start time YYYY-MM-DDThh:mm:ss - 'end' --> End time YYYY-MM-DDThh:mm:ss :type request_df: :class:`pandas.DataFrame`, str or Path if csv file :return: if interact is True an MTH5 object is returned otherwise the path to the file is returned :rtype: Path or :class:`mth5.mth5.MTH5` .. seealso:: https://www.usgs.gov/tools/web-service-geomagnetism-data """ request_df = self.validate_request_df(request_df) fn = self._make_filename(self.save_path, request_df) with MTH5( file_version=self.mth5_version, compression=self.compression, compression_opts=self.compression_opts, shuffle=self.shuffle, fletcher32=self.fletcher32, data_level=self.data_level, ) as m: m.open_mth5(fn) if self.mth5_version in ["0.1.0"]: survey_group = m.survey_group survey_group.metadata.id = "USGS-GEOMAG" elif self.mth5_version in ["0.2.0"]: survey_group = m.add_survey("USGS-GEOMAG") else: raise ValueError( f"MTH5 version must be [ '0.1.0' | '0.2.0' ] not {self.mth5_version}" ) for row in request_df.itertuples(): geomag_client = GeomagClient( observatory=row.observatory, type=row.type, elements=row.elements, start=row.start, end=row.end, sampling_period=row.sampling_period, **{"_ch_map": {"x": "hx", "y": "hy", "z": "hz"}}, ) run = geomag_client.get_data(run_id=row.run) station_group = survey_group.stations_group.add_station( run.station_metadata.id, station_metadata=run.station_metadata, ) run_group = station_group.add_run( run.run_metadata.id, run_metadata=run.run_metadata ) run_group.from_runts(run) station_group.update_station_metadata() survey_group.update_survey_metadata() if self.interact: m.open_mth5(m.filename) return m else: return m.filename