Source code for mth5.mth5

# -*- coding: utf-8 -*-
"""
==================
MTH5
==================

MTH5 deals with reading and writing an MTH5 file, which are HDF5 files
developed for magnetotelluric (MT) data.  The code is based on h5py and
therefor numpy.  This is the simplest and we are not really dealing with
large tables of data to warrant using pytables.

Created on Sun Dec  9 20:50:41 2018

:copyright: Jared Peacock (jpeacock@usgs.gov)
    
:license: MIT

"""

# =============================================================================
# Imports
# =============================================================================
from pathlib import Path
from platform import platform

import h5py

from mth5.utils.exceptions import MTH5Error
from mth5 import __version__ as mth5_version
from mth5 import groups as groups
from mth5.tables import ChannelSummaryTable, TFSummaryTable
from mth5 import helpers
from mth5.utils.mth5_logger import setup_logger
from mth5 import CHANNEL_DTYPE, TF_DTYPE

from mt_metadata.utils.mttime import get_now_utc
from mt_metadata.timeseries import Experiment
from mt_metadata.transfer_functions.core import TF

# =============================================================================
# Acceptable parameters
# =============================================================================
acceptable_file_types = ["mth5", "MTH5", "h5", "H5"]
acceptable_file_versions = ["0.1.0", "0.2.0"]
acceptable_data_levels = [0, 1, 2, 3]

# =============================================================================
# MT HDF5 file
# =============================================================================


[docs]class MTH5: """ MTH5 is the main container for the HDF5 file format developed for MT data It uses the metadata standards developled by the `IRIS PASSCAL software group <https://www.iris.edu/hq/about_iris/governance/mt_soft>`_ and defined in the `metadata documentation <https://github.com/kujaku11/MTarchive/blob/tables/docs/mt_metadata_guide.pdf>`_. MTH5 is built with h5py and therefore numpy. The structure follows the different levels of MT data collection: For version 0.1.0: - Survey - Reports - Standards - Filters - Stations - Run - Channel For version 0.2.0: - Experiment - Reports - Standards - Surveys - Reports - Standards - Filters - Stations - Run -Channel All timeseries data are stored as individual channels with the appropriate metadata defined for the given channel, i.e. electric, magnetic, auxiliary. Each level is represented as a mth5 group class object which has methods to add, remove, and get a group from the level below. Each group has a metadata attribute that is the approprate metadata class object. For instance the SurveyGroup has an attribute metadata that is a :class:`mth5.metadata.Survey` object. Metadata is stored in the HDF5 group attributes as (key, value) pairs. All groups are represented by their structure tree and can be shown at any time from the command line. Each level has a summary array of the contents of the levels below to hopefully make searching easier. :param filename: name of the to be or existing file :type filename: string or :class:`pathlib.Path` :param compression: compression type. Supported lossless compressions are * 'lzf' - Available with every installation of h5py (C source code also available). Low to moderate compression, very fast. No options. * 'gzip' - Available with every installation of HDF5, so it’s best where portability is required. Good compression, moderate speed. compression_opts sets the compression level and may be an integer from 0 to 9, default is 3. * 'szip' - Patent-encumbered filter used in the NASA community. Not available with all installations of HDF5 due to legal reasons. Consult the HDF5 docs for filter options. :param compression_opts: compression options, see above :type compression_opts: string or int depending on compression type :param shuffle: Block-oriented compressors like GZIP or LZF work better when presented with runs of similar values. Enabling the shuffle filter rearranges the bytes in the chunk and may improve compression ratio. No significant speed penalty, lossless. :type shuffle: boolean :param fletcher32: Adds a checksum to each chunk to detect data corruption. Attempts to read corrupted chunks will fail with an error. No significant speed penalty. Obviously shouldn’t be used with lossy compression filters. :type fletcher32: boolean :param data_level: level the data are stored following levels defined by `NASA ESDS <https://earthdata.nasa.gov/collaborate/open-data-services-and-software/data-information-policy/data-levels>`_ * 0 - Raw data * 1 - Raw data with response information and full metadata * 2 - Derived product, raw data has been manipulated :type data_level: integer, defaults to 1 :param file_version: Version of the file [ '0.1.0' | '0.2.0' ], defaults to "0.2.0" :type file_version: string, optional :Usage: * Open a new file and show initialized file >>> from mth5 import mth5 >>> mth5_obj = mth5.MTH5(file_version='0.1.0') >>> # Have a look at the dataset options >>> mth5.dataset_options {'compression': 'gzip', 'compression_opts': 3, 'shuffle': True, 'fletcher32': True} >>> mth5_obj.open_mth5(r"/home/mtdata/mt01.mth5", 'w') >>> mth5_obj /: ==================== |- Group: Survey ---------------- |- Group: Filters ----------------- --> Dataset: summary ...................... |- Group: Reports ----------------- --> Dataset: summary ...................... |- Group: Standards ------------------- --> Dataset: summary ...................... |- Group: Stations ------------------ --> Dataset: summary ...................... * Add metadata for survey from a dictionary >>> survey_dict = {'survey':{'acquired_by': 'me', 'archive_id': 'MTCND'}} >>> survey = mth5_obj.survey_group >>> survey.metadata.from_dict(survey_dict) >>> survey.metadata { "survey": { "acquired_by.author": "me", "acquired_by.comments": null, "archive_id": "MTCND" ...} } * Add a station from the convenience function >>> station = mth5_obj.add_station('MT001') >>> mth5_obj /: ==================== |- Group: Survey ---------------- |- Group: Filters ----------------- --> Dataset: summary ...................... |- Group: Reports ----------------- --> Dataset: summary ...................... |- Group: Standards ------------------- --> Dataset: summary ...................... |- Group: Stations ------------------ |- Group: MT001 --------------- --> Dataset: summary ...................... --> Dataset: summary ...................... >>> station /Survey/Stations/MT001: ==================== --> Dataset: summary ...................... >>> data.schedule_01.ex[0:10] = np.nan >>> data.calibration_hx[...] = np.logspace(-4, 4, 20) .. note:: if replacing an entire array with a new one you need to use [...] otherwise the data will not be updated. .. warning:: You can only replace entire arrays with arrays of the same size. Otherwise you need to delete the existing data and make a new dataset. .. seealso:: https://www.hdfgroup.org/ and https://www.h5py.org/ """ def __init__( self, filename=None, compression="gzip", compression_opts=4, shuffle=True, fletcher32=True, data_level=1, file_version="0.2.0", ): self.logger = setup_logger(f"{__name__}.{self.__class__.__name__}") # make these private so the user cant accidentally change anything. self.__hdf5_obj = None ( self.__compression, self.__compression_opts, ) = helpers.validate_compression(compression, compression_opts) self.__shuffle = shuffle self.__fletcher32 = fletcher32 self.data_level = data_level self.filename = filename self.file_version = file_version self.file_type = "mth5" self._set_default_groups() def __str__(self): if self.h5_is_read(): return helpers.get_tree(self.__hdf5_obj) return "HDF5 file is closed and cannot be accessed." def __repr__(self): return self.__str__() def __exit__(self, exc_type, exc_val, exc_tb): self.close_mth5() return False @property def dataset_options(self): """summary of dataset options""" return { "compression": self.__compression, "compression_opts": self.__compression_opts, "shuffle": self.__shuffle, "fletcher32": self.__fletcher32, } @property def file_attributes(self): return { "file.type": "MTH5", "file.version": self.file_version, "file.access.platform": platform(), "file.access.time": get_now_utc(), "mth5.software.version": mth5_version, "mth5.software.name": "mth5", "data_level": self.data_level, } @property def filename(self): """file name of the hdf5 file""" if self.h5_is_read(): return Path(self.__hdf5_obj.filename) msg = ( "MTH5 file is not open or has not been created yet. " + "Returning default name" ) self.logger.warning(msg) return self.__filename @filename.setter def filename(self, value): """make sure file has the proper extension""" self.__filename = None if value is not None: if not isinstance(value, Path): value = Path(value) if value.suffix not in acceptable_file_types: msg = ( f"file extension {value.suffix} is not correct. " "Changing to default .h5" ) self.logger.info(msg) self.__filename = value.with_suffix(".h5") else: self.__filename = value @property def file_type(self): """File Type should be MTH5""" if self.h5_is_read(): # need the try statement for when a file is initialize it does # not have the attributes yet. try: return self.__hdf5_obj.attrs["file.type"] except KeyError: return self.__file_type return self.__file_type @file_type.setter def file_type(self, value): """set file type while validating input""" if not isinstance(value, str): msg = f"Input file type must be a string not {type(value)}" self.logger.error(msg) raise ValueError(msg) if value not in acceptable_file_types: msg = ( f"Input file.type is not valid, must be {acceptable_file_types}" ) self.logger.error(msg) raise ValueError(msg) self.__file_type = value if self.h5_is_read(): self.__hdf5_obj.attrs["file.type"] = value @property def file_version(self): """mth5 file version""" if self.h5_is_read(): # need the try statement for when a file is initialize it does # not have the attributes yet. try: return self.__hdf5_obj.attrs["file.version"] except KeyError: return self.__file_version return self.__file_version @file_version.setter def file_version(self, value): """set file version while validating input""" if not isinstance(value, str): msg = f"Input file version must be a string not {type(value)}" self.logger.error(msg) raise ValueError(msg) if value not in acceptable_file_versions: msg = f"Input file.version is not valid, must be {acceptable_file_versions}" self.logger.error(msg) raise ValueError(msg) self.__file_version = value self._set_default_groups() if self.h5_is_read(): self.__hdf5_obj.attrs["file.version"] = value @property def software_name(self): """software name that wrote the file""" if self.h5_is_read(): return self.__hdf5_obj.attrs["mth5.software.name"] return "mth5" @property def data_level(self): """data level""" if self.h5_is_read(): try: return self.__hdf5_obj.attrs["data_level"] except KeyError: return self.__data_level else: return self.__data_level @data_level.setter def data_level(self, value): """set data level while validating input""" if not isinstance(value, int): msg = f"Input file type must be an integer not {type(value)}" self.logger.error(msg) raise ValueError(msg) if value not in acceptable_data_levels: msg = f"Input data_level is not valid, must be {acceptable_data_levels}" self.logger.error(msg) raise ValueError(msg) self.__data_level = value if self.h5_is_read(): self.__hdf5_obj.attrs["data_level"] = value def _set_default_groups(self): """get the default groups based on file version""" if self.file_version in ["0.1.0"]: self._default_root_name = "Survey" self._default_subgroup_names = [ "Stations", "Reports", "Filters", "Standards", ] self._root_path = "/Survey" elif self.file_version in ["0.2.0"]: self._default_root_name = "Experiment" self._default_subgroup_names = [ "Surveys", "Reports", "Standards", ] self._root_path = "/Experiment" @property def experiment_group(self): """Convenience property for /Experiment group""" if self.h5_is_read(): if self.file_version in ["0.2.0"]: return groups.ExperimentGroup( self.__hdf5_obj[f"{self._root_path}"], **self.dataset_options, ) else: self.logger.info( f"File version {self.file_version} does not have an Experiment Group" ) return None self.logger.info("File is closed cannot access /Experiment") return None @property def survey_group(self): """Convenience property for /Survey group""" if self.file_version in ["0.1.0"]: if self.h5_is_read(): return groups.SurveyGroup( self.__hdf5_obj[f"{self._root_path}"], **self.dataset_options, ) self.logger.info("File is closed cannot access /Survey") return None elif self.file_version in ["0.2.0"]: self.logger.info( f"File version {self.file_version} does not have a survey_group, try surveys_group" ) @property def surveys_group(self): """Convenience property for /Surveys group""" if self.file_version in ["0.1.0"]: self.logger.info( f"File version {self.file_version} does not have a survey_group, try surveys_group" ) elif self.file_version in ["0.2.0"]: if self.h5_is_read(): return groups.MasterSurveyGroup( self.__hdf5_obj[f"{self._root_path}/Surveys"], **self.dataset_options, ) self.logger.info("File is closed cannot access /Surveys") return None @property def reports_group(self): """Convenience property for /Survey/Reports group""" if self.h5_is_read(): return groups.ReportsGroup( self.__hdf5_obj[f"{self._root_path}/Reports"], **self.dataset_options, ) self.logger.info("File is closed cannot access /Reports") return None @property def filters_group(self): """Convenience property for /Survey/Filters group""" if self.h5_is_read(): if self.file_version in ["0.1.0"]: return groups.FiltersGroup( self.__hdf5_obj[f"{self._root_path}/Filters"], **self.dataset_options, ) else: self.logger.info( "File version 0.2.0 does not have a FiltersGroup at the experiment level" ) return None self.logger.info("File is closed cannot access /Filters") return None @property def standards_group(self): """Convenience property for /Standards group""" if self.h5_is_read(): return groups.StandardsGroup( self.__hdf5_obj[f"{self._root_path}/Standards"], **self.dataset_options, ) self.logger.info("File is closed cannot access /Standards") return None @property def stations_group(self): """Convenience property for /Survey/Stations group""" if self.h5_is_read(): if self.file_version not in ["0.1.0"]: self.logger.info( f"File version {self.file_version} does not have a Stations. " "try surveys_group." ) return None return groups.MasterStationGroup( self.__hdf5_obj[f"{self._root_path}/Stations"], **self.dataset_options, ) self.logger.info("File is closed cannot access /Stations") return None @property def station_list(self): """list of existing stations names""" if self.file_version in ["0.1.0"]: return self.stations_group.groups_list elif self.file_version in ["0.2.0"]: station_list = [] for survey in self.surveys_group.groups_list: sg = self.surveys_group.get_survey(survey) station_list += sg.stations_group.groups_list return station_list
[docs] def open_mth5(self, filename=None, mode="a"): """ open an mth5 file :return: Survey Group :type: groups.SurveyGroup :Example: >>> from mth5 import mth5 >>> mth5_object = mth5.MTH5(file_version='0.1.0') >>> survey_object = mth5_object.open_mth5('Test.mth5', 'w') >>> from mth5 import mth5 >>> mth5_object = mth5.MTH5() >>> survey_object = mth5_object.open_mth5('Test.mth5', 'w') >>> mth5_object.file_version '0.2.0' """ if filename is not None: self.__filename = filename if not isinstance(self.__filename, Path): self.__filename = Path(filename) if self.__filename.exists(): if mode in ["w"]: self.logger.warning( f"{self.__filename.name} will be overwritten in 'w' mode" ) try: self._initialize_file(mode) except OSError as error: msg = ( f"{error}. Need to close any references to {self.__filename} first. " + "Then reopen the file in the preferred mode" ) self.logger.exception(msg) elif mode in ["a", "w-", "x", "r+"]: self.__hdf5_obj = h5py.File(self.__filename, mode=mode) self._set_default_groups() if not self.validate_file(): msg = "Input file is not a valid MTH5 file" self.logger.error(msg) raise MTH5Error(msg) elif mode in ["r"]: self.__hdf5_obj = h5py.File(self.__filename, mode=mode) self._set_default_groups() self.validate_file() else: msg = "mode {0} is not understood".format(mode) self.logger.error(msg) raise MTH5Error(msg) else: if mode in ["a", "w", "w-", "x"]: self._initialize_file(mode=mode) else: msg = "Cannot open new file in mode {0} ".format(mode) self.logger.error(msg) raise MTH5Error(msg) # TODO need to add a validation step to check for version and legit file if not "channel_summary" in self.__hdf5_obj[self._root_path].keys(): self._initialize_summary()
def _initialize_file(self, mode="w"): """ Initialize the default groups for the file :return: Survey Group :rtype: groups.SurveyGroup """ # open an hdf5 file self.__hdf5_obj = h5py.File(self.__filename, mode) # write general metadata self.__hdf5_obj.attrs.update(self.file_attributes) # create the default group root = self.__hdf5_obj.create_group(self._default_root_name) if self._default_root_name == "Survey": root_group = groups.SurveyGroup(root) root_group.write_metadata() for group_name in self._default_subgroup_names: try: self.__hdf5_obj.create_group( f"{self._default_root_name}/{group_name}" ) except ValueError: pass m5_grp = getattr(self, f"{group_name.lower()}_group") m5_grp.initialize_group() self._initialize_summary() self.logger.info( f"Initialized MTH5 {self.file_version} file {self.filename} in mode {mode}" ) def _initialize_summary(self): try: # initiate channel and tf summary datasets self.__hdf5_obj[self._default_root_name].create_dataset( "channel_summary", shape=(1,), maxshape=(None,), dtype=CHANNEL_DTYPE, **self.dataset_options, ) except ValueError: pass try: self.__hdf5_obj[self._default_root_name].create_dataset( "tf_summary", shape=(1,), maxshape=(None,), dtype=TF_DTYPE, **self.dataset_options, ) except ValueError: pass
[docs] def validate_file(self): """ Validate an open mth5 file will test the attribute values and group names :return: Boolean [ True = valid, False = not valid] :rtype: Boolean """ if self.h5_is_read(): if self.file_type not in acceptable_file_types: msg = f"Unacceptable file type {self.file_type}" self.logger.error(msg) return False if self.file_version not in acceptable_file_versions: msg = f"Unacceptable file version {self.file_version}" self.logger.error(msg) return False if self.data_level not in acceptable_data_levels: msg = f"Unacceptable data_level {self.data_level}" self.logger.error(msg) return False if self.file_version in ["0.1.0"]: for gr in self.survey_group.groups_list: if "summary" in gr: continue if gr not in self._default_subgroup_names: msg = f"Unacceptable group {gr}" self.logger.error(msg) return False elif self.file_version in ["0.2.0"]: for gr in self.experiment_group.groups_list: if "summary" in gr: continue if gr not in self._default_subgroup_names: msg = f"Unacceptable group {gr}" self.logger.error(msg) return False return True self.logger.warning("HDF5 file is not open") return False
[docs] def close_mth5(self): """ close mth5 file to make sure everything is flushed to the file """ try: # update summay tables if self.h5_is_write(): self.channel_summary.summarize() self.tf_summary.summarize() self.logger.info(f"Flushing and closing {str(self.filename)}") self.__hdf5_obj.flush() self.__hdf5_obj.close() except (AttributeError, ValueError): helpers.close_open_files()
[docs] def h5_is_write(self): """ check to see if the hdf5 file is open and writeable """ if isinstance(self.__hdf5_obj, h5py.File): try: if "w" in self.__hdf5_obj.mode or "+" in self.__hdf5_obj.mode: return True return False except ValueError: return False return False
[docs] def h5_is_read(self): """ check to see if the hdf5 file is open and readable :return: True if readable, False if not :rtype: Boolean """ if isinstance(self.__hdf5_obj, h5py.File): try: if self.__hdf5_obj.mode in ["r", "r+", "a", "w", "w-", "x"]: return True return False except ValueError: return False return False
[docs] def has_group(self, group_name): """ Check to see if the group name exists """ if self.h5_is_read(): def has_name(name): if group_name == name: return True if self.__hdf5_obj.visit(has_name): return True return False
def _make_h5_path( self, survey=None, station=None, run=None, channel=None, tf_id=None ): """ create an h5 path from inputs """ if self.file_version == "0.1.0": h5_path = self._root_path elif self.file_version == "0.2.0": if survey is None: raise ValueError("Survey must be input for file type 0.2.0") else: survey = helpers.validate_name(survey) h5_path = f"{self._root_path}/Surveys/{survey}" if station is not None: station = helpers.validate_name(station) h5_path += f"/Stations/{station}" if tf_id is not None: tf_id = helpers.validate_name(tf_id) h5_path += f"/Transfer_Functions/{tf_id}" elif run is not None: run = helpers.validate_name(run) h5_path += f"/{run}" if channel is not None: channel = helpers.validate_name(channel) h5_path += f"/{channel}" return h5_path
[docs] def from_reference(self, h5_reference): """ Get an HDF5 group, dataset, etc from a reference :param h5_reference: DESCRIPTION :type h5_reference: TYPE :return: DESCRIPTION :rtype: TYPE """ ref_dict = { "survey": groups.SurveyGroup, "station": groups.StationGroup, "run": groups.RunGroup, "electric": groups.ElectricDataset, "magnetic": groups.MagneticDataset, "auxiliary": groups.AuxiliaryDataset, "transferfunction": groups.TransferFunctionGroup, } # in the future should allow this to return the proper container. referenced = self.__hdf5_obj[h5_reference] mth5_type = referenced.attrs["mth5_type"].lower() try: if mth5_type == "transferfunction": group = ref_dict[mth5_type](referenced) return group.to_tf_object() return ref_dict[mth5_type](referenced) except KeyError: self.logger.info( f"Could not identify the MTH5 type {mth5_type}, " + "returning h5 group." ) return referenced
[docs] def to_experiment(self): """ Create an :class:`mt_metadata.timeseries.Experiment` object from the metadata contained in the MTH5 file. :returns: :class:`mt_metadata.timeseries.Experiment` """ if self.h5_is_read(): if self.file_version in ["0.1.0"]: experiment = Experiment() experiment.surveys.append(self.survey_group.metadata) elif self.file_version in ["0.2.0"]: experiment = self.experiment_group.metadata return experiment
[docs] def from_experiment(self, experiment, survey_index=0, update=False): """ Fill out an MTH5 from a :class:`mt_metadata.timeseries.Experiment` object given a survey_id :param experiment: Experiment metadata :type experiment: :class:`mt_metadata.timeseries.Experiment` :param survey_index: Index of the survey to write :type survey_index: int, defaults to 0 """ if self.h5_is_write(): if self.file_version in ["0.1.0"]: sg = self.survey_group sg.metadata.from_dict( experiment.surveys[survey_index].to_dict() ) sg.write_metadata() for station in experiment.surveys[0].stations: mt_station = self.add_station( station.id, station_metadata=station ) if update: mt_station.metadata.update(station) mt_station.write_metadata() for run in station.runs: mt_run = mt_station.add_run(run.id, run_metadata=run) if update: mt_run.metadata.update(run) mt_run.write_metadata() for channel in run.channels: mt_ch = mt_run.add_channel( channel.component, channel.type, None, channel_metadata=channel, ) if update: mt_ch.metadata.update(channel) mt_ch.write_metadata() ### need to update from input metadata for time period ### and channels, runs, stations. mt_run.update_run_metadata() mt_station.update_station_metadata() sg.update_survey_metadata() for k, v in experiment.surveys[0].filters.items(): self.filters_group.add_filter(v) elif self.file_version in ["0.2.0"]: for survey in experiment.surveys: sg = self.add_survey(survey.id, survey_metadata=survey) for station in survey.stations: mt_station = self.add_station( station.id, station_metadata=station, survey=sg.metadata.id, ) if update: mt_station.metadata.update(station) mt_station.write_metadata() for run in station.runs: mt_run = mt_station.add_run( run.id, run_metadata=run ) if update: mt_run.metadata.update(run) mt_run.write_metadata() for channel in run.channels: mt_ch = mt_run.add_channel( channel.component, channel.type, None, channel_metadata=channel, ) if update: mt_ch.metadata.update(channel) mt_ch.write_metadata() mt_run.update_run_metadata() mt_station.update_station_metadata() sg.update_survey_metadata() for k, v in survey.filters.items(): sg.filters_group.add_filter(v)
@property def channel_summary(self): """return a dataframe of channels""" return ChannelSummaryTable( self.__hdf5_obj[f"{self._root_path}/channel_summary"] ) @property def tf_summary(self): """return a dataframe of channels""" return TFSummaryTable(self.__hdf5_obj[f"{self._root_path}/tf_summary"])
[docs] def add_survey(self, survey_name, survey_metadata=None): """ Add a survey with metadata if given with the path: ``/Experiment/Surveys/survey_name`` If the survey already exists, will return that survey and nothing is added. :param survey_name: Name of the survey, should be the same as metadata.id :type survey_name: string :param survey_metadata: survey metadata container, defaults to None :type survey_metadata: :class:`mth5.metadata.survey`, optional :return: A convenience class for the added survey :rtype: :class:`mth5_groups.SurveyGroup` :Example: :: >>> from mth5 import mth5 >>> mth5_obj = mth5.MTH5() >>> mth5_obj.open_mth5(r"/test.mth5", mode='a') >>> # one option >>> new_survey = mth5_obj.add_survey('MT001') >>> # another option >>> new_station = mth5_obj.experiment_group.surveys_group.add_survey('MT001') """ return self.surveys_group.add_survey( survey_name, survey_metadata=survey_metadata )
[docs] def get_survey(self, survey_name): """ Get a survey with the same name as survey_name :param survey_name: existing survey name :type survey_name: string :return: convenience survey class :rtype: :class:`mth5.mth5_groups.surveyGroup` :raises MTH5Error: if the survey name is not found. :Example: >>> from mth5 import mth5 >>> mth5_obj = mth5.MTH5() >>> mth5_obj.open_mth5(r"/test.mth5", mode='a') >>> # one option >>> existing_survey = mth5_obj.get_survey('MT001') >>> # another option >>> existing_staiton = mth5_obj.experiment_group.surveys_group.get_survey('MT001') MTH5Error: MT001 does not exist, check groups_list for existing names """ survey_path = self._make_h5_path(survey=survey_name) try: return groups.SurveyGroup( self.__hdf5_obj[survey_path], **self.dataset_options, ) except KeyError: msg = ( f"{survey_path} does not exist, check survey_list for " "existing names." ) self.logger.warning(msg) raise MTH5Error(msg)
[docs] def remove_survey(self, survey_name): """ Remove a survey from the file. .. note:: Deleting a survey is not as simple as del(survey). In HDF5 this does not free up memory, it simply removes the reference to that survey. The common way to get around this is to copy what you want into a new file, or overwrite the survey. :param survey_name: existing survey name :type survey_name: string :Example: :: >>> from mth5 import mth5 >>> mth5_obj = mth5.MTH5() >>> mth5_obj.open_mth5(r"/test.mth5", mode='a') >>> # one option >>> mth5_obj.remove_survey('MT001') >>> # another option >>> mth5_obj.experiment_group.surveys_group.remove_survey('MT001') """ survey_path = self._make_h5_path(survey=survey_name) try: del self.__hdf5_obj[f"{survey_path}"] self.logger.info( "Deleting a survey does not reduce the HDF5" "file size it simply remove the reference. If " "file size reduction is your goal, simply copy" " what you want into another file." ) except KeyError: msg = ( f"{survey_path} does not exist, " "check station_list for existing names" ) self.logger.warning(msg) raise MTH5Error(msg)
[docs] def add_station(self, station_name, station_metadata=None, survey=None): """ Convenience function to add a station using ``mth5.stations_group.add_station`` Add a station with metadata if given with the path [v0.1.0]: ``/Survey/Stations/station_name`` Add a station with metadata if given with the path [v0.2.0]: ``Experiment/Surveys/survey/Stations/station_name`` If the station already exists, will return that station and nothing is added. :param station_name: Name of the station, should be the same as metadata.archive_id :type station_name: string :param station_metadata: Station metadata container, defaults to None :type station_metadata: :class:`mth5.metadata.Station`, optional :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :return: A convenience class for the added station :rtype: :class:`mth5_groups.StationGroup` :Example: >>> new_staiton = mth5_obj.add_station('MT001') """ if self.file_version in ["0.1.0"]: return self.stations_group.add_station( station_name, station_metadata=station_metadata ) elif self.file_version in ["0.2.0"]: if survey is None: msg = "Need to input 'survey' for file version %s" self.logger.error(msg, self.file_version) raise ValueError(msg % self.file_version) sg = self.get_survey(survey) return sg.stations_group.add_station( station_name, station_metadata=station_metadata )
[docs] def get_station(self, station_name, survey=None): """ Convenience function to get a station using Get a station with the same name as station_name :param station_name: existing station name :type station_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :return: convenience station class :rtype: :class:`mth5.mth5_groups.StationGroup` :raises MTH5Error: if the station name is not found. :Example: >>> existing_staiton = mth5_obj.get_station('MT001') MTH5Error: MT001 does not exist, check station_list for existing names """ station_path = self._make_h5_path(survey=survey, station=station_name) try: return groups.StationGroup(self.__hdf5_obj[station_path]) except KeyError: raise MTH5Error(f"Could not find station {station_name}")
[docs] def remove_station(self, station_name, survey=None): """ Convenience function to remove a station using Remove a station from the file. .. note:: Deleting a station is not as simple as del(station). In HDF5 this does not free up memory, it simply removes the reference to that station. The common way to get around this is to copy what you want into a new file, or overwrite the station. :param station_name: existing station name :type station_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :Example: >>> mth5_obj.remove_station('MT001') """ station_name = helpers.validate_name(station_name) if self.file_version in ["0.1.0"]: return self.stations_group.remove_station(station_name) elif self.file_version in ["0.2.0"]: if survey is None: msg = "Need to input 'survey' for file version %s" self.logger.error(msg, self.file_version) raise ValueError(msg % self.file_version) survey = helpers.validate_name(survey) sg = self.get_survey(survey) return sg.stations_group.remove_station(station_name)
[docs] def add_run(self, station_name, run_name, run_metadata=None, survey=None): """ Convenience function to add a run using Add a run to a given station. :param run_name: run name, should be archive_id{a-z} :type run_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :param metadata: metadata container, defaults to None :type metadata: :class:`mth5.metadata.Station`, optional :example: >>> new_run = mth5_obj.add_run('MT001', 'MT001a') .. todo:: auto fill run name if none is given. .. todo:: add ability to add a run with data. """ return self.get_station(station_name, survey=survey).add_run( run_name, run_metadata=run_metadata )
[docs] def get_run(self, station_name, run_name, survey=None): """ Convenience function to get a run using ``mth5.stations_group.get_station(station_name).get_run()`` get a run from run name for a given station :param station_name: existing station name :type station_name: string :param run_name: existing run name :type run_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :return: Run object :rtype: :class:`mth5.mth5_groups.RunGroup` :Example: >>> existing_run = mth5_obj.get_run('MT001', 'MT001a') """ run_path = self._make_h5_path( survey=survey, station=station_name, run=run_name ) try: return groups.RunGroup(self.__hdf5_obj[run_path]) except KeyError: raise MTH5Error(f"Could not find {run_path}")
[docs] def remove_run(self, station_name, run_name, survey=None): """ Remove a run from the station. .. note:: Deleting a run is not as simple as del(run). In HDF5 this does not free up memory, it simply removes the reference to that station. The common way to get around this is to copy what you want into a new file, or overwrite the run. :param station_name: existing station name :type station_name: string :param run_name: existing run name :type run_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :Example: >>> mth5_obj.remove_station('MT001', 'MT001a') """ return self.get_station(station_name, survey=survey).remove_run( run_name )
[docs] def add_channel( self, station_name, run_name, channel_name, channel_type, data, channel_dtype="int32", max_shape=(None,), chunks=True, channel_metadata=None, survey=None, ): """ Convenience function to add a channel using ``mth5.stations_group.get_station().get_run().add_channel()`` add a channel to a given run for a given station :param station_name: existing station name :type station_name: string :param run_name: existing run name :type run_name: string :param channel_name: name of the channel :type channel_name: string :param channel_type: [ electric | magnetic | auxiliary ] :type channel_type: string :raises MTH5Error: If channel type is not correct :param channel_metadata: metadata container, defaults to None :type channel_metadata: [ :class:`mth5.metadata.Electric` | :class:`mth5.metadata.Magnetic` | :class:`mth5.metadata.Auxiliary` ], optional :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :return: Channel container :rtype: [ :class:`mth5.mth5_groups.ElectricDatset` | :class:`mth5.mth5_groups.MagneticDatset` | :class:`mth5.mth5_groups.AuxiliaryDatset` ] :Example: >>> new_channel = mth5_obj.add_channel('MT001', 'MT001a''Ex', >>> ... 'electric', None) >>> new_channel Channel Electric: ------------------- component: None data type: electric data format: float32 data shape: (1,) start: 1980-01-01T00:00:00+00:00 end: 1980-01-01T00:00:00+00:00 sample rate: None """ return self.get_run(station_name, run_name, survey=survey).add_channel( channel_name, channel_type, data, channel_metadata=channel_metadata, channel_dtype=channel_dtype, max_shape=max_shape, chunks=chunks, **self.dataset_options, )
[docs] def get_channel(self, station_name, run_name, channel_name, survey=None): """ Convenience function to get a channel using ``mth5.stations_group.get_station().get_run().get_channel()`` Get a channel from an existing name. Returns the appropriate container. :param station_name: existing station name :type station_name: string :param run_name: existing run name :type run_name: string :param channel_name: name of the channel :type channel_name: string :return: Channel container :rtype: [ :class:`mth5.mth5_groups.ElectricDatset` | :class:`mth5.mth5_groups.MagneticDatset` | :class:`mth5.mth5_groups.AuxiliaryDatset` ] :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :raises MTH5Error: If no channel is found :Example: >>> existing_channel = mth5_obj.get_channel(station_name, >>> ... run_name, >>> ... channel_name) >>> existing_channel Channel Electric: ------------------- component: Ex data type: electric data format: float32 data shape: (4096,) start: 1980-01-01T00:00:00+00:00 end: 1980-01-01T00:00:01+00:00 sample rate: 4096 """ run_path = self._make_h5_path( survey=survey, station=station_name, run=run_name ) rg = groups.RunGroup(self.__hdf5_obj[run_path]) try: return rg.get_channel(helpers.validate_name(channel_name)) except (AttributeError, KeyError): raise MTH5Error( f"Could not find channel, {run_path}/{channel_name}" )
[docs] def remove_channel(self, station_name, run_name, channel_name, survey=None): """ Convenience function to remove a channel using ``mth5.stations_group.get_station().get_run().remove_channel()`` Remove a channel from a given run and station. .. note:: Deleting a channel is not as simple as del(channel). In HDF5 this does not free up memory, it simply removes the reference to that channel. The common way to get around this is to copy what you want into a new file, or overwrite the channel. :param station_name: existing station name :type station_name: string :param run_name: existing run name :type run_name: string :param channel_name: existing station name :type channel_name: string :param survey: existing survey name, needed for file version >= 0.2.0 :type survey: string :Example: >>> mth5_obj.remove_channel('MT001', 'MT001a', 'Ex') """ station_name = helpers.validate_name(station_name) run_name = helpers.validate_name(run_name) channel_name = helpers.validate_name(channel_name) return ( self.get_station(station_name, survey=survey) .get_run(run_name) .remove_channel(channel_name) )
[docs] def add_transfer_function(self, tf_object): """ Add a transfer function :param tf_object: DESCRIPTION :type tf_object: TYPE :return: DESCRIPTION :rtype: TYPE """ if not isinstance(tf_object, TF): msg = "Input must be a TF object not %s" self.logger.error(msg, type(tf_object)) raise ValueError(msg % type(tf_object)) if self.file_version == "0.2.0": try: # need to check survey metadata to make sure it matches, # if it doesn't need to make a new survey group so that # when a TF is pulled it gets the proper survey metadata. # this should eventually search over each unknonw survey # for matching metadata so there aren't 100 groups survey_group = self.get_survey(tf_object.survey_metadata.id) if tf_object.survey_metadata.id == "unknown_survey": for sg_id in self.surveys_group.groups_list: if "unknown_survey" in sg_id: match = True survey_group = self.get_survey(sg_id) sg_dict = survey_group.metadata.to_dict( single=True, required=False ) for ( key, value, ) in tf_object.survey_metadata.to_dict( single=True ).items(): if key in [ "hdf5_reference", "mth5_type", "id", ]: continue if sg_dict[key] != value: match = False break if match: break # create a new survey group with a new id, this is likely # not the best way to do this, it should be strongly # encouraged that the user assigne a survey id. if not match: count = 1 survey_id = f"unknown_survey_{count:03}" while survey_id in self.surveys_group.groups_list: count += 1 survey_id = f"unknown_survey_{count:03}" tf_object.survey_metadata.id = survey_id survey_group = self.add_survey( tf_object.survey_metadata.id, survey_metadata=tf_object.survey_metadata, ) except MTH5Error: survey_group = self.add_survey( tf_object.survey_metadata.id, survey_metadata=tf_object.survey_metadata, ) else: survey_group = self.survey_group # might need a better test here if survey_group.metadata.id is None: survey_group.metadata.update(tf_object.survey_metadata) survey_group.write_metadata() try: station_group = survey_group.stations_group.get_station( tf_object.station_metadata.id ) station_group.metadata.update(tf_object.to_ts_station_metadata()) station_group.write_metadata() except MTH5Error: station_group = survey_group.stations_group.add_station( tf_object.station_metadata.id, station_metadata=tf_object.to_ts_station_metadata(), ) ## need to check for runs and channels for ( run_id ) in tf_object.station_metadata.transfer_function.runs_processed: if run_id in ["", None, "None"]: continue try: run_group = station_group.get_run(run_id) except MTH5Error: run = tf_object.station_metadata.get_run(run_id) run_group = station_group.add_run(run_id, run_metadata=run) for ch in run.channels: try: ch_dataset = run_group.get_channel(ch.component) except MTH5Error: ch_dataset = run_group.add_channel( ch.component, ch.type, None, channel_metadata=ch ) try: tf_group = ( station_group.transfer_functions_group.add_transfer_function( tf_object.station, tf_object=tf_object ) ) except (OSError, RuntimeError, ValueError): msg = f"TF {tf_object.station} already exists, returning existing group." self.logger.debug(msg) tf_group = ( station_group.transfer_functions_group.get_transfer_function( tf_object.station ) ) return tf_group
[docs] def get_transfer_function(self, station_id, tf_id, survey=None): """ Get a transfer function :param survey_id: DESCRIPTION :type survey_id: TYPE :param station_id: DESCRIPTION :type station_id: TYPE :param tf_id: DESCRIPTION :type tf_id: TYPE :return: DESCRIPTION :rtype: TYPE """ try: tf_df = self.tf_summary.to_dataframe() ref = ( tf_df.loc[ (tf_df.station == station_id) & (tf_df.tf_id == tf_id) & (tf_df.survey == "unknown_survey") ] .iloc[0] .hdf5_reference ) return self.from_reference(ref) except IndexError: tf_path = self._make_h5_path( survey=survey, station=station_id, tf_id=tf_id ) try: tg = groups.TransferFunctionGroup(self.__hdf5_obj[tf_path]) return tg.to_tf_object() except KeyError: raise MTH5Error(f"Could not find {tf_path}")
[docs] def remove_transfer_function(self, station_id, tf_id, survey=None): """ remove a transfer function :param survey_id: DESCRIPTION :type survey_id: TYPE :param station_id: DESCRIPTION :type station_id: TYPE :param tf_id: DESCRIPTION :type tf_id: TYPE :return: DESCRIPTION :rtype: TYPE """ station_group = self.get_station(station_id, survey=survey) station_group.transfer_functions_group.remove_transfer_function(tf_id)