Source code for mth5.groups.channel_dataset

# -*- coding: utf-8 -*-
"""
Created on Sat May 27 10:03:23 2023

@author: jpeacock
"""

# =============================================================================
# Imports
# =============================================================================
import inspect
import weakref

import h5py
import numpy as np
import pandas as pd
import xarray as xr
from loguru import logger

from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
from mt_metadata.base import Base
from mt_metadata.timeseries.filters import ChannelResponseFilter

from mth5 import CHANNEL_DTYPE
from mth5.utils.exceptions import MTH5Error
from mth5.groups import FiltersGroup
from mth5.helpers import (
    to_numpy_type,
    from_numpy_type,
    inherit_doc_string,
)

from mth5.timeseries import ChannelTS
from mth5.timeseries.channel_ts import make_dt_coordinates


meta_classes = dict(inspect.getmembers(metadata, inspect.isclass))
# =============================================================================
[docs]class ChannelDataset: """ Holds a channel dataset. This is a simple container for the data to make sure that the user has the flexibility to turn the channel into an object they want to deal with. For now all the numpy type slicing can be used on `hdf5_dataset` :param dataset: dataset object for the channel :type dataset: :class:`h5py.Dataset` :param dataset_metadata: metadata container, defaults to None :type dataset_metadata: [ :class:`mth5.metadata.Electric` | :class:`mth5.metadata.Magnetic` | :class:`mth5.metadata.Auxiliary` ], optional :raises MTH5Error: If the dataset is not of the correct type Utilities will be written to create some common objects like: * xarray.DataArray * pandas.DataFrame * zarr * dask.Array The benefit of these other objects is that they can be indexed by time, and they have much more buit-in funcionality. .. code-block:: python >>> from mth5 import mth5 >>> mth5_obj = mth5.MTH5() >>> mth5_obj.open_mth5(r"/test.mth5", mode='a') >>> run = mth5_obj.stations_group.get_station('MT001').get_run('MT001a') >>> channel = run.get_channel('Ex') >>> channel Channel Electric: ------------------- component: Ey data type: electric data format: float32 data shape: (4096,) start: 1980-01-01T00:00:00+00:00 end: 1980-01-01T00:00:01+00:00 sample rate: 4096 """ def __init__( self, dataset, dataset_metadata=None, write_metadata=True, **kwargs ): for key, value in kwargs.items(): setattr(self, key, value) if dataset is not None and isinstance(dataset, (h5py.Dataset)): self.hdf5_dataset = weakref.ref(dataset)() self.logger = logger # set metadata to the appropriate class. Standards is not a # Base object so should be skipped. If the class name is not # defined yet set to Base class. self.metadata = Base() try: self.metadata = meta_classes[self._class_name]() except KeyError: self.metadata = Base() if not hasattr(self.metadata, "mth5_type"): self._add_base_attributes() self.metadata.hdf5_reference = self.hdf5_dataset.ref self.metadata.mth5_type = self._class_name # if the input data set already has filled attributes, namely if the # channel data already exists then read them in with our writing back if "mth5_type" in list(self.hdf5_dataset.attrs.keys()): self.metadata.from_dict( {self.hdf5_dataset.attrs["mth5_type"]: self.hdf5_dataset.attrs} ) # if metadata is input, make sure that its the same class type amd write # to the hdf5 dataset if dataset_metadata is not None: if not isinstance(dataset_metadata, type(self.metadata)): msg = ( f"metadata must be type metadata.{self._class_name} not " f"{type(dataset_metadata)}" ) self.logger.error(msg) raise MTH5Error(msg) # load from dict because of the extra attributes for MTH5 self.metadata.from_dict(dataset_metadata.to_dict()) self.metadata.hdf5_reference = self.hdf5_dataset.ref self.metadata.mth5_type = self._class_name # write out metadata to make sure that its in the file. if write_metadata: self.write_metadata() else: self.read_metadata() # if the attrs don't have the proper metadata keys yet write them if not "mth5_type" in list(self.hdf5_dataset.attrs.keys()): self.write_metadata() def _add_base_attributes(self): # add 2 attributes that will help with querying # 1) the metadata class name self.metadata.add_base_attribute( "mth5_type", self._class_name, { "type": str, "required": True, "style": "free form", "description": "type of group", "units": None, "options": [], "alias": [], "example": "group_name", "default": None, }, ) # 2) the HDF5 reference that can be used instead of paths self.metadata.add_base_attribute( "hdf5_reference", self.hdf5_dataset.ref, { "type": "h5py_reference", "required": True, "style": "free form", "description": "hdf5 internal reference", "units": None, "options": [], "alias": [], "example": "<HDF5 Group Reference>", "default": None, }, ) def __str__(self): try: lines = ["Channel {0}:".format(self._class_name)] lines.append("-" * (len(lines[0]) + 2)) info_str = "\t{0:<18}{1}" lines.append( info_str.format("component:", self.metadata.component) ) lines.append(info_str.format("data type:", self.metadata.type)) lines.append( info_str.format("data format:", self.hdf5_dataset.dtype) ) lines.append( info_str.format("data shape:", self.hdf5_dataset.shape) ) lines.append( info_str.format("start:", self.metadata.time_period.start) ) lines.append( info_str.format("end:", self.metadata.time_period.end) ) lines.append( info_str.format("sample rate:", self.metadata.sample_rate) ) return "\n".join(lines) except ValueError: return "MTH5 file is closed and cannot be accessed." def __repr__(self): return self.__str__() @property def _class_name(self): return self.__class__.__name__.split("Dataset")[0] @property def run_metadata(self): """run metadata""" meta_dict = dict(self.hdf5_dataset.parent.attrs) for key, value in meta_dict.items(): meta_dict[key] = from_numpy_type(value) run_metadata = metadata.Run() run_metadata.from_dict({"run": meta_dict}) run_metadata.add_channel(self.metadata) return run_metadata @property def station_metadata(self): """station metadata""" meta_dict = dict(self.hdf5_dataset.parent.parent.attrs) for key, value in meta_dict.items(): meta_dict[key] = from_numpy_type(value) station_metadata = metadata.Station() station_metadata.from_dict({"station": meta_dict}) station_metadata.add_run(self.run_metadata) return station_metadata @property def survey_metadata(self): """survey metadata""" meta_dict = dict(self.hdf5_dataset.parent.parent.parent.parent.attrs) for key, value in meta_dict.items(): meta_dict[key] = from_numpy_type(value) survey_metadata = metadata.Survey() survey_metadata.from_dict({"survey": meta_dict}) survey_metadata.add_station(self.station_metadata) return survey_metadata @property def survey_id(self): """shortcut to survey group""" return self.hdf5_dataset.parent.parent.parent.parent.attrs["id"] @property def channel_response_filter(self): # get the filters to make a channel response filters_group = FiltersGroup( self.hdf5_dataset.parent.parent.parent.parent["Filters"] ) f_list = [] for name in self.metadata.filter.name: name = name.replace("/", " per ").lower() try: f_list.append(filters_group.to_filter_object(name)) except KeyError: self.logger.warning(f"Could not locate filter {name}") continue return ChannelResponseFilter(filters_list=f_list) @property def start(self): return self.metadata.time_period._start_dt @start.setter def start(self, value): """set start time and validate through metadata validator""" if isinstance(value, MTime): self.metadata.time_period.start = value.iso_str else: self.metadata.time_period.start = value @property def end(self): """return end time based on the data""" return self.start + ((self.n_samples - 1) / self.sample_rate) @property def sample_rate(self): return self.metadata.sample_rate @sample_rate.setter def sample_rate(self, value): """set sample rate through metadata validator""" self.metadata.sample_rate = value @property def n_samples(self): return self.hdf5_dataset.size @property def time_index(self): """ Create a time index based on the metadata. This can help when asking for time windows from the data :return: DESCRIPTION :rtype: TYPE """ return make_dt_coordinates( self.start, self.sample_rate, self.n_samples )
[docs] def read_metadata(self): """ Read metadata from the HDF5 file into the metadata container, that way it can be validated. """ meta_dict = dict(self.hdf5_dataset.attrs) for key, value in meta_dict.items(): meta_dict[key] = from_numpy_type(value) self.metadata.from_dict({self._class_name: meta_dict})
[docs] def write_metadata(self): """ Write metadata from the metadata container to the HDF5 attrs dictionary. """ meta_dict = self.metadata.to_dict()[self.metadata._class_name.lower()] for key, value in meta_dict.items(): value = to_numpy_type(value) self.hdf5_dataset.attrs.create(key, value)
[docs] def replace_dataset(self, new_data_array): """ replace the entire dataset with a new one, nothing left behind :param new_data_array: new data array shape (npts, ) :type new_data_array: :class:`numpy.ndarray` """ if not isinstance(new_data_array, np.ndarray): try: new_data_array = np.array(new_data_array) except (ValueError, TypeError) as error: msg = f"{error} Input must be a numpy array not {type(new_data_array)}" self.logger.exception(msg) raise TypeError(msg) if new_data_array.shape != self.hdf5_dataset.shape: self.hdf5_dataset.resize(new_data_array.shape) self.hdf5_dataset[...] = new_data_array
[docs] def extend_dataset( self, new_data_array, start_time, sample_rate, fill=None, max_gap_seconds=1, fill_window=10, ): """ Append data according to how the start time aligns with existing data. If the start time is before existing start time the data is prepended, similarly if the start time is near the end data will be appended. If the start time is within the existing time range, existing data will be replace with the new data. If there is a gap between start or end time of the new data with the existing data you can either fill the data with a constant value or an error will be raise depending on the value of fill. :param new_data_array: new data array with shape (npts, ) :type new_data_array: :class:`numpy.ndarray` :param start_time: start time of the new data array in UTC :type start_time: string or :class:`mth5.utils.mttime.MTime` :param sample_rate: Sample rate of the new data array, must match existing sample rate :type sample_rate: float :param fill: If there is a data gap how do you want to fill the gap * None: will raise an :class:`mth5.utils.exceptions.MTH5Error` * 'mean': will fill with the mean of each data set within the fill window * 'median': will fill with the median of each data set within the fill window * value: can be an integer or float to fill the gap * 'nan': will fill the gap with NaN :type fill: string, None, float, integer :param max_gap_seconds: sets a maximum number of seconds the gap can be. Anything over this number will raise a :class:`mth5.utils.exceptions.MTH5Error`. :type max_gap_seconds: float or integer :param fill_window: number of points from the end of each data set to estimate fill value from. :type fill_window: integer :raises: :class:`mth5.utils.excptions.MTH5Error` if sample rate is not the same, or fill value is not understood, :Append Example: >>> ex = mth5_obj.get_channel('MT001', 'MT001a', 'Ex') >>> ex.n_samples 4096 >>> ex.end 2015-01-08T19:32:09.500000+00:00 >>> t = timeseries.ChannelTS('electric', ... data=2*np.cos(4 * np.pi * .05 * \ ... np.linspace(0,4096l num=4096) * ... .01), ... channel_metadata={'electric':{ ... 'component': 'ex', ... 'sample_rate': 8, ... 'time_period.start':(ex.end+(1)).iso_str}}) >>> ex.extend_dataset(t.ts, t.start, t.sample_rate, fill='median', ... max_gap_seconds=2) 2020-07-02T18:02:47 - mth5.groups.Electric.extend_dataset - INFO - filling data gap with 1.0385180759767025 >>> ex.n_samples 8200 >>> ex.end 2015-01-08T19:40:42.500000+00:00 """ fw = fill_window # check input parameters if sample_rate != self.sample_rate: msg = ( "new data must have the same sampling rate as existing data.\n" + f"\tnew sample rate = {sample_rate}\n" + f"\texisting sample rate = {self.sample_rate}" ) self.logger.error(msg) raise MTH5Error(msg) if not isinstance(new_data_array, np.ndarray): try: new_data_array = np.array(new_data_array) except (ValueError, TypeError) as error: msg = f"{error} Input must be a numpy array not {type(new_data_array)}" self.logger.exception(msg) raise TypeError(msg) if not isinstance(start_time, MTime): start_time = MTime(start_time) # get end time will need later end_time = start_time + (new_data_array.size / sample_rate) # check start time start_t_diff = self._get_diff_new_array_start(start_time) end_t_diff = self._get_diff_new_array_end(end_time) self.logger.info("Extending data.") self.logger.info(f"Existing start time {self.start}") self.logger.info(f"New start time {start_time}") self.logger.info(f"Existing end time {self.end}") self.logger.info(f"New end time {end_time}") # prepend data if start_t_diff < 0: self.logger.info("Prepending: ") self.logger.info( f"new start time {start_time} is before existing {self.start}" ) if end_time.iso_no_tz not in self.time_index: gap = abs(end_time - self.start) if gap > 0: if gap > max_gap_seconds: msg = ( f"Time gap of {gap} seconds " + f"is more than max_gap_seconds = {max_gap_seconds}." + " Consider making a new run." ) self.logger.error(msg) raise MTH5Error(msg) if fill is None: msg = ( f"A time gap of {gap} seconds is found " + "between new and existing data sets. \n" + f"\tnew end time: {end_time}\n" + f"\texisting start time: {self.start}" ) self.logger.error(msg) raise MTH5Error(msg) # set new start time old_slice = self.time_slice(self.start, end_time=self.end) old_start = self.start.copy() self.start = start_time # resize the existing data to make room for new data self.hdf5_dataset.resize( ( int( new_data_array.size + self.hdf5_dataset.size + gap * sample_rate ), ) ) # fill based on time, refill existing data first self.hdf5_dataset[ self.get_index_from_time(old_start) : ] = old_slice.ts.values self.hdf5_dataset[ 0 : self.get_index_from_time(end_time) ] = new_data_array if fill == "mean": fill_value = np.mean( np.array( [ new_data_array[-fw:].mean(), float(old_slice.ts[0:fw].mean()), ] ) ) elif fill == "median": fill_value = np.median( np.array( [ np.median(new_data_array[-fw:]), np.median(old_slice.ts[0:fw]), ] ) ) elif fill == "nan": fill_value = np.nan elif isinstance(fill, (int, float)): fill_value = fill else: msg = f"fill value {fill} is not understood" self.logger.error(msg) raise MTH5Error(msg) self.logger.info(f"filling data gap with {fill_value}") self.hdf5_dataset[ self.get_index_from_time( end_time ) : self.get_index_from_time(old_start) ] = fill_value else: new_size = ( self.n_samples + int(abs(start_t_diff) * sample_rate), ) overlap = abs(end_time - self.start) self.logger.warning( f"New data is overlapping by {overlap} s." + " Any overlap will be overwritten." ) # set new start time old_slice = self.time_slice(self.start, end_time=self.end) old_start = self.start.copy() self.start = start_time self.logger.debug( f"resizing data set from {self.n_samples} to {new_size}" ) self.hdf5_dataset.resize(new_size) # put back the existing data, which any overlapping times # will be overwritten self.hdf5_dataset[ self.get_index_from_time(old_start) : ] = old_slice.ts.values self.hdf5_dataset[ 0 : self.get_index_from_time(end_time) ] = new_data_array # append data elif start_t_diff > 0: old_end = self.end.copy() if start_time.iso_no_tz not in self.time_index: gap = abs(self.end - start_time) if gap > 0: if gap > max_gap_seconds: msg = ( f"Time gap of {gap} seconds " + f"is more than max_gap_seconds = {max_gap_seconds}." + " Consider making a new run." ) self.logger.error(msg) raise MTH5Error(msg) if fill is None: msg = ( f"A time gap of {gap} seconds is found " + "between new and existing data sets. \n" + f"\tnew start time: {start_time}\n" + f"\texisting end time: {self.end}" ) self.logger.error(msg) raise MTH5Error(msg) # resize the existing data to make room for new data self.hdf5_dataset.resize( ( int( new_data_array.size + self.hdf5_dataset.size + gap * sample_rate ), ) ) self.hdf5_dataset[ self.get_index_from_time(start_time) : ] = new_data_array old_index = self.get_index_from_time(old_end) if fill == "mean": fill_value = np.mean( np.array( [ new_data_array[0:fw].mean(), np.mean( self.hdf5_dataset[old_index - fw :] ), ] ) ) elif fill == "median": fill_value = np.median( np.array( [ np.median(new_data_array[0:fw]), np.median( self.hdf5_dataset[old_index - fw :] ), ] ) ) elif fill == "nan": fill_value = np.nan elif isinstance(fill, (int, float)): fill_value = fill else: msg = f"fill value {fill} is not understood" self.logger.error(msg) raise MTH5Error(msg) self.logger.info(f"filling data gap with {fill_value}") self.hdf5_dataset[ self.get_index_from_time( old_end ) : self.get_index_from_time(start_time) ] = fill_value else: # if the new data fits within the extisting time span if end_t_diff < 0: self.logger.debug( "New data fits within existing time span" + " all data in the window : " f"{start_time} -- {end_time} " + "will be overwritten." ) self.hdf5_dataset[ self.get_index_from_time( start_time ) : self.get_index_from_time(end_time) ] = new_data_array else: new_size = ( self.n_samples + int(abs(start_t_diff) * sample_rate), ) overlap = abs(self.end - start_time) self.logger.warning( f"New data is overlapping by {overlap} s." + " Any overlap will be overwritten." ) self.logger.debug( f"resizing data set from {self.n_samples} to {new_size}" ) self.hdf5_dataset.resize(new_size) # put back the existing data, which any overlapping times # will be overwritten self.hdf5_dataset[ self.get_index_from_time(start_time) : ] = new_data_array
[docs] def to_channel_ts(self): """ :return: a Timeseries with the appropriate time index and metadata :rtype: :class:`mth5.timeseries.ChannelTS` loads from memory (nearly half the size of xarray alone, not sure why) """ return ChannelTS( channel_type=self.metadata.type, data=self.hdf5_dataset[()], channel_metadata=self.metadata, run_metadata=self.run_metadata.copy(), station_metadata=self.station_metadata.copy(), survey_metadata=self.survey_metadata.copy(), channel_response_filter=self.channel_response_filter, )
[docs] def to_xarray(self): """ :return: an xarray DataArray with appropriate metadata and the appropriate time index. :rtype: :class:`xarray.DataArray` .. note:: that metadta will not be validated if changed in an xarray. loads from memory """ return xr.DataArray( self.hdf5_dataset[()], coords=[("time", self.time_index)], attrs=self.metadata.to_dict(single=True), )
[docs] def to_dataframe(self): """ :return: a dataframe where data is stored in the 'data' column and attributes are stored in the experimental attrs attribute :rtype: :class:`pandas.DataFrame` .. note:: that metadta will not be validated if changed in an xarray. loads into RAM """ df = pd.DataFrame( {"data": self.hdf5_dataset[()]}, index=self.time_index ) df.attrs.update(self.metadata.to_dict(single=True)) return df
[docs] def to_numpy(self): """ :return: a numpy structured array with 2 columns (time, channel_data) :rtype: :class:`numpy.core.records` .. note:: data is a builtin to numpy and cannot be used as a name loads into RAM """ return np.core.records.fromarrays( [self.time_index.to_numpy(), self.hdf5_dataset[()]], names="time,channel_data", )
[docs] def from_channel_ts( self, channel_ts_obj, how="replace", fill=None, max_gap_seconds=1, fill_window=10, ): """ fill data set from a :class:`mth5.timeseries.ChannelTS` object. Will check for time alignement, and metadata. :param channel_ts_obj: time series object :type channel_ts_obj: :class:`mth5.timeseries.ChannelTS` :param how: how the new array will be input to the existing dataset: - 'replace' -> replace the entire dataset nothing is left over. - 'extend' -> add onto the existing dataset, any overlapping values will be rewritten, if there are gaps between data sets those will be handled depending on the value of fill. :param fill: If there is a data gap how do you want to fill the gap: - None -> will raise an :class:`mth5.utils.exceptions.MTH5Error` - 'mean'-> will fill with the mean of each data set within the fill window - 'median' -> will fill with the median of each data set within the fill window - value -> can be an integer or float to fill the gap - 'nan' -> will fill the gap with NaN :type fill: string, None, float, integer :param max_gap_seconds: sets a maximum number of seconds the gap can be. Anything over this number will raise a :class:`mth5.utils.exceptions.MTH5Error`. :type max_gap_seconds: float or integer :param fill_window: number of points from the end of each data set to estimate fill value from. :type fill_window: integer """ if not isinstance(channel_ts_obj, ChannelTS): msg = ( f"Input must be a ChannelTS object not {type(channel_ts_obj)}" ) self.logger.error(msg) raise TypeError(msg) if how == "replace": self.metadata.from_dict(channel_ts_obj.channel_metadata.to_dict()) self.replace_dataset(channel_ts_obj.ts) # apparently need to reset these otherwise they get overwritten with None self.metadata.hdf5_reference = self.hdf5_dataset.ref self.metadata.mth5_type = self._class_name self.write_metadata() elif how == "extend": self.extend_dataset( channel_ts_obj.ts, channel_ts_obj.start, channel_ts_obj.sample_rate, fill=fill, )
# # TODO need to check on metadata.
[docs] def from_xarray( self, data_array, how="replace", fill=None, max_gap_seconds=1, fill_window=10, ): """ fill data set from a :class:`xarray.DataArray` object. Will check for time alignement, and metadata. :param data_array_obj: Xarray data array :type channel_ts_obj: :class:`xarray.DataArray` :param how: how the new array will be input to the existing dataset: - 'replace' -> replace the entire dataset nothing is left over. - 'extend' -> add onto the existing dataset, any overlapping values will be rewritten, if there are gaps between data sets those will be handled depending on the value of fill. :param fill: If there is a data gap how do you want to fill the gap: - None -> will raise an :class:`mth5.utils.exceptions.MTH5Error` - 'mean'-> will fill with the mean of each data set within the fill window - 'median' -> will fill with the median of each data set within the fill window - value -> can be an integer or float to fill the gap - 'nan' -> will fill the gap with NaN :type fill: string, None, float, integer :param max_gap_seconds: sets a maximum number of seconds the gap can be. Anything over this number will raise a :class:`mth5.utils.exceptions.MTH5Error`. :type max_gap_seconds: float or integer :param fill_window: number of points from the end of each data set to estimate fill value from. :type fill_window: integer """ if not isinstance(data_array, xr.DataArray): msg = f"Input must be a xarray.DataArray object not {type(data_array)}" self.logger.error(msg) raise TypeError(msg) if how == "replace": self.metadata.from_dict( {self.metadata._class_name: data_array.attrs} ) self.replace_dataset(data_array.values) self.write_metadata() elif how == "extend": self.extend_dataset( data_array.values, data_array.coords.indexes["time"][0].isoformat(), 1e9 / data_array.coords.indexes["time"][0].freq.nanos, fill=fill, )
# TODO need to check on metadata. def _get_diff_new_array_start(self, start_time): """ Make sure the new array has the same start time if not return the time difference :param start_time: start time of the new array :type start_time: string, int or :class:`mth5.utils.MTime` :return: time difference in seconds as new start time minus old. * A positive number means new start time is later than old start time. * A negative number means the new start time is earlier than the old start time. :rtype: float """ if not isinstance(start_time, MTime): start_time = MTime(start_time) t_diff = 0 if start_time != self.start: t_diff = start_time - self.start return t_diff def _get_diff_new_array_end(self, end_time): """ Make sure the new array has the same end time if not return the time difference :param end_time: end time of the new array :type end_time: string, int or :class:`mth5.utils.MTime` :return: time difference in seconds as new end time minus old. * A positive number means new end time is later than old end time. * A negative number means the new end time is earlier than the old end time. :rtype: float """ if not isinstance(end_time, MTime): end_time = MTime(end_time) t_diff = 0 if end_time != self.end: t_diff = end_time - self.end return t_diff @property def channel_entry(self): """ channel entry that will go into a full channel summary of the entire survey """ return np.array( [ ( self.survey_id, self.hdf5_dataset.parent.parent.attrs["id"], self.hdf5_dataset.parent.attrs["id"], self.hdf5_dataset.parent.parent.attrs["location.latitude"], self.hdf5_dataset.parent.parent.attrs[ "location.longitude" ], self.hdf5_dataset.parent.parent.attrs[ "location.elevation" ], self.metadata.component, self.metadata.time_period.start, self.metadata.time_period.end, self.hdf5_dataset.size, self.metadata.sample_rate, self.metadata.type, self.metadata.measurement_azimuth, self.metadata.measurement_tilt, self.metadata.units, self.hdf5_dataset.ref, self.hdf5_dataset.parent.ref, self.hdf5_dataset.parent.parent.ref, ) ], dtype=CHANNEL_DTYPE, )
[docs] def time_slice( self, start, end=None, n_samples=None, return_type="channel_ts", ): """ Get a time slice from the channel and return the appropriate type * numpy array with metadata * pandas.Dataframe with metadata * xarray.DataFrame with metadata * :class:`mth5.timeseries.ChannelTS` 'default' * dask.DataFrame with metadata 'not yet' :param start: start time of the slice :type start: string or :class:`mth5.utils.mttime.MTime` :param end: end time of the slice :type end: string or :class:`mth5.utils.mttime.MTime`, optional :param n_samples: number of samples to read in :type n_samples: integer, optional :return: the correct container for the time series. :rtype: [ :class:`xarray.DataArray` | :class:`pandas.DataFrame` | :class:`mth5.timeseries.ChannelTS` | :class:`numpy.ndarray` ] :raises: ValueError if both end_time and n_samples are None or given. :Example with number of samples: .. code-block:: >>> ex = mth5_obj.get_channel('FL001', 'FL001a', 'Ex') >>> ex_slice = ex.time_slice("2015-01-08T19:49:15", n_samples=4096) >>> ex_slice <xarray.DataArray (time: 4096)> array([0.93115046, 0.14233688, 0.87917119, ..., 0.26073634, 0.7137319 , 0.88154395]) Coordinates: * time (time) datetime64[ns] 2015-01-08T19:49:15 ... 2015-01-08T19:57:46.875000 Attributes: ac.end: None ac.start: None ... >>> type(ex_slice) mth5.timeseries.ChannelTS # plot the time series >>> ex_slice.ts.plot() :Example with start and end time: >>> ex_slice = ex.time_slice("2015-01-08T19:49:15", ... end_time="2015-01-09T19:49:15") :Raises Example: >>> ex_slice = ex.time_slice("2015-01-08T19:49:15", ... end_time="2015-01-09T19:49:15", ... n_samples=4096) ValueError: Must input either end_time or n_samples, not both. """ start_index, end_index, npts = self._get_slice_index_values( start, end, n_samples ) if npts > self.hdf5_dataset.size or end_index > self.hdf5_dataset.size: # leave as +1 to be inclusive end_index = self.hdf5_dataset.size msg = ( "Requested slice is larger than data. " f"Slice length = {npts}, data length = {self.hdf5_dataset.shape}. " f"Setting end_index to {end_index}" ) self.logger.warning(msg) if start_index < 0: # leave as +1 to be inclusive start_index = 0 msg = ( f"Requested start {start} is before data start {self.start}. " f"Setting start_index to {start_index} and start to {self.start}" ) self.logger.warning(msg) start = self.start # create a regional reference that can be used try: regional_ref = self.hdf5_dataset.regionref[start_index:end_index] except (OSError, RuntimeError): self.logger.debug( "file is in read mode cannot set an internal reference, using index values" ) regional_ref = slice(start_index, end_index) dt_index = make_dt_coordinates(start, self.sample_rate, npts) meta_dict = self.metadata.to_dict()[self.metadata._class_name] meta_dict["time_period.start"] = dt_index[0].isoformat() meta_dict["time_period.end"] = dt_index[-1].isoformat() data = None if return_type == "xarray": # need the +1 to be inclusive of the last point data = xr.DataArray( self.hdf5_dataset[regional_ref], coords=[("time", dt_index)] ) data.attrs.update(meta_dict) elif return_type == "pandas": data = pd.DataFrame( {"data": self.hdf5_dataset[regional_ref]}, index=dt_index ) data.attrs.update(meta_dict) elif return_type == "numpy": data = self.hdf5_dataset[regional_ref] elif return_type == "channel_ts": data = ChannelTS( self.metadata.type, data=self.hdf5_dataset[regional_ref], channel_metadata={self.metadata.type: meta_dict}, channel_response_filter=self.channel_response_filter, ) else: msg = "return_type not understood, must be [ pandas | numpy | channel_ts ]" self.logger.error(msg) raise ValueError(msg) return data
[docs] def get_index_from_time(self, given_time): """ get the appropriate index for a given time. :param given_time: time string :type given_time: string or MTime :return: index value :rtype: int """ if not isinstance(given_time, MTime): given_time = MTime(given_time) index = ( given_time - self.metadata.time_period.start ) * self.metadata.sample_rate return int(round(index))
[docs] def get_index_from_end_time(self, given_time): """ get the end index value. Add one to be inclusive of the found index. :param given_time: DESCRIPTION :type given_time: TYPE :return: DESCRIPTION :rtype: TYPE """ return self.get_index_from_time(given_time) + 1
def _get_slice_index_values(self, start, end=None, n_samples=None): """ Get the slice index values given (start, end) or (start, n_samples) :param start: DESCRIPTION :type start: TYPE :param end: DESCRIPTION, defaults to None :type end: TYPE, optional :param n_samples: DESCRIPTION, defaults to None :type n_samples: TYPE, optional :return: DESCRIPTION :rtype: TYPE """ start = MTime(start) if end is not None: end = MTime(end) if n_samples is not None: n_samples = int(n_samples) if n_samples is None and end is None: msg = "Must input either end_time or n_samples." self.logger.error(msg) raise ValueError(msg) if n_samples is not None and end is not None: msg = "Must input either end_time or n_samples, not both." self.logger.error(msg) raise ValueError(msg) # if end time is given if end is not None and n_samples is None: start_index = self.get_index_from_time(start) end_index = self.get_index_from_end_time(end) npts = int(end_index - start_index) # if n_samples are given elif end is None and n_samples is not None: start_index = self.get_index_from_time(start) # leave as +1 to be inclusive end_index = start_index + n_samples npts = n_samples return start_index, end_index, npts
[docs]@inherit_doc_string class ElectricDataset(ChannelDataset): def __init__(self, group, **kwargs): super().__init__(group, **kwargs)
[docs]@inherit_doc_string class MagneticDataset(ChannelDataset): def __init__(self, group, **kwargs): super().__init__(group, **kwargs)
[docs]@inherit_doc_string class AuxiliaryDataset(ChannelDataset): def __init__(self, group, **kwargs): super().__init__(group, **kwargs)