# -*- coding: utf-8 -*-
"""
Channel time series module for MT data.
This module provides the `ChannelTS` class for handling magnetotelluric (MT)
time series data with comprehensive metadata management, calibration,
and signal processing capabilities.
Notes
-----
- Time series are stored in `xarray.DataArray` for efficient operations.
- Metadata follows the mt_metadata standard with Survey/Station/Run/Channel hierarchy.
- Supports instrument response removal, resampling, merging, and Obspy integration.
"""
from __future__ import annotations
# ==============================================================================
# Imports
# ==============================================================================
import inspect
from typing import Any
import mt_metadata.timeseries as metadata
import numpy as np
import pandas as pd
import scipy
import xarray as xr
from loguru import logger
from mt_metadata.common.list_dict import ListDict
from mt_metadata.common.mttime import MTime
from mt_metadata.common.units import get_unit_object
from mt_metadata.timeseries.filters import ChannelResponse
from obspy.core import Trace
from scipy import signal
from mth5.timeseries.ts_filters import RemoveInstrumentResponse
from mth5.timeseries.ts_helpers import get_decimation_sample_rates, make_dt_coordinates
from mth5.utils import fdsn_tools
# =============================================================================
# make a dictionary of available metadata classes
# =============================================================================
# ==============================================================================
# Channel Time Series Object
# ==============================================================================
[docs]
class ChannelTS:
"""
Time series container for a single MT channel with full metadata.
Stores equally-spaced time series data in an `xarray.DataArray` with
a time coordinate index. Integrates comprehensive metadata from
Survey/Station/Run/Channel hierarchy and supports calibration,
resampling, merging, and format conversions.
Parameters
----------
channel_type : {'electric', 'magnetic', 'auxiliary'}, default 'auxiliary'
Type of the channel.
data : array-like, optional
Time series data (numpy array, pandas DataFrame/Series, xarray.DataArray).
channel_metadata : mt_metadata.timeseries.Electric | Magnetic | Auxiliary | dict, optional
Channel-specific metadata.
station_metadata : mt_metadata.timeseries.Station | dict, optional
Station metadata.
run_metadata : mt_metadata.timeseries.Run | dict, optional
Run metadata.
survey_metadata : mt_metadata.timeseries.Survey | dict, optional
Survey metadata.
**kwargs
Additional attributes to set on the object.
Attributes
----------
ts : numpy.ndarray
The time series data array.
sample_rate : float
Sample rate in samples per second.
start : MTime
Start time (UTC).
end : MTime
End time (UTC), derived from start + duration.
n_samples : int
Number of samples.
component : str
Component name (e.g., 'ex', 'hy', 'temperature').
channel_response : ChannelResponse
Full instrument response filter chain.
Notes
-----
- End time is a derived property and cannot be set directly.
- Leverages xarray for efficient interpolation, resampling, and groupby operations.
- Metadata follows mt_metadata standards with automatic time period updates.
Examples
--------
Create an auxiliary channel with synthetic data::
>>> from mth5.timeseries import ChannelTS
>>> import numpy as np
>>> ts_obj = ChannelTS('auxiliary')
>>> ts_obj.sample_rate = 8
>>> ts_obj.start = '2020-01-01T12:00:00+00:00'
>>> ts_obj.ts = np.random.randn(4096)
>>> ts_obj.station_metadata.id = 'MT001'
>>> ts_obj.run_metadata.id = 'MT001a'
>>> ts_obj.component = 'temperature'
>>> print(ts_obj)
Calibrate and remove instrument response::
>>> calibrated = ts_obj.remove_instrument_response()
>>> calibrated.channel_metadata.units
"""
def __init__(
self,
channel_type: str = "auxiliary",
data: (
np.ndarray | pd.DataFrame | pd.Series | xr.DataArray | list | tuple | None
) = None,
channel_metadata: (
metadata.Electric | metadata.Magnetic | metadata.Auxiliary | dict | None
) = None,
station_metadata: metadata.Station | dict | None = None,
run_metadata: metadata.Run | dict | None = None,
survey_metadata: metadata.Survey | dict | None = None,
**kwargs: Any,
) -> None:
self._channel_type = self._validate_channel_type(channel_type)
self._survey_metadata = self._initialize_metadata()
[docs]
self.data_array = xr.DataArray([1], coords=[("time", [1])], name="ts")
self._channel_response = ChannelResponse() # type: ignore
self.survey_metadata = survey_metadata
self.station_metadata = station_metadata
self.run_metadata = run_metadata
self.channel_metadata = channel_metadata
self._sample_rate = self.get_sample_rate_supplied_at_init(channel_metadata)
# input data
if data is not None:
self.ts = data
else:
self._update_xarray_metadata()
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
[docs]
def get_sample_rate_supplied_at_init(
self,
channel_metadata: (
metadata.Electric | metadata.Magnetic | metadata.Auxiliary | dict | None
),
) -> float | None:
"""
Extract sample_rate from channel_metadata if available.
Parameters
----------
channel_metadata : mt_metadata.timeseries.Electric | Magnetic | Auxiliary | dict | None
Metadata that may contain a sample_rate field.
Returns
-------
float | None
Sample rate if found, otherwise None.
Notes
-----
Supports nested dict structures like ``{"electric": {"sample_rate": 8.0}}``.
"""
sr = None
if channel_metadata is None:
sr = None
elif isinstance(channel_metadata, dict):
# check first two layers for sample_rate key
if "sample_rate" in channel_metadata.keys():
sr = channel_metadata["sample_rate"]
else:
for k, v in channel_metadata.items():
if isinstance(v, dict):
if "sample_rate" in v.keys():
sr = v["sample_rate"]
else:
try:
# if an mt_metadata.timeseries access attr
sr = channel_metadata.sample_rate
except AttributeError:
sr = None
return sr
def __str__(self) -> str:
"""
Return a summary string representation of the channel.
Returns
-------
str
Multi-line summary including survey, station, run, component,
sample rate, time range, and sample count.
"""
lines = [
f"Survey: {self.survey_metadata.id}",
f"Station: {self.station_metadata.id}",
f"Run: {self.run_metadata.id}",
f"Channel Type: {self.channel_type}",
f"Component: {self.component}",
f"Sample Rate: {self.sample_rate}",
f"Start: {self.start}",
f"End: {self.end}",
f"N Samples: {self.n_samples}",
]
return "\n\t".join(["Channel Summary:"] + lines)
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, other: object) -> bool:
"""
Test equality with another ChannelTS.
Parameters
----------
other : object
Object to compare with.
Returns
-------
bool
True if metadata and data arrays are equal.
Raises
------
TypeError
If `other` is not a ChannelTS instance.
"""
if not isinstance(other, ChannelTS):
raise TypeError(f"Cannot compare ChannelTS with {type(other)}.")
if not other.channel_metadata == self.channel_metadata:
return False
if self.data_array.equals(other.data_array) is False:
msg = "timeseries are not equal"
self.logger.info(msg)
return False
return True
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
def __lt__(self, other: ChannelTS) -> bool:
"""
Compare start times of two channels.
Parameters
----------
other : ChannelTS
Channel to compare with.
Returns
-------
bool
True if self.start < other.start and sample rates match.
Raises
------
TypeError
If `other` is not a ChannelTS instance.
"""
if not isinstance(other, ChannelTS):
raise TypeError(f"Cannot compare ChannelTS with {type(other)}")
self.logger.info("Only testing start time")
return self.start < other.start
def __gt__(self, other: ChannelTS) -> bool:
if not isinstance(other, ChannelTS):
raise TypeError(f"Cannot compare ChannelTS with {type(other)}")
return self.start > other.start
def __add__(self, other: ChannelTS) -> ChannelTS:
"""
Combine two channels with the same component.
Combines using `xr.combine_by_coords`, computes a monotonic time index,
and reindexes with linear interpolation.
Parameters
----------
other : ChannelTS
Channel to combine with this one.
Returns
-------
ChannelTS
Combined channel with monotonic time index.
Raises
------
TypeError
If `other` is not a ChannelTS.
ValueError
If components differ.
Examples
--------
Merge two sequential segments::
>>> combined = ch1 + ch2
"""
if not isinstance(other, ChannelTS):
raise TypeError(f"Cannot combine {type(other)} with ChannelTS.")
if self.component != other.component:
raise ValueError(
"Cannot combine channels with different components. "
f"{self.component} != {other.component}"
)
if self.data_array.name != self.component:
self.data_array.name = self.component
if other.data_array.name != self.component:
other.data_array.name = self.component
# combine into a data set use override to keep attrs from original
combined_ds = xr.combine_by_coords(
[self.data_array, other.data_array], combine_attrs="override"
)
# Handle datetime.timedelta for Python 3.12+ compatibility
duration = combined_ds.time.max().values - combined_ds.time.min().values
if hasattr(duration, "total_seconds"):
# Python datetime.timedelta
duration_ns = duration.total_seconds() * 1e9
elif hasattr(duration, "view"):
# numpy timedelta64 - convert to nanoseconds
duration_ns = float(duration / np.timedelta64(1, "ns"))
else:
# Already numeric
duration_ns = float(duration)
n_samples = (self.sample_rate * duration_ns / 1e9) + 1
new_dt_index = make_dt_coordinates(
combined_ds.time.min().values, self.sample_rate, n_samples
)
new_channel = ChannelTS(
channel_type=self.channel_metadata.type,
channel_metadata=self.channel_metadata,
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
channel_response=self.channel_response,
)
new_channel.data_array = combined_ds.interp(
time=new_dt_index, method="slinear"
).to_array()
new_channel.channel_metadata.time_period.start = new_channel.start
new_channel.channel_metadata.time_period.end = new_channel.end
new_channel.run_metadata.update_time_period()
new_channel.station_metadata.update_time_period()
new_channel.survey_metadata.update_time_period()
new_channel._update_xarray_metadata()
return new_channel
def _initialize_metadata(self) -> metadata.Survey:
"""
Create a Survey metadata hierarchy with default Station/Run/Channel.
Returns
-------
mt_metadata.timeseries.Survey
Initialized survey metadata with default IDs.
"""
survey_metadata = metadata.Survey(id="0")
survey_metadata.stations.append(metadata.Station(id="0"))
survey_metadata.stations[0].runs.append(metadata.Run(id="0"))
# Create temporary channel metadata with valid default components
channel_type_lower = self.channel_type.lower()
if channel_type_lower == "electric":
ch_metadata = meta_classes[self.channel_type](component="ex")
elif channel_type_lower == "magnetic":
ch_metadata = meta_classes[self.channel_type](component="hx")
elif channel_type_lower == "auxiliary":
ch_metadata = meta_classes[self.channel_type](component="temperature")
else:
# Fallback for unknown types - try with a generic component
ch_metadata = meta_classes[self.channel_type](component="temp")
ch_metadata.type = self.channel_type.lower()
survey_metadata.stations[0].runs[0].channels.append(ch_metadata)
return survey_metadata
def _validate_channel_type(self, channel_type: str | None) -> str:
"""
Validate and normalize channel type.
Parameters
----------
channel_type : str | None
Channel type string.
Returns
-------
str
Capitalized valid channel type: 'Electric', 'Magnetic', or 'Auxiliary'.
Raises
------
ValueError
If channel type is not recognized.
"""
if channel_type is None:
channel_type = "auxiliary"
if channel_type.lower() not in ["electric", "magnetic"]:
channel_type = "auxiliary"
if not channel_type.capitalize() in meta_classes.keys():
msg = (
"Channel type is undefined, must be [ electric | "
"magnetic | auxiliary ]"
)
self.logger.error(msg)
raise ValueError(msg)
return channel_type.capitalize()
def _validate_channel_metadata(
self,
channel_metadata: (
metadata.Electric | metadata.Magnetic | metadata.Auxiliary | dict
),
) -> metadata.Electric | metadata.Magnetic | metadata.Auxiliary:
"""
Validate and normalize channel metadata input.
Parameters
----------
channel_metadata : mt_metadata.timeseries.Electric | Magnetic | Auxiliary | dict
Metadata to validate.
Returns
-------
mt_metadata.timeseries.Electric | Magnetic | Auxiliary
Validated metadata object.
Raises
------
TypeError
If input is not an expected type.
"""
expected_types = (
metadata.Electric,
metadata.Magnetic,
metadata.Auxiliary,
)
if isinstance(channel_metadata, expected_types):
return channel_metadata.copy()
if not isinstance(channel_metadata, dict):
msg = (
f"input metadata must be type {type(self.channel_metadata)}"
f" or dict, not {type(channel_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
channel_metadata_lower_keys = [x.lower() for x in channel_metadata.keys()]
if self.channel_type.lower() not in channel_metadata_lower_keys:
try:
self.channel_type = channel_metadata["type"]
except KeyError:
pass
channel_metadata = {self.channel_type: channel_metadata}
self.channel_type = list(channel_metadata.keys())[0]
# Create channel metadata with proper default component
channel_type_lower = self.channel_type.lower()
if channel_type_lower == "electric":
ch_metadata = meta_classes[self.channel_type]()
elif channel_type_lower == "magnetic":
ch_metadata = meta_classes[self.channel_type]()
elif channel_type_lower == "auxiliary":
ch_metadata = meta_classes[self.channel_type]()
else:
ch_metadata = meta_classes[self.channel_type]()
self.logger.debug("Loading from metadata dict")
ch_metadata.from_dict(channel_metadata)
return ch_metadata
def _validate_run_metadata(self, run_metadata: metadata.Run | dict) -> metadata.Run:
"""
Validate and normalize run metadata input.
Parameters
----------
run_metadata : mt_metadata.timeseries.Run | dict
Run metadata to validate.
Returns
-------
mt_metadata.timeseries.Run
Validated run metadata object.
Raises
------
TypeError
If input is not a Run object or dict.
"""
if not isinstance(run_metadata, metadata.Run):
if isinstance(run_metadata, dict):
if "run" not in [cc.lower() for cc in run_metadata.keys()]:
run_metadata = {"Run": run_metadata}
r_metadata = metadata.Run()
r_metadata.from_dict(run_metadata)
self.logger.debug("Loading from metadata dict")
return r_metadata
else:
msg = (
f"input metadata must be type {type(self.run_metadata)} "
f"or dict, not {type(run_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return run_metadata.copy()
def _validate_station_metadata(
self, station_metadata: metadata.Station | dict
) -> metadata.Station:
"""
Validate and normalize station metadata input.
Parameters
----------
station_metadata : mt_metadata.timeseries.Station | dict
Station metadata to validate.
Returns
-------
mt_metadata.timeseries.Station
Validated station metadata object.
Raises
------
TypeError
If input is not a Station object or dict.
"""
if not isinstance(station_metadata, metadata.Station):
if isinstance(station_metadata, dict):
if "station" not in [cc.lower() for cc in station_metadata.keys()]:
station_metadata = {"Station": station_metadata}
st_metadata = metadata.Station()
st_metadata.from_dict(station_metadata)
self.logger.debug("Loading from metadata dict")
return st_metadata
else:
msg = (
f"input metadata must be type {type(self.station_metadata)}"
f" or dict, not {type(station_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return station_metadata.copy()
def _validate_survey_metadata(
self, survey_metadata: metadata.Survey | dict
) -> metadata.Survey:
"""
Validate and normalize survey metadata input.
Parameters
----------
survey_metadata : mt_metadata.timeseries.Survey | dict
Survey metadata to validate.
Returns
-------
mt_metadata.timeseries.Survey
Validated survey metadata object.
Raises
------
TypeError
If input is not a Survey object or dict.
"""
if not isinstance(survey_metadata, metadata.Survey):
if isinstance(survey_metadata, dict):
if "survey" not in [cc.lower() for cc in survey_metadata.keys()]:
survey_metadata = {"Survey": survey_metadata}
sv_metadata = metadata.Survey()
sv_metadata.from_dict(survey_metadata)
self.logger.debug("Loading from metadata dict")
return sv_metadata
else:
msg = (
f"input metadata must be type {type(self.survey_metadata)}"
f" or dict, not {type(survey_metadata)}"
)
self.logger.error(msg)
raise TypeError(msg)
return survey_metadata.copy()
[docs]
def copy(self, data: bool = True) -> ChannelTS:
"""
Create a copy of the ChannelTS object.
Parameters
----------
data : bool, default True
Include data in the copy (True) or only metadata (False).
Returns
-------
ChannelTS
Copy of the channel.
Examples
--------
Copy metadata structure without data::
>>> ch_copy = ts_obj.copy(data=False)
"""
if not data:
return ChannelTS(
channel_type=self.channel_metadata.type,
channel_metadata=self.channel_metadata.copy(),
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
survey_metadata=self.survey_metadata.copy(),
channel_response=self.channel_response.copy(),
)
else:
return ChannelTS(
channel_type=self.channel_metadata.type,
data=self.ts,
channel_metadata=self.channel_metadata.copy(),
run_metadata=self.run_metadata.copy(),
station_metadata=self.station_metadata.copy(),
survey_metadata=self.survey_metadata.copy(),
channel_response=self.channel_response.copy(),
)
### Properties ------------------------------------------------------------
@property
@survey_metadata.setter
def survey_metadata(self, survey_metadata: metadata.Survey | dict | None) -> None:
"""
Set survey metadata.
Parameters
----------
survey_metadata : mt_metadata.timeseries.Survey | dict | None
Survey metadata object or dictionary.
"""
if survey_metadata is not None:
survey_metadata = self._validate_survey_metadata(survey_metadata)
self._survey_metadata.update(survey_metadata)
@property
@station_metadata.setter
def station_metadata(
self, station_metadata: metadata.Station | dict | None
) -> None:
"""
Set station metadata.
Parameters
----------
station_metadata : mt_metadata.timeseries.Station | dict | None
Station metadata to set.
"""
if station_metadata is not None:
station_metadata = self._validate_station_metadata(station_metadata)
runs = ListDict()
if self.run_metadata.id not in ["0", 0, None]:
runs.append(self.run_metadata.copy())
runs.extend(station_metadata.runs)
if len(runs) == 0:
runs[0] = metadata.Run(id="0")
# be sure there is a level below
if len(runs[0].channels) == 0:
# Create channel metadata with proper default component
channel_type_lower = self.channel_type.lower()
if channel_type_lower == "electric":
ch_metadata = meta_classes[self.channel_type](component="ex")
elif channel_type_lower == "magnetic":
ch_metadata = meta_classes[self.channel_type](component="hx")
elif channel_type_lower == "auxiliary":
ch_metadata = meta_classes[self.channel_type](
component="temperature"
)
else:
ch_metadata = meta_classes[self.channel_type]()
ch_metadata.type = self.channel_type.lower()
runs[0].channels.append(ch_metadata)
stations = ListDict()
stations.append(station_metadata)
stations[0].runs = runs
self.survey_metadata.stations = stations
@property
@run_metadata.setter
def run_metadata(self, run_metadata: metadata.Run | dict | None) -> None:
"""
Set run metadata.
Parameters
----------
run_metadata : mt_metadata.timeseries.Run | dict | None
Run metadata to set.
"""
# need to make sure the first index is the desired channel
if run_metadata is not None:
run_metadata = self._validate_run_metadata(run_metadata)
runs = ListDict()
runs.append(run_metadata)
channels = ListDict()
if self.component is not None:
key = str(self.component)
channels.append(self.station_metadata.runs[0].channels[key])
# add existing channels
channels.extend(self.run_metadata.channels, skip_keys=[key, "0"])
# add channels from input metadata
channels.extend(run_metadata.channels)
runs[0].channels = channels
runs.extend(self.station_metadata.runs, skip_keys=[run_metadata.id, "0"])
self._survey_metadata.stations[0].runs = runs
@property
@channel_metadata.setter
def channel_metadata(
self,
channel_metadata: (
metadata.Electric | metadata.Magnetic | metadata.Auxiliary | dict | None
),
) -> None:
"""
Set channel metadata.
Parameters
----------
channel_metadata : mt_metadata.timeseries.Electric | Magnetic | Auxiliary | dict | None
Channel metadata to set.
Raises
------
ValueError
If the channel component is None.
"""
if channel_metadata is not None:
channel_metadata = self._validate_channel_metadata(channel_metadata)
if channel_metadata.component is not None:
channels = ListDict()
if channel_metadata.component in self.run_metadata.channels.keys():
channels.append(
self.run_metadata.channels[channel_metadata.component]
)
channels[0].update(channel_metadata)
else:
channels.append(channel_metadata)
channels.extend(
self.run_metadata.channels,
skip_keys=[channel_metadata.component, None],
)
self.run_metadata.channels = channels
self.channel_type = self.run_metadata.channels[0].type
else:
raise ValueError("Channel 'component' cannot be None")
def _check_pd_index(self, ts_arr: pd.DataFrame | pd.Series) -> pd.DatetimeIndex:
"""
Check and return the time index from a pandas DataFrame or Series.
Parameters
----------
ts_arr : pandas.DataFrame | pandas.Series
Time series data.
Returns
-------
pandas.DatetimeIndex
Time index (existing or reconstructed from start/sample_rate).
"""
if isinstance(ts_arr.index[0], pd._libs.tslibs.timestamps.Timestamp):
return ts_arr.index
else:
return make_dt_coordinates(self.start, self.sample_rate, ts_arr.shape[0])
def _validate_dataframe_input(
self, ts_arr: pd.DataFrame
) -> tuple[pd.DataFrame, pd.DatetimeIndex]:
"""
Validate pandas DataFrame input.
Parameters
----------
ts_arr : pandas.DataFrame
DataFrame containing a 'data' column.
Returns
-------
tuple[pandas.DataFrame, pandas.DatetimeIndex]
Validated DataFrame and time index.
Raises
------
ValueError
If 'data' column is missing or has object dtype that can't convert.
"""
if "data" not in ts_arr.columns:
msg = (
"Data frame needs to have a column named `data` "
"where the time series data is stored"
)
self.logger.error(msg)
raise ValueError(msg)
if isinstance(type(ts_arr.data.dtype), type(np.object_)):
try:
ts_arr["data"] = ts_arr.data.astype(float)
except ValueError:
raise ValueError(
"DataFrame dtype is 'object' and cannot convert "
"data to float values, check data dtype."
)
dt = self._check_pd_index(ts_arr)
return ts_arr, dt
def _validate_series_input(
self, ts_arr: pd.Series
) -> tuple[pd.Series, pd.DatetimeIndex]:
"""
Validate pandas Series input.
Parameters
----------
ts_arr : pandas.Series
Series containing time series data.
Returns
-------
tuple[pandas.Series, pandas.DatetimeIndex]
Validated Series and time index.
Raises
------
ValueError
If Series has object dtype that can't convert to float.
"""
if isinstance(type(ts_arr.dtype), type(np.object_)):
try:
ts_arr = ts_arr.astype(float)
except ValueError:
raise ValueError(
"Series dtype is 'object' and cannot convert "
"data to float values, check data dtype."
)
dt = self._check_pd_index(ts_arr)
return ts_arr, dt
@property
[docs]
def ts(self) -> np.ndarray:
"""
Time series data as a numpy array.
Returns
-------
numpy.ndarray
The time series data.
"""
return self.data_array.data
@ts.setter
def ts(
self,
ts_arr: np.ndarray | list | tuple | pd.DataFrame | pd.Series | xr.DataArray,
) -> None:
"""
Set the time series data.
Parameters
----------
ts_arr : numpy.ndarray | list | tuple | pandas.DataFrame | pandas.Series | xarray.DataArray
Time series data. DataFrames must have a 'data' column.
Raises
------
TypeError
If data type is not supported.
Notes
-----
- For pandas DataFrames/Series, time index is extracted or reconstructed.
- For xarray.DataArray, metadata is extracted from attrs.
"""
if isinstance(ts_arr, (np.ndarray, list, tuple)):
if not isinstance(ts_arr, np.ndarray):
ts_arr = np.array(ts_arr)
# Validate an input array to make sure its 1D
if len(ts_arr.shape) == 2:
if 1 in ts_arr.shape:
ts_arr = ts_arr.reshape(ts_arr.size)
else:
msg = f"Input array must be 1-D array not {ts_arr.shape}"
self.logger.error(msg)
raise ValueError(msg)
dt = make_dt_coordinates(self.start, self.sample_rate, ts_arr.size)
self.data_array = xr.DataArray(
ts_arr, coords=[("time", dt)], name=self.component
)
self._update_xarray_metadata()
elif isinstance(ts_arr, pd.core.frame.DataFrame):
ts_arr, dt = self._validate_dataframe_input(ts_arr)
self.data_array = xr.DataArray(
ts_arr["data"], coords=[("time", dt)], name=self.component
)
self._update_xarray_metadata()
elif isinstance(ts_arr, pd.core.series.Series):
ts_arr, dt = self._validate_series_input(ts_arr)
self.data_array = xr.DataArray(
ts_arr.values, coords=[("time", dt)], name=self.component
)
self._update_xarray_metadata()
elif isinstance(ts_arr, xr.DataArray):
# TODO: need to validate the input xarray
self.data_array = ts_arr
# need to pull out the metadata as a separate dictionary
meta_dict = dict([(k, v) for k, v in ts_arr.attrs.items()])
# need to get station and run metadata out
survey_dict = {}
station_dict = {}
run_dict = {}
for key in [k for k in meta_dict.keys() if "survey." in k]:
survey_dict[key.split("station.")[-1]] = meta_dict.pop(key)
for key in [k for k in meta_dict.keys() if "station." in k]:
station_dict[key.split("station.")[-1]] = meta_dict.pop(key)
for key in [k for k in meta_dict.keys() if "run." in k]:
run_dict[key.split("run.")[-1]] = meta_dict.pop(key)
self.channel_type = meta_dict["type"]
# Create channel metadata with proper default component
channel_type_lower = self.channel_type.lower()
if channel_type_lower == "electric":
ch_metadata = meta_classes[self.channel_type](component="ex")
elif channel_type_lower == "magnetic":
ch_metadata = meta_classes[self.channel_type](component="hx")
elif channel_type_lower == "auxiliary":
ch_metadata = meta_classes[self.channel_type](component="temperature")
else:
ch_metadata = meta_classes[self.channel_type]()
ch_metadata.from_dict({self.channel_type: meta_dict})
self.survey_metadata.from_dict({"survey": survey_dict})
self.station_metadata.from_dict({"station": station_dict})
self.run_metadata.from_dict({"run": run_dict})
self.channel_metadata = ch_metadata
# need to run this incase things are different.
self._update_xarray_metadata()
else:
msg = (
"Data type {0} not supported".format(type(ts_arr))
+ ", ts needs to be a numpy.ndarray, pandas DataFrame, "
+ "or xarray.DataArray."
)
raise TypeError(msg)
@property
[docs]
def time_index(self) -> np.ndarray:
"""
Time index as a numpy array.
Returns
-------
numpy.ndarray
Array of datetime64[ns] timestamps.
"""
try:
return self.data_array.time.to_numpy()
except AttributeError:
return self.data_array.time.values
@property
[docs]
def channel_type(self) -> str:
"""
Channel type.
Returns
-------
str
Channel type: 'Electric', 'Magnetic', or 'Auxiliary'.
"""
return self._channel_type
@channel_type.setter
def channel_type(self, value: str) -> None:
"""change channel type means changing the metadata type"""
value = self._validate_channel_type(value)
if value != self._channel_type:
m_dict = self.channel_metadata.to_dict(single=True)
msg = (
f"Changing metadata from {self.channel_type} to {value}, "
"will translate any similar attributes."
)
# Create channel metadata with proper default component
if value.lower() == "electric":
channel_metadata = meta_classes[value](component="ex")
elif value.lower() == "magnetic":
channel_metadata = meta_classes[value](component="hx")
elif value.lower() == "auxiliary":
channel_metadata = meta_classes[value](component="temperature")
else:
channel_metadata = meta_classes[value]()
self.logger.debug(msg)
for key in channel_metadata.to_dict(single=True).keys():
# need to skip type otherwise it keeps the same type
if key in ["type"]:
continue
# Skip component when changing channel types to avoid validation conflicts
# The new metadata already has appropriate default component for the type
if key in ["component"] and self._channel_type != value:
continue
try:
channel_metadata.update_attribute(key, m_dict[key])
except KeyError:
pass
self._channel_type = value
self.run_metadata.channels[0] = channel_metadata
def _update_xarray_metadata(self) -> None:
"""
Update xarray attrs with current metadata.
Notes
-----
Synchronizes channel_metadata fields into data_array.attrs and adds
station/run IDs for convenient access.
"""
self.logger.debug("Updating xarray attributes")
self.channel_metadata.time_period.start = self.start.iso_no_tz
self.channel_metadata.time_period.end = self.end.iso_no_tz
self.channel_metadata.sample_rate = self.sample_rate
self.data_array.attrs.update(
self.channel_metadata.to_dict()[self.channel_metadata._class_name]
)
# add station and run id's here, for now this is all we need but may need
# more metadata down the road.
self.data_array.attrs["station.id"] = self.station_metadata.id
self.data_array.attrs["run.id"] = self.run_metadata.id
self.data_array.name = self.component
@property
[docs]
def component(self):
"""component"""
return self.channel_metadata.component
@component.setter
def component(self, comp):
"""set component in metadata and carry through"""
if self.channel_metadata.type == "electric":
if comp[0].lower() != "e":
msg = (
"The current timeseries is an electric channel. "
"Cannot change channel type, create a new ChannelTS object."
)
self.logger.error(msg)
raise ValueError(msg)
elif self.channel_metadata.type == "magnetic":
if comp[0].lower() not in ["h", "b"]:
msg = (
"The current timeseries is a magnetic channel. "
"Cannot change channel type, create a new ChannelTS object."
)
self.logger.error(msg)
raise ValueError(msg)
if self.channel_metadata.type == "auxiliary":
if comp[0].lower() in ["e", "h", "b"]:
msg = (
"The current timeseries is an auxiliary channel. "
"Cannot change channel type, create a new ChannelTS object."
)
self.logger.error(msg)
raise ValueError(msg)
self.channel_metadata.component = comp
# need to update the keys in the list dict
channels = ListDict()
channels.append(self.channel_metadata)
if len(self.run_metadata.channels) > 1:
for ch in self.run_metadata.channels[1:]:
channels.append(ch)
self.run_metadata.channels = channels
self._update_xarray_metadata()
# --> number of samples just to make sure there is consistency
@property
[docs]
def n_samples(self):
"""number of samples"""
return int(self.ts.size)
@n_samples.setter
def n_samples(self, n_samples):
"""number of samples (int)"""
self.logger.warning(
"Cannot set the number of samples. Use `ChannelTS.resample` or `get_slice`"
)
[docs]
def has_data(self):
"""
check to see if there is an index in the time series
"""
if self.data_array.data.size > 1:
if isinstance(
self.data_array.indexes["time"][0],
pd._libs.tslibs.timestamps.Timestamp,
):
return True
return False
else:
return False
[docs]
def is_high_frequency(self, threshold_dt=1e-4):
"""
Quasi hard-coded condition to check if data are logged at more than 10kHz
can be parameterized in future
"""
if (
self.data_array.coords.indexes["time"][1]
- self.data_array.coords.indexes["time"][0]
).total_seconds() < threshold_dt:
return True
else:
return False
[docs]
def compute_sample_rate(self):
"""
Two cases, high_frequency (HF) data and not HF data.
# Original comment about the HF case:
Taking the median(diff(timestamps)) is more accurate for high sample rates, the way pandas.date_range
rounds nanoseconds is not consistent between samples, therefore taking the median provides better results
if the time series is long this can be inefficient so test first
"""
if self.is_high_frequency():
dt_array = np.diff(self.data_array.coords.indexes["time"])
best_dt, counts = scipy.stats.mode(dt_array)
# Calculate total seconds of the best dt and calculate sample rate
best_dt_seconds = float(best_dt) / 1e9
sr = 1 / best_dt_seconds
else:
t_diff = (
self.data_array.coords.indexes["time"][-1]
- self.data_array.coords.indexes["time"][0]
)
sr = self.data_array.size / t_diff.total_seconds()
return np.round(sr, 0)
# --> sample rate
@property
[docs]
def sample_rate(self):
"""sample rate in samples/second"""
if self.has_data():
if self._sample_rate is None:
self._sample_rate = self.compute_sample_rate()
return self._sample_rate
else:
self.logger.debug("Data has not been set yet, sample rate is from metadata")
sr = self.channel_metadata.sample_rate
if sr is None:
sr = 0.0
if sr >= 1:
return np.round(sr, 0)
else:
return np.round(sr, 6)
@sample_rate.setter
def sample_rate(self, sample_rate):
"""
sample rate in samples/second
:param sample_rate: sample rate in samples per second
:type sample_rate: float
"""
if self.has_data():
self.logger.warning(
"Resetting sample_rate assumes same start time and "
"same number of samples, resulting in new end time. "
"If you want to downsample existing time series "
"use the method channelTS.resample()"
)
self.logger.debug(
f"Resetting sample rate from {self.sample_rate} to {sample_rate}"
)
new_dt = make_dt_coordinates(self.start, sample_rate, self.n_samples)
self.data_array.coords["time"] = new_dt
else:
if self.channel_metadata.sample_rate not in [0.0, None]:
self.logger.warning(
f"Resetting ChannelTS.channel_metadata.sample_rate to {sample_rate}. "
)
self.channel_metadata.sample_rate = sample_rate
self._sample_rate = sample_rate
self._update_xarray_metadata()
@property
[docs]
def sample_interval(self):
"""
Sample interval = 1 / sample_rate
:return: sample interval as time distance between time samples
:rtype: float
"""
if self.sample_rate != 0:
return 1.0 / self.sample_rate
return 0.0
## set time and set index
@property
[docs]
def start(self):
"""MTime object"""
if self.has_data():
return MTime(
time_stamp=self.data_array.coords.indexes["time"][0].isoformat()
)
else:
self.logger.debug(
"Data not set yet, pulling start time from "
"metadata.time_period.start"
)
return MTime(time_stamp=self.channel_metadata.time_period.start)
@start.setter
def start(self, start_time):
"""
start time of time series in UTC given in some format or a datetime
object.
Resets epoch seconds if the new value is not equivalent to previous
value.
Resets how the ts data frame is indexed, setting the starting time to
the new start time.
:param start_time: start time of time series, can be string or epoch seconds
"""
if not isinstance(start_time, MTime):
start_time = MTime(time_stamp=start_time)
self.channel_metadata.time_period.start = start_time.isoformat()
if self.has_data():
if start_time == MTime(
time_stamp=self.data_array.coords.indexes["time"][0].isoformat()
):
return
else:
new_dt = make_dt_coordinates(
start_time, self.sample_rate, self.n_samples
)
self.data_array.coords["time"] = new_dt
# make a time series that the data can be indexed by
else:
self.logger.debug("No data, just updating metadata start")
self._survey_metadata.stations[0].runs[0].update_time_period()
self._survey_metadata.stations[0].update_time_period()
self._survey_metadata.update_time_period()
self._update_xarray_metadata()
@property
[docs]
def end(self):
"""MTime object"""
if self.has_data():
return MTime(
time_stamp=self.data_array.coords.indexes["time"][-1].isoformat()
)
else:
self.logger.debug(
"Data not set yet, pulling end time from metadata.time_period.end"
)
return MTime(time_stamp=self.channel_metadata.time_period.end)
@end.setter
def end(self, end_time):
"""
end time of time series in UTC given in some format or a datetime
object.
Resets epoch seconds if the new value is not equivalent to previous
value.
Resets how the ts data frame is indexed, setting the starting time to
the new start time.
"""
self.logger.warning(
"Cannot set `end`. If you want a slice, then use get_slice method"
)
@property
[docs]
def channel_response(self):
"""
Full channel response filter
:return: full channel response filter
:rtype: :class:`mt_metadata.timeseries.filters.ChannelResponse`
"""
return self._channel_response
@channel_response.setter
def channel_response(self, value):
"""
:param value: channel response filter
:type value: :class:`mt_metadata.timeseries.filters.`
"""
if value is None:
return
if not isinstance(value, ChannelResponse):
msg = (
"channel response must be a "
"mt_metadata.timeseries.filters.ChannelResponse object "
f"not {type(value)}."
)
self.logger.error(msg)
raise TypeError(msg)
self._channel_response = value
# update channel metadata
if self.channel_metadata.filter_names != value.names:
for ch_filter in self._channel_response.filters_list:
if ch_filter.name in self.channel_metadata.filter_names:
# update existing filter info
existing_filter = self.channel_metadata.get_filter(ch_filter.name)
existing_filter.applied = False
existing_filter.stage = ch_filter.sequence_number
existing_filter.comments = ch_filter.comments
else:
self.channel_metadata.add_filter(
name=ch_filter.name,
applied=False,
stage=ch_filter.sequence_number,
comments=ch_filter.comments,
)
[docs]
def get_calibration_operation(self):
if self.channel_response.units_out == self.channel_metadata.unit_object.name:
calibration_operation = "divide"
elif self.channel_response.units_in == self.channel_metadata.unit_object.name:
calibration_operation = "multiply"
self.logger.warning(
"Unexpected Inverse Filter is being corrected -- something maybe wrong here "
)
else:
msg = "cannot determine multiply or divide via units -- setting to divide"
self.logger.warning(msg)
calibration_operation = "divide"
return calibration_operation
[docs]
def get_calibrated_units(self):
"""
Follows the FDSN standard which has the filter stages starting with physical units to digital counts.
The channel_response is expected to have a list of filter "stages" of which the first stage
has input units corresponding to the the physical quantity that the instrument measures, and the last is
normally counts.
channel_response can be viewed as the chaining together of all of these filters.
Thus it is normal for channel_response.units_out will be in the same units as the archived raw
time series, and for the units after the response is corrected for will be the units_in of
The units of the channel metadata are compared to the input and output units of the channel_response.
:return: tuple, calibration_operation, either "mulitply" or divide", and a string for calibrated units
:rtype: tuple (of two strings_
"""
if self.channel_response.units_out == self.channel_metadata.unit_object.name:
calibrated_units = self.channel_response.units_in
elif (
self.channel_response.units_in == None
and self.channel_response.units_out == None
):
msg = "No Units are associated with the channel_response"
self.logger.warning(msg)
msg = "cannot determine multiply or divide via units -- setting to divide:/"
self.logger.warning(msg)
calibrated_units = self.channel_metadata.units
else:
logger.critical(
"channel response filter units are likely corrupt or channel_ts has no units"
)
calibrated_units = self.channel_response.units_in
unit_object = get_unit_object(calibrated_units)
calibrated_units = unit_object.name
return calibrated_units
[docs]
def remove_instrument_response(
self, include_decimation=False, include_delay=False, **kwargs
):
"""
Remove instrument response from the given channel response filter
The order of operations is important (if applied):
1) detrend
2) zero mean
3) zero pad
4) time window
5) frequency window
6) remove response
7) undo time window
8) bandpass
:param include_decimation: Include decimation in response,
defaults to True
:type include_decimation: bool, optional
:param include_delay: include delay in complex response,
defaults to False
:type include_delay: bool, optional
**kwargs**
:param plot: to plot the calibration process [ False | True ]
:type plot: boolean, default True
:param detrend: Remove linar trend of the time series
:type detrend: boolean, default True
:param zero_mean: Remove the mean of the time series
:type zero_mean: boolean, default True
:param zero_pad: pad the time series to the next power of 2 for efficiency
:type zero_pad: boolean, default True
:param t_window: Time domain windown name see `scipy.signal.windows` for options
:type t_window: string, default None
:param t_window_params: Time domain window parameters, parameters can be
found in `scipy.signal.windows`
:type t_window_params: dictionary
:param f_window: Frequency domain windown name see `scipy.signal.windows` for options
:type f_window: string, defualt None
:param f_window_params: Frequency window parameters, parameters can be
found in `scipy.signal.windows`
:type f_window_params: dictionary
:param bandpass: bandpass freequency and order {"low":, "high":, "order":,}
:type bandpass: dictionary
"""
def bool_flip(x):
return bool(int(x) - 1)
if hasattr(self.channel_metadata, "filter"):
if self.channel_metadata.filter.applied is []:
self.logger.warning("No filters to apply to calibrate time series data")
return self.copy()
elif self.channel_metadata.filters is []:
self.logger.warning("No filters to apply to calibrate time series data")
return self.copy()
calibrated_ts = self.copy(data=False)
# Make a list of the filters whose response will be removed.
# We make the list here so that we have access to the indices to flip
indices_to_flip = self.channel_response.get_indices_of_filters_to_remove(
include_decimation=include_decimation,
include_delay=include_delay,
)
filters_to_remove = [
self.channel_response.filters_list[i] for i in indices_to_flip
]
remover = RemoveInstrumentResponse(
self.ts,
self.time_index,
self.sample_interval,
self.channel_response,
**kwargs,
)
calibration_operation = self.get_calibration_operation()
calibrated_ts.ts = remover.remove_instrument_response(
filters_to_remove=filters_to_remove,
operation=calibration_operation,
)
# update "applied" booleans
if hasattr(calibrated_ts.channel_metadata, "filter"):
applied_filters = calibrated_ts.channel_metadata.filter.applied
for idx in indices_to_flip:
applied_filters[idx] = bool_flip(applied_filters[idx])
calibrated_ts.channel_metadata.filter.applied = applied_filters
else:
for idx in indices_to_flip:
calibrated_ts.channel_metadata.filters[idx].applied = bool_flip(
calibrated_ts.channel_metadata.filters[idx].applied
)
# update units
calibrated_units = self.get_calibrated_units()
calibrated_ts.data_array.attrs["units"] = calibrated_units
calibrated_ts.channel_metadata.units = calibrated_units
calibrated_ts._update_xarray_metadata()
return calibrated_ts
[docs]
def get_slice(self, start, end=None, n_samples=None):
"""
Get a slice from the time series given a start and end time.
Looks for >= start & <= end
Uses loc to be exact with milliseconds
:param start: start time of the slice
:type start: string, MTime
:param end: end time of the slice
:type end: string, MTime
:param n_samples: number of sample to get after start time
:type n_samples: integer
:return: slice of the channel requested
:rtype: ChannelTS
"""
if n_samples is None and end is None:
msg = "Must input either end_time or n_samples."
self.logger.error(msg)
raise ValueError(msg)
if n_samples is not None and end is not None:
msg = "Must input either end_time or n_samples, not both."
self.logger.error(msg)
raise ValueError(msg)
if not isinstance(start, MTime):
start = MTime(time_stamp=start)
if n_samples is not None:
n_samples = int(n_samples)
end = start + ((n_samples - 1) / self.sample_rate)
if end is not None:
if not isinstance(end, MTime):
end = MTime(time_stamp=end)
chunk = self.data_array.indexes["time"].slice_indexer(
start=np.datetime64(start.iso_no_tz),
end=np.datetime64(end.iso_no_tz),
)
new_ts = self.data_array.isel(indexers={"time": chunk})
new_ch_ts = ChannelTS(
channel_type=self.channel_type,
data=new_ts,
survey_metadata=self.survey_metadata,
channel_response=self.channel_response,
)
return new_ch_ts
# decimate data
[docs]
def decimate(self, new_sample_rate, inplace=False, max_decimation=8):
"""
decimate the data by using scipy.signal.decimate
:param dec_factor: decimation factor
:type dec_factor: int
* refills ts.data with decimated data and replaces sample_rate
"""
sr_list = get_decimation_sample_rates(
self.sample_rate, new_sample_rate, max_decimation
)
# need to fill nans with 0 otherwise they wipeout the decimation values
# and all becomes nan.
new_ts = self.data_array.fillna(0)
for step_sr in sr_list:
new_ts = new_ts.sps_filters.decimate(step_sr)
new_ts.attrs["sample_rate"] = new_sample_rate
self.channel_metadata.sample_rate = new_ts.attrs["sample_rate"]
if inplace:
self.ts = new_ts
else:
new_ts.attrs.update(
self.channel_metadata.to_dict()[self.channel_metadata._class_name]
)
# return new_ts
return ChannelTS(
self.channel_metadata.type,
data=new_ts,
metadata=self.channel_metadata,
)
[docs]
def resample_poly(self, new_sample_rate, pad_type="mean", inplace=False):
"""
Use scipy.signal.resample_poly to resample data while using an FIR
filter to remove aliasing.
:param new_sample_rate: DESCRIPTION
:type new_sample_rate: TYPE
:param pad_type: DESCRIPTION, defaults to "mean"
:type pad_type: TYPE, optional
:return: DESCRIPTION
:rtype: TYPE
"""
# need to fill nans with 0 otherwise they wipeout the decimation values
# and all becomes nan.
new_ts = self.data_array.fillna(0)
new_ts = new_ts.sps_filters.resample_poly(new_sample_rate, pad_type=pad_type)
new_ts.attrs["sample_rate"] = new_sample_rate
self.channel_metadata.sample_rate = new_ts.attrs["sample_rate"]
if inplace:
self.ts = new_ts
else:
new_ts.attrs.update(
self.channel_metadata.to_dict()[self.channel_metadata._class_name]
)
# return new_ts
return ChannelTS(
self.channel_metadata.type,
data=new_ts,
metadata=self.channel_metadata,
)
[docs]
def merge(
self,
other,
gap_method="slinear",
new_sample_rate=None,
resample_method="poly",
):
"""
merg two channels or list of channels together in the following steps
1. xr.combine_by_coords([original, other])
2. compute monotonic time index
3. reindex(new_time_index, method=gap_method)
If you want a different method or more control use merge
:param other: Another channel
:type other: :class:`mth5.timeseries.ChannelTS`
:raises TypeError: If input is not a ChannelTS
:raises ValueError: if the components are different
:return: Combined channel with monotonic time index and same metadata
:rtype: :class:`mth5.timeseries.ChannelTS`
"""
if new_sample_rate is not None:
merge_sample_rate = new_sample_rate
if resample_method == "decimate":
combine_list = [self.decimate(new_sample_rate).data_array]
elif resample_method == "poly":
combine_list = [self.resample_poly(new_sample_rate).data_array]
else:
merge_sample_rate = self.sample_rate
combine_list = [self.data_array]
if isinstance(other, (list, tuple)):
for ch in other:
if not isinstance(ch, ChannelTS):
raise TypeError(f"Cannot combine {type(ch)} with ChannelTS.")
if self.component != ch.component:
raise ValueError(
"Cannot combine channels with different components. "
f"{self.component} != {ch.component}"
)
if new_sample_rate is not None:
if resample_method == "decimate":
ch = ch.decimate(new_sample_rate)
elif resample_method == "poly":
ch = ch.resample_poly(new_sample_rate)
combine_list.append(ch.data_array)
else:
if not isinstance(other, ChannelTS):
raise TypeError(f"Cannot combine {type(other)} with ChannelTS.")
if self.component != other.component:
raise ValueError(
"Cannot combine channels with different components. "
f"{self.component} != {other.component}"
)
if new_sample_rate is not None:
if resample_method == "decimate":
other = other.decimate(new_sample_rate)
elif resample_method == "poly":
other = other.resample_poly(new_sample_rate)
combine_list.append(other.data_array)
# combine into a data set use override to keep attrs from original
combined_ds = xr.combine_by_coords(combine_list, combine_attrs="override")
n_samples = (
merge_sample_rate
* float(combined_ds.time.max().values - combined_ds.time.min().values)
/ 1e9
) + 1
new_dt_index = make_dt_coordinates(
combined_ds.time.min().values, merge_sample_rate, n_samples
)
channel_metadata = self.channel_metadata.copy()
channel_metadata.sample_rate = merge_sample_rate
run_metadata = self.run_metadata.copy()
run_metadata.sample_rate = merge_sample_rate
new_channel = ChannelTS(
channel_type=self.channel_metadata.type,
channel_metadata=channel_metadata,
run_metadata=self.run_metadata,
station_metadata=self.station_metadata,
survey_metadata=self.survey_metadata,
channel_response=self.channel_response,
)
new_channel.data_array = combined_ds.interp(
time=new_dt_index, method=gap_method
).to_array()
new_channel.channel_metadata.time_period.start = new_channel.start
new_channel.channel_metadata.time_period.end = new_channel.end
new_channel.run_metadata.update_time_period()
new_channel.station_metadata.update_time_period()
new_channel.survey_metadata.update_time_period()
new_channel._update_xarray_metadata()
return new_channel
[docs]
def to_xarray(self):
"""
Returns a :class:`xarray.DataArray` object of the channel timeseries
this way metadata from the metadata class is updated upon return.
:return: Returns a :class:`xarray.DataArray` object of the channel timeseries
this way metadata from the metadata class is updated upon return.
:rtype: :class:`xarray.DataArray`
>>> import numpy as np
>>> from mth5.timeseries import ChannelTS
>>> ex = ChannelTS("electric")
>>> ex.start = "2020-01-01T12:00:00"
>>> ex.sample_rate = 16
>>> ex.ts = np.random.rand(4096)
"""
self._update_xarray_metadata()
return self.data_array
[docs]
def to_obspy_trace(self, network_code=None, encoding=None):
"""
Convert the time series to an :class:`obspy.core.trace.Trace` object. This
will be helpful for converting between data pulled from IRIS and data going
into IRIS.
:param network_code: two letter code provided by FDSN DMC
:type network_code: string
:return: DESCRIPTION
:rtype: TYPE
"""
encoding_dict = {
"INT16": np.int16,
"INT32": np.int32,
"INT64": np.int32,
"FLOAT32": np.float32,
"FLOAT64": np.float64,
}
if self.ts.dtype.type in [np.int64]:
obspy_trace = Trace(self.ts.astype(np.int32))
if encoding:
try:
obspy_trace = Trace(self.ts.astype(encoding_dict[encoding]))
except KeyError:
raise KeyError(
f"{encoding} is not understood. Acceptable values are {list(encoding_dict.keys())}"
)
else:
obspy_trace = Trace(self.ts)
# add metadata
obspy_trace.stats.channel = fdsn_tools.make_channel_code(self.channel_metadata)
obspy_trace.stats.starttime = self.start.isoformat()
obspy_trace.stats.sampling_rate = self.sample_rate
if self.station_metadata.fdsn.id is None:
self.station_metadata.fdsn.id = self.station_metadata.id
obspy_trace.stats.station = self.station_metadata.fdsn.id.upper()
obspy_trace.stats.network = network_code
return obspy_trace
[docs]
def from_obspy_trace(self, obspy_trace):
"""
Fill data from an :class:`obspy.core.Trace`
:param obspy.core.trace obspy_trace: Obspy trace object
"""
if not isinstance(obspy_trace, Trace):
msg = f"Input must be obspy.core.Trace, not {type(obspy_trace)}"
self.logger.error(msg)
raise TypeError(msg)
if obspy_trace.stats.channel[1].lower() in ["e", "q"]:
self.channel_type = "electric"
measurement = "electric"
elif obspy_trace.stats.channel[1].lower() in ["h", "b", "f"]:
self.channel_type = "magnetic"
measurement = "magnetic"
else:
try:
measurement = fdsn_tools.measurement_code_dict_reverse[
obspy_trace.stats.channel[1]
]
except KeyError:
measurement = "auxiliary"
self.channel_type = "auxiliary"
mt_code = fdsn_tools.make_mt_channel(
fdsn_tools.read_channel_code(obspy_trace.stats.channel)
)
self.channel_metadata.component = mt_code
self.channel_metadata.type = measurement
self.sample_rate = obspy_trace.stats.sampling_rate
self.start = obspy_trace.stats.starttime.isoformat()
self.station_metadata.fdsn.id = obspy_trace.stats.station
# Handle None network values
if (
obspy_trace.stats.network is not None
and obspy_trace.stats.network != "None"
):
self.station_metadata.fdsn.network = obspy_trace.stats.network
self.station_metadata.id = obspy_trace.stats.station
self.channel_metadata.units = "counts"
self.ts = obspy_trace.data
self.run_metadata.id = f"sr{int(self.sample_rate)}_001"
[docs]
def plot(self):
"""
Simple plot of the data
:return: figure object
:rtype: matplotlib.figure
"""
return self.data_array.plot()
[docs]
def welch_spectra(self, window_length=2**12, **kwargs):
"""
get welch spectra
:param window_length: DESCRIPTION
:type window_length: TYPE
:param **kwargs: DESCRIPTION
:type **kwargs: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
plot_frequency, power = signal.welch(
self.ts, fs=self.sample_rate, nperseg=window_length, **kwargs
)
return plot_frequency, power
[docs]
def plot_spectra(self, spectra_type="welch", window_length=2**12, **kwargs):
"""
:param spectra_type: spectra type, defaults to "welch"
:type spectra_type: string, optional
:param window_length: window length of the welch method should be a
power of 2, defaults to 2 ** 12
:type window_length: int, optional
:param **kwargs: DESCRIPTION
:type **kwargs: TYPE
"""
from matplotlib import pyplot as plt
if spectra_type == "welch":
plot_frequency, power = self.welch_spectra(
window_length=window_length, **kwargs
)
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.loglog(1.0 / plot_frequency, power, lw=1.5)
ax.set_xlabel("Period (s)", fontdict={"size": 10, "weight": "bold"})
ax.set_ylabel("Power (dB)", fontdict={"size": 10, "weight": "bold"})
ax.axis("tight")
ax.grid(which="both")
ax2 = ax.twiny()
ax2.loglog(plot_frequency, power, lw=0)
ax2.set_xlabel("Frequency (Hz)", fontdict={"size": 10, "weight": "bold"})
ax2.set_xlim([1 / cc for cc in ax.get_xlim()])
plt.show()