Source code for mth5.timeseries.scipy_filters

# -*- coding: utf-8 -*-
"""
Scipy filter wrappers for xarray.

This module provides xarray-compatible wrappers for scipy.signal filtering
functions, enabling efficient filtering operations on labeled N-dimensional
arrays with automatic dimension handling.

Notes
-----
- Adapted from xr-scipy: https://github.com/fujiisoup/xr-scipy
- Filters can be applied along any dimension with automatic sampling rate detection.
- Supports both IIR and FIR filter types with forward/backward filtering.

Examples
--------
Apply a bandpass filter to an xarray DataArray::

    >>> import xarray as xr
    >>> import numpy as np
    >>> data = xr.DataArray(np.random.randn(1000), dims=['time'])
    >>> data['time'] = np.arange(1000) / 100.0  # 100 Hz
    >>> filtered = data.sps_filters.bandpass(10, 40)  # 10-40 Hz

"""

from __future__ import annotations

import warnings
from fractions import Fraction
from typing import Any

import numpy as np
import pandas as pd
import scipy.signal
import xarray as xr
from loguru import logger


# =============================================================================
# Imports
# =============================================================================


try:
    from scipy.signal import sosfiltfilt
except ImportError:
[docs] sosfiltfilt = None
# ============================================================================= def _firwin_ba(*args, **kwargs): if not kwargs.get("pass_zero"): args = (args[0] + 1,) + args[1:] # numtaps must be odd return scipy.signal.firwin(*args, **kwargs), np.array([1]) _BA_FUNCS = { "iir": scipy.signal.iirfilter, "fir": _firwin_ba, } _ORDER_DEFAULTS = { "iir": 8, "fir": 29, } ### Warnings
[docs] class UnevenSamplingWarning(Warning): pass
[docs] class FilteringNaNWarning(Warning): pass
[docs] class DecimationWarning(Warning): pass
warnings.filterwarnings("always", category=UnevenSamplingWarning) warnings.filterwarnings("always", category=FilteringNaNWarning) warnings.filterwarnings("always", category=DecimationWarning)
[docs] def get_maybe_only_dim(darray: xr.DataArray | xr.Dataset, dim: str | None) -> str: """ Determine the dimension along which to operate. If `dim` is None and the array is 1-D, returns the single dimension. Otherwise, returns the provided `dim`. Parameters ---------- darray : xarray.DataArray | xarray.Dataset An xarray DataArray or Dataset. dim : str | None Dimension name, or None to auto-detect for 1-D arrays. Returns ------- str The dimension name. Raises ------ ValueError If `dim` is None and the array is not 1-D. """ if dim is None: if len(darray.dims) == 1: if isinstance(darray, xr.DataArray): return str(darray.dims[0]) elif isinstance(darray, xr.Dataset): return str(list(darray.sizes.keys())[0]) else: raise ValueError("Specify the dimension") else: return dim
[docs] def get_maybe_last_dim_axis( darray: xr.DataArray | xr.Dataset, dim: str | None = None ) -> tuple[str, int]: """ Get dimension name and axis index. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Input array. dim : str | None, default None Dimension name. If None, uses the last dimension. Returns ------- tuple[str, int] Dimension name and corresponding axis index. """ if dim is None: axis = darray.ndim - 1 dim = str(darray.dims[axis]) else: axis = darray.get_axis_num(dim) dim = str(dim) return dim, axis
[docs] def get_sampling_step( darray: xr.DataArray | xr.Dataset, dim: str | None = None, rtol: float = 1e-3 ) -> float: """ Compute the sampling step along a dimension. Automatically detects time unit (ns, us, ms, s) and scales accordingly. Issues a warning if sampling is not uniform (average vs first step mismatch). Parameters ---------- darray : xarray.DataArray | xarray.Dataset Input array with coordinates. dim : str | None, default None Dimension name. Auto-detected for 1-D arrays. rtol : float, default 1e-3 Relative tolerance for detecting uneven sampling. Returns ------- float Sampling step in appropriate units (seconds for time coordinates). Raises ------ ValueError If the coordinate has fewer than 2 samples. Warnings -------- UnevenSamplingWarning If average sampling step differs from first step by more than rtol. Examples -------- Get sampling step from a time series:: >>> dt = get_sampling_step(data, dim='time') >>> sample_rate = 1.0 / dt """ dim = get_maybe_only_dim(darray, dim) coord = darray.coords[dim] if len(coord) < 2: raise ValueError( f"Cannot compute sampling step for coordinate with less than 2 samples. " f"Got {len(coord)} samples." ) if "ns" in coord.dtype.descr[0][1]: t_scale = 1e9 elif "us" in coord.dtype.descr[0][1]: t_scale = 1e6 elif "ms" in coord.dtype.descr[0][1]: t_scale = 1e3 else: t_scale = 1 # FIX: Convert timedelta to numeric value before float() for Python 3.11+ compatibility # When subtracting datetime64 values, result is timedelta64 which must be converted dt_avg_raw = coord[-1] - coord[0] dt_first_raw = coord[1] - coord[0] # Convert to float - handle numpy timedelta64, pandas Timedelta, and datetime.timedelta if hasattr(dt_avg_raw, "values"): # xarray DataArray - get the underlying value dt_avg_value = dt_avg_raw.values dt_first_value = dt_first_raw.values else: dt_avg_value = dt_avg_raw dt_first_value = dt_first_raw # Convert timedelta objects to total_seconds() * 1e9 (nanoseconds) if hasattr(dt_avg_value, "total_seconds"): # Python datetime.timedelta dt_avg = (dt_avg_value.total_seconds() * 1e9 / (len(coord) - 1)) / t_scale dt_first = (dt_first_value.total_seconds() * 1e9) / t_scale elif hasattr(dt_avg_value, "view"): # numpy timedelta64 - convert to int64 view dt_avg = (float(dt_avg_value.view("int64")) / (len(coord) - 1)) / t_scale dt_first = float(dt_first_value.view("int64")) / t_scale else: # Already numeric dt_avg = (float(dt_avg_value) / (len(coord) - 1)) / t_scale dt_first = float(dt_first_value) / t_scale if abs(dt_avg - dt_first) > rtol * min(dt_first, dt_avg): # show warning at caller level to see which signal it is related to warnings.warn( f"Average sampling {dt_avg:.3g} != first sampling step {dt_first:.3g}", UnevenSamplingWarning, stacklevel=2, ) return dt_avg
[docs] def frequency_filter( darray: xr.DataArray | xr.Dataset, f_crit: float | list[float] | tuple[float, float], order: int | None = None, irtype: str = "iir", filtfilt: bool = True, apply_kwargs: dict[str, Any] | None = None, in_nyq: bool = False, dim: str | None = None, **kwargs: Any, ) -> xr.DataArray | xr.Dataset: """ Apply a frequency filter to an xarray. Supports IIR (infinite impulse response) and FIR (finite impulse response) filters with optional forward-backward filtering for zero-phase response. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to be filtered. f_crit : float | list[float] | tuple[float, float] Critical frequency or frequencies (in Hz). Scalar for lowpass/highpass, 2-element for bandpass/bandstop. order : int | None, default None Filter order. If None, uses default (8 for IIR, 29 for FIR). irtype : {'iir', 'fir'}, default 'iir' Impulse response type: 'iir' or 'fir'. filtfilt : bool, default True Apply filter forwards and backwards for zero-phase response. apply_kwargs : dict | None, default None Additional kwargs passed to the filter function. in_nyq : bool, default False If True, `f_crit` values are already normalized by Nyquist frequency. dim : str | None, default None Dimension along which to filter. Auto-detected for 1-D arrays. **kwargs Passed to filter design function (scipy.signal.iirfilter or firwin). Returns ------- xarray.DataArray | xarray.Dataset Filtered data with same structure as input. Raises ------ ValueError If `irtype` is not 'iir' or 'fir', or if `dim` is ambiguous. Warnings -------- FilteringNaNWarning If input contains NaN values. Examples -------- Apply a 4th-order IIR lowpass at 10 Hz:: >>> filtered = frequency_filter(data, 10, order=4, btype='low') FIR bandpass filter from 5 to 15 Hz:: >>> filtered = frequency_filter(data, [5, 15], irtype='fir', btype='band') """ if irtype not in _BA_FUNCS: raise ValueError( "Wrong argument for irtype: {}, must be one of {}".format( irtype, _BA_FUNCS.keys() ) ) if order is None: order = _ORDER_DEFAULTS[irtype] if apply_kwargs is None: apply_kwargs = {} dim = get_maybe_only_dim(darray, dim) f_crit_norm = np.asarray(f_crit, dtype=np.float64) if not in_nyq: # normalize by Nyquist frequency f_crit_norm *= 2 * get_sampling_step(darray, dim) if np.any( np.isnan( np.asarray(darray.to_array() if isinstance(darray, xr.Dataset) else darray) ) ): # only warn since simple forward-filter or FIR is valid warnings.warn( "data contains NaNs, filter will propagate them", FilteringNaNWarning, stacklevel=2, ) if sosfiltfilt and irtype == "iir": sos = scipy.signal.iirfilter(order, f_crit_norm, output="sos", **kwargs) if filtfilt: ret = xr.apply_ufunc( sosfiltfilt, sos, darray, input_core_dims=[[], [dim]], output_core_dims=[[dim]], kwargs=apply_kwargs, ) else: ret = xr.apply_ufunc( scipy.signal.sosfilt, sos, darray, input_core_dims=[[], [dim]], output_core_dims=[[dim]], kwargs=apply_kwargs, ) else: b, a = _BA_FUNCS[irtype](order, f_crit_norm, **kwargs) if filtfilt: ret = xr.apply_ufunc( scipy.signal.filtfilt, b, a, darray, input_core_dims=[[], [], [dim]], output_core_dims=[[dim]], kwargs=apply_kwargs, ) else: ret = xr.apply_ufunc( scipy.signal.lfilter, b, a, darray, input_core_dims=[[], [], [dim]], output_core_dims=[[dim]], kwargs=apply_kwargs, ) return ret
def _update_ftype_kwargs(kwargs, iirvalue, firvalue): if kwargs.get("irtype", "iir") == "iir": kwargs.setdefault("btype", iirvalue) else: # fir kwargs.setdefault("pass_zero", firvalue) return kwargs
[docs] def lowpass( darray: xr.DataArray | xr.Dataset, f_cutoff: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply a lowpass filter. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to filter. f_cutoff : float Cutoff frequency in Hz. *args Passed to `frequency_filter`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design (see `frequency_filter`). Returns ------- xarray.DataArray | xarray.Dataset Lowpass-filtered data. Examples -------- Remove components above 50 Hz:: >>> filtered = lowpass(data, 50) """ kwargs = _update_ftype_kwargs(kwargs, "lowpass", True) return frequency_filter(darray, f_cutoff, *args, **kwargs)
[docs] def highpass( darray: xr.DataArray | xr.Dataset, f_cutoff: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply a highpass filter. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to filter. f_cutoff : float Cutoff frequency in Hz. *args Passed to `frequency_filter`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design (see `frequency_filter`). Returns ------- xarray.DataArray | xarray.Dataset Highpass-filtered data. Examples -------- Remove components below 1 Hz:: >>> filtered = highpass(data, 1.0) """ kwargs = _update_ftype_kwargs(kwargs, "highpass", False) return frequency_filter(darray, f_cutoff, *args, **kwargs)
[docs] def bandpass( darray: xr.DataArray | xr.Dataset, f_low: float, f_high: float, *args: Any, **kwargs: Any, ) -> xr.DataArray | xr.Dataset: """ Apply a bandpass filter. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to filter. f_low : float Lower cutoff frequency in Hz. f_high : float Upper cutoff frequency in Hz. *args Passed to `frequency_filter`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design (see `frequency_filter`). Returns ------- xarray.DataArray | xarray.Dataset Bandpass-filtered data. Examples -------- Keep components between 10 and 50 Hz:: >>> filtered = bandpass(data, 10, 50) """ kwargs = _update_ftype_kwargs(kwargs, "bandpass", False) return frequency_filter(darray, [f_low, f_high], *args, **kwargs)
[docs] def bandstop( darray: xr.DataArray | xr.Dataset, f_low: float, f_high: float, *args: Any, **kwargs: Any, ) -> xr.DataArray | xr.Dataset: """ Apply a bandstop (notch) filter. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to filter. f_low : float Lower cutoff frequency in Hz. f_high : float Upper cutoff frequency in Hz. *args Passed to `frequency_filter`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design (see `frequency_filter`). Returns ------- xarray.DataArray | xarray.Dataset Bandstop-filtered data (removes frequencies between f_low and f_high). Examples -------- Remove 50-60 Hz powerline noise:: >>> filtered = bandstop(data, 50, 60) """ kwargs = _update_ftype_kwargs(kwargs, "bandstop", True) return frequency_filter(darray, [f_low, f_high], *args, **kwargs)
# def notch( # darray, # notch_freq, # notch_radius=0.5, # frequency_radius=0.9, # ripple=0.1, # db_stop_limit=5.0, # ): # ford, wn = signal.cheb1ord(wp, ws, 1, dbstop) # b, a = signal.cheby1(1, 0.5, wn, btype="bandstop") # bx = signal.filtfilt(b, a, bx)
[docs] def decimate( darray: xr.Dataset | xr.DataArray, target_sample_rate: float, n_order: int = 8, dim: str | None = None, ) -> xr.DataArray | xr.Dataset: """ Decimate data using Chebyshev filter and downsampling. Applies an 8th-order Chebyshev type I filter with zero-phase filtering (sosfiltfilt) before downsampling. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to decimate. target_sample_rate : float Target sample rate in samples per second (or per space unit). n_order : int, default 8 Order of the Chebyshev type I filter. dim : str | None, default None Dimension to decimate along. Auto-detected for 1-D arrays. Returns ------- xarray.DataArray | xarray.Dataset Decimated data with adjusted coordinates. Raises ------ ValueError If `dim` is None and array is not 1-D. Warnings -------- UserWarning If decimation factor > 13, suggest calling decimate multiple times. Notes ----- If sample_rate / target_sample_rate > 13, call decimate multiple times to avoid aliasing artifacts. Examples -------- Decimate from 100 Hz to 10 Hz:: >>> decimated = decimate(data, target_sample_rate=10.0) """ dim = get_maybe_only_dim(darray, dim) dt = get_sampling_step(darray, dim) q = int(np.rint(1 / (dt * target_sample_rate))) if q > 13: warnings.warn( f"Decimation factor is larger than 13 ({q}), the resulting " "decimated array maybe incorrect. Suggest calling decimate " "multiple times." ) sos = scipy.signal.cheby1(n_order, 0.05, 0.8 / q, output="sos") if sosfiltfilt is None: raise ImportError("sosfiltfilt not available in scipy.signal") ret = xr.apply_ufunc( sosfiltfilt, sos, darray, input_core_dims=[[], [dim]], output_core_dims=[[dim]], kwargs={}, ) return ret.isel(**{dim: slice(None, None, q)})
[docs] def resample_poly( darray: xr.DataArray | xr.Dataset, new_sample_rate: float, dim: str | None = None, pad_type: str = "mean", ) -> xr.DataArray | xr.Dataset: """ Resample using polyphase filtering. Computes rational resampling ratio (up/down) and applies scipy.signal.resample_poly. Automatically handles coordinate updates. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to resample. new_sample_rate : float Target sample rate. dim : str | None, default None Dimension to resample along. Auto-detected for 1-D arrays. pad_type : str, default 'mean' Padding type passed to scipy.signal.resample_poly. Options: 'constant', 'line', 'mean', 'median', etc. Returns ------- xarray.DataArray | xarray.Dataset Resampled data with updated coordinates. Raises ------ ValueError If `dim` is None and array is not 1-D. Warnings -------- UserWarning If new sample rate is not an integer multiple of original rate. Notes ----- In newer scipy versions, data is cast to float and returns float dtype. Examples -------- Resample to 50 Hz:: >>> resampled = resample_poly(data, new_sample_rate=50.0) """ dim = get_maybe_only_dim(darray, dim) old_sample_rate = 1.0 / get_sampling_step(darray, dim) fraction = Fraction(new_sample_rate / old_sample_rate).limit_denominator() # need to resample the time coordinate because it will change and that # is illegal in apply_ufunc. dim = get_maybe_only_dim(darray, dim) ret = xr.apply_ufunc( scipy.signal.resample_poly, darray.astype(float), fraction.numerator, fraction.denominator, input_core_dims=[[dim], [], []], output_core_dims=[[dim]], exclude_dims=set([dim]), kwargs={"padtype": pad_type}, ) dt = get_sampling_step(darray, dim) new_step = 1 / (dt * new_sample_rate) if new_step % 1 == 0: q = int(np.rint(new_step)) # directly downsample without AAF on dimension # this only works if q is an integer, otherwise to # the index gets messed up from fractional spacing new_dim = darray[dim].values[slice(None, None, q)] else: logger.warning( "New sample rate is not an even number of original sample rate. " f"The ratio is {new_step}. Use the new dimensions with caution." ) # need to reset the end time end_time = darray[dim].values[0] + np.timedelta64( int(np.rint(((ret[dim].size - 1) / new_sample_rate) * 1e9)), "ns" ) if dim in ["time"]: new_dim = pd.date_range( darray[dim].values[0], end_time, periods=ret[dim].size, ) else: end_index = ( int(np.rint((ret[dim].size - (darray[dim].size / new_step)))) - 1 ) new_dim = np.linspace( darray[dim].values[0], darray[dim].values[end_index], ret[dim].size ) # check to make sure the dimension size is the same as the new array n_samples_data = len(ret[dim]) n_samples_axis = len(new_dim) if n_samples_data != n_samples_axis: logger.warning( f"conflicting axes sizes {n_samples_data} data and {n_samples_axis}" " axes after resampling" ) logger.info(f"trimming {dim} axis from {n_samples_axis} to {n_samples_data}") new_dim = new_dim[:n_samples_data] ret[dim] = new_dim return ret
[docs] def savgol_filter( darray: xr.DataArray | xr.Dataset, window_length: int, polyorder: int, deriv: int = 0, delta: float | None = None, dim: str | None = None, mode: str = "interp", cval: float = 0.0, ) -> xr.DataArray | xr.Dataset: """ Apply a Savitzky-Golay filter. Smooths data using least-squares polynomial fit over a sliding window. Can also compute derivatives. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to filter. Converted to float64 if not already float. window_length : int Length of the filter window (number of coefficients). Must be positive odd integer. polyorder : int Order of polynomial for fitting. Must be < window_length. deriv : int, default 0 Order of derivative to compute (0 = no differentiation). delta : float | None, default None Sample spacing for derivative computation. Only used if deriv > 0. dim : str | None, default None Dimension along which to filter. Auto-detected for 1-D arrays. mode : str, default 'interp' Extension mode: 'mirror', 'constant', 'nearest', 'wrap', or 'interp'. cval : float, default 0.0 Fill value when mode='constant'. Returns ------- xarray.DataArray | xarray.Dataset Filtered data. Raises ------ ValueError If window_length is not positive odd, polyorder >= window_length, or `dim` is None for multi-dimensional arrays. Examples -------- Smooth with 11-point window, 2nd-order polynomial:: >>> smoothed = savgol_filter(data, window_length=11, polyorder=2) Compute first derivative:: >>> deriv1 = savgol_filter(data, 11, 2, deriv=1, delta=0.01) """ dim = get_maybe_only_dim(darray, dim) if delta is None: delta = get_sampling_step(darray, dim) window_length = int(np.rint(window_length / delta)) if window_length % 2 == 0: # must be odd window_length += 1 return xr.apply_ufunc( scipy.signal.savgol_filter, darray, input_core_dims=[[dim]], output_core_dims=[[dim]], kwargs=dict( window_length=window_length, polyorder=polyorder, deriv=deriv, delta=delta, mode=mode, cval=cval, ), )
[docs] def detrend( darray: xr.DataArray | xr.Dataset, dim: str | None = None, trend_type: str = "linear", ) -> xr.DataArray | xr.Dataset: """ Remove linear or constant trend from data. Parameters ---------- darray : xarray.DataArray | xarray.Dataset Data to detrend. dim : str | None, default None Dimension along which to detrend. Auto-detected for 1-D arrays. trend_type : {'linear', 'constant'}, default 'linear' Type of detrending: 'linear' removes linear trend via least-squares, 'constant' removes mean. Returns ------- xarray.DataArray | xarray.Dataset Detrended data. Raises ------ ValueError If `dim` is None and array is not 1-D. Examples -------- Remove linear trend:: >>> detrended = detrend(data, trend_type='linear') Remove mean (DC component):: >>> demeaned = detrend(data, trend_type='constant') """ dim = get_maybe_only_dim(darray, dim) return xr.apply_ufunc( scipy.signal.detrend, darray, input_core_dims=[[dim]], output_core_dims=[[dim]], kwargs={"type": trend_type}, )
@xr.register_dataarray_accessor("sps_filters") @xr.register_dataset_accessor("sps_filters")
[docs] class FilterAccessor: """ Accessor exposing common frequency and filtering methods. Registered as xarray accessor under `.sps_filters` for both DataArray and Dataset objects. Attributes ---------- darray : xarray.DataArray | xarray.Dataset The wrapped xarray object. Examples -------- Apply filters via accessor:: >>> data.sps_filters.low(10) # lowpass at 10 Hz >>> data.sps_filters.bandpass(5, 15) # bandpass 5-15 Hz """ def __init__(self, darray: xr.DataArray | xr.Dataset) -> None:
[docs] self.darray: xr.DataArray | xr.Dataset = darray
@property
[docs] def dt(self) -> float: """Sampling step of last axis.""" return get_sampling_step(self.darray)
@property
[docs] def fs(self) -> float: """Sampling frequency in inverse units of self.dt.""" return 1.0 / self.dt
@property
[docs] def dx(self) -> np.ndarray: """Sampling steps for all axes as array.""" return np.array( [get_sampling_step(self.darray, str(dim)) for dim in self.darray.dims] )
# NOTE: the arguments are coded explicitly for tab-completion to work, # using a decorator wrapper with *args would not expose them
[docs] def low( self, f_cutoff: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply lowpass filter. Parameters ---------- f_cutoff : float Cutoff frequency in Hz. *args Passed to `lowpass`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design. Returns ------- xarray.DataArray | xarray.Dataset Lowpass-filtered data. """ return lowpass(self.darray, f_cutoff, *args, **kwargs)
[docs] def high( self, f_cutoff: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply highpass filter. Parameters ---------- f_cutoff : float Cutoff frequency in Hz. *args Passed to `highpass`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design. Returns ------- xarray.DataArray | xarray.Dataset Highpass-filtered data. """ return highpass(self.darray, f_cutoff, *args, **kwargs)
[docs] def bandpass( self, f_low: float, f_high: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply bandpass filter. Parameters ---------- f_low : float Lower cutoff frequency in Hz. f_high : float Upper cutoff frequency in Hz. *args Passed to `bandpass`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design. Returns ------- xarray.DataArray | xarray.Dataset Bandpass-filtered data. """ return bandpass(self.darray, f_low, f_high, *args, **kwargs)
[docs] def bandstop( self, f_low: float, f_high: float, *args: Any, **kwargs: Any ) -> xr.DataArray | xr.Dataset: """ Apply bandstop filter. Parameters ---------- f_low : float Lower cutoff frequency in Hz. f_high : float Upper cutoff frequency in Hz. *args Passed to `bandstop`: (order, irtype, filtfilt, apply_kwargs, in_nyq, dim). **kwargs Passed to filter design. Returns ------- xarray.DataArray | xarray.Dataset Bandstop-filtered data. """ return bandstop(self.darray, f_low, f_high, *args, **kwargs)
[docs] def freq( self, f_crit: float | list[float] | tuple[float, float], order: int | None = None, irtype: str = "iir", filtfilt: bool = True, apply_kwargs: dict[str, Any] | None = None, in_nyq: bool = False, dim: str | None = None, **kwargs: Any, ) -> xr.DataArray | xr.Dataset: """ Apply general frequency filter. Parameters ---------- f_crit : float | list[float] | tuple[float, float] Critical frequency or frequencies in Hz. order : int | None, default None Filter order. irtype : {'iir', 'fir'}, default 'iir' Impulse response type. filtfilt : bool, default True Apply forward-backward filtering. apply_kwargs : dict | None, default None Additional filter function kwargs. in_nyq : bool, default False If True, f_crit is normalized by Nyquist frequency. dim : str | None, default None Dimension along which to filter. **kwargs Passed to filter design. Returns ------- xarray.DataArray | xarray.Dataset Filtered data. """ return frequency_filter( self.darray, f_crit, order, irtype, filtfilt, apply_kwargs, in_nyq, dim, **kwargs, )
__call__ = freq
[docs] def savgol( self, window_length: int, polyorder: int, deriv: int = 0, delta: float | None = None, dim: str | None = None, mode: str = "interp", cval: float = 0.0, ) -> xr.DataArray | xr.Dataset: """ Apply Savitzky-Golay filter. Parameters ---------- window_length : int Filter window length (positive odd integer). polyorder : int Polynomial order (< window_length). deriv : int, default 0 Derivative order. delta : float | None, default None Sample spacing. dim : str | None, default None Dimension to filter along. mode : str, default 'interp' Extension mode. cval : float, default 0.0 Constant fill value. Returns ------- xarray.DataArray | xarray.Dataset Filtered data. """ return savgol_filter( self.darray, window_length, polyorder, deriv, delta, dim, mode, cval, )
[docs] def decimate( self, target_sample_rate: float, n_order: int = 8, dim: str | None = None ) -> xr.DataArray | xr.Dataset: """ Decimate signal. Parameters ---------- target_sample_rate : float Target sample rate. n_order : int, default 8 Chebyshev filter order. dim : str | None, default None Dimension to decimate along. Returns ------- xarray.DataArray | xarray.Dataset Decimated data. """ return decimate(self.darray, target_sample_rate, n_order, dim)
[docs] def detrend( self, trend_type: str = "linear", dim: str | None = None ) -> xr.DataArray | xr.Dataset: """ Remove trend from data. Parameters ---------- trend_type : {'linear', 'constant'}, default 'linear' Type of trend to remove. dim : str | None, default None Dimension to detrend along. Returns ------- xarray.DataArray | xarray.Dataset Detrended data. """ return detrend(self.darray, dim, trend_type)
[docs] def resample_poly( self, target_sample_rate: float, pad_type: str = "mean", dim: str | None = None ) -> xr.DataArray | xr.Dataset: """ Resample using polyphase filtering. Parameters ---------- target_sample_rate : float Target sample rate. pad_type : str, default 'mean' Padding type for resampling. dim : str | None, default None Dimension to resample along. Returns ------- xarray.DataArray | xarray.Dataset Resampled data. """ return resample_poly( self.darray, target_sample_rate, dim=dim, pad_type=pad_type )