# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 17:18:29 2020
.. note:: Need to keep these groups together, if you split them into files you
get a circular import.
:copyright:
Jared Peacock (jpeacock@usgs.gov)
:license: MIT
"""
# =============================================================================
# Imports
# =============================================================================
import inspect
import weakref
import h5py
import numpy as np
import pandas as pd
import xarray as xr
from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
from mt_metadata.base import Base
from mt_metadata.timeseries.filters import ChannelResponseFilter
from mth5 import CHUNK_SIZE, CHANNEL_DTYPE
from mth5.groups.base import BaseGroup
from mth5.groups import FiltersGroup, TransferFunctionGroup
from mth5.utils.exceptions import MTH5Error
from mth5.helpers import (
to_numpy_type,
from_numpy_type,
inherit_doc_string,
validate_name,
)
from mth5.timeseries import ChannelTS, RunTS
from mth5.timeseries.channel_ts import make_dt_coordinates
from mth5.utils.mth5_logger import setup_logger
meta_classes = dict(inspect.getmembers(metadata, inspect.isclass))
# =============================================================================
# Standards Group
# =============================================================================
[docs]class MasterStationGroup(BaseGroup):
"""
Utility class to holds information about the stations within a survey and
accompanying metadata. This class is next level down from Survey for
stations ``/Survey/Stations``. This class provides methods to add and
get stations. A summary table of all existing stations is also provided
as a convenience look up table to make searching easier.
To access MasterStationGroup from an open MTH5 file:
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> stations = mth5_obj.stations_group
To check what stations exist
>>> stations.groups_list
['summary', 'MT001', 'MT002', 'MT003']
To access the hdf5 group directly use `SurveyGroup.hdf5_group`.
>>> stations.hdf5_group.ref
<HDF5 Group Reference>
.. note:: All attributes should be input into the metadata object, that
way all input will be validated against the metadata standards.
If you change attributes in metadata object, you should run the
`SurveyGroup.write_metadata()` method. This is a temporary
solution, working on an automatic updater if metadata is changed.
>>> stations.metadata.existing_attribute = 'update_existing_attribute'
>>> stations.write_metadata()
If you want to add a new attribute this should be done using the
`metadata.add_base_attribute` method.
>>> stations.metadata.add_base_attribute('new_attribute',
>>> ... 'new_attribute_value',
>>> ... {'type':str,
>>> ... 'required':True,
>>> ... 'style':'free form',
>>> ... 'description': 'new attribute desc.',
>>> ... 'units':None,
>>> ... 'options':[],
>>> ... 'alias':[],
>>> ... 'example':'new attribute
To add a station:
>>> new_station = stations.add_station('new_station')
>>> stations
/Survey/Stations:
====================
--> Dataset: summary
......................
|- Group: new_station
---------------------
--> Dataset: summary
......................
Add a station with metadata:
>>> from mth5.metadata import Station
>>> station_metadata = Station()
>>> station_metadata.id = 'MT004'
>>> station_metadata.time_period.start = '2020-01-01T12:30:00'
>>> station_metadata.location.latitude = 40.000
>>> station_metadata.location.longitude = -120.000
>>> new_station = stations.add_station('Test_01', station_metadata)
>>> # to look at the metadata
>>> new_station.metadata
{
"station": {
"acquired_by.author": null,
"acquired_by.comments": null,
"id": "MT004",
...
}
}
.. seealso:: `mth5.metadata` for details on how to add metadata from
various files and python objects.
To remove a station:
>>> stations.remove_station('new_station')
>>> stations
/Survey/Stations:
====================
--> Dataset: summary
......................
.. note:: Deleting a station is not as simple as del(station). In HDF5
this does not free up memory, it simply removes the reference
to that station. The common way to get around this is to
copy what you want into a new file, or overwrite the station.
To get a station:
>>> existing_station = stations.get_station('existing_station_name')
>>> existing_station
/Survey/Stations/existing_station_name:
=======================================
--> Dataset: summary
......................
|- Group: run_01
----------------
--> Dataset: summary
......................
--> Dataset: Ex
......................
--> Dataset: Ey
......................
--> Dataset: Hx
......................
--> Dataset: Hy
......................
--> Dataset: Hz
......................
A summary table is provided to make searching easier. The table
summarized all stations within a survey. To see what names are in the
summary table:
>>> stations.summary_table.dtype.descr
[('id', ('|S5', {'h5py_encoding': 'ascii'})),
('start', ('|S32', {'h5py_encoding': 'ascii'})),
('end', ('|S32', {'h5py_encoding': 'ascii'})),
('components', ('|S100', {'h5py_encoding': 'ascii'})),
('measurement_type', ('|S12', {'h5py_encoding': 'ascii'})),
('sample_rate', '<f8')]
.. note:: When a station is added an entry is added to the summary table,
where the information is pulled from the metadata.
>>> stations.summary_table
index | id | start | end
| components | measurement_type | sample_rate
-------------------------------------------------------------------------
--------------------------------------------------
0 | Test_01 | 1980-01-01T00:00:00+00:00 | 1980-01-01T00:00:00+00:00
| Ex,Ey,Hx,Hy,Hz | BBMT | 100
"""
def __init__(self, group, **kwargs):
super().__init__(group, **kwargs)
@property
def channel_summary(self):
"""
Summary of all channels in the file.
"""
ch_list = []
for station in self.groups_list:
s_group = StationGroup(self.hdf5_group[station])
for run in s_group.groups_list:
r_group = RunGroup(s_group.hdf5_group[run])
for ch in r_group.groups_list:
ds_type = r_group.hdf5_group[ch].attrs["mth5_type"]
if ds_type.lower() in ["electric"]:
ch_dataset = ElectricDataset(r_group.hdf5_group[ch])
elif ds_type.lower() in ["magnetic"]:
ch_dataset = MagneticDataset(r_group.hdf5_group[ch])
elif ds_type.lower() in ["auxiliary"]:
ch_dataset = AuxiliaryDataset(r_group.hdf5_group[ch])
ch_list.append(ch_dataset.channel_entry)
ch_list = np.array(ch_list)
return pd.DataFrame(ch_list.flatten())
@property
def station_summary(self):
"""
Summary of stations in the file
:return: DESCRIPTION
:rtype: TYPE
"""
st_list = []
for key, group in self.hdf5_group.items():
entry = {
"station": key,
"start": group.attrs["time_period.start"],
"end": group.attrs["time_period.end"],
"latitude": group.attrs["location.latitude"],
"longitude": group.attrs["location.longitude"],
}
st_list.append(entry)
df = pd.DataFrame(st_list)
df.start = pd.to_datetime(df.start)
df.end = pd.to_datetime(df.end)
return df
[docs] def add_station(self, station_name, station_metadata=None):
"""
Add a station with metadata if given with the path:
``/Survey/Stations/station_name``
If the station already exists, will return that station and nothing
is added.
:param station_name: Name of the station, should be the same as
metadata.id
:type station_name: string
:param station_metadata: Station metadata container, defaults to None
:type station_metadata: :class:`mth5.metadata.Station`, optional
:return: A convenience class for the added station
:rtype: :class:`mth5_groups.StationGroup`
:Example: ::
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> # one option
>>> stations = mth5_obj.stations_group
>>> new_station = stations.add_station('MT001')
>>> # another option
>>> new_staiton = mth5_obj.stations_group.add_station('MT001')
.. todo:: allow dictionaries, json string, xml elements as metadata
input.
"""
if station_name is None:
raise Exception("station name is None, do not know what to name it")
station_name = validate_name(station_name)
try:
station_group = self.hdf5_group.create_group(station_name)
self.logger.debug("Created group %s", station_group.name)
if station_metadata is None:
station_metadata = metadata.Station(id=station_name)
else:
if validate_name(station_metadata.id) != station_name:
msg = (
f"Station group name {station_name} must be same as "
+ f"station id {station_metadata.id}"
)
self.logger.error(msg)
raise MTH5Error(msg)
station_obj = StationGroup(
station_group,
station_metadata=station_metadata,
**self.dataset_options,
)
station_obj.initialize_group()
# be sure to add a table entry
# self.summary_table.add_row(station_obj.table_entry)
except ValueError:
msg = "Station %s already exists, returning existing group."
self.logger.info(msg, station_name)
station_obj = self.get_station(station_name)
return station_obj
[docs] def get_station(self, station_name):
"""
Get a station with the same name as station_name
:param station_name: existing station name
:type station_name: string
:return: convenience station class
:rtype: :class:`mth5.mth5_groups.StationGroup`
:raises MTH5Error: if the station name is not found.
:Example:
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> # one option
>>> stations = mth5_obj.stations_group
>>> existing_station = stations.get_station('MT001')
>>> # another option
>>> existing_staiton = mth5_obj.stations_group.get_station('MT001')
MTH5Error: MT001 does not exist, check station_list for existing names
"""
station_name = validate_name(station_name)
try:
return StationGroup(
self.hdf5_group[station_name], **self.dataset_options
)
except KeyError:
msg = (
f"{station_name} does not exist, "
+ "check station_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
[docs] def remove_station(self, station_name):
"""
Remove a station from the file.
.. note:: Deleting a station is not as simple as del(station). In HDF5
this does not free up memory, it simply removes the reference
to that station. The common way to get around this is to
copy what you want into a new file, or overwrite the station.
:param station_name: existing station name
:type station_name: string
:Example: ::
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> # one option
>>> stations = mth5_obj.stations_group
>>> stations.remove_station('MT001')
>>> # another option
>>> mth5_obj.stations_group.remove_station('MT001')
"""
station_name = validate_name(station_name)
try:
del self.hdf5_group[station_name]
self.logger.info(
"Deleting a station does not reduce the HDF5"
+ "file size it simply remove the reference. If "
+ "file size reduction is your goal, simply copy"
+ " what you want into another file."
)
except KeyError:
msg = (
f"{station_name} does not exist, "
+ "check station_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
# =============================================================================
# Station Group
# =============================================================================
[docs]class StationGroup(BaseGroup):
"""
StationGroup is a utility class to hold information about a single station
and accompanying metadata. This class is the next level down from
Stations --> ``/Survey/Stations/station_name``.
This class provides methods to add and get runs. A summary table of all
existing runs in the station is also provided as a convenience look up
table to make searching easier.
:param group: HDF5 group for a station, should have a path
``/Survey/Stations/station_name``
:type group: :class:`h5py.Group`
:param station_metadata: metadata container, defaults to None
:type station_metadata: :class:`mth5.metadata.Station`, optional
:Usage:
:Access StationGroup from an open MTH5 file:
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> station = mth5_obj.stations_group.get_station('MT001')
:Check what runs exist:
>>> station.groups_list
['MT001a', 'MT001b', 'MT001c', 'MT001d']
To access the hdf5 group directly use `StationGroup.hdf5_group`.
>>> station.hdf5_group.ref
<HDF5 Group Reference>
.. note:: All attributes should be input into the metadata object, that
way all input will be validated against the metadata standards.
If you change attributes in metadata object, you should run the
`SurveyGroup.write_metadata()` method. This is a temporary
solution, working on an automatic updater if metadata is changed.
>>> station.metadata.existing_attribute = 'update_existing_attribute'
>>> station.write_metadata()
If you want to add a new attribute this should be done using the
`metadata.add_base_attribute` method.
>>> station.metadata.add_base_attribute('new_attribute',
>>> ... 'new_attribute_value',
>>> ... {'type':str,
>>> ... 'required':True,
>>> ... 'style':'free form',
>>> ... 'description': 'new attribute desc.',
>>> ... 'units':None,
>>> ... 'options':[],
>>> ... 'alias':[],
>>> ... 'example':'new attribute
:To add a run:
>>> new_run = stations.add_run('MT001e')
>>> new_run
/Survey/Stations/Test_01:
=========================
|- Group: MT001e
-----------------
--> Dataset: summary
......................
--> Dataset: summary
......................
:Add a run with metadata:
>>> from mth5.metadata import Run
>>> run_metadata = Run()
>>> run_metadata.time_period.start = '2020-01-01T12:30:00'
>>> run_metadata.time_period.end = '2020-01-03T16:30:00'
>>> run_metadata.location.latitude = 40.000
>>> run_metadata.location.longitude = -120.000
>>> new_run = runs.add_run('Test_01', run_metadata)
>>> # to look at the metadata
>>> new_run.metadata
{
"run": {
"acquired_by.author": "new_user",
"acquired_by.comments": "First time",
"channels_recorded_auxiliary": ['T'],
...
}
}
.. seealso:: `mth5.metadata` for details on how to add metadata from
various files and python objects.
:Remove a run:
>>> station.remove_run('new_run')
>>> station
/Survey/Stations/Test_01:
=========================
--> Dataset: summary
......................
.. note:: Deleting a station is not as simple as del(station). In HDF5
this does not free up memory, it simply removes the reference
to that station. The common way to get around this is to
copy what you want into a new file, or overwrite the station.
:Get a run:
>>> existing_run = stations.get_station('existing_run')
>>> existing_run
/Survey/Stations/MT001/MT001a:
=======================================
--> Dataset: summary
......................
--> Dataset: Ex
......................
--> Dataset: Ey
......................
--> Dataset: Hx
......................
--> Dataset: Hy
......................
--> Dataset: Hz
......................
:summary Table:
A summary table is provided to make searching easier. The table
summarized all stations within a survey. To see what names are in the
summary table:
>>> new_run.summary_table.dtype.descr
[('id', ('|S20', {'h5py_encoding': 'ascii'})),
('start', ('|S32', {'h5py_encoding': 'ascii'})),
('end', ('|S32', {'h5py_encoding': 'ascii'})),
('components', ('|S100', {'h5py_encoding': 'ascii'})),
('measurement_type', ('|S12', {'h5py_encoding': 'ascii'})),
('sample_rate', '<f8'),
('hdf5_reference', ('|O', {'ref': h5py.h5r.Reference}))]
.. note:: When a run is added an entry is added to the summary table,
where the information is pulled from the metadata.
>>> station.summary_table
index | id | start | end | components | measurement_type | sample_rate |
hdf5_reference
--------------------------------------------------------------------------
-------------
"""
def __init__(self, group, station_metadata=None, **kwargs):
super().__init__(group, group_metadata=station_metadata, **kwargs)
self._defaults_summary_keys = [
"id",
"start",
"end",
"components",
"measurement_type",
"sample_rate",
"hdf5_reference",
"mth5_type",
]
self._default_subgroup_names = [
"Transfer_Functions",
]
[docs] def initialize_group(self, **kwargs):
"""
Initialize group by making a summary table and writing metadata
"""
for key, value in kwargs.items():
setattr(self, key, value)
self.write_metadata()
for group_name in self._default_subgroup_names:
self.hdf5_group.create_group(f"{group_name}")
m5_grp = getattr(self, f"{group_name.lower()}_group")
m5_grp.initialize_group()
@property
def master_station_group(self):
"""shortcut to master station group"""
return MasterStationGroup(self.hdf5_group.parent)
@property
def transfer_functions_group(self):
"""Convinience method for /Station/Transfer_Functions"""
return TransferFunctionsGroup(
self.hdf5_group["Transfer_Functions"], **self.dataset_options
)
@BaseGroup.metadata.getter
def metadata(self):
"""Overwrite get metadata to include run information in the station"""
self._metadata.runs = []
for key in self.groups_list:
if key.lower() == "transfer_functions":
continue
try:
key_group = self.get_run(key)
self._metadata.runs.append(key_group.metadata)
except MTH5Error:
self.logger.warning(f"Could not find run {key}")
return self._metadata
@property
def name(self):
return self.metadata.id
@name.setter
def name(self, name):
self.metadata.id = name
@property
def run_summary(self):
"""
Summary of runs in the station
:return: DESCRIPTION
:rtype: TYPE
"""
run_list = []
for key, group in self.hdf5_group.items():
if group.attrs["mth5_type"].lower() in ["run"]:
comps = ",".join(
[
ii.decode()
for ii in group.attrs[
"channels_recorded_auxiliary"
].tolist()
+ group.attrs["channels_recorded_electric"].tolist()
+ group.attrs["channels_recorded_magnetic"].tolist()
]
)
run_list.append(
(
group.attrs["id"],
group.attrs["time_period.start"].split("+")[0],
group.attrs["time_period.end"].split("+")[0],
comps,
group.attrs["data_type"],
group.attrs["sample_rate"],
self.hdf5_group.ref,
)
)
run_summary = np.array(
run_list,
dtype=np.dtype(
[
("id", "U20"),
("start", "datetime64[ns]"),
("end", "datetime64[ns]"),
("components", "U100"),
("measurement_type", "U12"),
("sample_rate", float),
("hdf5_reference", h5py.ref_dtype),
]
),
)
return pd.DataFrame(run_summary)
[docs] def make_run_name(self, alphabet=False):
"""
Make a run name that will be the next alphabet letter extracted from
the run list. Expects that all runs are labled as id{a-z}.
:return: metadata.id + next letter
:rtype: string
>>> station.metadata.id = 'MT001'
>>> station.make_run_name()
'MT001a'
"""
run_list = sorted(
[group[-1:] for group in self.groups_list if self.name in group]
)
next_letter = None
if len(run_list) == 0:
if alphabet:
next_letter = "a"
else:
next_letter = "001"
else:
try:
next_letter = chr(ord(run_list[-1]) + 1)
except TypeError:
try:
next_letter = f"{int(run_list[-1]) + 1}"
except ValueError:
self.logger.info("Could not create a new run name")
return next_letter
[docs] def locate_run(self, sample_rate, start):
"""
Locate a run based on sample rate and start time from the summary table
:param sample_rate: sample rate in samples/seconds
:type sample_rate: float
:param start: start time
:type start: string or :class:`mth5.utils.mttime.MTime`
:return: appropriate run name, None if not found
:rtype: string or None
"""
if not isinstance(start, MTime):
start = MTime(start)
if self.run_summary.size < 1:
return None
sr_find = self.run_summary[
(self.run_summary.sample_rate == sample_rate)
& (self.run_summary.start == start)
]
if sr_find.size < 1:
return None
return sr_find
[docs] def add_run(self, run_name, run_metadata=None):
"""
Add a run to a station.
:param run_name: run name, should be id{a-z}
:type run_name: string
:param metadata: metadata container, defaults to None
:type metadata: :class:`mth5.metadata.Station`, optional
need to be able to fill an entry in the summary table.
.. todo:: auto fill run name if none is given.
.. todo:: add ability to add a run with data.
"""
run_name = validate_name(run_name)
try:
run_group = self.hdf5_group.create_group(run_name)
if run_metadata is None:
run_metadata = metadata.Run(id=run_name)
elif validate_name(run_metadata.id) != run_name:
msg = "Run name %s must be the same as run_metadata.id %s"
self.logger.error(msg, run_name, run_metadata.id)
raise MTH5Error(msg % (run_name, run_metadata.id))
run_obj = RunGroup(
run_group, run_metadata=run_metadata, **self.dataset_options
)
run_obj.initialize_group()
except ValueError:
msg = "run %s already exists, returning existing group."
self.logger.info(msg, run_name)
run_obj = self.get_run(run_name)
return run_obj
[docs] def get_run(self, run_name):
"""
get a run from run name
:param run_name: existing run name
:type run_name: string
:return: Run object
:rtype: :class:`mth5.mth5_groups.RunGroup`
>>> existing_run = station.get_run('MT001')
"""
run_name = validate_name(run_name)
try:
return RunGroup(self.hdf5_group[run_name], **self.dataset_options)
except KeyError:
msg = (
f"{run_name} does not exist, "
+ "check groups_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
[docs] def remove_run(self, run_name):
"""
Remove a run from the station.
.. note:: Deleting a station is not as simple as del(station). In HDF5
this does not free up memory, it simply removes the reference
to that station. The common way to get around this is to
copy what you want into a new file, or overwrite the station.
:param station_name: existing station name
:type station_name: string
:Example: ::
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> # one option
>>> stations = mth5_obj.stations_group
>>> stations.remove_station('MT001')
>>> # another option
>>> mth5_obj.stations_group.remove_station('MT001')
"""
run_name = validate_name(run_name)
try:
del self.hdf5_group[run_name]
self.logger.info(
"Deleting a run does not reduce the HDF5"
+ "file size it simply remove the reference. If "
+ "file size reduction is your goal, simply copy"
+ " what you want into another file."
)
except KeyError:
msg = (
f"{run_name} does not exist, "
+ "check station_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
# =============================================================================
# Transfer Functions Group
# =============================================================================
[docs]class TransferFunctionsGroup(BaseGroup):
"""
Object to hold transfer functions
The is the high level group, all transfer functions for the station are
held here and each one will have its own TransferFunctionGroup.
This has add, get, remove_transfer_function.
"""
def __init__(self, group, **kwargs):
super().__init__(group, **kwargs)
[docs] def tf_summary(self, as_dataframe=True):
"""
Summary of all transfer functions in this group
:return: DESCRIPTION
:rtype: TYPE
"""
tf_list = []
for tf_id in self.groups_list:
tf_group = self.get_transfer_function(tf_id)
tf_entry = tf_group.tf_entry
tf_entry["station_hdf5_reference"][:] = self.hdf5_group.parent.ref
tf_entry["station"][:] = self.hdf5_group.parent.attrs["id"]
tf_entry["latitude"][:] = self.hdf5_group.parent.attrs[
"location.latitude"
]
tf_entry["longitude"][:] = self.hdf5_group.parent.attrs[
"location.longitude"
]
tf_entry["elevation"][:] = self.hdf5_group.parent.attrs[
"location.elevation"
]
tf_list.append(tf_entry)
tf_list = np.array(tf_list)
if as_dataframe:
return pd.DataFrame(tf_list.flatten())
return tf_list
[docs] def add_transfer_function(self, name, tf_object=None):
"""
Add a transfer function to the group
:param name: name of the transfer function
:type name: string
:param tf_object: Transfer Function object
:type tf_object: :class:`mt_metadata.transfer_function.core.TF`
:return: DESCRIPTION
:rtype: TYPE
>>> from mth5.mth5 import MTH5
>>> m = MTH5()
>>> m.open_mth5("example.h5", "a")
>>> station_group = m.get_station("mt01", survey="test")
>>> tf_group = station_group.transfer_functions_group
>>> tf_group.add_transfer_function("mt01_4096", tf_object)
"""
name = validate_name(name)
tf_group = TransferFunctionGroup(
self.hdf5_group.create_group(name), **self.dataset_options
)
if tf_object is not None:
tf_group.from_tf_object(tf_object)
return tf_group
[docs] def get_transfer_function(self, tf_id):
"""
Get transfer function from id
:param tf_id: name of transfer function
:type tf_id: string
:return: Transfer function group
:rtype: :class:`mth5.groups.TransferFunctionGroup`
>>> from mth5.mth5 import MTH5
>>> m = MTH5()
>>> m.open_mth5("example.h5", "a")
>>> station_group = m.get_station("mt01", survey="test")
>>> tf_group = station_group.transfer_functions_group.get_transfer_function("mt01_4096")
"""
tf_id = validate_name(tf_id)
try:
return TransferFunctionGroup(
self.hdf5_group[tf_id], **self.dataset_options
)
except KeyError:
msg = (
f"{tf_id} does not exist, "
+ "check station_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
[docs] def remove_transfer_function(self, tf_id):
"""
Remove a transfer function from the group
:param tf_id: DESCRIPTION
:type tf_id: TYPE
:return: DESCRIPTION
:rtype: TYPE
>>> from mth5.mth5 import MTH5
>>> m = MTH5()
>>> m.open_mth5("example.h5", "a")
>>> station_group = m.get_station("mt01", survey="test")
>>> tf_group = station_group.transfer_functions_group
>>> tf_group.remove_transfer_function("mt01_4096")
"""
tf_id = validate_name(tf_id)
try:
del self.hdf5_group[tf_id]
self.logger.info(
"Deleting a station does not reduce the HDF5"
+ "file size it simply remove the reference. If "
+ "file size reduction is your goal, simply copy"
+ " what you want into another file."
)
except KeyError:
msg = (
f"{tf_id} does not exist, "
+ "check station_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
[docs] def get_tf_object(self, tf_id):
"""
This is the function you want to use to get a proper
:class:`mt_metadata.transfer_functions.core.TF` object with all the
appropriate metadata.
:param tf_id: name of the transfer function to get
:type tf_id: string
:return: Full transfer function with appropriate metadata
:rtype: :class:`mt_metadata.transfer_functions.core.TF`
>>> from mth5.mth5 import MTH5
>>> m = MTH5()
>>> m.open_mth5("example.h5", "a")
>>> station_group = m.get_station("mt01", survey="test")
>>> tf_group = station_group.transfer_functions_group
>>> tf_object = tf_group.get_tf_object("mt01_4096")
"""
tf_group = self.get_transfer_function(tf_id)
return tf_group.to_tf_object()
# =============================================================================
# Run Group
# =============================================================================
[docs]class RunGroup(BaseGroup):
"""
RunGroup is a utility class to hold information about a single run
and accompanying metadata. This class is the next level down from
Stations --> ``/Survey/Stations/station/station{a-z}``.
This class provides methods to add and get channels. A summary table of
all existing channels in the run is also provided as a convenience look up
table to make searching easier.
:param group: HDF5 group for a station, should have a path
``/Survey/Stations/station_name/run_name``
:type group: :class:`h5py.Group`
:param station_metadata: metadata container, defaults to None
:type station_metadata: :class:`mth5.metadata.Station`, optional
:Access RunGroup from an open MTH5 file:
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> run = mth5_obj.stations_group.get_station('MT001').get_run('MT001a')
:Check what channels exist:
>>> station.groups_list
['Ex', 'Ey', 'Hx', 'Hy']
To access the hdf5 group directly use `RunGroup.hdf5_group`
>>> station.hdf5_group.ref
<HDF5 Group Reference>
.. note:: All attributes should be input into the metadata object, that
way all input will be validated against the metadata standards.
If you change attributes in metadata object, you should run the
`SurveyGroup.write_metadata()` method. This is a temporary
solution, working on an automatic updater if metadata is changed.
>>> run.metadata.existing_attribute = 'update_existing_attribute'
>>> run.write_metadata()
If you want to add a new attribute this should be done using the
`metadata.add_base_attribute` method.
>>> station.metadata.add_base_attribute('new_attribute',
>>> ... 'new_attribute_value',
>>> ... {'type':str,
>>> ... 'required':True,
>>> ... 'style':'free form',
>>> ... 'description': 'new attribute desc.',
>>> ... 'units':None,
>>> ... 'options':[],
>>> ... 'alias':[],
>>> ... 'example':'new attribute
:Add a channel:
>>> new_channel = run.add_channel('Ex', 'electric',
>>> ... data=numpy.random.rand(4096))
>>> new_run
/Survey/Stations/MT001/MT001a:
=======================================
--> Dataset: summary
......................
--> Dataset: Ex
......................
--> Dataset: Ey
......................
--> Dataset: Hx
......................
--> Dataset: Hy
......................
:Add a channel with metadata:
>>> from mth5.metadata import Electric
>>> ex_metadata = Electric()
>>> ex_metadata.time_period.start = '2020-01-01T12:30:00'
>>> ex_metadata.time_period.end = '2020-01-03T16:30:00'
>>> new_ex = run.add_channel('Ex', 'electric',
>>> ... channel_metadata=ex_metadata)
>>> # to look at the metadata
>>> new_ex.metadata
{
"electric": {
"ac.end": 1.2,
"ac.start": 2.3,
...
}
}
.. seealso:: `mth5.metadata` for details on how to add metadata from
various files and python objects.
:Remove a channel:
>>> run.remove_channel('Ex')
>>> station
/Survey/Stations/MT001/MT001a:
=======================================
--> Dataset: summary
......................
--> Dataset: Ey
......................
--> Dataset: Hx
......................
--> Dataset: Hy
......................
.. note:: Deleting a station is not as simple as del(station). In HDF5
this does not free up memory, it simply removes the reference
to that station. The common way to get around this is to
copy what you want into a new file, or overwrite the station.
:Get a channel:
>>> existing_ex = stations.get_channel('Ex')
>>> existing_ex
Channel Electric:
-------------------
data type: Ex
data type: electric
data format: float32
data shape: (4096,)
start: 1980-01-01T00:00:00+00:00
end: 1980-01-01T00:32:+08:00
sample rate: 8
:summary Table:
A summary table is provided to make searching easier. The table
summarized all stations within a survey. To see what names are in the
summary table:
>>> run.summary_table.dtype.descr
[('component', ('|S5', {'h5py_encoding': 'ascii'})),
('start', ('|S32', {'h5py_encoding': 'ascii'})),
('end', ('|S32', {'h5py_encoding': 'ascii'})),
('n_samples', '<i4'),
('measurement_type', ('|S12', {'h5py_encoding': 'ascii'})),
('units', ('|S25', {'h5py_encoding': 'ascii'})),
('hdf5_reference', ('|O', {'ref': h5py.h5r.Reference}))]
.. note:: When a run is added an entry is added to the summary table,
where the information is pulled from the metadata.
>>> new_run.summary_table
index | component | start | end | n_samples | measurement_type | units |
hdf5_reference
--------------------------------------------------------------------------
-------------
"""
def __init__(self, group, run_metadata=None, **kwargs):
super().__init__(group, group_metadata=run_metadata, **kwargs)
# summary of channels in run
self._defaults_summary_attrs = {
"name": "summary",
"max_shape": (20,),
"dtype": np.dtype(
[
("component", "S20"),
("start", "S32"),
("end", "S32"),
("n_samples", int),
("measurement_type", "S12"),
("units", "S25"),
("hdf5_reference", h5py.ref_dtype),
]
),
}
@property
def station_group(self):
"""shortcut to station group"""
return StationGroup(self.hdf5_group.parent)
@property
def station_metadata(self):
"""station metadata"""
meta_dict = dict(self.hdf5_group.parent.attrs)
for key, value in meta_dict.items():
meta_dict[key] = from_numpy_type(value)
station_metadata = metadata.Station()
station_metadata.from_dict({"station": meta_dict})
return station_metadata
@property
def master_station_group(self):
"""shortcut to master station group"""
return MasterStationGroup(self.hdf5_group.parent.parent)
@property
def survey_metadata(self):
"""survey metadata"""
meta_dict = dict(self.hdf5_group.parent.parent.parent.attrs)
for key, value in meta_dict.items():
meta_dict[key] = from_numpy_type(value)
survey_metadata = metadata.Survey()
survey_metadata.from_dict({"survey": meta_dict})
return survey_metadata
@BaseGroup.metadata.getter
def metadata(self):
"""Overwrite get metadata to include channel information in the runs"""
self._metadata.channels = []
for ch in self.groups_list:
ch_group = self.get_channel(ch)
self._metadata.channels.append(ch_group.metadata)
self._metadata.hdf5_reference = self.hdf5_group.ref
return self._metadata
@property
def channel_summary(self):
"""
summary of channels in run
:return: DESCRIPTION
:rtype: TYPE
"""
ch_list = []
for key, group in self.hdf5_group.items():
try:
ch_type = group.attrs["type"]
if ch_type in ["electric", "magnetic", "auxiliary"]:
ch_list.append(
(
group.attrs["component"],
group.attrs["time_period.start"].split("+")[0],
group.attrs["time_period.end"].split("+")[0],
group.size,
group.attrs["type"],
group.attrs["units"],
group.ref,
)
)
except KeyError:
pass
ch_summary = np.array(
ch_list,
dtype=np.dtype(
[
("component", "U20"),
("start", "datetime64[ns]"),
("end", "datetime64[ns]"),
("n_samples", np.int64),
("measurement_type", "U12"),
("units", "U25"),
("hdf5_reference", h5py.ref_dtype),
]
),
)
return pd.DataFrame(ch_summary)
[docs] def add_channel(
self,
channel_name,
channel_type,
data,
channel_dtype="int32",
max_shape=(None,),
chunks=True,
channel_metadata=None,
**kwargs,
):
"""
add a channel to the run
:param channel_name: name of the channel
:type channel_name: string
:param channel_type: [ electric | magnetic | auxiliary ]
:type channel_type: string
:raises MTH5Error: If channel type is not correct
:param channel_metadata: metadata container, defaults to None
:type channel_metadata: [ :class:`mth5.metadata.Electric` |
:class:`mth5.metadata.Magnetic` |
:class:`mth5.metadata.Auxiliary` ], optional
:return: Channel container
:rtype: [ :class:`mth5.mth5_groups.ElectricDatset` |
:class:`mth5.mth5_groups.MagneticDatset` |
:class:`mth5.mth5_groups.AuxiliaryDatset` ]
>>> new_channel = run.add_channel('Ex', 'electric', None)
>>> new_channel
Channel Electric:
-------------------
component: None
data type: electric
data format: float32
data shape: (1,)
start: 1980-01-01T00:00:00+00:00
end: 1980-01-01T00:00:00+00:00
sample rate: None
"""
channel_name = validate_name(channel_name.lower())
for key, value in kwargs.items():
setattr(self, key, value)
if data is not None:
if data.size < 1024:
chunks = None
try:
if data is not None:
channel_group = self.hdf5_group.create_dataset(
channel_name,
data=data,
dtype=data.dtype,
chunks=chunks,
maxshape=max_shape,
**self.dataset_options,
)
# initialize an resizable data array
# need to set the chunk size to something useful, if the chunk
# size is 1 this causes performance issues and bloating of the
# hdf5 file. Set to 8196 for now. Should add a parameter for
# this
else:
if channel_metadata is not None:
# can estimate a size, this will help with allocating
# and set the chunk size to a realistic value
if (
channel_metadata.time_period.start
!= channel_metadata.time_period.end
):
if channel_metadata.sample_rate > 0:
estimate_size = (
int(
(
channel_metadata.time_period._end_dt
- channel_metadata.time_period._start_dt
)
* channel_metadata.sample_rate
),
)
else:
estimate_size = (1,)
chunks = CHUNK_SIZE
else:
estimate_size = (1,)
chunks = CHUNK_SIZE
if estimate_size[0] > 2**31:
estimate_size = (1,)
self.logger.warning(
"Estimated size is too large. Check start and end "
"times, initializing with size (1,)"
)
channel_group = self.hdf5_group.create_dataset(
channel_name,
shape=estimate_size,
maxshape=max_shape,
dtype=channel_dtype,
chunks=chunks,
**self.dataset_options,
)
if channel_metadata and channel_metadata.component is None:
channel_metadata.component = channel_name
if channel_type.lower() in ["magnetic"]:
channel_obj = MagneticDataset(
channel_group, dataset_metadata=channel_metadata
)
elif channel_type.lower() in ["electric"]:
channel_obj = ElectricDataset(
channel_group, dataset_metadata=channel_metadata
)
elif channel_type.lower() in ["auxiliary"]:
channel_obj = AuxiliaryDataset(
channel_group, dataset_metadata=channel_metadata
)
else:
msg = (
"`channel_type` must be in [ electric | magnetic | "
+ "auxiliary ]. Input was {0}".format(channel_type)
)
self.logger.error(msg)
raise MTH5Error(msg)
except (OSError, RuntimeError, ValueError):
msg = f"channel {channel_name} already exists, returning existing group."
self.logger.debug(msg)
channel_obj = self.get_channel(channel_name)
if data is not None:
self.logger.debug(
"Replacing data with new shape %s", data.shape
)
channel_obj.replace_dataset(data)
self.logger.debug("Updating metadata")
channel_obj.metadata.update(channel_metadata)
channel_obj.write_metadata()
self.logger.debug("Done with %s", channel_name)
# need to make sure the channel name is passed.
if channel_obj.metadata.component is None:
channel_obj.metadata.component = channel_name
channel_obj.write_metadata()
return channel_obj
[docs] def get_channel(self, channel_name):
"""
Get a channel from an existing name. Returns the appropriate
container.
:param channel_name: name of the channel
:type channel_name: string
:return: Channel container
:rtype: [ :class:`mth5.mth5_groups.ElectricDatset` |
:class:`mth5.mth5_groups.MagneticDatset` |
:class:`mth5.mth5_groups.AuxiliaryDatset` ]
:raises MTH5Error: If no channel is found
:Example:
>>> existing_channel = run.get_channel('Ex')
MTH5Error: Ex does not exist, check groups_list for existing names'
>>> run.groups_list
['Ey', 'Hx', 'Hz']
>>> existing_channel = run.get_channel('Ey')
>>> existing_channel
Channel Electric:
-------------------
component: Ey
data type: electric
data format: float32
data shape: (4096,)
start: 1980-01-01T00:00:00+00:00
end: 1980-01-01T00:00:01+00:00
sample rate: 4096
"""
channel_name = validate_name(channel_name.lower())
try:
ch_dataset = self.hdf5_group[channel_name]
except KeyError:
msg = (
f"{channel_name} does not exist, "
+ "check groups_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
if ch_dataset.attrs["mth5_type"].lower() in ["electric"]:
ch_metadata = meta_classes["Electric"]()
ch_metadata.from_dict({"Electric": ch_dataset.attrs})
channel = ElectricDataset(
ch_dataset,
dataset_metadata=ch_metadata,
write_metadata=False,
)
elif ch_dataset.attrs["mth5_type"].lower() in ["magnetic"]:
ch_metadata = meta_classes["Magnetic"]()
ch_metadata.from_dict({"Magnetic": ch_dataset.attrs})
channel = MagneticDataset(
ch_dataset,
dataset_metadata=ch_metadata,
write_metadata=False,
)
elif ch_dataset.attrs["mth5_type"].lower() in ["auxiliary"]:
ch_metadata = meta_classes["Auxiliary"]()
ch_metadata.from_dict({"Auxiliary": ch_dataset.attrs})
channel = AuxiliaryDataset(
ch_dataset,
dataset_metadata=ch_metadata,
write_metadata=False,
)
else:
channel = ChannelDataset(ch_dataset)
channel.read_metadata()
return channel
[docs] def remove_channel(self, channel_name):
"""
Remove a run from the station.
.. note:: Deleting a channel is not as simple as del(channel). In HDF5
this does not free up memory, it simply removes the reference
to that channel. The common way to get around this is to
copy what you want into a new file, or overwrite the channel.
:param station_name: existing station name
:type station_name: string
:Example:
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> run = mth5_obj.stations_group.get_station('MT001').get_run('MT001a')
>>> run.remove_channel('Ex')
.. todo:: Need to remove summary table entry as well.
"""
channel_name = validate_name(channel_name.lower())
try:
del self.hdf5_group[channel_name]
self.logger.info(
"Deleting a channel does not reduce the HDF5"
+ "file size it simply remove the reference. If "
+ "file size reduction is your goal, simply copy"
+ " what you want into another file."
)
except KeyError:
msg = (
f"{channel_name} does not exist, "
+ "check groups_list for existing names"
)
self.logger.debug("Error" + msg)
raise MTH5Error(msg)
[docs] def to_runts(self, start=None, end=None, n_samples=None):
"""
create a :class:`mth5.timeseries.RunTS` object from channels of the
run
:return: DESCRIPTION
:rtype: TYPE
"""
ch_list = []
for channel in self.groups_list:
if channel in ["summary"]:
continue
ch_obj = self.get_channel(channel)
if start is not None:
ts_obj = ch_obj.time_slice(start, end=end, n_samples=n_samples)
else:
ts_obj = ch_obj.to_channel_ts()
ch_list.append(ts_obj)
return RunTS(
ch_list,
run_metadata=self.metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
[docs] def from_runts(self, run_ts_obj, **kwargs):
"""
create channel datasets from a :class:`mth5.timeseries.RunTS` object
and update metadata.
:parameter :class:`mth5.timeseries.RunTS` run_ts_obj: Run object with all
the appropriate channels and metadata.
Will create a run group and appropriate channel datasets.
"""
if not isinstance(run_ts_obj, RunTS):
msg = f"Input must be a mth5.timeseries.RunTS object not {type(run_ts_obj)}"
self.logger.error(msg)
raise MTH5Error(msg)
self.metadata.update(run_ts_obj.run_metadata)
channels = []
for comp in run_ts_obj.channels:
ch = getattr(run_ts_obj, comp)
if ch.station_metadata.id is not None:
if ch.station_metadata.id != self.station_group.metadata.id:
if ch.station_metadata.id not in ["0", None]:
self.logger.warning(
f"Channel station.id {ch.station_metadata.id} != "
+ f" group station.id {self.station_group.metadata.id}"
)
if ch.run_metadata.id is not None:
if ch.run_metadata.id != self.metadata.id:
if ch.run_metadata.id not in ["0", None]:
self.logger.warning(
f"Channel run.id {ch.run_metadata.id} != "
+ f" group run.id {self.metadata.id}"
)
channels.append(self.from_channel_ts(ch))
self.update_run_metadata()
return channels
[docs] def from_channel_ts(self, channel_ts_obj):
"""
create a channel data set from a :class:`mth5.timeseries.ChannelTS` object and
update metadata.
:param channel_ts_obj: a single time series object
:type channel_ts_obj: :class:`mth5.timeseries.ChannelTS`
:return: new channel dataset
:rtype: :class:`mth5.groups.ChannelDataset
"""
if not isinstance(channel_ts_obj, ChannelTS):
msg = f"Input must be a mth5.timeseries.ChannelTS object not {type(channel_ts_obj)}"
self.logger.error(msg)
raise MTH5Error(msg)
## Need to add in the filters
if channel_ts_obj.channel_response_filter.filters_list != []:
from mth5.groups import FiltersGroup
fg = FiltersGroup(self.hdf5_group.parent.parent.parent["Filters"])
for ff in channel_ts_obj.channel_response_filter.filters_list:
fg.add_filter(ff)
ch_obj = self.add_channel(
channel_ts_obj.component,
channel_ts_obj.channel_metadata.type,
channel_ts_obj.ts,
channel_metadata=channel_ts_obj.channel_metadata,
)
# need to update the channels recorded
if channel_ts_obj.channel_metadata.type == "electric":
if self.metadata.channels_recorded_electric is None:
self.metadata.channels_recorded_electric = [
channel_ts_obj.component
]
elif (
channel_ts_obj.component
not in self.metadata.channels_recorded_electric
):
self.metadata.channels_recorded_electric.append(
channel_ts_obj.component
)
elif channel_ts_obj.channel_metadata.type == "magnetic":
if self.metadata.channels_recorded_magnetic is None:
self.metadata.channels_recorded_magnetic = [
channel_ts_obj.component
]
elif (
channel_ts_obj.component
not in self.metadata.channels_recorded_magnetic
):
self.metadata.channels_recorded_magnetic.append(
channel_ts_obj.component
)
elif channel_ts_obj.channel_metadata.type == "auxiliary":
if self.metadata.channels_recorded_auxiliary is None:
self.metadata.channels_recorded_auxiliary = [
channel_ts_obj.component
]
elif (
channel_ts_obj.component
not in self.metadata.channels_recorded_auxiliary
):
self.metadata.channels_recorded_auxiliary.append(
channel_ts_obj.component
)
return ch_obj
[docs] def plot(self, start=None, end=None, n_samples=None):
"""
Produce a simple matplotlib plot using runts
"""
runts = self.to_runts(start=start, end=end, n_samples=n_samples)
runts.plot()
[docs]class ChannelDataset:
"""
Holds a channel dataset. This is a simple container for the data to make
sure that the user has the flexibility to turn the channel into an object
they want to deal with.
For now all the numpy type slicing can be used on `hdf5_dataset`
:param dataset: dataset object for the channel
:type dataset: :class:`h5py.Dataset`
:param dataset_metadata: metadata container, defaults to None
:type dataset_metadata: [ :class:`mth5.metadata.Electric` |
:class:`mth5.metadata.Magnetic` |
:class:`mth5.metadata.Auxiliary` ], optional
:raises MTH5Error: If the dataset is not of the correct type
Utilities will be written to create some common objects like:
* xarray.DataArray
* pandas.DataFrame
* zarr
* dask.Array
The benefit of these other objects is that they can be indexed by time,
and they have much more buit-in funcionality.
.. code-block:: python
>>> from mth5 import mth5
>>> mth5_obj = mth5.MTH5()
>>> mth5_obj.open_mth5(r"/test.mth5", mode='a')
>>> run = mth5_obj.stations_group.get_station('MT001').get_run('MT001a')
>>> channel = run.get_channel('Ex')
>>> channel
Channel Electric:
-------------------
component: Ey
data type: electric
data format: float32
data shape: (4096,)
start: 1980-01-01T00:00:00+00:00
end: 1980-01-01T00:00:01+00:00
sample rate: 4096
"""
def __init__(
self, dataset, dataset_metadata=None, write_metadata=True, **kwargs
):
for key, value in kwargs.items():
setattr(self, key, value)
if dataset is not None and isinstance(dataset, (h5py.Dataset)):
self.hdf5_dataset = weakref.ref(dataset)()
self.logger = setup_logger(f"{__name__}.{self._class_name}")
# set metadata to the appropriate class. Standards is not a
# Base object so should be skipped. If the class name is not
# defined yet set to Base class.
self.metadata = Base()
try:
self.metadata = meta_classes[self._class_name]()
except KeyError:
self.metadata = Base()
if not hasattr(self.metadata, "mth5_type"):
self._add_base_attributes()
self.metadata.hdf5_reference = self.hdf5_dataset.ref
self.metadata.mth5_type = self._class_name
# if the input data set already has filled attributes, namely if the
# channel data already exists then read them in with our writing back
if "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
self.metadata.from_dict(
{self.hdf5_dataset.attrs["mth5_type"]: self.hdf5_dataset.attrs}
)
# if metadata is input, make sure that its the same class type amd write
# to the hdf5 dataset
if dataset_metadata is not None:
if not isinstance(dataset_metadata, type(self.metadata)):
msg = "metadata must be type metadata.%s not %s"
self.logger.error(msg, self._class_name, type(dataset_metadata))
raise MTH5Error(
msg % (self._class_name, type(dataset_metadata))
)
# load from dict because of the extra attributes for MTH5
self.metadata.from_dict(dataset_metadata.to_dict())
self.metadata.hdf5_reference = self.hdf5_dataset.ref
self.metadata.mth5_type = self._class_name
# write out metadata to make sure that its in the file.
if write_metadata:
self.write_metadata()
# if the attrs don't have the proper metadata keys yet write them
if not "mth5_type" in list(self.hdf5_dataset.attrs.keys()):
self.write_metadata()
def _add_base_attributes(self):
# add 2 attributes that will help with querying
# 1) the metadata class name
self.metadata.add_base_attribute(
"mth5_type",
self._class_name,
{
"type": str,
"required": True,
"style": "free form",
"description": "type of group",
"units": None,
"options": [],
"alias": [],
"example": "group_name",
"default": None,
},
)
# 2) the HDF5 reference that can be used instead of paths
self.metadata.add_base_attribute(
"hdf5_reference",
self.hdf5_dataset.ref,
{
"type": "h5py_reference",
"required": True,
"style": "free form",
"description": "hdf5 internal reference",
"units": None,
"options": [],
"alias": [],
"example": "<HDF5 Group Reference>",
"default": None,
},
)
def __str__(self):
try:
lines = ["Channel {0}:".format(self._class_name)]
lines.append("-" * (len(lines[0]) + 2))
info_str = "\t{0:<18}{1}"
lines.append(info_str.format("component:", self.metadata.component))
lines.append(info_str.format("data type:", self.metadata.type))
lines.append(
info_str.format("data format:", self.hdf5_dataset.dtype)
)
lines.append(
info_str.format("data shape:", self.hdf5_dataset.shape)
)
lines.append(
info_str.format("start:", self.metadata.time_period.start)
)
lines.append(info_str.format("end:", self.metadata.time_period.end))
lines.append(
info_str.format("sample rate:", self.metadata.sample_rate)
)
return "\n".join(lines)
except ValueError:
return "MTH5 file is closed and cannot be accessed."
def __repr__(self):
return self.__str__()
@property
def _class_name(self):
return self.__class__.__name__.split("Dataset")[0]
@property
def run_group(self):
"""shortcut to run group"""
return RunGroup(self.hdf5_dataset.parent)
@property
def run_metadata(self):
"""run metadata"""
meta_dict = dict(self.hdf5_dataset.parent.attrs)
for key, value in meta_dict.items():
meta_dict[key] = from_numpy_type(value)
run_metadata = metadata.Run()
run_metadata.from_dict({"run": meta_dict})
return run_metadata
@property
def station_group(self):
"""shortcut to station group"""
return StationGroup(self.hdf5_dataset.parent.parent)
@property
def station_metadata(self):
"""station metadata"""
meta_dict = dict(self.hdf5_dataset.parent.parent.attrs)
for key, value in meta_dict.items():
meta_dict[key] = from_numpy_type(value)
station_metadata = metadata.Station()
station_metadata.from_dict({"station": meta_dict})
return station_metadata
@property
def master_station_group(self):
"""shortcut to master station group"""
return MasterStationGroup(self.hdf5_dataset.parent.parent.parent)
@property
def survey_metadata(self):
"""survey metadata"""
meta_dict = dict(self.hdf5_dataset.parent.parent.parent.parent.attrs)
for key, value in meta_dict.items():
meta_dict[key] = from_numpy_type(value)
survey_metadata = metadata.Survey()
survey_metadata.from_dict({"survey": meta_dict})
return survey_metadata
@property
def survey_id(self):
"""shortcut to survey group"""
return self.hdf5_dataset.parent.parent.parent.parent.attrs["id"]
@property
def channel_response_filter(self):
# get the filters to make a channel response
filters_group = FiltersGroup(
self.hdf5_dataset.parent.parent.parent.parent["Filters"]
)
f_list = []
for name in self.metadata.filter.name:
name = name.replace("/", " per ").lower()
try:
f_list.append(filters_group.to_filter_object(name))
except KeyError:
self.logger.warning("Could not locate filter %s", name)
continue
return ChannelResponseFilter(filters_list=f_list)
@property
def start(self):
return self.metadata.time_period._start_dt
@start.setter
def start(self, value):
"""set start time and validate through metadata validator"""
if isinstance(value, MTime):
self.metadata.time_period.start = value.iso_str
else:
self.metadata.time_period.start = value
@property
def end(self):
"""return end time based on the data"""
return self.start + (self.n_samples / self.sample_rate)
@property
def sample_rate(self):
return self.metadata.sample_rate
@sample_rate.setter
def sample_rate(self, value):
"""set sample rate through metadata validator"""
self.metadata.sample_rate = value
@property
def n_samples(self):
return self.hdf5_dataset.size
@property
def time_index(self):
"""
Create a time index based on the metadata. This can help when asking
for time windows from the data
:return: DESCRIPTION
:rtype: TYPE
"""
return make_dt_coordinates(
self.start, self.sample_rate, self.n_samples, self._logger
)
[docs] def replace_dataset(self, new_data_array):
"""
replace the entire dataset with a new one, nothing left behind
:param new_data_array: new data array shape (npts, )
:type new_data_array: :class:`numpy.ndarray`
"""
if not isinstance(new_data_array, np.ndarray):
try:
new_data_array = np.array(new_data_array)
except (ValueError, TypeError) as error:
msg = f"{error} Input must be a numpy array not {type(new_data_array)}"
self.logger.exception(msg)
raise TypeError(msg)
if new_data_array.shape != self.hdf5_dataset.shape:
self.hdf5_dataset.resize(new_data_array.shape)
self.hdf5_dataset[...] = new_data_array
[docs] def extend_dataset(
self,
new_data_array,
start_time,
sample_rate,
fill=None,
max_gap_seconds=1,
fill_window=10,
):
"""
Append data according to how the start time aligns with existing
data. If the start time is before existing start time the data is
prepended, similarly if the start time is near the end data will be
appended.
If the start time is within the existing time range, existing data
will be replace with the new data.
If there is a gap between start or end time of the new data with
the existing data you can either fill the data with a constant value
or an error will be raise depending on the value of fill.
:param new_data_array: new data array with shape (npts, )
:type new_data_array: :class:`numpy.ndarray`
:param start_time: start time of the new data array in UTC
:type start_time: string or :class:`mth5.utils.mttime.MTime`
:param sample_rate: Sample rate of the new data array, must match
existing sample rate
:type sample_rate: float
:param fill: If there is a data gap how do you want to fill the gap
* None: will raise an :class:`mth5.utils.exceptions.MTH5Error`
* 'mean': will fill with the mean of each data set within
the fill window
* 'median': will fill with the median of each data set
within the fill window
* value: can be an integer or float to fill the gap
* 'nan': will fill the gap with NaN
:type fill: string, None, float, integer
:param max_gap_seconds: sets a maximum number of seconds the gap can
be. Anything over this number will raise
a :class:`mth5.utils.exceptions.MTH5Error`.
:type max_gap_seconds: float or integer
:param fill_window: number of points from the end of each data set
to estimate fill value from.
:type fill_window: integer
:raises: :class:`mth5.utils.excptions.MTH5Error` if sample rate is
not the same, or fill value is not understood,
:Append Example:
>>> ex = mth5_obj.get_channel('MT001', 'MT001a', 'Ex')
>>> ex.n_samples
4096
>>> ex.end
2015-01-08T19:32:09.500000+00:00
>>> t = timeseries.ChannelTS('electric',
... data=2*np.cos(4 * np.pi * .05 * \
... np.linspace(0,4096l num=4096) *
... .01),
... channel_metadata={'electric':{
... 'component': 'ex',
... 'sample_rate': 8,
... 'time_period.start':(ex.end+(1)).iso_str}})
>>> ex.extend_dataset(t.ts, t.start, t.sample_rate, fill='median',
... max_gap_seconds=2)
2020-07-02T18:02:47 - mth5.groups.Electric.extend_dataset - INFO -
filling data gap with 1.0385180759767025
>>> ex.n_samples
8200
>>> ex.end
2015-01-08T19:40:42.500000+00:00
"""
fw = fill_window
# check input parameters
if sample_rate != self.sample_rate:
msg = (
"new data must have the same sampling rate as existing data.\n"
+ f"\tnew sample rate = {sample_rate}\n"
+ f"\texisting sample rate = {self.sample_rate}"
)
self.logger.error(msg)
raise MTH5Error(msg)
if not isinstance(new_data_array, np.ndarray):
try:
new_data_array = np.array(new_data_array)
except (ValueError, TypeError) as error:
msg = f"{error} Input must be a numpy array not {type(new_data_array)}"
self.logger.exception(msg)
raise TypeError(msg)
if not isinstance(start_time, MTime):
start_time = MTime(start_time)
# get end time will need later
end_time = start_time + (new_data_array.size / sample_rate)
# check start time
start_t_diff = self._get_diff_new_array_start(start_time)
end_t_diff = self._get_diff_new_array_end(end_time)
self.logger.info("Extending data.")
self.logger.info(f"Existing start time {self.start}")
self.logger.info(f"New start time {start_time}")
self.logger.info(f"Existing end time {self.end}")
self.logger.info(f"New end time {end_time}")
# prepend data
if start_t_diff < 0:
self.logger.info("Prepending: ")
self.logger.info(
f"new start time {start_time} is before existing {self.start}"
)
if end_time.iso_no_tz not in self.time_index:
gap = abs(end_time - self.start)
if gap > 0:
if gap > max_gap_seconds:
msg = (
f"Time gap of {gap} seconds "
+ f"is more than max_gap_seconds = {max_gap_seconds}."
+ " Consider making a new run."
)
self.logger.error(msg)
raise MTH5Error(msg)
if fill is None:
msg = (
f"A time gap of {gap} seconds is found "
+ "between new and existing data sets. \n"
+ f"\tnew end time: {end_time}\n"
+ f"\texisting start time: {self.start}"
)
self.logger.error(msg)
raise MTH5Error(msg)
# set new start time
old_slice = self.time_slice(self.start, end_time=self.end)
old_start = self.start.copy()
self.start = start_time
# resize the existing data to make room for new data
self.hdf5_dataset.resize(
(
int(
new_data_array.size
+ self.hdf5_dataset.size
+ gap * sample_rate
),
)
)
# fill based on time, refill existing data first
self.hdf5_dataset[
self.get_index_from_time(old_start) :
] = old_slice.ts.values
self.hdf5_dataset[
0 : self.get_index_from_time(end_time)
] = new_data_array
if fill == "mean":
fill_value = np.mean(
np.array(
[
new_data_array[-fw:].mean(),
float(old_slice.ts[0:fw].mean()),
]
)
)
elif fill == "median":
fill_value = np.median(
np.array(
[
np.median(new_data_array[-fw:]),
np.median(old_slice.ts[0:fw]),
]
)
)
elif fill == "nan":
fill_value = np.nan
elif isinstance(fill, (int, float)):
fill_value = fill
else:
msg = f"fill value {fill} is not understood"
self.logger.error(msg)
raise MTH5Error(msg)
self.logger.info(f"filling data gap with {fill_value}")
self.hdf5_dataset[
self.get_index_from_time(
end_time
) : self.get_index_from_time(old_start)
] = fill_value
else:
new_size = (
self.n_samples + int(abs(start_t_diff) * sample_rate),
)
overlap = abs(end_time - self.start)
self.logger.warning(
f"New data is overlapping by {overlap} s."
+ " Any overlap will be overwritten."
)
# set new start time
old_slice = self.time_slice(self.start, end_time=self.end)
old_start = self.start.copy()
self.start = start_time
self.logger.debug(
f"resizing data set from {self.n_samples} to {new_size}"
)
self.hdf5_dataset.resize(new_size)
# put back the existing data, which any overlapping times
# will be overwritten
self.hdf5_dataset[
self.get_index_from_time(old_start) :
] = old_slice.ts.values
self.hdf5_dataset[
0 : self.get_index_from_time(end_time)
] = new_data_array
# append data
elif start_t_diff > 0:
old_end = self.end.copy()
if start_time.iso_no_tz not in self.time_index:
gap = abs(self.end - start_time)
if gap > 0:
if gap > max_gap_seconds:
msg = (
f"Time gap of {gap} seconds "
+ f"is more than max_gap_seconds = {max_gap_seconds}."
+ " Consider making a new run."
)
self.logger.error(msg)
raise MTH5Error(msg)
if fill is None:
msg = (
f"A time gap of {gap} seconds is found "
+ "between new and existing data sets. \n"
+ f"\tnew start time: {start_time}\n"
+ f"\texisting end time: {self.end}"
)
self.logger.error(msg)
raise MTH5Error(msg)
# resize the existing data to make room for new data
self.hdf5_dataset.resize(
(
int(
new_data_array.size
+ self.hdf5_dataset.size
+ gap * sample_rate
),
)
)
self.hdf5_dataset[
self.get_index_from_time(start_time) :
] = new_data_array
old_index = self.get_index_from_time(old_end)
if fill == "mean":
fill_value = np.mean(
np.array(
[
new_data_array[0:fw].mean(),
np.mean(
self.hdf5_dataset[old_index - fw :]
),
]
)
)
elif fill == "median":
fill_value = np.median(
np.array(
[
np.median(new_data_array[0:fw]),
np.median(
self.hdf5_dataset[old_index - fw :]
),
]
)
)
elif fill == "nan":
fill_value = np.nan
elif isinstance(fill, (int, float)):
fill_value = fill
else:
msg = f"fill value {fill} is not understood"
self.logger.error(msg)
raise MTH5Error(msg)
self.logger.info(f"filling data gap with {fill_value}")
self.hdf5_dataset[
self.get_index_from_time(
old_end
) : self.get_index_from_time(start_time)
] = fill_value
else:
# if the new data fits within the extisting time span
if end_t_diff < 0:
self.logger.debug(
"New data fits within existing time span"
+ " all data in the window : "
f"{start_time} -- {end_time} " + "will be overwritten."
)
self.hdf5_dataset[
self.get_index_from_time(
start_time
) : self.get_index_from_time(end_time)
] = new_data_array
else:
new_size = (
self.n_samples + int(abs(start_t_diff) * sample_rate),
)
overlap = abs(self.end - start_time)
self.logger.warning(
f"New data is overlapping by {overlap} s."
+ " Any overlap will be overwritten."
)
self.logger.debug(
f"resizing data set from {self.n_samples} to {new_size}"
)
self.hdf5_dataset.resize(new_size)
# put back the existing data, which any overlapping times
# will be overwritten
self.hdf5_dataset[
self.get_index_from_time(start_time) :
] = new_data_array
[docs] def to_channel_ts(self):
"""
:return: a Timeseries with the appropriate time index and metadata
:rtype: :class:`mth5.timeseries.ChannelTS`
loads from memory (nearly half the size of xarray alone, not sure why)
"""
return ChannelTS(
channel_type=self.metadata.type,
data=self.hdf5_dataset[()],
channel_metadata=self.metadata,
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
survey_metadata=self.survey_metadata.copy(),
channel_response_filter=self.channel_response_filter,
)
[docs] def to_xarray(self):
"""
:return: an xarray DataArray with appropriate metadata and the
appropriate time index.
:rtype: :class:`xarray.DataArray`
.. note:: that metadta will not be validated if changed in an xarray.
loads from memory
"""
return xr.DataArray(
self.hdf5_dataset[()],
coords=[("time", self.time_index)],
attrs=self.metadata.to_dict(single=True),
)
[docs] def to_dataframe(self):
"""
:return: a dataframe where data is stored in the 'data' column and
attributes are stored in the experimental attrs attribute
:rtype: :class:`pandas.DataFrame`
.. note:: that metadta will not be validated if changed in an xarray.
loads into RAM
"""
df = pd.DataFrame(
{"data": self.hdf5_dataset[()]}, index=self.time_index
)
df.attrs.update(self.metadata.to_dict(single=True))
return df
[docs] def to_numpy(self):
"""
:return: a numpy structured array with 2 columns (time, channel_data)
:rtype: :class:`numpy.core.records`
.. note:: data is a builtin to numpy and cannot be used as a name
loads into RAM
"""
return np.core.records.fromarrays(
[self.time_index.to_numpy(), self.hdf5_dataset[()]],
names="time,channel_data",
)
[docs] def from_channel_ts(
self,
channel_ts_obj,
how="replace",
fill=None,
max_gap_seconds=1,
fill_window=10,
):
"""
fill data set from a :class:`mth5.timeseries.ChannelTS` object.
Will check for time alignement, and metadata.
:param channel_ts_obj: time series object
:type channel_ts_obj: :class:`mth5.timeseries.ChannelTS`
:param how: how the new array will be input to the existing dataset:
- 'replace' -> replace the entire dataset nothing is left over.
- 'extend' -> add onto the existing dataset, any overlapping
values will be rewritten, if there are gaps between data sets
those will be handled depending on the value of fill.
:param fill: If there is a data gap how do you want to fill the gap:
- None -> will raise an :class:`mth5.utils.exceptions.MTH5Error`
- 'mean'-> will fill with the mean of each data set within
the fill window
- 'median' -> will fill with the median of each data set
within the fill window
- value -> can be an integer or float to fill the gap
- 'nan' -> will fill the gap with NaN
:type fill: string, None, float, integer
:param max_gap_seconds: sets a maximum number of seconds the gap can
be. Anything over this number will raise
a :class:`mth5.utils.exceptions.MTH5Error`.
:type max_gap_seconds: float or integer
:param fill_window: number of points from the end of each data set
to estimate fill value from.
:type fill_window: integer
"""
if not isinstance(channel_ts_obj, ChannelTS):
msg = f"Input must be a ChannelTS object not {type(channel_ts_obj)}"
self.logger.error(msg)
raise TypeError(msg)
if how == "replace":
self.metadata.from_dict(channel_ts_obj.channel_metadata.to_dict())
self.replace_dataset(channel_ts_obj.ts)
# apparently need to reset these otherwise they get overwritten with None
self.metadata.hdf5_reference = self.hdf5_dataset.ref
self.metadata.mth5_type = self._class_name
self.write_metadata()
elif how == "extend":
self.extend_dataset(
channel_ts_obj.ts,
channel_ts_obj.start,
channel_ts_obj.sample_rate,
fill=fill,
)
#
# TODO need to check on metadata.
[docs] def from_xarray(
self,
data_array,
how="replace",
fill=None,
max_gap_seconds=1,
fill_window=10,
):
"""
fill data set from a :class:`xarray.DataArray` object.
Will check for time alignement, and metadata.
:param data_array_obj: Xarray data array
:type channel_ts_obj: :class:`xarray.DataArray`
:param how: how the new array will be input to the existing dataset:
- 'replace' -> replace the entire dataset nothing is left over.
- 'extend' -> add onto the existing dataset, any overlapping
values will be rewritten, if there are gaps between data sets
those will be handled depending on the value of fill.
:param fill: If there is a data gap how do you want to fill the gap:
- None -> will raise an :class:`mth5.utils.exceptions.MTH5Error`
- 'mean'-> will fill with the mean of each data set within
the fill window
- 'median' -> will fill with the median of each data set
within the fill window
- value -> can be an integer or float to fill the gap
- 'nan' -> will fill the gap with NaN
:type fill: string, None, float, integer
:param max_gap_seconds: sets a maximum number of seconds the gap can
be. Anything over this number will raise a
:class:`mth5.utils.exceptions.MTH5Error`.
:type max_gap_seconds: float or integer
:param fill_window: number of points from the end of each data set
to estimate fill value from.
:type fill_window: integer
"""
if not isinstance(data_array, xr.DataArray):
msg = f"Input must be a xarray.DataArray object not {type(data_array)}"
self.logger.error(msg)
raise TypeError(msg)
if how == "replace":
self.metadata.from_dict(
{self.metadata._class_name: data_array.attrs}
)
self.replace_dataset(data_array.values)
self.write_metadata()
elif how == "extend":
self.extend_dataset(
data_array.values,
data_array.coords.indexes["time"][0].isoformat(),
1e9 / data_array.coords.indexes["time"][0].freq.nanos,
fill=fill,
)
# TODO need to check on metadata.
def _get_diff_new_array_start(self, start_time):
"""
Make sure the new array has the same start time if not return the
time difference
:param start_time: start time of the new array
:type start_time: string, int or :class:`mth5.utils.MTime`
:return: time difference in seconds as new start time minus old.
* A positive number means new start time is later than old
start time.
* A negative number means the new start time is earlier than
the old start time.
:rtype: float
"""
if not isinstance(start_time, MTime):
start_time = MTime(start_time)
t_diff = 0
if start_time != self.start:
t_diff = start_time - self.start
return t_diff
def _get_diff_new_array_end(self, end_time):
"""
Make sure the new array has the same end time if not return the
time difference
:param end_time: end time of the new array
:type end_time: string, int or :class:`mth5.utils.MTime`
:return: time difference in seconds as new end time minus old.
* A positive number means new end time is later than old
end time.
* A negative number means the new end time is earlier than
the old end time.
:rtype: float
"""
if not isinstance(end_time, MTime):
end_time = MTime(end_time)
t_diff = 0
if end_time != self.end:
t_diff = end_time - self.end
return t_diff
@property
def table_entry(self):
"""
Creat a table entry to put into the run summary table.
"""
return np.array(
[
(
self.metadata.component,
self.metadata.time_period._start_dt.iso_no_tz,
self.metadata.time_period._end_dt.iso_no_tz,
self.hdf5_dataset.size,
self.metadata.type,
self.metadata.units,
self.hdf5_dataset.ref,
)
],
dtype=np.dtype(
[
("component", "U20"),
("start", "datetime64[ns]"),
("end", "datetime64[ns]"),
("n_samples", int),
("measurement_type", "U12"),
("units", "U25"),
("hdf5_reference", h5py.ref_dtype),
]
),
)
@property
def channel_entry(self):
"""
channel entry that will go into a full channel summary of the entire survey
"""
return np.array(
[
(
self.survey_id,
self.hdf5_dataset.parent.parent.attrs["id"],
self.hdf5_dataset.parent.attrs["id"],
self.hdf5_dataset.parent.parent.attrs["location.latitude"],
self.hdf5_dataset.parent.parent.attrs["location.longitude"],
self.hdf5_dataset.parent.parent.attrs["location.elevation"],
self.metadata.component,
self.metadata.time_period.start,
self.metadata.time_period.end,
self.hdf5_dataset.size,
self.metadata.sample_rate,
self.metadata.type,
self.metadata.measurement_azimuth,
self.metadata.measurement_tilt,
self.metadata.units,
self.hdf5_dataset.ref,
self.hdf5_dataset.parent.ref,
self.hdf5_dataset.parent.parent.ref,
)
],
dtype=CHANNEL_DTYPE,
)
[docs] def time_slice(
self,
start,
end=None,
n_samples=None,
return_type="channel_ts",
):
"""
Get a time slice from the channel and return the appropriate type
* numpy array with metadata
* pandas.Dataframe with metadata
* xarray.DataFrame with metadata
* :class:`mth5.timeseries.ChannelTS` 'default'
* dask.DataFrame with metadata 'not yet'
:param start: start time of the slice
:type start: string or :class:`mth5.utils.mttime.MTime`
:param end: end time of the slice
:type end: string or :class:`mth5.utils.mttime.MTime`, optional
:param n_samples: number of samples to read in
:type n_samples: integer, optional
:return: the correct container for the time series.
:rtype: [ :class:`xarray.DataArray` | :class:`pandas.DataFrame` |
:class:`mth5.timeseries.ChannelTS` | :class:`numpy.ndarray` ]
:raises: ValueError if both end_time and n_samples are None or given.
:Example with number of samples:
.. code-block::
>>> ex = mth5_obj.get_channel('FL001', 'FL001a', 'Ex')
>>> ex_slice = ex.time_slice("2015-01-08T19:49:15", n_samples=4096)
>>> ex_slice
<xarray.DataArray (time: 4096)>
array([0.93115046, 0.14233688, 0.87917119, ..., 0.26073634, 0.7137319 ,
0.88154395])
Coordinates:
* time (time) datetime64[ns] 2015-01-08T19:49:15 ... 2015-01-08T19:57:46.875000
Attributes:
ac.end: None
ac.start: None
...
>>> type(ex_slice)
mth5.timeseries.ChannelTS
# plot the time series
>>> ex_slice.ts.plot()
:Example with start and end time:
>>> ex_slice = ex.time_slice("2015-01-08T19:49:15",
... end_time="2015-01-09T19:49:15")
:Raises Example:
>>> ex_slice = ex.time_slice("2015-01-08T19:49:15",
... end_time="2015-01-09T19:49:15",
... n_samples=4096)
ValueError: Must input either end_time or n_samples, not both.
"""
if not isinstance(start, MTime):
start = MTime(start)
if end is not None:
if not isinstance(end, MTime):
end = MTime(end)
if n_samples is not None:
n_samples = int(n_samples)
if n_samples is None and end is None:
msg = "Must input either end_time or n_samples."
self.logger.error(msg)
raise ValueError(msg)
if n_samples is not None and end is not None:
msg = "Must input either end_time or n_samples, not both."
self.logger.error(msg)
raise ValueError(msg)
# if end time is given
if end is not None and n_samples is None:
start_index = self.get_index_from_time(start)
end_index = self.get_index_from_time(end)
npts = int(end_index - start_index)
# if n_samples are given
elif end is None and n_samples is not None:
start_index = self.get_index_from_time(start)
end_index = start_index + (n_samples - 1)
npts = n_samples
if npts > self.hdf5_dataset.size or end_index > self.hdf5_dataset.size:
msg = (
"Requested slice is larger than data. "
f"Slice length = {npts}, data length = {self.hdf5_dataset.shape}. "
f"Setting end_index to {self.hdf5_dataset.shape}"
)
end_index = self.hdf5_dataset.size - 1
self.logger.warning(msg)
# create a regional reference that can be used, need +1 to be inclusive
try:
regional_ref = self.hdf5_dataset.regionref[
start_index : end_index + 1
]
except (OSError, RuntimeError):
self.logger.debug(
"file is in read mode cannot set an internal reference, using index values"
)
regional_ref = slice(start_index, end_index)
dt_index = make_dt_coordinates(
start, self.sample_rate, npts, self.logger
)
meta_dict = self.metadata.to_dict()[self.metadata._class_name]
meta_dict["time_period.start"] = dt_index[0].isoformat()
meta_dict["time_period.end"] = dt_index[-1].isoformat()
data = None
if return_type == "xarray":
# need the +1 to be inclusive of the last point
data = xr.DataArray(
self.hdf5_dataset[regional_ref], coords=[("time", dt_index)]
)
data.attrs.update(meta_dict)
elif return_type == "pandas":
data = pd.DataFrame(
{"data": self.hdf5_dataset[regional_ref]}, index=dt_index
)
data.attrs.update(meta_dict)
elif return_type == "numpy":
data = self.hdf5_dataset[regional_ref]
elif return_type == "channel_ts":
data = ChannelTS(
self.metadata.type,
data=self.hdf5_dataset[regional_ref],
channel_metadata={self.metadata.type: meta_dict},
channel_response_filter=self.channel_response_filter,
)
else:
msg = "return_type not understood, must be [ pandas | numpy | channel_ts ]"
self.logger.error(msg)
raise ValueError(msg)
return data
[docs] def get_index_from_time(self, given_time):
"""
get the appropriate index for a given time.
:param given_time: time string
:type given_time: string or MTime
:return: index value
:rtype: int
"""
if not isinstance(given_time, MTime):
given_time = MTime(given_time)
index = (
given_time - self.metadata.time_period.start
) * self.metadata.sample_rate
return int(round(index))
[docs]@inherit_doc_string
class ElectricDataset(ChannelDataset):
def __init__(self, group, **kwargs):
super().__init__(group, **kwargs)
[docs]@inherit_doc_string
class MagneticDataset(ChannelDataset):
def __init__(self, group, **kwargs):
super().__init__(group, **kwargs)
[docs]@inherit_doc_string
class AuxiliaryDataset(ChannelDataset):
def __init__(self, group, **kwargs):
super().__init__(group, **kwargs)