# -*- coding: utf-8 -*-
"""
Created on Fri Dec 13 12:40:34 2024
@author: jpeacock
"""
from __future__ import annotations
# =============================================================================
# Imports
# =============================================================================
from typing import Optional
import h5py
import numpy as np
import pandas as pd
import xarray as xr
from mt_metadata.features import FeatureDecimationChannel
from mt_metadata.processing.fourier_coefficients.decimation import Decimation
from mth5.groups import BaseGroup, FeatureChannelDataset, RunGroup
from mth5.helpers import validate_name
from mth5.utils.exceptions import MTH5Error
# =============================================================================
"""feature -> FeatureMasterGroup -> FeatureGroup -> DecimationLevelGroup -> ChannelGroup -> FeatureChannelDataset"""
[docs]
TIME_DOMAIN = ["ts", "time", "time series", "time_series"]
[docs]
FREQUENCY_DOMAIN = ["fc", "frequency", "fourier", "fourier_domain"]
[docs]
class MasterFeaturesGroup(BaseGroup):
"""
Master group container for features associated with Fourier Coefficients or time series.
This class manages the top-level organization of geophysical feature data,
organizing it into feature-specific groups. Features can include various
frequency or time-domain analyses.
Hierarchy
---------
MasterFeatureGroup -> FeatureGroup -> FeatureRunGroup ->
- FC: FeatureDecimationGroup -> FeatureChannelDataset
- Time Series: FeatureChannelDataset
Parameters
----------
group : h5py.Group
HDF5 group object for this MasterFeaturesGroup.
**kwargs
Additional keyword arguments passed to BaseGroup.
Examples
--------
>>> import h5py
>>> from mth5.groups.features import MasterFeaturesGroup
>>> with h5py.File('data.h5', 'r') as f:
... master = MasterFeaturesGroup(f['features'])
... feature_list = master.groups_list
"""
def __init__(self, group: h5py.Group, **kwargs) -> None:
super().__init__(group, **kwargs)
[docs]
def add_feature_group(
self,
feature_name: str,
feature_metadata: Optional[FeatureDecimationChannel] = None,
) -> FeatureGroup:
"""
Add a feature group to the master features container.
Creates a new FeatureGroup with the specified name and optional metadata.
Feature groups organize all runs and decimation levels for a particular feature.
Parameters
----------
feature_name : str
Name for the feature group. Will be validated and formatted.
feature_metadata : FeatureDecimationChannel, optional
Metadata describing the feature. Default is None.
Returns
-------
FeatureGroup
Newly created feature group object.
Examples
--------
>>> master = MasterFeaturesGroup(h5_group)
>>> feature = master.add_feature_group('coherency')
>>> print(feature.name)
'coherency'
"""
return self._add_group(
feature_name,
FeatureGroup,
group_metadata=feature_metadata,
match="name",
)
[docs]
def get_feature_group(self, feature_name: str) -> FeatureGroup:
"""
Retrieve a feature group by name.
Parameters
----------
feature_name : str
Name of the feature group to retrieve.
Returns
-------
FeatureGroup
The requested feature group.
Raises
------
MTH5Error
If the feature group does not exist.
Examples
--------
>>> master = MasterFeaturesGroup(h5_group)
>>> feature = master.get_feature_group('coherency')
>>> print(feature.name)
'coherency'
"""
return self._get_group(feature_name, FeatureGroup)
[docs]
def remove_feature_group(self, feature_name: str) -> None:
"""
Remove a feature group from the master container.
Deletes the specified feature group and its associated data from the
HDF5 file. Note that this operation removes the reference but does not
reduce the file size; copy desired data to a new file for size reduction.
Parameters
----------
feature_name : str
Name of the feature group to remove.
Raises
------
MTH5Error
If the feature group does not exist.
Examples
--------
>>> master = MasterFeaturesGroup(h5_group)
>>> master.remove_feature_group('coherency')
"""
self._remove_group(feature_name)
[docs]
class FeatureGroup(BaseGroup):
"""
Container for a single feature set with all associated runs and decimation levels.
This class manages feature-specific data including all processing runs and
decimation levels. Features can include both Fourier Coefficient and time series data.
Hierarchy
---------
FeatureGroup -> FeatureRunGroup ->
- FC: FeatureDecimationLevel -> FeatureChannelDataset
- TS: FeatureChannelDataset
Parameters
----------
group : h5py.Group
HDF5 group object for this FeatureGroup.
feature_metadata : optional
Metadata specific to this feature. Should include description and parameters.
**kwargs
Additional keyword arguments passed to BaseGroup.
Notes
-----
Feature metadata should be specific to the feature and include descriptions
of the feature and any parameters used in its computation.
Examples
--------
>>> feature = FeatureGroup(h5_group, feature_metadata=metadata)
>>> run_group = feature.add_feature_run_group('run_1', domain='fc')
"""
def __init__(
self,
group: h5py.Group,
feature_metadata: Optional[object] = None,
**kwargs,
) -> None:
super().__init__(group, group_metadata=feature_metadata, **kwargs)
[docs]
def add_feature_run_group(
self,
feature_name: str,
feature_run_metadata: Optional[object] = None,
domain: str = "fc",
) -> object:
"""
Add a feature run group for a single feature.
Creates either a Fourier Coefficient run group or a time series run group
based on the specified domain. The domain can be determined from the metadata
or explicitly provided.
Parameters
----------
feature_name : str
Name for the feature run group.
feature_run_metadata : optional
Metadata for the feature run. If provided, domain is extracted from
metadata.domain attribute. Default is None.
domain : str, default='fc'
Domain type for the data. Must be one of:
- 'fc', 'frequency', 'fourier', 'fourier_domain': Fourier Coefficients
- 'ts', 'time', 'time series', 'time_series': Time series
Returns
-------
FeatureFCRunGroup or FeatureTSRunGroup
Newly created feature run group.
Raises
------
ValueError
If domain is not recognized.
AttributeError
If metadata does not have a domain attribute when metadata is provided.
Examples
--------
>>> feature = FeatureGroup(h5_group)
>>> fc_run = feature.add_feature_run_group('processing_run_1', domain='fc')
>>> ts_run = feature.add_feature_run_group('ts_analysis', domain='ts')
"""
if feature_run_metadata is not None:
try:
domain = feature_run_metadata.domain
except AttributeError:
raise AttributeError(
"Could not find attribute 'domain' in metadata object"
)
if domain in FREQUENCY_DOMAIN:
return self._add_group(
feature_name,
FeatureFCRunGroup,
group_metadata=feature_run_metadata,
match="id",
)
elif domain in TIME_DOMAIN:
return self._add_group(
feature_name,
FeatureTSRunGroup,
group_metadata=feature_run_metadata,
match="id",
)
else:
raise ValueError(
f"feature_type {domain} not supported. Use either 'fc' "
"for Fourier Coefficent or 'ts' for time series."
)
[docs]
def get_feature_run_group(
self,
feature_name: str,
domain: str = "frequency",
) -> object:
"""
Retrieve a feature run group by name and domain type.
Parameters
----------
feature_name : str
Name of the feature run group to retrieve.
domain : str, default='frequency'
Domain type. Must be one of:
- 'fc', 'frequency', 'fourier', 'fourier_domain': Fourier Coefficients
- 'ts', 'time', 'time series', 'time_series': Time series
Returns
-------
FeatureFCRunGroup or FeatureTSRunGroup
The requested feature run group.
Raises
------
ValueError
If domain is not recognized.
MTH5Error
If the feature run group does not exist.
Examples
--------
>>> feature = FeatureGroup(h5_group)
>>> fc_run = feature.get_feature_run_group('processing_run_1', domain='fc')
"""
if domain in FREQUENCY_DOMAIN:
return self._get_group(feature_name, FeatureFCRunGroup)
elif domain in TIME_DOMAIN:
return self._get_group(feature_name, FeatureTSRunGroup)
else:
raise ValueError(
f"feature_type {domain} not supported. Use either 'fc' "
"for Fourier Coefficent or 'ts' for time series."
)
[docs]
def remove_feature_run_group(self, feature_name: str) -> None:
"""
Remove a feature run group.
Deletes the specified feature run group and all its associated data.
Note that deletion removes the reference but does not reduce HDF5 file size.
Parameters
----------
feature_name : str
Name of the feature run group to remove.
Raises
------
MTH5Error
If the feature run group does not exist.
Examples
--------
>>> feature = FeatureGroup(h5_group)
>>> feature.remove_feature_run_group('processing_run_1')
"""
self._remove_group(feature_name)
[docs]
class FeatureTSRunGroup(BaseGroup):
"""
Container for time series features from a processing or analysis run.
This class wraps a RunGroup to manage time series data features while
maintaining compatibility with the feature hierarchy structure.
Parameters
----------
group : h5py.Group
HDF5 group object for this FeatureTSRunGroup.
feature_run_metadata : optional
Metadata for the feature run (same type as timeseries.Run).
**kwargs
Additional keyword arguments passed to BaseGroup.
Notes
-----
This class uses methods from RunGroup for channel management, which may
have performance implications due to multiple RunGroup instantiations.
Examples
--------
>>> ts_run = FeatureTSRunGroup(h5_group, feature_run_metadata=metadata)
>>> channel = ts_run.add_feature_channel('Ex', 'electric', data)
"""
def __init__(
self,
group: h5py.Group,
feature_run_metadata: Optional[object] = None,
**kwargs,
) -> None:
super().__init__(group, group_metadata=feature_run_metadata, **kwargs)
### Use methods from RunGroup (might be slow cause initiating multiple
### RunGroups)?
self._run_group = RunGroup(group, feature_run_metadata=None)
[docs]
def add_feature_channel(
self,
channel_name: str,
channel_type: str,
data: Optional[np.ndarray] = None,
channel_dtype: str = "int32",
shape: Optional[tuple] = None,
max_shape: tuple = (None,),
chunks: bool = True,
channel_metadata: Optional[object] = None,
**kwargs,
) -> object:
"""
Add a time series channel to the feature run group.
Creates a new channel for time series data with the specified properties
and optional metadata. Channel metadata should be a timeseries.Channel object.
Parameters
----------
channel_name : str
Name for the channel.
channel_type : str
Type of channel (e.g., 'electric', 'magnetic').
data : np.ndarray, optional
Initial data for the channel. Default is None.
channel_dtype : str, default='int32'
Data type for the channel.
shape : tuple, optional
Shape of the channel data. Default is None.
max_shape : tuple, default=(None,)
Maximum shape for expandable dimensions.
chunks : bool, default=True
Whether to use chunking for the dataset.
channel_metadata : optional
Metadata object (timeseries.Channel type). Default is None.
**kwargs
Additional keyword arguments for dataset creation.
Returns
-------
object
Channel object from RunGroup.
Examples
--------
>>> ts_run = FeatureTSRunGroup(h5_group)
>>> channel = ts_run.add_feature_channel(
... 'Ex', 'electric', data=np.arange(1000))
"""
return self._run_group.add_channel(
channel_name,
channel_type,
data,
channel_dtype=channel_dtype,
shape=shape,
max_shape=max_shape,
chunks=chunks,
channel_metadata=channel_metadata,
**kwargs,
)
[docs]
def get_feature_channel(self, channel_name: str) -> object:
"""
Retrieve a feature channel by name.
Parameters
----------
channel_name : str
Name of the channel to retrieve.
Returns
-------
object
Channel object from RunGroup.
Raises
------
MTH5Error
If the channel does not exist.
Examples
--------
>>> ts_run = FeatureTSRunGroup(h5_group)
>>> channel = ts_run.get_feature_channel('Ex')
"""
return self._run_group.get_channel(channel_name)
[docs]
def remove_feature_channel(self, channel_name: str) -> None:
"""
Remove a feature channel from the run group.
Parameters
----------
channel_name : str
Name of the channel to remove.
Raises
------
MTH5Error
If the channel does not exist.
Examples
--------
>>> ts_run = FeatureTSRunGroup(h5_group)
>>> ts_run.remove_feature_channel('Ex')
"""
self._run_group.remove_channel(channel_name)
[docs]
class FeatureFCRunGroup(BaseGroup):
"""
Container for Fourier Coefficient features from a processing run.
This class manages Fourier Coefficient data organized by decimation levels,
each containing multiple frequency channels with time-frequency data.
Hierarchy
---------
FeatureFCRunGroup -> FeatureDecimationGroup -> FeatureChannelDataset
Attributes
----------
metadata : Decimation
Metadata including:
- list of decimation levels
- start time (earliest)
- end time (latest)
- method (fft, wavelet, ...)
- list of channels used
- starting sample rate
- bands used
- type (TS or FC)
Parameters
----------
group : h5py.Group
HDF5 group object for this FeatureFCRunGroup.
feature_run_metadata : optional
Decimation metadata for the feature run. Default is None.
**kwargs
Additional keyword arguments passed to BaseGroup.
Examples
--------
>>> fc_run = FeatureFCRunGroup(h5_group, feature_run_metadata=metadata)
>>> decimation = fc_run.add_decimation_level('level_0', dec_metadata)
"""
def __init__(
self,
group: h5py.Group,
feature_run_metadata: Optional[Decimation] = None,
**kwargs,
) -> None:
super().__init__(group, group_metadata=feature_run_metadata, **kwargs)
@BaseGroup.metadata.getter
@property
[docs]
def decimation_level_summary(self) -> pd.DataFrame:
"""
Get a summary of all decimation levels in the run.
Returns a pandas DataFrame with information about each decimation level
including decimation factor, time range, and HDF5 reference.
Returns
-------
pd.DataFrame
DataFrame with columns:
- name : str
Decimation level name
- start : datetime64[ns]
Start time of the decimation level
- end : datetime64[ns]
End time of the decimation level
- hdf5_reference : h5py.ref_dtype
HDF5 reference to the decimation level group
Examples
--------
>>> fc_run = FeatureFCRunGroup(h5_group)
>>> summary = fc_run.decimation_level_summary
>>> print(summary[['name', 'start', 'end']])
"""
ch_list = []
for key, group in self.hdf5_group.items():
try:
ch_type = group.attrs["mth5_type"]
if ch_type in ["FeatureDecimation"]:
ch_list.append(
(
group.attrs["decimation_level"],
group.attrs["time_period.start"].split("+")[0],
group.attrs["time_period.end"].split("+")[0],
group.ref,
)
)
except KeyError as error:
self.logger.debug(f"Could not find key: {error}")
ch_summary = np.array(
ch_list,
dtype=np.dtype(
[
("name", "U20"),
("start", "datetime64[ns]"),
("end", "datetime64[ns]"),
("hdf5_reference", h5py.ref_dtype),
]
),
)
return pd.DataFrame(ch_summary)
[docs]
def add_decimation_level(
self,
decimation_level_name: str,
feature_decimation_level_metadata: Optional[object] = None,
) -> FeatureDecimationGroup:
"""
Add a decimation level group to the feature run.
Parameters
----------
decimation_level_name : str
Name for the decimation level.
feature_decimation_level_metadata : optional
Metadata for the decimation level. Default is None.
Returns
-------
FeatureDecimationGroup
Newly created decimation level group.
Examples
--------
>>> fc_run = FeatureFCRunGroup(h5_group)
>>> decimation = fc_run.add_decimation_level('level_0', dec_metadata)
>>> print(decimation.name)
'level_0'
"""
return self._add_group(
decimation_level_name,
FeatureDecimationGroup,
group_metadata=feature_decimation_level_metadata,
match="id",
)
[docs]
def get_decimation_level(
self, decimation_level_name: str
) -> FeatureDecimationGroup:
"""
Retrieve a decimation level group by name.
Parameters
----------
decimation_level_name : str
Name of the decimation level to retrieve.
Returns
-------
FeatureDecimationGroup
The requested decimation level group.
Raises
------
MTH5Error
If the decimation level does not exist.
Examples
--------
>>> fc_run = FeatureFCRunGroup(h5_group)
>>> decimation = fc_run.get_decimation_level('level_0')
"""
return self._get_group(decimation_level_name, FeatureDecimationGroup)
[docs]
def remove_decimation_level(self, decimation_level_name: str) -> None:
"""
Remove a decimation level from the feature run.
Parameters
----------
decimation_level_name : str
Name of the decimation level to remove.
Raises
------
MTH5Error
If the decimation level does not exist.
Examples
--------
>>> fc_run = FeatureFCRunGroup(h5_group)
>>> fc_run.remove_decimation_level('level_0')
"""
self._remove_group(decimation_level_name)
# def supports_aurora_processing_config(
# self, processing_config, remote
# ) -> bool:
# """
# An "all-or-nothing" check: Return True if every (valid) decimation needed to satisfy the processing_config
# is available in the FCGroup (self) otherwise return False (and we will build all FCs).
# Logic:
# 1. Get a list of all fc groups in the FCGroup (self)
# 2. Loop the processing_config decimations, checking if there is a corresponding, already built FCDecimation
# in the FCGroup.
# Parameters
# ----------
# processing_config: aurora.config.metadata.processing.Processing
# remote: bool
# Returns
# -------
# """
# pre_existing_fc_decimation_ids_to_check = self.groups_list
# levels_present = np.full(processing_config.num_decimation_levels, False)
# for i, dec_level in enumerate(processing_config.decimations):
# # Quit checking if dec_level wasn't there
# if i > 0:
# if not levels_present[i - 1]:
# return False
# # iterate over existing decimations
# for fc_decimation_id in pre_existing_fc_decimation_ids_to_check:
# fc_dec_group = self.get_decimation_level(fc_decimation_id)
# fc_decimation = fc_dec_group.metadata
# levels_present[i] = fc_decimation.has_fcs_for_aurora_processing(
# dec_level, remote
# )
# if levels_present[i]:
# pre_existing_fc_decimation_ids_to_check.remove(
# fc_decimation_id
# ) # no need to check this one again
# break # break inner for-loop over decimations
# return levels_present.all()
[docs]
class FeatureDecimationGroup(BaseGroup):
"""
Container for a single decimation level with multiple Fourier Coefficient channels.
This class manages Fourier Coefficient data organized by frequency, time, and channel.
Data is assumed to be uniformly sampled in both frequency and time domains.
Hierarchy
---------
FeatureDecimationGroup -> FeatureChannelDataset (multiple channels)
Data Assumptions
----------------
1. Data are uniformly sampled in frequency domain
2. Data are uniformly sampled in time domain
3. FFT moving window has uniform step size
Attributes
----------
start time : datetime
Start time of the decimation level
end time : datetime
End time of the decimation level
channels : list
List of channel names in this decimation level
decimation_factor : int
Factor by which data was decimated
decimation_level : int
Level index in decimation hierarchy
decimation_sample_rate : float
Sample rate after decimation (Hz)
method : str
Method used (FFT, wavelet, etc.)
anti_alias_filter : optional
Anti-aliasing filter used
prewhitening_type : optional
Type of prewhitening applied
harmonics_kept : list or 'all'
Harmonic indices kept in the data
window : dict
Window parameters (length, overlap, type, sample rate)
bands : list
Frequency bands in the data
Parameters
----------
group : h5py.Group
HDF5 group object for this FeatureDecimationGroup.
decimation_level_metadata : optional
Metadata for the decimation level. Default is None.
**kwargs
Additional keyword arguments passed to BaseGroup.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group, metadata)
>>> channel = decimation.add_channel('Ex', fc_data=fc_array, fc_metadata=ch_metadata)
"""
def __init__(
self,
group: h5py.Group,
decimation_level_metadata: Optional[object] = None,
**kwargs,
) -> None:
super().__init__(group, group_metadata=decimation_level_metadata, **kwargs)
@BaseGroup.metadata.getter
@property
[docs]
def channel_summary(self) -> pd.DataFrame:
"""
Get a summary of all channels in this decimation level.
Returns a pandas DataFrame with detailed information about each Fourier
Coefficient channel including time ranges, dimensions, and sampling rates.
Returns
-------
pd.DataFrame
DataFrame with columns:
- name : str
Channel name
- start : datetime64[ns]
Start time of the channel data
- end : datetime64[ns]
End time of the channel data
- n_frequency : int64
Number of frequency bins
- n_windows : int64
Number of time windows
- sample_rate_decimation_level : float64
Decimation level sample rate (Hz)
- sample_rate_window_step : float64
Sample rate of window stepping (Hz)
- units : str
Physical units of the data
- hdf5_reference : h5py.ref_dtype
HDF5 reference to the channel dataset
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> summary = decimation.channel_summary
>>> print(summary[['name', 'n_frequency', 'n_windows']])
"""
ch_list = []
for key, group in self.hdf5_group.items():
try:
ch_type = group.attrs["mth5_type"]
if ch_type in ["FCChannel"]:
ch_list.append(
(
group.attrs["name"],
group.attrs["time_period.start"].split("+")[0],
group.attrs["time_period.end"].split("+")[0],
group.shape[0],
group.shape[1],
group.attrs["sample_rate_decimation_level"],
group.attrs["sample_rate_window_step"],
group.attrs["units"],
group.ref,
)
)
except KeyError as error:
self.logger.debug(f"Cannot find a key: {error}")
ch_summary = np.array(
ch_list,
dtype=np.dtype(
[
("name", "U20"),
("start", "datetime64[ns]"),
("end", "datetime64[ns]"),
("n_frequency", np.int64),
("n_windows", np.int64),
("sample_rate_decimation_level", np.float64),
("sample_rate_window_step", np.float64),
("units", "U25"),
("hdf5_reference", h5py.ref_dtype),
]
),
)
return pd.DataFrame(ch_summary)
[docs]
def from_dataframe(
self,
df: pd.DataFrame,
channel_key: str,
time_key: str = "time",
frequency_key: str = "frequency",
) -> None:
"""
Load Fourier Coefficient data from a pandas DataFrame.
Assumes the channel_key column contains complex coefficient values
organized with time and frequency dimensions.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing the coefficient data.
channel_key : str
Name of the column containing coefficient values.
time_key : str, default='time'
Name of the time coordinate column.
frequency_key : str, default='frequency'
Name of the frequency coordinate column.
Raises
------
TypeError
If df is not a pandas DataFrame.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> decimation.from_dataframe(df, channel_key='Ex', time_key='time')
"""
if not isinstance(df, pd.DataFrame):
msg = f"Must input a pandas dataframe not {type(df)}"
self.logger.error(msg)
raise TypeError(msg)
for col in df.columns:
df[col] = np.complex128(df[col])
xrds = df[col].to_xarray()
self.add_channel(col, fc_data=xrds.to_numpy())
[docs]
def from_xarray(
self,
data_array: xr.DataArray | xr.Dataset,
sample_rate_decimation_level: float,
) -> None:
"""
Load Fourier Coefficient data from an xarray DataArray or Dataset.
Automatically extracts metadata (time, frequency, units) from the xarray
object and creates appropriate FeatureChannelDataset instances for each
variable or the single DataArray.
Parameters
----------
data_array : xr.DataArray or xr.Dataset
Input xarray object with 'time' and 'frequency' coordinates and
dimensions ['time', 'frequency'] (or transposed variant).
sample_rate_decimation_level : float
Sample rate of the decimation level (Hz).
Raises
------
TypeError
If data_array is not an xarray Dataset or DataArray.
Notes
-----
Automatically handles both (time, frequency) and (frequency, time) dimension ordering.
Units are extracted from xarray attributes if available.
Examples
--------
>>> import xarray as xr
>>> import numpy as np
>>> decimation = FeatureDecimationGroup(h5_group)
Create sample xarray data:
>>> times = np.arange('2023-01-01', '2023-01-02', dtype='datetime64[s]')
>>> freqs = np.linspace(0.01, 100, 256)
>>> data_array = np.random.randn(len(times), len(freqs)) + \\
... 1j * np.random.randn(len(times), len(freqs))
>>> xr_data = xr.DataArray(
... data_array,
... dims=['time', 'frequency'],
... coords={'time': times, 'frequency': freqs},
... name='Ex',
... attrs={'units': 'mV/km'}
... )
Load into decimation group:
>>> decimation.from_xarray(xr_data, sample_rate_decimation_level=0.5)
"""
if not isinstance(data_array, (xr.Dataset, xr.DataArray)):
msg = f"Must input a xarray Dataset or DataArray not {type(data_array)}"
self.logger.error(msg)
raise TypeError(msg)
ch_metadata = FeatureDecimationChannel()
ch_metadata.time_period.start = data_array.time[0].values
ch_metadata.time_period.end = data_array.time[-1].values
ch_metadata.sample_rate_decimation_level = sample_rate_decimation_level
ch_metadata.frequency_min = data_array.coords["frequency"].data.min()
ch_metadata.frequency_max = data_array.coords["frequency"].data.max()
step_size = (
data_array.coords["time"].data[1] - data_array.coords["time"].data[0]
)
ch_metadata.sample_rate_window_step = step_size / np.timedelta64(1, "s")
try:
ch_metadata.units = data_array.units
except AttributeError:
self.logger.debug("Could not find 'units' in xarray")
if isinstance(data_array, xr.DataArray):
self.add_channel(
data_array.name,
fc_data=data_array.to_numpy(),
fc_metadata=ch_metadata,
)
else:
for ch in data_array.data_vars.keys():
ch_metadata.name = ch
if ch in self.channel_summary.name.to_list():
self.remove_channel(ch)
# time index should be the first index
if data_array[ch].time.size == data_array[ch].shape[0]:
self.add_channel(
ch,
fc_data=data_array[ch].to_numpy(),
fc_metadata=ch_metadata,
dtype=data_array[ch].dtype,
)
elif data_array[ch].time.size == data_array[ch].shape[1]:
self.add_channel(
ch,
fc_data=data_array[ch].to_numpy().T,
fc_metadata=ch_metadata,
dtype=data_array[ch].dtype,
)
return
[docs]
def to_xarray(self, channels: Optional[list] = None) -> xr.Dataset:
"""
Create an xarray Dataset from Fourier Coefficient channels.
If no channels are specified, all channels in the decimation level
are included. Each channel becomes a data variable in the resulting Dataset.
Parameters
----------
channels : list, optional
List of channel names to include. If None, all channels are used.
Default is None.
Returns
-------
xr.Dataset
xarray Dataset with channels as data variables and 'time' and
'frequency' as shared coordinates.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> xr_data = decimation.to_xarray()
>>> print(xr_data.data_vars)
Data variables:
Ex (time, frequency) complex128
Ey (time, frequency) complex128
Get specific channels:
>>> subset = decimation.to_xarray(channels=['Ex', 'Ey'])
"""
if channels is None:
channels = self.groups_list
ch_dict = {}
for ch in channels:
ch_ds = self.get_channel(ch)
ch_dict[ch] = ch_ds.to_xarray()
return xr.Dataset(ch_dict)
[docs]
def from_numpy_array(
self,
nd_array: np.ndarray,
ch_name: str | list,
) -> None:
"""
Load Fourier Coefficient data from a numpy array.
Assumes array shape is either (n_frequencies, n_windows) for a single
channel or (n_channels, n_frequencies, n_windows) for multiple channels.
Parameters
----------
nd_array : np.ndarray
Input numpy array containing coefficient data.
ch_name : str or list
Channel name (for 2D array) or list of channel names
(for 3D array).
Raises
------
TypeError
If nd_array is not a numpy ndarray.
ValueError
If array shape is not (n_frequencies, n_windows) or
(n_channels, n_frequencies, n_windows).
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
Load single channel:
>>> data_2d = np.random.randn(256, 100) + 1j * np.random.randn(256, 100)
>>> decimation.from_numpy_array(data_2d, ch_name='Ex')
Load multiple channels:
>>> data_3d = np.random.randn(2, 256, 100) + 1j * np.random.randn(2, 256, 100)
>>> decimation.from_numpy_array(data_3d, ch_name=['Ex', 'Ey'])
"""
if not isinstance(nd_array, np.ndarray):
msg = f"Must input a numpy ndarray not {type(nd_array)}"
self.logger.error(msg)
raise TypeError(msg)
if len(nd_array.shape) == 3:
for index, ch in zip(nd_array.shape[0], ch_name):
self.add_channel(ch, fc_data=nd_array[index])
elif len(nd_array.shape) == 2:
self.add_channel(ch_name, fc_data=nd_array)
else:
raise ValueError(
"input array must be shaped (n_frequencies, n_windows) or "
"(n_channels, n_frequencies, n_windows)"
)
[docs]
def add_channel(
self,
fc_name: str,
fc_data: Optional[np.ndarray | xr.DataArray | xr.Dataset | pd.DataFrame] = None,
fc_metadata: Optional[FeatureDecimationChannel] = None,
max_shape: tuple = (None, None),
chunks: bool = True,
dtype: type = complex,
**kwargs,
) -> FeatureChannelDataset:
"""
Add a Fourier Coefficient channel to the decimation level.
Creates a new FeatureChannelDataset for a single channel at a single
decimation level. Input data can be provided as numpy array, xarray,
DataFrame, or created empty.
Parameters
----------
fc_name : str
Name for the Fourier Coefficient channel.
fc_data : np.ndarray, xr.DataArray, xr.Dataset, pd.DataFrame, optional
Input data. Can be numpy array (time, frequency) or xarray/DataFrame
format. Default is None (creates empty dataset).
fc_metadata : FeatureDecimationChannel, optional
Metadata for the channel. Default is None.
max_shape : tuple, default=(None, None)
Maximum shape for HDF5 dataset dimensions (expandable if None).
chunks : bool, default=True
Whether to use HDF5 chunking.
dtype : type, default=complex
Data type for the dataset (e.g., complex, float, int).
**kwargs
Additional keyword arguments for HDF5 dataset creation.
Returns
-------
FeatureChannelDataset
Newly created FeatureChannelDataset object.
Raises
------
TypeError
If fc_data type is not supported or metadata type mismatch.
RuntimeError or OSError
If channel already exists (will return existing channel).
Notes
-----
Data layout assumes (time, frequency) organization:
- time index: window start times
- frequency index: harmonic indices or float values
- data: complex Fourier coefficients
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> metadata = FeatureDecimationChannel(name='Ex')
Create from numpy array:
>>> fc_data = np.random.randn(100, 256) + 1j * np.random.randn(100, 256)
>>> channel = decimation.add_channel('Ex', fc_data=fc_data, fc_metadata=metadata)
Create empty channel (expandable):
>>> channel = decimation.add_channel('Ex', fc_metadata=metadata)
"""
fc_name = validate_name(fc_name)
if fc_metadata is None:
fc_metadata = FeatureDecimationChannel(name=fc_name)
if fc_data is not None:
if not isinstance(
fc_data, (np.ndarray, xr.DataArray, xr.Dataset, pd.DataFrame)
):
msg = (
"Need to input a numpy.array, xarray.DataArray, "
f"xr.Dataset, pd.DataFrame not {type(fc_data)}"
)
self.logger.exception(msg)
raise TypeError(msg)
else:
chunks = True
fc_data = np.zeros((1, 1), dtype=dtype)
try:
dataset = self.hdf5_group.create_dataset(
fc_name,
data=fc_data,
dtype=dtype,
chunks=chunks,
maxshape=max_shape,
**self.dataset_options,
)
fc_dataset = FeatureChannelDataset(dataset, dataset_metadata=fc_metadata)
except (OSError, RuntimeError, ValueError) as error:
self.logger.error(error)
msg = (
f"estimate {fc_metadata.name} already exists, returning existing group."
)
self.logger.debug(msg)
fc_dataset = self.get_channel(fc_metadata.name)
return fc_dataset
[docs]
def get_channel(self, fc_name: str) -> FeatureChannelDataset:
"""
Retrieve a Fourier Coefficient channel by name.
Parameters
----------
fc_name : str
Name of the channel to retrieve.
Returns
-------
FeatureChannelDataset
The requested FeatureChannelDataset object.
Raises
------
MTH5Error
If the channel does not exist.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> channel = decimation.get_channel('Ex')
>>> data = channel.to_numpy()
"""
fc_name = validate_name(fc_name)
try:
fc_dataset = self.hdf5_group[fc_name]
fc_metadata = FeatureDecimationChannel(**dict(fc_dataset.attrs))
return FeatureChannelDataset(fc_dataset, dataset_metadata=fc_metadata)
except KeyError:
msg = f"{fc_name} does not exist, check groups_list for existing names"
self.logger.error(msg)
raise MTH5Error(msg)
except OSError as error:
self.logger.error(error)
raise MTH5Error(error)
[docs]
def remove_channel(self, fc_name: str) -> None:
"""
Remove a Fourier Coefficient channel from the decimation level.
Deletes the channel from the HDF5 file. Note that this removes the
reference but does not reduce file size.
Parameters
----------
fc_name : str
Name of the channel to remove.
Raises
------
MTH5Error
If the channel does not exist.
Notes
-----
To reduce HDF5 file size, copy desired data to a new file.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> decimation.remove_channel('Ex')
"""
fc_name = validate_name(fc_name.lower())
try:
del self.hdf5_group[fc_name]
self.logger.info(
"Deleting a estimate does not reduce the HDF5"
"file size it simply remove the reference. If "
"file size reduction is your goal, simply copy"
" what you want into another file."
)
except KeyError:
msg = f"{fc_name} does not exist, check groups_list for existing names"
self.logger.error(msg)
raise MTH5Error(msg)
[docs]
def add_weights(
self,
weight_name: str,
weight_data: Optional[np.ndarray] = None,
weight_metadata: Optional[object] = None,
max_shape: tuple = (None, None, None),
chunks: bool = True,
**kwargs,
) -> None:
"""
Add weight or masking data for Fourier Coefficients.
Creates a dataset to store weights or masks for quality control,
frequency band selection, or time window filtering.
Parameters
----------
weight_name : str
Name for the weight dataset.
weight_data : np.ndarray, optional
Weight values. Default is None.
weight_metadata : optional
Metadata for the weight dataset. Default is None.
max_shape : tuple, default=(None, None, None)
Maximum shape for expandable dimensions.
chunks : bool, default=True
Whether to use HDF5 chunking.
**kwargs
Additional keyword arguments for HDF5 dataset creation.
Notes
-----
Weight datasets can track:
- weight_channel: Per-channel weights
- weight_band: Per-frequency-band weights
- weight_time: Per-time-window weights
This method is a placeholder for future implementation.
Examples
--------
>>> decimation = FeatureDecimationGroup(h5_group)
>>> decimation.add_weights('coherency_weights', weight_data=weights)
"""