# -*- coding: utf-8 -*-
"""
.. module:: timeseries
:synopsis: Deal with MT time series
.. todo:: Check the conversion to netcdf. There are some weird serializations of
lists and arrays that goes on, seems easiest to convert all lists to strings and then
convert them back if read in.
:copyright:
Jared Peacock (jpeacock@usgs.gov)
:license:
MIT
"""
# ==============================================================================
# Imports
# ==============================================================================
import inspect
import xarray as xr
import numpy as np
from loguru import logger
from matplotlib import pyplot as plt
from mt_metadata import timeseries as metadata
from mt_metadata.utils.mttime import MTime
from mt_metadata.utils.list_dict import ListDict
from mt_metadata.timeseries.filters import ChannelResponseFilter
from .channel_ts import ChannelTS
from .ts_helpers import (
make_dt_coordinates,
get_decimation_sample_rates,
)
from obspy.core import Stream
# =============================================================================
# make a dictionary of available metadata classes
# =============================================================================
meta_classes = dict(inspect.getmembers(metadata, inspect.isclass))
# =============================================================================
# run container
# =============================================================================
[docs]class RunTS:
"""
holds all run ts in one aligned array
components --> {'ex': ex_xarray, 'ey': ey_xarray}
ToDo, have a single Survey object under the hood and properties to other
metadata objects for get/set.
"""
def __init__(
self,
array_list=None,
run_metadata=None,
station_metadata=None,
survey_metadata=None,
):
self.logger = logger
self._survey_metadata = self._initialize_metadata()
self._dataset = xr.Dataset()
self._filters = {}
self.survey_metadata = survey_metadata
self.station_metadata = station_metadata
self.run_metadata = run_metadata
# load the arrays first this will write run and station metadata
if array_list is not None:
self.dataset = array_list
def __str__(self):
s_list = [
f"Survey: {self.survey_metadata.id}",
f"Station: {self.station_metadata.id}",
f"Run: {self.run_metadata.id}",
f"Start: {self.start}",
f"End: {self.end}",
f"Sample Rate: {self.sample_rate}",
f"Components: {self.channels}",
]
return "\n\t".join(["RunTS Summary:"] + s_list)
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, RunTS):
raise TypeError(f"Cannot compare RunTS with {type(other)}.")
if not other.survey_metadata == self.survey_metadata:
return False
if not other.station_metadata == self.station_metadata:
return False
if not other.run_metadata == self.run_metadata:
return False
if self.dataset.equals(other.dataset) is False:
return False
return True
def __neq__(self, other):
return not self.__eq__(other)
def __add__(self, other):
"""
Add two runs together in the following steps
1. xr.combine_by_coords([original, other])
2. compute monotonic time index
3. reindex(new_time_index, method='nearest')
If you want a different method or more control use merge
:param other: Another run
:type other: :class:`mth5.timeseries.RunTS`
:raises TypeError: If input is not a RunTS
:raises ValueError: if the components are different
:return: Combined channel with monotonic time index and same metadata
:rtype: :class:`mth5.timeseries.RunTS`
"""
if not isinstance(other, RunTS):
raise TypeError(f"Cannot combine {type(other)} with RunTS.")
# combine into a data set use override to keep attrs from original
combined_ds = xr.combine_by_coords(
[self.dataset, other.dataset], combine_attrs="override"
)
n_samples = (
self.sample_rate
* float(
combined_ds.time.max().values - combined_ds.time.min().values
)
/ 1e9
) + 1
new_dt_index = make_dt_coordinates(
combined_ds.time.min().values, self.sample_rate, n_samples
)
new_run = RunTS(
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
new_run.dataset = combined_ds.interp(
time=new_dt_index, method="slinear"
)
new_run.run_metadata.update_time_period()
new_run.station_metadata.update_time_period()
new_run.survey_metadata.update_time_period()
new_run.filters = self.filters
new_run.filters.update(other.filters)
return new_run
def _initialize_metadata(self):
"""
Create a single `Survey` object to store all metadata
:param channel_type: DESCRIPTION
:type channel_type: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
survey_metadata = metadata.Survey(id="0")
survey_metadata.stations.append(metadata.Station(id="0"))
survey_metadata.stations[0].runs.append(metadata.Run(id="0"))
return survey_metadata
def _validate_run_metadata(self, run_metadata):
"""
validate run metadata
"""
if not isinstance(run_metadata, metadata.Run):
if isinstance(run_metadata, dict):
if "run" not in [cc.lower() for cc in run_metadata.keys()]:
run_metadata = {"Run": run_metadata}
r_metadata = metadata.Run()
r_metadata.from_dict(run_metadata)
self.logger.debug("Loading from metadata dict")
return r_metadata
else:
msg = (
"input metadata must be type {type(self.run_metadata)} "
"or dict, not {type(run_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return run_metadata.copy()
def _validate_station_metadata(self, station_metadata):
"""
validate station metadata
"""
if not isinstance(station_metadata, metadata.Station):
if isinstance(station_metadata, dict):
if "station" not in [
cc.lower() for cc in station_metadata.keys()
]:
station_metadata = {"Station": station_metadata}
st_metadata = metadata.Station()
st_metadata.from_dict(station_metadata)
self.logger.debug("Loading from metadata dict")
return st_metadata
else:
msg = (
f"input metadata must be type {type(self.station_metadata)} "
"or dict, not {type(station_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return station_metadata.copy()
def _validate_survey_metadata(self, survey_metadata):
"""
validate station metadata
"""
if not isinstance(survey_metadata, metadata.Survey):
if isinstance(survey_metadata, dict):
if "survey" not in [
cc.lower() for cc in survey_metadata.keys()
]:
survey_metadata = {"Survey": survey_metadata}
sv_metadata = metadata.Survey()
sv_metadata.from_dict(survey_metadata)
self.logger.debug("Loading from metadata dict")
return sv_metadata
else:
msg = (
f"input metadata must be type {type(self.survey_metadata)} "
"or dict, not {type(survey_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return survey_metadata.copy()
def _validate_array_list(self, array_list):
"""check to make sure all entries are a :class:`ChannelTS` object"""
if not isinstance(array_list, (tuple, list)):
msg = f"array_list must be a list or tuple, not {type(array_list)}"
self.logger.error(msg)
raise TypeError(msg)
valid_list = []
station_metadata = metadata.Station()
run_metadata = metadata.Run()
channels = ListDict()
for index, item in enumerate(array_list):
if not isinstance(item, (ChannelTS, xr.DataArray)):
msg = f"array entry {index} must be ChannelTS object not {type(item)}"
self.logger.error(msg)
raise TypeError(msg)
if isinstance(item, ChannelTS):
valid_list.append(item.to_xarray())
# if a channelTS is input then it comes with run and station metadata
# use those first, then the user can update later.
if item.station_metadata.id not in ["0", None]:
if station_metadata.id not in ["0", None]:
station_metadata.update(
item.station_metadata, match=["id"]
)
else:
station_metadata.update(item.station_metadata)
if item.run_metadata.id not in ["0", None]:
if run_metadata.id not in ["0", None]:
run_metadata.update(item.run_metadata, match=["id"])
else:
run_metadata.update(item.run_metadata)
channels.append(item.channel_metadata)
# get the filters from the channel
if item.channel_response_filter.filters_list != []:
for ff in item.channel_response_filter.filters_list:
self._filters[ff.name] = ff
else:
valid_list.append(item)
# need to make sure that the station metadata was actually updated,
# should have an ID.
run_metadata.channels = channels
if station_metadata.id not in ["0", None]:
station_metadata.runs = ListDict()
station_metadata.runs.append(run_metadata)
# need to add the other runs that are in the metadata for
# completeness.
for run in self.station_metadata.runs:
if run.id not in [run_metadata.id, "0", None]:
station_metadata.runs.append(run)
self.station_metadata = station_metadata
# if the run metadata was updated
elif run_metadata.id not in ["0", None]:
self.run_metadata = run_metadata
# if the run metadata or station metadata was not updated from channel
# metadata, then update just the channels.
else:
self.run_metadata.channels = channels
# first need to align the time series.
valid_list = self._align_channels(valid_list)
return valid_list
def _align_channels(self, valid_list):
"""
check for common start and end times, if not resample each.
:param valid_list: DESCRIPTION
:type valid_list: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
earliest_start = self._get_earliest_start(valid_list)
latest_end = self._get_latest_end(valid_list)
reindex = False
if not self._check_common_start(valid_list):
self.logger.info(
f"Channels do not have a common start, using earliest: {earliest_start}"
)
reindex = True
if not self._check_common_end(valid_list):
self.logger.info(
f"Channels do not have a common end, using latest: {latest_end}"
)
reindex = True
if reindex:
sample_rate = self._check_sample_rate(valid_list)
new_time_index = self._get_common_time_index(
earliest_start, latest_end, sample_rate
)
tolerance = f"{(1e9 / sample_rate):.0f}N"
aligned_list = []
for ch in valid_list:
aligned_list.append(
ch.reindex(
time=new_time_index,
method="nearest",
tolerance=tolerance,
)
)
else:
aligned_list = valid_list
return aligned_list
def _check_sample_rate(self, valid_list):
# probably should test for sampling rate.
sr_test = list(
set(
[(item.sample_rate) for item in valid_list]
+ [np.round(item.sps_filters.fs, 3) for item in valid_list]
)
)
if len(sr_test) != 1:
msg = f"sample rates are not all the same {sr_test}"
self.logger.error(msg)
raise ValueError(msg)
return sr_test[0]
def _check_common_start(self, valid_list):
"""
check to see if there are different starting times
:param valid_list: DESCRIPTION
:type valid_list: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
start_list = list(
set([item.coords["time"].values[0] for item in valid_list])
)
if len(start_list) != 1:
return False
return True
def _check_common_end(self, valid_list):
"""
check to see if there are different end times
:param valid_list: DESCRIPTION
:type valid_list: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
end_list = list(
set([item.coords["time"].values[-1] for item in valid_list])
)
if len(end_list) != 1:
return False
return True
def _get_earliest_start(self, valid_list):
"""
get the earliest start time
"""
return min([item.coords["time"].values[0] for item in valid_list])
def _get_latest_end(self, valid_list):
"""
get the earliest start time
"""
return max([item.coords["time"].values[-1] for item in valid_list])
def _get_common_time_index(self, start, end, sample_rate):
"""
get common time index
"""
n_samples = int(sample_rate * float(end - start) / 1e9) + 1
return make_dt_coordinates(start, sample_rate, n_samples)
def _get_channel_response_filter(self, ch_name):
"""
Get the channel response filter from the filter dictionary
:param ch_name: DESCRIPTION
:type ch_name: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
filter_list = []
if ch_name in self.dataset.keys():
for filter_name in self.dataset[ch_name].attrs["filter.name"]:
try:
filter_list.append(self.filters[filter_name])
except KeyError:
self.logger.debug(
f"Could not find {filter_name} in filters"
)
return ChannelResponseFilter(filters_list=filter_list)
def __getattr__(self, name):
# change to look for keys directly and use type to set channel type
if name in self.dataset.keys():
ch_response_filter = self._get_channel_response_filter(name)
# if cannot get filters, but the filters name indicates that
# filters should be there don't input the channel response filter
# cause then an empty filters_list will set filter.name to []
if ch_response_filter.filters_list == []:
ch_response_filter = None
return ChannelTS(
self.dataset[name].attrs["type"],
self.dataset[name],
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
channel_response_filter=ch_response_filter,
)
else:
# this is a hack for now until figure out who is calling shape, size
if name[0] == "_":
return None
if name not in ["shape", "size"]:
try:
return super().__getattribute__(name)
except AttributeError:
msg = f"RunTS has no attribute {name}"
self.logger.error(msg)
raise NameError(msg)
[docs] def copy(self, data=True):
"""
:param data: DESCRIPTION, defaults to True
:type data: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
if not data:
return RunTS(
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
survey_metadata=self.survey_metadata.copy(),
)
else:
return RunTS(
array_list=self.dataset,
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
survey_metadata=self.survey_metadata.copy(),
)
### Properties ------------------------------------------------------------
@property
def survey_metadata(self):
"""
survey metadata
"""
return self._survey_metadata
@survey_metadata.setter
def survey_metadata(self, survey_metadata):
"""
:param survey_metadata: survey metadata object or dictionary
:type survey_metadata: :class:`mt_metadata.timeseries.Survey` or dict
"""
if survey_metadata is not None:
survey_metadata = self._validate_survey_metadata(survey_metadata)
self._survey_metadata.update(
self._validate_survey_metadata(survey_metadata)
)
@property
def station_metadata(self):
"""
station metadata
"""
return self.survey_metadata.stations[0]
@station_metadata.setter
def station_metadata(self, station_metadata):
"""
set station metadata from a valid input
"""
if station_metadata is not None:
station_metadata = self._validate_station_metadata(
station_metadata
)
runs = ListDict()
if self.run_metadata.id not in ["0", 0]:
runs.append(self.run_metadata.copy())
runs.extend(station_metadata.runs)
if len(runs) == 0:
runs[0] = metadata.Run(id="0")
# be sure there is a level below
if len(runs[0].channels) == 0:
ch_metadata = metadata.Auxiliary()
ch_metadata.type = "auxiliary"
runs[0].channels.append(ch_metadata)
stations = ListDict()
stations.append(station_metadata)
stations[0].runs = runs
self.survey_metadata.stations = stations
@property
def run_metadata(self):
"""
station metadata
"""
run_metadata = self.survey_metadata.stations[0].runs[0]
return run_metadata
@run_metadata.setter
def run_metadata(self, run_metadata):
"""
set run metadata from a valid input
"""
if run_metadata is not None:
run_metadata = self._validate_run_metadata(run_metadata)
runs = ListDict()
runs.append(run_metadata)
runs.extend(
self.station_metadata.runs, skip_keys=[run_metadata.id, "0"]
)
self._survey_metadata.stations[0].runs = runs
[docs] def has_data(self):
"""check to see if there is data"""
if len(self.channels) > 0:
return True
return False
@property
def summarize_metadata(self):
"""
Get a summary of all the metadata
:return: A summary of all channel metadata in one place
:rtype: dictionary
"""
meta_dict = {}
for comp in self.dataset.data_vars:
for mkey, mvalue in self.dataset[comp].attrs.items():
meta_dict[f"{comp}.{mkey}"] = mvalue
return meta_dict
[docs] def set_dataset(self, array_list, align_type="outer"):
"""
:param array_list: list of xarrays
:type array_list: list of :class:`mth5.timeseries.ChannelTS` objects
:param align_type: how the different times will be aligned
* ’outer’: use the union of object indexes
* ’inner’: use the intersection of object indexes
* ’left’: use indexes from the first object with each dimension
* ’right’: use indexes from the last object with each dimension
* ’exact’: instead of aligning, raise ValueError when indexes to
be aligned are not equal
* ’override’: if indexes are of same size, rewrite indexes to
be those of the first object with that dimension. Indexes for
the same dimension must have the same size in all objects.
:type align_type: string
"""
if isinstance(array_list, (list, tuple)):
x_array_list = self._validate_array_list(array_list)
# input as a dictionary
xdict = dict([(x.component.lower(), x) for x in x_array_list])
self._dataset = xr.Dataset(xdict)
elif isinstance(array_list, xr.Dataset):
self._dataset = array_list
self.validate_metadata()
self._dataset.attrs.update(self.run_metadata.to_dict(single=True))
[docs] def add_channel(self, channel):
"""
Add a channel to the dataset, can be an :class:`xarray.DataArray` or
:class:`mth5.timeseries.ChannelTS` object.
Need to be sure that the coordinates and dimensions are the same as the
existing dataset, namely coordinates are time, and dimensions are the same,
if the dimesions are larger than the existing dataset then the added channel
will be clipped to the dimensions of the existing dataset.
If the start time is not the same nan's will be placed at locations where the
timing does not match the current start time. This is a feature of xarray.
:param channel: a channel xarray or ChannelTS to add to the run
:type channel: :class:`xarray.DataArray` or :class:`mth5.timeseries.ChannelTS`
"""
if isinstance(channel, xr.DataArray):
c = ChannelTS()
c.ts = channel
elif isinstance(channel, ChannelTS):
c = channel
self.run_metadata.channels.append(c.channel_metadata)
for ff in c.channel_response_filter.filters_list:
self._filters[ff.name] = ff
else:
raise ValueError(
"Input Channel must be type xarray.DataArray or ChannelTS"
)
### need to validate the channel to make sure sample rate is the same
if c.sample_rate != self.sample_rate:
msg = (
f"Channel sample rate is not correct, current {self.sample_rate} "
+ f"input {c.sample_rate}"
)
self.logger.error(msg)
raise ValueError(msg)
### should probably check for other metadata like station and run?
if len(self.dataset.dims) == 0:
self.dataset = c.data_array.to_dataset()
else:
self.dataset = xr.merge([self.dataset, c.data_array.to_dataset()])
@property
def dataset(self):
""":class:`xarray.Dataset`"""
return self._dataset
@dataset.setter
def dataset(self, array_list):
"""Set the dataset"""
msg = (
"Data will be aligned using the min and max time. "
"If that is not correct use set_dataset and change the alignment type."
)
self.logger.debug(msg)
self.set_dataset(array_list)
@property
def start(self):
"""Start time UTC"""
if self.has_data():
return MTime(self.dataset.coords["time"].to_index()[0].isoformat())
return self.run_metadata.time_period.start
@property
def end(self):
"""End time UTC"""
if self.has_data():
return MTime(
self.dataset.coords["time"].to_index()[-1].isoformat()
)
return self.run_metadata.time_period.end
@property
def sample_rate(self):
"""
Sample rate, this is estimated by the mdeian difference between
samples in time, if data is present. Otherwise return the metadata
sample rate.
"""
if self.has_data():
try:
return round(
1.0
/ np.float64(
(
np.median(
np.diff(self.dataset.coords["time"].to_index())
/ np.timedelta64(1, "s")
)
)
),
0,
)
except AttributeError:
self.logger.warning(
"Something weird happend with xarray time indexing"
)
raise ValueError(
"Something weird happend with xarray time indexing"
)
return self.run_metadata.sample_rate
@property
def sample_interval(self):
"""
Sample interval = 1 / sample_rate
:return: DESCRIPTION
:rtype: TYPE
"""
if self.sample_rate != 0:
return 1.0 / self.sample_rate
return 0.0
@property
def channels(self):
"""List of channel names in dataset"""
return [cc for cc in list(self.dataset.data_vars)]
@property
def filters(self):
"""Dictionary of filters used by the channels"""
return self._filters
@filters.setter
def filters(self, value):
"""
a dictionary of filters found in the channel objects.
Should use the dictionary methods to update a dictionary.
:param value: dictionary of :module:`mt_metadata.timeseries.filters`
objects
:type value: dictionary
:raises TypeError: If input is anything other than a dictionary
"""
if not isinstance(value, dict):
raise TypeError("input must be a dictionary")
self._filters = value
[docs] def to_obspy_stream(self):
"""
convert time series to an :class:`obspy.core.Stream` which is like a
list of :class:`obspy.core.Trace` objects.
:return: An Obspy Stream object from the time series data
:rtype: :class:`obspy.core.Stream`
"""
trace_list = []
for channel in self.channels:
ts_obj = getattr(self, channel)
trace_list.append(ts_obj.to_obspy_trace())
return Stream(traces=trace_list)
[docs] def from_obspy_stream(self, obspy_stream, run_metadata=None):
"""
Get a run from an :class:`obspy.core.stream` which is a list of
:class:`obspy.core.Trace` objects.
:param obspy_stream: Obspy Stream object
:type obspy_stream: :class:`obspy.core.Stream`
"""
if not isinstance(obspy_stream, Stream):
msg = f"Input must be obspy.core.Stream not {type(obspy_stream)}"
self.logger.error(msg)
raise TypeError(msg)
array_list = []
station_list = []
for obs_trace in obspy_stream:
channel_ts = ChannelTS()
channel_ts.from_obspy_trace(obs_trace)
if channel_ts.channel_metadata.component == "e1":
channel_ts.channel_metadata.component = "ex"
if channel_ts.channel_metadata.component == "e2":
channel_ts.channel_metadata.component = "ey"
if channel_ts.channel_metadata.component == "h1":
channel_ts.channel_metadata.component = "hx"
if channel_ts.channel_metadata.component == "h2":
channel_ts.channel_metadata.component = "hy"
if channel_ts.channel_metadata.component == "h3":
channel_ts.channel_metadata.component = "hz"
if run_metadata:
try:
ch = [
ch
for ch in run_metadata.channels
if ch.component == channel_ts.component
][0]
channel_ts.channel_metadata.update(ch)
except IndexError:
self.logger.warning(
f"could not find {channel_ts.component}"
)
# else:
# run_metadata = metadata.Run(id="001")
station_list.append(channel_ts.station_metadata.fdsn.id)
array_list.append(channel_ts)
### need to merge metadata into something useful, station name is the only
### name that is preserved
try:
station = list(set([ss for ss in station_list if ss is not None]))[
0
]
except IndexError:
station = None
msg = "Could not find station name"
self.logger.warn(msg)
self.station_metadata.fdsn.id = station
self.set_dataset(array_list)
# need to be sure update any input metadata.
if run_metadata is not None:
self.station_metadata.runs = ListDict()
self.station_metadata.add_run(run_metadata)
self.validate_metadata()
[docs] def get_slice(self, start, end=None, n_samples=None):
"""
:param start: DESCRIPTION
:type start: TYPE
:param end: DESCRIPTION, defaults to None
:type end: TYPE, optional
:param n_samples: DESCRIPTION, defaults to None
:type n_samples: TYPE, optional
:raises ValueError: DESCRIPTION
:return: DESCRIPTION
:rtype: TYPE
"""
"""
Get just a chunk of data from the run, this will attempt to find the
closest points to the given parameters.
.. note:: We use pandas `slice_indexer` because xarray slice does not
seem to work as well, even though they should be based on the same
code.
:param start: start time of the slice
:type start: string or :class:`mt_metadata.utils.mttime.MTime`
:param end: end time of the slice, defaults to None
:type end: string or :class:`mt_metadata.utils.mttime.MTime`, optional
:param n_samples: number of samples to get, defaults to None
:type n_samples: int, optional
:raises ValueError: If end and n_samples are not input
:return: slice of data requested
:rtype: :class:`mth5.timeseries.RunTS`
"""
if not isinstance(start, MTime):
start = MTime(start)
if n_samples is not None:
seconds = (n_samples - 1) / self.sample_rate
end = start + seconds
elif end is not None:
if not isinstance(end, MTime):
end = MTime(end)
else:
raise ValueError("Must input n_samples or end")
chunk = self.dataset.indexes["time"].slice_indexer(
start=np.datetime64(start.iso_no_tz),
end=np.datetime64(end.iso_no_tz),
)
new_runts = RunTS()
new_runts.station_metadata = self.station_metadata
new_runts.run_metadata = self.run_metadata
new_runts.filters = self.filters
new_runts.dataset = self._dataset.isel(indexers={"time": chunk})
return new_runts
[docs] def calibrate(self, **kwargs):
"""
Calibrate the data according to the filters in each channel.
:return: calibrated run
:rtype: :class:`mth5.timeseries.RunTS`
"""
new_run = RunTS()
new_run.run_metadata = self.run_metadata
new_run.station_metadata = self.station_metadata
for channel in self.channels:
ch_ts = getattr(self, channel)
calibrated_ch_ts = ch_ts.remove_instrument_response(**kwargs)
new_run.add_channel(calibrated_ch_ts)
return new_run
[docs] def decimate(self, new_sample_rate, inplace=False, max_decimation=8):
"""
decimate data to new sample rate.
:param new_sample_rate: DESCRIPTION
:type new_sample_rate: TYPE
:param inplace: DESCRIPTION, defaults to False
:type inplace: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
sr_list = get_decimation_sample_rates(
self.sample_rate, new_sample_rate, max_decimation
)
# need to fill nans with 0 otherwise they wipeout the decimation values
# and all becomes nan.
new_ds = self.dataset.fillna(0)
for step_sr in sr_list:
new_ds = new_ds.sps_filters.decimate(step_sr)
new_ds.attrs["sample_rate"] = new_sample_rate
self.run_metadata.sample_rate = new_ds.attrs["sample_rate"]
if inplace:
self.dataset = new_ds
else:
# return new_ds
return RunTS(
new_ds,
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
[docs] def resample_poly(self, new_sample_rate, pad_type="mean", inplace=False):
"""
Use scipy.signal.resample_poly to resample data while using an FIR
filter to remove aliasing.
:param new_sample_rate: DESCRIPTION
:type new_sample_rate: TYPE
:param pad_type: DESCRIPTION, defaults to "mean"
:type pad_type: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
# need to fill nans with 0 otherwise they wipeout the decimation values
# and all becomes nan.
new_ds = self.dataset.fillna(0)
new_ds = new_ds.sps_filters.resample_poly(
new_sample_rate, pad_type=pad_type
)
new_ds.attrs["sample_rate"] = new_sample_rate
self.run_metadata.sample_rate = new_ds.attrs["sample_rate"]
if inplace:
self.dataset = new_ds
else:
return RunTS(
new_ds,
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
[docs] def resample(self, new_sample_rate, inplace=False):
"""
Resample data to new sample rate.
:param new_sample_rate: DESCRIPTION
:type new_sample_rate: TYPE
:param inplace: DESCRIPTION, defaults to False
:type inplace: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
new_dt_freq = "{0:.0f}N".format(1e9 / (new_sample_rate))
new_ds = self.dataset.resample(time=new_dt_freq).nearest(
tolerance=new_dt_freq
)
new_ds.attrs["sample_rate"] = new_sample_rate
self.run_metadata.sample_rate = new_ds.attrs["sample_rate"]
if inplace:
self.dataset = new_ds
else:
# return new_ds
return RunTS(
new_ds,
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
[docs] def merge(
self,
other,
gap_method="slinear",
new_sample_rate=None,
resample_method="poly",
):
"""
merg two runs or list of runs together in the following steps
1. xr.combine_by_coords([original, other])
2. compute monotonic time index
3. reindex(new_time_index, method=gap_method)
If you want a different method or more control use merge
:param other: Another run
:type other: :class:`mth5.timeseries.RunTS`
:raises TypeError: If input is not a RunTS
:raises ValueError: if the components are different
:return: Combined run with monotonic time index and same metadata
:rtype: :class:`mth5.timeseries.RunTS`
"""
if new_sample_rate is not None:
merge_sample_rate = new_sample_rate
if resample_method == "decimate":
combine_list = [self.decimate(new_sample_rate).dataset]
elif resample_method == "poly":
combine_list = [self.resample_poly(new_sample_rate).dataset]
else:
merge_sample_rate = self.sample_rate
combine_list = [self.dataset]
ts_filters = self.filters
if isinstance(other, (list, tuple)):
for run in other:
if not isinstance(run, RunTS):
raise TypeError(f"Cannot combine {type(run)} with RunTS.")
if new_sample_rate is not None:
if resample_method == "decimate":
run = run.decimate(new_sample_rate)
elif resample_method == "poly":
run = run.resample_poly(new_sample_rate)
combine_list.append(run.dataset)
ts_filters.update(run.filters)
else:
if not isinstance(other, RunTS):
raise TypeError(f"Cannot combine {type(other)} with RunTS.")
if new_sample_rate is not None:
if resample_method == "decimate":
other = other.decimate(new_sample_rate)
elif resample_method == "poly":
other = other.resample_poly(new_sample_rate)
combine_list.append(other.dataset)
ts_filters.update(other.filters)
# combine into a data set use override to keep attrs from original
combined_ds = xr.combine_by_coords(
combine_list, combine_attrs="override"
)
n_samples = (
merge_sample_rate
* float(
combined_ds.time.max().values - combined_ds.time.min().values
)
/ 1e9
) + 1
new_dt_index = make_dt_coordinates(
combined_ds.time.min().values, merge_sample_rate, n_samples
)
run_metadata = self.run_metadata.copy()
run_metadata.sample_rate = merge_sample_rate
new_run = RunTS(
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
)
## tried reindex then interpolate_na, but that has issues if the
## intial time index does not exactly match up with the new time index
## and then get a bunch of Nan, unless use nearest or pad, but then
## gaps are not filled correctly, just do a interp seems easier.
new_run.dataset = combined_ds.interp(
time=new_dt_index, method=gap_method
)
# update channel attributes
for ch in new_run.channels:
new_run.dataset[ch].attrs["time_period.start"] = new_run.start
new_run.dataset[ch].attrs["time_period.end"] = new_run.end
new_run.run_metadata.update_time_period()
new_run.station_metadata.update_time_period()
new_run.survey_metadata.update_time_period()
new_run.filters = ts_filters
return new_run
[docs] def plot(
self,
color_map={
"ex": (1, 0.2, 0.2),
"ey": (1, 0.5, 0),
"hx": (0, 0.5, 1),
"hy": (0.5, 0.2, 1),
"hz": (0.2, 1, 1),
},
channel_order=None,
):
"""
plot the time series probably slow for large data sets
"""
if channel_order is not None:
ch_list = channel_order()
else:
ch_list = self.channels
n_channels = len(self.channels)
fig = plt.figure()
fig.subplots_adjust(hspace=0)
ax_list = []
for ii, comp in enumerate(ch_list, 1):
try:
color = color_map[comp]
except KeyError:
color = (0, 0.4, 0.8)
if ii == 1:
ax = plt.subplot(n_channels, 1, ii)
else:
ax = plt.subplot(n_channels, 1, ii, sharex=ax_list[0])
self.dataset[comp].plot.line(ax=ax, color=color)
ax.grid(which="major", color=(0.65, 0.65, 0.65), ls="--", lw=0.75)
ax.grid(which="minor", color=(0.85, 0.85, 0.85), ls="--", lw=0.5)
ax.set_axisbelow(True)
if ii != len(ch_list):
plt.setp(ax.get_xticklabels(), visible=False)
ax_list.append(ax)
return fig
[docs] def plot_spectra(
self,
spectra_type="welch",
color_map={
"ex": (1, 0.2, 0.2),
"ey": (1, 0.5, 0),
"hx": (0, 0.5, 1),
"hy": (0.5, 0.2, 1),
"hz": (0.2, 1, 1),
},
**kwargs,
):
"""
Plot spectra using spectra type, only 'welch' is supported now.
:param spectra_type: spectra type, defaults to "welch"
:type spectra_type: string, optional
:param color_map: colors of channels, defaults to {
"ex": (1, 0.2, 0.2),
"ey": (1, 0.5, 0),
"hx": (0, 0.5, 1),
"hy": (0.5, 0.2, 1),
"hz": (0.2, 1, 1),
}
:type color_map: dictionary, optional
:param **kwargs: key words for the spectra type
:type **kwargs: dictionary
"""
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
line_list = []
label_list = []
for comp in self.channels:
ch = getattr(self, comp)
plot_freq, power = ch.welch_spectra(**kwargs)
(l1,) = ax.loglog(
1.0 / plot_freq, power, lw=1.5, color=color_map[comp]
)
line_list.append(l1)
label_list.append(comp)
ax.set_xlabel("Period (s)", fontdict={"size": 10, "weight": "bold"})
ax.set_ylabel("Power (dB)", fontdict={"size": 10, "weight": "bold"})
ax.axis("tight")
ax.grid(which="both")
ax2 = ax.twiny()
ax2.loglog(plot_freq, power, lw=0)
ax2.set_xlabel(
"Frequency (Hz)", fontdict={"size": 10, "weight": "bold"}
)
ax2.set_xlim([1 / cc for cc in ax.get_xlim()])
ax.legend(line_list, label_list)
plt.show()
return fig