Source code for mth5.timeseries.ts_filters

#!/usr/bin/env python

time series filters


# =================================================================
import numpy as np
from scipy import signal

from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
from mth5.utils.mth5_logger import setup_logger

logger = setup_logger(__file__)
# =================================================================

[docs]def butter_bandpass(lowcut, highcut, fs, order=5): """ Butterworth bandpass filter using scipy.signal :param lowcut: low cut frequency in Hz :type lowcut: float :param highcut: high cut frequency in Hz :type highcut: float :param fs: Sample rate :type fs: float :param order: Butterworth order, defaults to 5 :type order: int, optional :return: SOS scipy.signal format :rtype: scipy.signal.SOS? """ nyq = 0.5 * fs if lowcut is not None: low = lowcut / nyq if highcut is not None: high = highcut / nyq if lowcut and highcut: sos = signal.butter( order, [low, high], analog=False, btype="band", output="sos" ) elif highcut is None: sos = signal.butter(order, low, analog=False, btype="low", output="sos") elif lowcut is None: sos = signal.butter( order, high, analog=False, btype="high", output="sos" ) return sos
[docs]def butter_bandpass_filter(data, lowcut, highcut, fs, order=5): """ :param data: 1D time series data :type data: np.ndarray :param lowcut: low cut frequency in Hz :type lowcut: float :param highcut: high cut frequency in Hz :type highcut: float :param fs: Sample rate :type fs: float :param order: Butterworth order, defaults to 5 :type order: int, optional :return: filtered data :rtype: np.ndarray """ sos = butter_bandpass(lowcut, highcut, fs, order=order) y = signal.sosfiltfilt(sos, data) return y
[docs]def low_pass(data, low_pass_freq, cutoff_freq, sampling_rate): """ :param data: 1D time series data :type data: np.ndarray :param low_pass_freq: low pass frequency in Hz :type low_pass_freq: float :param cutoff_freq: cut off frequency in Hz :type cutoff_freq: float :param sampling_rate: Sample rate in samples per second :type sampling_rate: float :return: lowpass filtered data :rtype: np.ndarray """ nyq = 0.5 * sampling_rate filt_order, wn = signal.buttord( low_pass_freq / nyq, cutoff_freq / nyq, 3, 40 ) b, a = signal.butter(filt_order, wn, btype="low") data_filtered = signal.filtfilt(b, a, data) return data_filtered
[docs]def zero_pad(input_array, power=2, pad_fill=0): """ :param input_array: 1D array :type input_array: np.ndarray :param power: base power to used to pad to, defaults to 2 which is optimal for the FFT :type power: int, optional :param pad_fill: fill value for padded values, defaults to 0 :type pad_fill: float, optional :return: zero padded array :rtype: np.ndarray """ len_array = input_array.shape[0] if power == 2: npow = int(np.ceil(np.log2(len_array))) if power == 10: npow = int(np.ceil(np.log10(len_array))) if npow > 32: logger.warning( "Exceeding memory allocation inherent in your computer 2**32. " "Limiting the zero pad to 2**32" ) pad_array = np.zeros(power**npow) if pad_fill != 0: pad_array[:] = pad_fill pad_array[0:len_array] = input_array return pad_array
[docs]class RemoveInstrumentResponse: """ Remove instrument response from the given channel response filter The order of operations is important (if applied): 1) detrend 2) zero mean 3) zero pad 4) time window 5) frequency window 6) remove response 7) undo time window 8) bandpass :param ts: time series data to remove response from :type ts: np.ndarray((N,) , dtype=float) :param time_array: time index that corresponds to the time series :type time_array: np.ndarray((N,) , dtype=np.datetime[ns]) :param sample_interval: seconds per sample (time interval between samples) :type sample_interval: float :param channel_response_filter: Channel response filter with all filters included to convert from counts to physical units :type channel_response_filter: `class`:mt_metadata.timeseries.filters.ChannelResponseFilter` **kwargs** :param plot: to plot the calibration process [ False | True ] :type plot: boolean, default True :param detrend: Remove linar trend of the time series :type detrend: boolean, default True :param zero_mean: Remove the mean of the time series :type zero_mean: boolean, default True :param zero_pad: pad the time series to the next power of 2 for efficiency :type zero_pad: boolean, default True :param t_window: Time domain windown name see `` for options :type t_window: string, default None :param t_window_params: Time domain window parameters, parameters can be found in `` :type t_window_params: dictionary :param f_window: Frequency domain windown name see `` for options :type f_window: string, defualt None :param f_window_params: Frequency window parameters, parameters can be found in `` :type f_window_params: dictionary :param bandpass: bandpass freequency and order {"low":, "high":, "order":,} :type bandpass: dictionary """ def __init__( self, ts, time_array, sample_interval, channel_response_filter, **kwargs, ): self.logger = setup_logger(f"{__name__}.{self.__class__.__name__}") self.ts = ts self.time_array = time_array self.sample_interval = sample_interval self.channel_response_filter = channel_response_filter self.plot = False self.detrend = True self.zero_mean = True self.zero_pad = True self.t_window = None self.t_window_params = {} self.f_window = None self.f_window_params = {} self.bandpass = {} self.fig = None self.nrows = None self.subplot_dict = {} for key, value in kwargs.items(): setattr(self, key, value) def _subplots(self, x, y, color, num, label): """ helper function to make subplots for if plotting is desired :param x: x array :type x: np.ndarray :param y: y array :type y: np.ndarray :param color: color of the line :type color: tuple :param num: subplot number :type num: integer :param label: legend label :type label: string """ ax_t = self.fig.get_axes()[0] ax_f = self.fig.get_axes()[1] ax = self.fig.add_subplot(self.nrows, 2, num, sharex=ax_t) ax2 = self.fig.add_subplot(self.nrows, 2, num + 1, sharex=ax_f) # plot time series line = Line2D([0], [0], color=color) ax.plot(x, y, color=color) f = np.fft.rfftfreq(x.size, d=self.sample_interval) # plot spectra data = np.fft.rfft(y) ax2.loglog(f, abs(data), color=color) ax.legend( [line], [label], loc="upper left", borderaxespad=0.01, borderpad=0.1, markerscale=0.02, ).set_zorder(1000)
[docs] @staticmethod def get_window(window, window_params, size): """ Get window from scipy.signal :param window: name of the window :type window: string :param window_params: dictionary of window parameters :type window_params: dictionary :param size: number of points in the window :type size: integer :return: window function :rtype: class:`scipy.signal` """ return getattr(, window)(size, **window_params)
[docs] def apply_detrend(self, ts): """ Detrend time series using scipy.detrend('linear') :param ts: input time series :type ts: np.ndarray :return: detrended time series :rtype: np.ndarray """ ts = signal.detrend(np.nan_to_num(ts), type="linear") if self.plot: self._subplots( self.time_array, ts, (0.45, 0.1, 0.5), self.subplot_dict["detrend"], "Detrended", ) return ts
[docs] def apply_zero_mean(self, ts): """ Remove the mean from the time series :param ts: input time series :type ts: np.ndarray :return: zero mean time series :rtype: np.ndarray """ ts = ts - ts.mean() if self.plot: self._subplots( self.time_array, ts, (0.55, 0.1, 0.4), self.subplot_dict["zero_mean"], "Zero Mean", ) return ts
[docs] def apply_zero_pad(self, ts): """ zero pad to power of 2, at the end of the time series to make the FFT more efficient :param ts: input time series :type ts: np.ndarray :return: zero padded time series :rtype: np.ndarray """ pad_ts = zero_pad(ts) if self.plot: self._subplots( self.time_array, pad_ts[0 : self.time_array.size], (0.7, 0.1, 0.25), self.subplot_dict["pad"], "Zero Pad", ) return pad_ts
[docs] def apply_t_window(self, ts): """ Apply a window in the time domain. Get the available windows from `` :param ts: input time series :type ts: np.ndarray :return: windowed time series :rtype: np.ndarray """ w = self.get_window(self.t_window, self.t_window_params, self.ts.size) ts = ts * w if self.plot: self._subplots( self.time_array, ts, (0.7, 0.1, 0.25), self.subplot_dict["t_window"], f"t_window {self.t_window.capitalize()}", ) return ts
[docs] def apply_f_window(self, data): """ Apply a frequency domain window. Get the available windows from `` Need to create a window twice the size of the input because we are only taking the rfft which gives just half the spectra and then take only half the window :param data: input spectra :type data: np.ndarray :return: windowed spectra :rtype: np.ndarray """ w = self.get_window(self.f_window, self.f_window_params, 2 * data.size)[ data.size : ] data = data * w if self.plot: f = np.fft.rfftfreq(2 * data.size, d=self.sample_interval)[1:] ax_t = self.fig.get_axes()[0] ax_f = self.fig.get_axes()[1] ax = self.fig.add_subplot( self.nrows, 2, self.subplot_dict["f_window"], sharex=ax_t ) ax2 = self.fig.add_subplot( self.nrows, 2, self.subplot_dict["f_window"] + 1, sharex=ax_f ) ax.plot( self.time_array, abs(np.fft.irfft(data)[0 : self.time_array.size]), color=(0.85, 0.1, 0.15), ) ax2.loglog(f, abs(data), color=(0.85, 0.1, 0.15)) axt2 = ax2.twinx() axt2.semilogx(f, w, color=(0.5, 0.5, 0.5), zorder=0) line = Line2D([0], [0], color=(0.85, 0.1, 0.15)) ax.legend( [line], [f"Freq Window {self.f_window.capitalize()}"], loc="upper left", borderaxespad=0.01, borderpad=0.1, ) return data
[docs] def apply_bandpass(self, ts): """ apply a bandpass filter to the calibrated data :param ts: calibrated time series :type ts: np.ndarray :return: bandpassed time series :rtype: np.ndarray """ ts = butter_bandpass_filter( ts, self.bandpass["low"], self.bandpass["high"], self.sample_interval, order=self.bandpass["order"], ) if self.plot: self._subplots( self.time_array, ts, (0.875, 0.1, 0.1), self.subplot_dict["bandpass"], "Band Pass", ) return ts
def _get_subplot_count(self): """ helper function to get subplot information :return: dictionary of subplot information :rtype: dictionary """ order = [ "detrend", "zero_mean", "t_window", "pad", "f_window", "bandpass", ] pdict = { "pad": self.zero_pad, "zero_mean": self.zero_mean, "detrend": self.detrend, "t_window": self.t_window, "f_window": self.f_window, "bandpass": self.bandpass, } subplot_dict = {} count = 3 nrows = 2 for key in order: value = pdict[key] if value: subplot_dict[key] = count count += 2 nrows += 1 self.nrows = nrows return subplot_dict
[docs] def remove_instrument_response(self, operation="divide"): """ Remove instrument response following the recipe provided :return: calibrated time series :rtype: np.ndarray """ ts = np.copy(self.ts) f = np.fft.rfftfreq(ts.size, d=self.sample_interval) step = 1 if self.plot: self.subplot_dict = self._get_subplot_count() self.fig = plt.figure(figsize=[10, 12]) self.fig.clf() ax = self.fig.add_subplot(self.nrows, 2, 1) ax2 = self.fig.add_subplot(self.nrows, 2, 2) (l1,) = ax.plot(self.time_array, ts, color="k", lw=2) ax.set_xlim((self.time_array[0], self.time_array[-1])) (l2,) = ax2.loglog(f, abs(np.fft.rfft(ts)), "k", lw=2) ax2.set_xlim((f[0], f[-1])) ax.legend( [l1], ["Original"], loc="upper left", borderaxespad=0.01, borderpad=0.1, ) # detrend if self.detrend: ts = self.apply_detrend(ts) self.logger.debug(f"Step {step}: Applying Linear Detrend") step += 1 # zero mean if self.zero_mean: ts = self.apply_zero_mean(ts) self.logger.debug(f"Step {step}: Removing Mean") step += 1 # filter in time domain if self.t_window is not None: ts = self.apply_t_window(ts) self.logger.debug( f"Step {step}: Applying {self.t_window} Time Window" ) step += 1 if self.plot: wax = self.fig.get_axes()[ self.subplot_dict["t_window"] - 1 ].twinx() (tw,) = wax.plot( self.time_array, self.get_window( self.t_window, self.t_window_params, ts.size ), color=(0.75, 0.75, 0.75), zorder=0, ) if self.zero_pad: # pad the time series to a power of 2, this may be overkill, especially for long time series ts = self.apply_zero_pad(ts) self.logger.debug(f"Step {step}: Applying Zero Padding") step += 1 # get the real frequencies of the FFT f = np.fft.rfftfreq(ts.size, d=self.sample_interval) if self.channel_response_filter.filters_list is []: raise ValueError( "There are no filters in channel_response to remove" ) # compute the complex response given the frequency range of the FFT # the complex response assumes frequencies are in reverse order and flip them on input # so we need to flip the complex reponse so it aligns with the fft. cr = self.channel_response_filter.complex_response(f)[::-1] # remove the DC term at frequency == 0 cr[-1] = abs(cr[-2]) + 0.0j data = np.fft.rfft(ts) # remove DC term data[-1] = abs(data[-1]) + 0.0j # if a window is requested then create it here and mulitply by the data # the windows are designed to be symmetrical about frequency = 0 # here we are taking only the real part of the FFT so we cut the window in half if self.f_window is not None: data = self.apply_f_window(data) self.logger.debug( f"Step {step}: Applying {self.f_window} Frequency Window" ) step += 1 if operation == "divide": # calibrate the time series, compute real part of fft, divide out # channel response, inverse fft calibrated_ts = np.fft.irfft(data / cr)[0 : self.ts.size] self.logger.debug( f"Step {step}: Removing Calibration by {operation}" ) step += 1 elif operation == "multiply": # calibrate the time series, compute real part of fft, multiply out # channel response, inverse fft calibrated_ts = np.fft.irfft(data * cr)[0 : self.ts.size] self.logger.debug( f"Step {step}: Removing Calibration by {operation}" ) step += 1 # If a time window was applied, need to un-apply it to reconstruct the signal. if self.t_window is not None: w = self.get_window( self.t_window, self.t_window_params, calibrated_ts.size ) calibrated_ts = calibrated_ts / w self.logger.debug(f"Step {step}: Un-applying Time Window") step += 1 if self.bandpass: calibrated_ts = self.apply_bandpass(calibrated_ts) self.logger.debug(f"Step {step}: Applying Bandpass Filter") step += 1 if self.plot: self._subplots( self.time_array[0 : calibrated_ts.size], calibrated_ts, (1, 0.1, 0.1), self.nrows * 2 - 1, "Calibrated", ) self.fig.get_axes()[-2].set_ylabel( self.channel_response_filter.units_in ) if self.t_window is not None: wax = self.fig.get_axes()[-2].twinx() (tw,) = wax.plot( self.time_array, 1.0 / w, color=(0.5, 0.5, 0.5), zorder=0 ) self.fig.tight_layout() return calibrated_ts
[docs]def adaptive_notch_filter( bx, df=100, notches=[50, 100], notchradius=0.5, freqrad=0.9, rp=0.1, dbstop_limit=5.0, ): """ :param bx: time series to filter :type bx: np.ndarray :param df: sample rate in samples per second, defaults to 100 :type df: float, optional :param notches: list of frequencies to locate notches at in Hz, defaults to [50, 100] :type notches: list, optional :param notchradius: notch radius, defaults to 0.5 :type notchradius: float, optional :param freqrad: radius to search for a peak at the notch frequency, defaults to 0.9 :type freqrad: float, optional :param rp: ripple of Chebyshev type 1 filter, lower numbers means less ripples, defaults to 0.1 :type rp: float, optional :param dbstop_limit: limits the difference between the peak at the notch and surrounding spectra. Any difference above dbstop_limit will be filtered, anything less will not, defaults to 5.0 :type dbstop_limit: float, optional :return: notch filtered data :rtype: np.ndarray :return: list of notch frequencies :rtype: list Example --------- >>> import RemovePeriodicNoise_Kate as rmp >>> # make a variable for the file to load in >>> fn = r"/home/MT/mt01_20130101_000000.BX" >>> # load in file, if the time series is not an ascii file >>> # might need to add keywords to np.loadtxt or use another >>> # method to read in the file >>> bx = np.loadtxt(fn) >>> # create a list of frequencies to filter out >>> freq_notches = [50, 150, 200] >>> # filter data >>> bx_filt, filt_lst = rmp.adaptiveNotchFilter(bx, df=100. >>> ... notches=freq_notches) >>> #save the filtered data into a file >>> np.savetxt(r"/home/MT/Filtered/mt01_20130101_000000.BX", bx_filt) Notes: Most of the time the default parameters work well, the only thing you need to change is the notches and perhaps the radius. I would test it out with a few time series to find the optimum parameters. Then make a loop over all you time series data. Something like >>> import os >>> dirpath = r"/home/MT" >>> #make a director to save filtered time series >>> save_path = r"/home/MT/Filtered" >>> if not os.path.exists(save_path): >>> os.mkdir(save_path) >>> for fn in os.listdir(dirpath): >>> bx = np.loadtxt(os.path.join(dirpath, fn) >>> bx_filt, filt_lst = rmp.adaptiveNotchFilter(bx, df=100. >>> ... notches=freq_notches) >>> np.savetxt(os.path.join(save_path, fn), bx_filt) """ bx = np.array(bx) if type(notches) is list: notches = np.array(notches) elif type(notches) in [float, int]: notches = np.array([notches], dtype=np.float) df = float(df) # make sure df is a float dt = 1.0 / df # sampling rate # transform data into frequency domain to find notches BX = np.fft.fft(zero_pad(bx)) n = len(BX) # length of array dfn = df / n # frequency step dfnn = int(freqrad / dfn) # radius of frequency search fn = notchradius # filter radius freq = np.fft.fftfreq(n, dt) filtlst = [] for notch in notches: if notch > freq.max(): break else: fspot = int(round(notch / dfn)) nspot = np.where( abs(BX) == max(abs(BX[max([fspot - dfnn, 0]) : min([fspot + dfnn, n])])) )[0][0] med_bx = np.median( abs( BX[ max([nspot - dfnn * 10, 0]) : min( [nspot + dfnn * 10, n] ) ] ) ** 2 ) # calculate difference between peak and surrounding spectra in dB dbstop = 10 * np.log10(abs(BX[nspot]) ** 2 / med_bx) if np.nan_to_num(dbstop) == 0.0 or dbstop < dbstop_limit: filtlst.append("No need to filter \n") pass else: filtlst.append([freq[nspot], dbstop]) ws = 2 * np.array([freq[nspot] - fn, freq[nspot] + fn]) / df wp = ( 2 * np.array([freq[nspot] - 2 * fn, freq[nspot] + 2 * fn]) / df ) ford, wn = signal.cheb1ord(wp, ws, 1, dbstop) b, a = signal.cheby1(1, 0.5, wn, btype="bandstop") bx = signal.filtfilt(b, a, bx) return bx, filtlst