"""
Collection of digital filters and filter front ends for filters defined in the *scipy*
package
In the filtering module of *signal_processing* tool box three base filters have been
implemented:
1. Ideal block filter (*bandpass_block_filter*)
2. Kaiser filter (*kaiser_bandpass_filter*)
3. Butterworth filter (*butter_bandpass_filter*).
Notes
-----
* All filters need at least a low *or* a high cut-off frequency as an input argument.
* In case only a low cut-off frequency is passed, the filters behave as a high-pass
filter, as only the frequencies below f_cut_low are suppress
* In case only a high cut-off frequency is passed, the filters behave as a low
pass-filter, as only the frequencies above the f_cut_high are suppressed.
* In case both the low and high cut-off frequencies are given, the filters are
band-pass filters.
* The cut-off frequencies and the sample frequency are given in Hz.
* The *kaiser_bandpass_filter* and *butter_bandpass_filter* are in fact front-ends to
the *scipy* filters.
To use the scipy filter, quite some steps are required to calculate
the filter coefficients.
The implementations in this *signal_processing* package take care of that and then
call the *scipy* filters.
* The Ideal block filter *bandpass_block_filter* is a literal copy of the
Matlab *band_pass2* filter
* On top of the base filters an extra front end is defined, *filter_signal*,
which receives the same input arguments as all three filtered mentioned above,
plus an argument *filter_type*. With this function, we can pick which filter we
want to use: *kaiser*, *block* or
*butterworth*. The default choice is *block*.
"""
import logging
import numpy as np
from numpy import pi
from scipy.ndimage import shift
from scipy.signal import butter, filtfilt, firwin, kaiserord, lfilter
BUTTER_DEFAULT_ORDER = 2
logger = logging.getLogger(__name__)
[docs]
def matrix_fft(
om_spec=None,
spec=None,
eps=None,
om_DFT=None,
DFT=None,
om_RAO=None,
RAO=None,
t=None,
TT=None,
s=4,
):
"""Matrix FFT routine obtained from matlab
Parameters
----------
om_spec : array_like
frequencies of spectrum
spec : array_like
spectrum density
eps : float
eps must always be in degrees
om_DFT : array_like
frequencies of DFT
DFT : int
DFT components
om_RAO : array_like
array with frequencies
RAO : float
rao array
t : array_like
array with time values
TT : array_like
array with values
s : int
Switch to determine the mode of operation, must be integer between 1 and
5
Returns
-------
:class:list
List with arrays. The amount of arrays returned depends on the mode used
Notes
-----
* FFT routine copied from HMS matfft by RvD. Modified input argument
treatment: named input arguments are used Major difference is that this
routine is only for 1-D signals, in the matlab code the array may contain
more than one 1-D signals.
* In case more than one signal needs to be filtered, in the python version
you need to loop over the signals. Since the filter is linear it does not
influence the calculation time
* Depending on the option s, the following input argument are used an may
not be None::
1. spec 2 DFT use: [om_DFT,DFT] = matfft(om_spec,spec,eps,1)
2. apply RAO use: [om_DFT,DFT] = matfft(om_DFT, DFT,om_RAO,
RAO,2)
3. DFT 2 TT use: [t ,TT] = matfft(om_DFT, DFT,3)
4. TT 2 DFT use: [om_DFT,DFT] = matfft(t,TT,4)
5. DFT 2 spec use: [om_spec,spec,eps] = matfft(om_DFT, DFT,5)
eps always in degrees!!!
* The original code was followed as close as possible which means the
variable naming does not follow the PIP standards
"""
# check if the correct arguments are passed
if s == 1:
if om_spec is None or spec is None or eps is None:
raise ValueError("om_spec, spec, and eps arguments must be given")
elif s == 2:
if om_DFT is None or DFT is None or om_RAO is None or RAO is None:
raise ValueError("om_DFT, DFT, om_RAO, RAO arguments must be given")
elif s in (3, 5):
if om_DFT is None or DFT is None:
raise ValueError("om_DFT, and DFT arguments must be given")
elif s == 4:
if t is None or TT is None:
raise ValueError("t, and TT arguments must be given")
else:
raise ValueError("Option s must be integer between 1 and 5")
if s == 3:
om = om_DFT
N = om.size
dw = om[1]
dt = 2 * pi / dw / N
t = np.linspace(0, (N - 1) * dt, N)
ns = DFT.size
DFTL = np.zeros(N, dtype=complex)
DFTL[:ns] = DFT[:]
if N > ns:
# zero padding if omega is larger than DFT
DFTL[ns:] = complex(0, 0)
if N % 2 == 0:
n_fft = N - 1
else:
n_fft = N
TT = np.real(np.fft.ifft(DFTL, n=n_fft))
if n_fft != N:
# in case we have used the add n_fft with a even N, append a zero at
# the end to correct
# the length
TT = np.append(TT, np.array([0]))
output = [t, TT]
elif s == 4:
dt = t[1] - t[0]
if max(np.abs(np.diff(t) - dt)) > dt / 100.0:
raise ValueError("time range must be equidistant")
if TT.shape[0] != t.size:
raise ValueError("first dim. of TT must equal length(t)")
dt = (t[-1] - t[0]) / (t.size - 1)
N = t.size # number of timesteps
dw = 2 * pi / N / dt # frequency step [rad/s]
om = np.linspace(0, (N - 1) * dw, N, endpoint=True)
if N % 2 == 0:
n_fft = N - 1
else:
n_fft = N
DFT = np.fft.fft(TT, n=n_fft)
if n_fft != N:
# in case we have used the add n_fft with a even N, append a zero
# at the end to correct the length
DFT = np.append(DFT, np.array([0 + 0 * 1j]))
if N % 2 == 0:
logger.debug(f"In even section {N}")
# some magic
DFT[: int(N / 2) + 1] = 2 * DFT[: int(N / 2) + 1]
DFT[0] /= 2.0
DFT[int(N / 2) + 1] /= 2.0
DFT[int(N / 2) + 1 :] = 0
else:
logger.debug(f"In odd section {N}")
DFT[1 : int((N - 1) / 2)] = 2 * DFT[1 : int((N - 1) / 2)]
DFT[int((N - 1) / 2) + 1 :] = 0
output = [om, DFT]
else:
raise ValueError("Only s=3 and s=4 are implemented")
return output
[docs]
def bandpass_block_filter(T, X, wfiltlo=None, wfiltup=None):
"""
Ideal band pass filter
Parameters
----------
T: arry_like
time base 'T' (seconds !!) of size N OR delta time already.
X: array_like
signal 'X' of size N. Note that I took out the M dimensional signals
wfiltlo: float, optional
Filter frequency 'wfiltlo': lower boundary (high pass filter) in
rad/s. In case wfillo is not given, zero is imposed, i.e. the band
pass filter turns into a low pass filter. The high cut-off frequency
must be given in that case
wfiltup: float, optional
filter frequency 'wfiltup': upper boundary (low pass filter) in
rad/s. In case wfilup is not given, the Nyquist frequency is imposed
, i.e. the band pass filter turns into a high pass filter.
The low cut-off frequency must be given in that case
Returns
-------
ndarray
1-D array with the band passed filtered signal
Raises
------
ValueError:
* In case no cut-off frequencies are defined
* In case that wfiltlo >= wfiltup
Examples
--------
Define as signal with some noise
>>> x = np.linspace(0, 10, num=500, endpoint=True)
>>> fs = 1 / (x[1] - x[0])
>>> y_orig = np.sin(2 * pi * x / 2.5)
>>> y_noise = y_orig + np.random.random_sample(x.size)
Filter the signal with the ideal block filter to recover the peak period at 2.5 s.
Set the low and high cut-off frequencies at 3 s and 1 s, respectively
>>> f_low = 2 * pi / 3.0 # T = 3.0 s. f_low is the frequency in rad/s
>>> f_hig = 2 * pi / 1.0 # T = 1.0 s. f_hig is the frequency in rad/s
>>> y_recov = bandpass_block_filter(x, y_noise, wfiltlo=f_low, wfiltup=f_hig)
The recovered signal *y_recov* now contains the filter signal.
Notes
-----
* Explicity extend the input signal with one value in case we have an odd
signal. This is to avoid a bug in the fft making the function extremely
slow for an odd number of points.
* For the rest the original matlab code is followed as close as possible,
which means that the variable naming does not follow the PIP standards
"""
N = X.size
if N % 2 == 1:
# we have an odd signal, make sure that is is even, otherwise the filter
# becomes extremely slow
X = np.append(X, X[-1])
N = X.size
extended_array = True
else:
extended_array = False
# put the ValueExceptions on the top
if wfiltlo is None and wfiltup is None:
raise ValueError(
"At least one cut-off frequcny needs to be given. Please define "
"either wfiltlo or wfiltup"
)
if wfiltlo is not None and wfiltup is not None and abs(wfiltlo) >= abs(wfiltup):
raise ValueError(
"Low pass filter frequency must be < high pass filter frequency."
)
try:
T = T - T[0]
if T.size != X.size:
raise ValueError("Length of array X must be equal to length of array T")
dt = (T[-1] - T[0]) / (N - 1)
except (TypeError, IndexError):
# if the reference to T[0] fails, it is assumed that dt is passed
# through T already
dt = T
wNyq = pi / dt # Nyquist frequency = 1/2 sampling frequency
if wfiltup is None or (wfiltup is not None and wfiltup > wNyq):
wfiltup = wNyq
logger.warning(
"High pass filter frequency > Nyquist frequency\n"
"High pass filter frequency set to Nyquist frequency = pi/dT"
)
if wfiltlo is None:
wfiltlo = 0
# New code Feb 2007, using matfft and applying RAO
# equidistant time (3 times original time trace)
TT = np.linspace(0, 3 * (N - 1) * dt, 3 * (N - 1) + 1, endpoint=True)
XX = np.zeros(3 * X.size - 2)
XX[: N - 1] = -X[-1:0:-1] + 2 * X[0]
XX[N - 1 : 2 * N - 1] = X[:]
XX[2 * N - 1 : 3 * N - 1] = -X[-1:0:-1] + 2 * X[-1]
if wfiltup < 0 or wfiltlo < 0:
# lijnspiegeling
XX[: N - 1] = -X[1::-1]
XX[2 * N] = X[:]
XX[2 * N :] = X[-1::-1]
# note: left side of FFT contains twice energy, right side = zero
om_DFT, DFT = matrix_fft(t=TT, TT=XX, s=4)
RAO = np.where((om_DFT > abs(wfiltup)) | (om_DFT < abs(wfiltlo)), 0, 1)
DFTr = DFT * RAO
TT, YY = matrix_fft(om_DFT=om_DFT, DFT=DFTr, s=3)
Y = YY[int(np.floor(int(YY.size / 3.0))) : int(np.ceil(2.0 * YY.size / 3.0))]
if extended_array:
# we added a zero at the beginning to make the signal even. Now remove
# the last point again to keep the size of the output signal the same
# as the input
Y = Y[:-1]
return Y
[docs]
def band_pass_block(omega, fs=1, lowcut=None, highcut=None):
"""A perfect block filter with unity response
Parameters
----------
omega: class:`numpy.ndarray`
Frequencies in rad/s
fs: float
Sample frequency in Hz
lowcut: float
Cut-off low frequency in Hz
highcut: float
Cut-off high frequency in Hz
Returns
-------
:class:`numpy.ndarray`
1-D array with 1 in between lowcut, highcut and 0 elsewhere
Examples
--------
Create a array with some radial frequencies running from 0 to 2.5 rad/s
>>> frequencies = np.linspace(0, 2.5, 10)
Calculate the block response
>>> band_pass_block(omega=frequencies, fs=1.0, lowcut=0.1, highcut=0.2)
array([ 0., 0., 0., 1., 1., 0., 0., 0., 0., 0.])
"""
h_block = np.ones(omega.shape)
h_zero = np.zeros(omega.shape)
if lowcut is not None:
h_block = np.where(omega * fs / (2 * np.pi) <= lowcut, h_zero, h_block)
if highcut is not None:
h_block = np.where(omega * fs / (2 * np.pi) > highcut, h_zero, h_block)
return h_block
[docs]
def kaiser_bandpass_coefficients(lowcut, highcut, fs, f_width_edge=None, ripple_db=200):
"""Calculates the Kaiser band pass filter coefficients
Parameters
----------
lowcut: float
The low cut-off frequency [Hz]
highcut: float
The high cut-off frequency [Hz]
fs: float
Sample frequency [Hz]
f_width_edge : float, optional
Give the distance in Hz for the edges of the filter. If None, take 1 Hz.
Default = None
ripple_db: float, option.
The desired attenuation in the stop band, in dB. Default = 200 db
Returns
-------
tuple:
(*band_pass_coefficients*, *n_delay*)
- *band_pass_coefficients*: list with the band pass coefficients
- *n_delay*: the number of delay samples
"""
f_nyq = fs / 2.0
if highcut is not None and highcut >= f_nyq:
logger.info(
"The high cut frequency of {} is higher than Nyquist {}".format(
highcut, f_nyq
)
)
highcut = None
if lowcut is not None and lowcut == 0:
lowcut = None
if lowcut is None and highcut is None:
raise AssertionError(
"Both low and high cut off are set to none (of set outside the "
"range 0~f_nyq. Please make sure that you have proper boundaries"
)
# turn edge width from Hz to fraction of Nyquist maximum frequency
if f_width_edge is not None:
width = f_width_edge / f_nyq
else:
width = 1.0 / f_nyq
# Compute the order and Kaiser parameter for the FIR filter.
n_kaiser, beta = kaiserord(ripple_db, width)
# Use firwin with a Kaiser window to create a high pass FIR filter.
# This is done by modifying the lp to hp taps
if lowcut is not None:
# low cut is defined so create the high-pass filter coefficient here.
# Do this by taking
# the low-pass filter and apply a spectral inversion
hp_taps = firwin(n_kaiser, lowcut / f_nyq, window=("kaiser", beta))
# spectral inversion: multiply with -1 and add one to zero frequency
hp_taps *= -1
hp_taps[int(hp_taps.size / 2)] += 1
else:
# low cut is None, which means that we only do a low-pass filter
hp_taps = None
if highcut is not None:
# Use firwin with Kaiser to create the low pas filter up the highest
# frequency
lp_taps = firwin(n_kaiser, highcut / f_nyq, window=("kaiser", beta))
if hp_taps is not None:
# in case the high-pass filter is defined as well, remove unity at
# zero frequency again
lp_taps[int(hp_taps.size / 2)] -= 1
else:
lp_taps = None
if lowcut is not None and highcut is not None:
# combine the low pass and high pass coefficients into band pass
# coefficients
taps = [sum(pair) for pair in zip(hp_taps, lp_taps)]
elif highcut is None:
taps = hp_taps
elif lowcut is None:
taps = lp_taps
else:
raise AssertionError(
"Kaiser filter is used but both low and high cutoff frequency is " "none"
)
return taps, n_kaiser
[docs]
def kaiser_bandpass_filter(
data,
lowcut=None,
highcut=None,
fs=1,
f_width_edge=0.01,
ripple_db=100.0,
cval=None,
replace_cval_with_unfiltered_signal=False,
replace_cval_with_nan=False,
use_filtfilt=True,
):
"""
Filter the data with the FIR kaiser
Parameters
----------
data: ndarray
Input signal 1D
lowcut: float or None, optional
Lower frequency in Hz. If the low-cut frequency is not given, the filter
turns into a low-pass filter with only a high cut-off frequency defined.
The high-cut must be defined in that case
highcut: float or None, optional
higher frequency in Hz. If the high-cut frequency is not given the filter
turns into a high-pass filter with only the low cut-off frequency
defined. The low-cut frequency must be defined in that case
fs: float, optional
Sample frequency Hz. Default = 1.0 Hz
f_width_edge: float, optional
width of the edges. Default = 0.01
ripple_db: float, optional
Suppression of the stop band in dB. Default = 100
cval: float, optional
Constant value just for the shift. Default = 0
replace_cval_with_unfiltered_signal: bool, optional
You may choose to replace the zero introduced due to the shift.
Default = False
replace_cval_with_nan: bool, optional
Replace the cval introduced by the shift correction with nans
use_filtfilt: bool, optional
If true use the filtfilt which takes out the phase shift. Default = True
Returns
-------
ndarray:
Filtered signal
Raises
------
ValueError:
* In case no cut off frequency is defined
* In case the low cut-off frequency is higher than or equal to the high
cut-off frequency
Notes
-----
* If both the low and high cut-off frequencies *lowcut* and *highcut* are
defined, the kaiser filter is a band pass filter
* In case only the low cut-off frequency *lowcut* is defined, all
frequencies higher than lowcut will pass (i.e. the filter is a high-pass
filter)
* In case only the high cut-off frequency *highcut* is defined, all
frequencies lower than highcut will pass (i.e. the filter is a low-pass
filter)
"""
if highcut is None and lowcut is None:
raise ValueError(
"At least one cut-off frequency, *highcut* or *lowcut* must be " "given"
)
if highcut is not None and lowcut is not None and lowcut >= highcut:
raise ValueError(
"The low cut-off frequency must be smaller than the high cut-off "
"frequency"
)
if replace_cval_with_unfiltered_signal and replace_cval_with_nan:
raise AssertionError(
"Both flags replace_cval_with_unfiltered_signal and "
"replace_cval_with_nan are."
"while only one is allowed"
)
if highcut is not None and highcut > fs / 2:
logger.info(
"Clipping the high cut off frequency to the Nyquist "
"frequency {} - {}"
"".format(highcut, fs / 2)
)
highcut = fs / 2
if lowcut is not None and lowcut == 0:
lowcut = 0.001 * fs
taps, n_delay = kaiser_bandpass_coefficients(
lowcut, highcut, fs, f_width_edge, ripple_db=ripple_db
)
n_shift = int(n_delay / 2)
if use_filtfilt:
y_fir = filtfilt(taps, [1.0], data)
else:
y_fir = lfilter(taps, [1.0], data)
if cval is not None:
# is cval is given as a value, correct the phase shift
y_fir = shift(y_fir, -n_shift, cval=cval)
if replace_cval_with_unfiltered_signal:
# the last part from n_shift contains the value cval; replace it
# with the unfiltered signal
delta_y = data[n_shift] - y_fir[n_shift]
y_fir[-n_shift:] = data[-n_shift:] + delta_y
elif replace_cval_with_unfiltered_signal:
y_fir[-n_shift:] = np.nan
return y_fir
[docs]
def butter_bandpass_coefficients(f_cut_low, f_cut_high, fs, order=BUTTER_DEFAULT_ORDER):
"""Return the filter coefficients of a Butterworth bandpass filter
Parameters
----------
f_cut_low :
Frequency low in Hz: suppress all frequencies below this value
f_cut_high :
Frequency high in Hz: suppress all frequencies above this value
fs :
the sample frequencies in Hz
order :
order of the filter: higher order is sharper drop off edge at the cutoff
frequencies (order n has 20n dB/decaded drop off) (Default value = 2)
Returns
-------
tuple:
(b, a) with the band pass filter coefficients
Notes
-----
* This function is a front-end to the *butter* function from the
*scipy.signal* module.
* The *butter* function requires the low and/or high cut-off frequency to be
expressed as a fraction of the Nyquist frequency defined as 0.5 * f_sample
* This function allows to get the Butterworth filter coefficients using the
absolute high and low cut frequency as an input
* This function is used from the *butterworth_filter* function, which is a
front end to the digital Butterworth filter
"""
nyq = 0.5 * fs
low = f_cut_low / nyq
high = f_cut_high / nyq
return butter(order, [low, high], btype="bandpass")
[docs]
def butter_highpass_coefficients(f_cut_low, fs, order=BUTTER_DEFAULT_ORDER):
"""Return the filter coefficients of a Butterworth high-pass filter
Parameters
----------
f_cut_low :
suppress all frequencies below this value
fs :
the sample frequencies
order :
order of the filter: higher order is sharper drop off edge at the cutoff
frequencies (order n has 20n dB/decade drop off) (Default value = 2)
Returns
-------
tuple:
(b, a) with the high pass filter coefficients
Notes
-----
* The high-pass filter requires a low cut-off frequency: all frequencies
below f_cut_low will be suppressed
"""
nyq = 0.5 * fs
low = f_cut_low / nyq
return butter(order, low, btype="highpass")
[docs]
def butter_lowpass_coefficients(f_cut_high, fs, order=BUTTER_DEFAULT_ORDER):
"""Return the filter coefficients of a Butterworth low-pass filter
Parameters
----------
f_cut_high :
suppress all frequencies above this value
fs :
the sample frequencies
order :
order of the filter: higher order is sharper drop off edge at the cutoff
frequencies (order n has 20n dB/decade drop off) (Default value = 2)
Returns
-------
tuple:
(b, a) with the low pass filter coefficients
Notes
-----
* The low-pass filter requires a high cut-off frequency: all frequencies
above f_cut_high will be suppressed
"""
nyq = 0.5 * fs
high = f_cut_high / nyq
return butter(order, high, btype="lowpass")
[docs]
def butterworth_filter(
data,
f_cut_low=None,
f_cut_high=None,
fs=1,
order=BUTTER_DEFAULT_ORDER,
use_filtfilt=True,
):
"""Apply the Butterworth filter (high-, low, or band-pass) to the signal
'data'.
Parameters
----------
data : ndarray
Array with the signal to be filtered
f_cut_low : float or None, optional
low frequency below which to suppress all signals (Default value = None)
f_cut_high : float or None, optional
high frequency above which to suppress all signals (Default value =None)
fs : float, optional
sample frequency (Default value = 1)
order : int
order of the filter (Default value = 2)
use_filtfilt: bool, optional
If true use the filtfilt filter from scipy to take out the phase shift.
Default = True.
If False, the lfilter is used instead, causing a phase shift.
Returns
-------
ndarray:
Filtered signal
Raises
------
ValueError:
* In case no cut-off frequencies are defined
* In case that wfiltlo >= wfiltup
Examples
--------
First create a noise signal with 2 harmonics with a period of 2.5 s and 1.0 s
>>> x = np.linspace(0, 10, num=500)
>>> fs = 1 / (x[1] - x[0])
>>> y_orig = np.sin(2 * pi * x / 2.5) + np.sin(2 * pi * x / 1.0)
>>> y_noise = y_orig + np.random.random_sample(x.size)
Filter the signal with the high pass butt block filter to recover the peak with a
period of 1.0 s.
Note that for the high-pass filter, we have to define the low-cut frequency
f_low (as all the frequencies below f_low are suppressed)
>>> f_low = 1 / 2.0 # T = 2.0 s. f_low is the frequency in Hz
>>> y_recover = butterworth_filter(y_noise, f_cut_low=f_low, order=4, fs=fs)
The recovered signal *y_recov* now contains the filtered signal.
We can also apply a low-pass filter to remove the peak with a frequency of 1 Hz
>>> f_high = 1 / 2.0 # T = 2.0 s. f_low is the frequency in Hz
>>> y_recover = butterworth_filter(y_noise, f_cut_high=f_high, order=4, fs=fs)
Perhaps better would be to use a band pass filter by both defining a high
and low cut-off frequency. If we want to recover the peak with a period of
1.0 seconds we can do
>>> f_high = 1 / 0.5 # T = 0.5 s. Supress all frequencies above 2 Hz
>>> f_low = 1 / 2.0 # T = 2.0 s. Suppress all frequency below 0.5 Hz
>>> y_recover = butterworth_filter(y_noise, f_cut_high=f_high, f_cut_low=f_low,
... fs=fs)
Notes
-----
* The *order* argument is used to control the sharpness of the filter a high order
will result in a stronger suppression of frequencies outside the cut-off limits.
* An *order* which is too high also leads to an overflow of the filter. Use
trial-and-error to find an optimal *order*. Typically, *order* should be
between 2 and 5
* In case the filter type is set to *high*, only the low cut-off frequence
f_cut_low needs to be defined: all the frequencies below f_cut_low will be
suppressed (hence: high-pass)
* In case the filter type is set to *low*, only the high cut-off frequence
f_cut_high needs to be defined: all the frequencies above f_cut_high will
be suppressed (hence: low-pass)
"""
if f_cut_high is None and f_cut_low is None:
raise ValueError(
"At least one of the cut-off frequencies must be given "
"(f_cut_high or f_cut_low)"
)
if f_cut_high is not None and f_cut_low is not None and f_cut_low >= f_cut_high:
raise ValueError(
"The low cut-off frequency must be smaller than the high cut-off "
)
if f_cut_high is None:
# only the low cut-off frequency is given, so we have a high-pass filter
coefficients = butter_highpass_coefficients(f_cut_low, fs, order=order)
elif f_cut_low is None:
# only the high cut-off frequency is given, so we have a low-pass
# filter
coefficients = butter_lowpass_coefficients(f_cut_high, fs, order=order)
else:
# both the high and low cut off frequencies are given, so we have a
# band pass filter
coefficients = butter_bandpass_coefficients(
f_cut_low, f_cut_high, fs, order=order
)
b, a = coefficients[0], coefficients[1]
if use_filtfilt:
data_filtered = filtfilt(b, a, data)
else:
data_filtered = lfilter(b, a, data)
return data_filtered
[docs]
def remove_phase_shift(signal, degrees=True):
"""Apply a phase shift to the angles of *signal* to get rid of phase-jumps at
zero degrees
Parameters
----------
signal :
the input array with the yaw angle in degrees or rad
degrees :
flag to indicate that the yaw is in degrees. Default is True
Returns
-------
ndarray:
array with shifted phase such that there are no phase-jumps at zero degrees
Notes
-----
* The yaw angle sometimes has discontinuities when the phase angle goes from 0 to
360. To avoid this, in this routine the range of the yaw is put as good as
possible in the range between 0 and 360.
* In case the MRU 6-DOF motions need to be filtered in order to obtain
the accelerations, the yaw needs to be put into the range 0~360, otherwise the
discontinuity at 0 degrees will lead to an artificial spike in the acceleration.
* In case the signal passes the zero degree two times, this routine will fail.
Examples
--------
Lets assume we have a yaw signal which passed the 0 degrees. If we want to
get the derivative of
the yaw, we need to put it into a range 0~360 such that we get rid of the
discontinuity at 0.
>>> yaw = np.hstack([np.linspace(350, 360, num=5, endpoint=False),
... np.linspace(0, 10, num=5)])
>>> print(yaw)
[ 350. 352. 354. 356. 358. 0. 2.5 5. 7.5 10. ]
In case we want to calculate the first derivative for the yaw, we get a spike at 0
>>> dyawdt = np.diff(yaw)
>>> print(dyawdt)
[ 2. 2. 2. 2. -358. 2.5 2.5 2.5 2.5]
In order to get rid of this spike, first remove the phase shift
>>> yaw2 = remove_phase_shift(yaw)
>>> print(yaw2)
[ 0. 2. 4. 6. 8. 10. 12.5 15. 17.5 20. ]
Now the derivative yields proper values with the spike of -358 degrees
>>> dyaw2dt = np.diff(yaw2)
>>> print(dyaw2dt)
[ 2. 2. 2. 2. 2. 2.5 2.5 2.5 2.5]
"""
if degrees:
phase = np.deg2rad(signal)
else:
phase = signal
# put phase into range -pi ~ pi
new_signal = np.angle(np.exp(1j * phase))
# find the minimum phase angle and set that angle to zero
index_min = np.argmin(new_signal)
min_phase = new_signal[index_min]
# set the minimum phase to zero and move back to degrees if needed
new_signal -= min_phase
# convert the angle back to degrees is required
if degrees:
new_signal = np.rad2deg(new_signal)
# start the phase signal at zero
new_signal -= new_signal[0]
return new_signal
[docs]
def filter_signal(
signal,
filter_type="block",
f_cut_low=None,
f_cut_high=None,
f_sampling=1,
f_width_edge=0.05,
ripple_db=100,
order=BUTTER_DEFAULT_ORDER,
constant_value=None,
use_filtfilt=True,
):
"""Filter the signal of input *signal* base on the input argument
Parameters
----------
signal: ndarray
Input 1D signal
filter_type: {"mean", "butterworth", "kaiser", "block", "none"}
Type of the filter to use. Default = block.
f_cut_low: Quantity or float or None
Low cut-off frequency in Hz. Default = None, which means that f_cut_high must
be given and that we are dealing with a low-pass filter
f_cut_high: Quantity or float or None
High cut-off frequency in Hz. Default = None, which means that f_cut_low must
be given and that we are dealing with a high-pass filter
f_sampling: Quantity or float
Sampling frequency. Default = 1 Hz
f_width_edge: Quantity or float
Sharpness of the stop band of the Kaiser filter. Only used when
filter_type == "kaiser". A smaller f_width_edge approaches the ideal block
filter better. Default = 0.05 Hz
ripple_db: float, optional
The desired attenuation in the stop band in dB. Only used when
filter_type == "kaiser". Default = 100 dB
order: int
Order of the Butterworth filter. Only used when filter_type=="butterworth".
Default = 2.
Higher values causes to approach the ideal block band filter, but also a more
unstable filter.
constant_value: float
Constant to use for the Kaiser filter when a phase shift correction is applied.
Default = None, which means that the phase shift is not correct. Since we use
filtfilt by default, this is not required. Only used when
filter_type == "kaiser"
use_filtfilt: bool
If true use the filtfilt filter from scipy to take out the phase shift.
Default = True.
Returns
-------
ndarray:
Filtered signal
Raises
------
ValueError:
* In case no cut-off frequencies are defined
* In case that wfiltlo >= wfiltup
Notes
-----
* This function is a front end to the other filters in this module
* The Nyquist frequency follows from the sample frequency as f_nyq = f_s / 2
* At least one of the cut-off frequencies must be properly defined, i.e. in the
range 0 ~ f_nyq
* For f_cut_low == None (or 0), the filter is acting as a low-pass filter with its
cut-off frequency given by f_cut_high. All frequencies above f_cut_high will be
suppressed
* For f_cut_high == None or f_cut_high >= f_nyquist, the filter is acting as a
high-pass filter with its cut-off frequency given by f_cut_low.
All frequencies below f_cut_low will be suppressed.
* If both f_cut_low and f_cut_high are in the range 0 ~ f_nyquist, the filter is
acting as a band pass filter
"""
if f_cut_high is None and f_cut_low is None:
raise ValueError(
"Both the low and high cut off frequencies are None. At least set "
"one cut off frequency"
)
if f_cut_low is not None and f_cut_high is not None and f_cut_low >= f_cut_high:
raise ValueError(
"The low cut-off frequency must be smaller than the high cut-off "
)
# it is better to set the high and low cut to None if they are outside the
# valid range 0 ~ f_nyq
if f_cut_high is not None and f_cut_high >= f_sampling / 2:
f_cut_high = None
if f_cut_low is not None and f_cut_low == 0:
f_cut_low = None
if filter_type == "butterworth":
filtered = butterworth_filter(
data=signal,
f_cut_low=f_cut_low,
f_cut_high=f_cut_high,
fs=f_sampling,
order=order,
)
elif filter_type == "mean":
filtered = signal - signal.mean()
elif filter_type == "block":
delta_t = 1.0 / f_sampling
# The block filter does not allow a pure high or low-pass filter, only band.
# Mimic pure high or low pass by settings the cut-off frequencies to 0 or the
# Nyquist frequency
if f_cut_low is None:
# In this case, we want to model a low-pass filter (with the f_cut_high as
# the maximum f)
# set omega_cut to 0
omega_cut_low = 0
else:
omega_cut_low = 2 * pi * f_cut_low
if f_cut_high is None:
# In this case, we want to model a high-pass filter (with the f_cut_low as
# the minimum f) # set omega_cut to the Nyquist frequency f_n = f_s / 2
omega_cut_high = 2 * pi * f_sampling / 2.0
else:
omega_cut_high = 2 * pi * f_cut_high
filtered = bandpass_block_filter(
delta_t, signal, wfiltlo=omega_cut_low, wfiltup=omega_cut_high
)
elif filter_type == "kaiser":
filtered = kaiser_bandpass_filter(
signal,
lowcut=f_cut_low,
highcut=f_cut_high,
fs=f_sampling,
f_width_edge=f_width_edge,
ripple_db=ripple_db,
cval=constant_value,
use_filtfilt=use_filtfilt,
)
elif filter_type == "none":
# No filtering at all. Pass the signal without changing.
filtered = signal
else:
logger.warning("Filter not yet implemented")
raise ValueError(f"Filter not recognised: {filter_type}")
return filtered