#!/usr/bin/python
# vim: set expandtab ts=4 sw=4:
"""
Functions for handling and assessing IMFs.
Supporting tools for IMFs estimated using the emd.sift submodule.
"""
import logging
import numpy as np
from scipy import signal
from ._sift_core import (get_padded_extrema, interp_envelope,
zero_crossing_count)
from .support import ensure_2d, ensure_equal_dims
# Housekeeping for logging
logger = logging.getLogger(__name__)
[docs]def amplitude_normalise(X, thresh=1e-10, clip=False, interp_method='pchip',
max_iters=3):
"""Normalise the amplitude envelope of an IMF to be 1.
Multiple runs of normalisation are carried out until the desired threshold
is reached. This uses the method described as part of the AM-FM transform
[1]_
Parameters
----------
X : ndarray
Input array of IMFs to be normalised
thresh : float
Threshold for stopping normalisation (Default value = 1e-10)
clip : bool
Whether to clip the output between -1 and 1 (Default value = False)
interp_method : {'pchip','mono_pchip','splrep'}
Method used to interpolate envelopes (Default value = 'pchip')
max_iters : int
Maximum number of iterations of normalisation to perform
Returns
-------
ndarray
Amplitude normalised IMFs
References
----------
.. [1] Huang, N. E., Wu, Z., Long, S. R., Arnold, K. C., Chen, X., & Blank,
K. (2009). On Instantaneous Frequency. Advances in Adaptive Data Analysis,
1(2), 177–229. https://doi.org/10.1142/s1793536909000096
"""
logger.info('STARTED: Amplitude-Normalise')
if X.ndim == 2:
logger.debug('Normalising {0} samples across {1} IMFs'.format(*X.shape))
else:
logger.debug('Normalising {0} samples across {1} first-level and {2} second-level IMFs'.format(*X.shape))
logger.debug('Using {0} interpolation with threshold of {1} and max_iters {2}'.format(interp_method,
thresh,
max_iters))
# Don't normalise in place
X = X.copy()
orig_dim = X.ndim
if X.ndim == 2:
X = X[:, :, None]
extrema_opts = {'method': 'numpypad'} # Rilling doesn't make sense for combined extrema
for iimf in range(X.shape[1]):
for jimf in range(X.shape[2]):
env = interp_envelope(X[:, iimf, jimf], mode='combined',
interp_method=interp_method,
extrema_opts=extrema_opts)
if env is None:
continue_norm = False
else:
continue_norm = True
iters = 0
while continue_norm and (iters < max_iters):
iters += 1
X[:, iimf, jimf] = X[:, iimf, jimf] / env
env = interp_envelope(X[:, iimf, jimf], mode='combined',
interp_method=interp_method,
extrema_opts=extrema_opts)
if env is None:
continue_norm = False
else:
continue_norm = True
iter_val = np.abs(env.sum() - env.shape[0])
if iter_val < thresh:
continue_norm = False
logger.info('Normalise of IMF-{0}-{1} complete in {2} iters (val={3})'.format(iimf,
jimf,
iters,
iter_val))
if clip:
logger.debug('Clipping signal to -1:1 range')
# Make absolutely sure nothing daft is happening
X = np.clip(X, -1, 1)
if orig_dim == 2:
X = X[:, :, 0]
logger.info('COMPLETED: Amplitude-Normalise')
return X
[docs]def wrap_phase(IP, ncycles=1, mode='2pi'):
"""
Wrap a phase time-course.
Parameters
----------
IP : ndarray
Input array of unwrapped phase values
ncycles : int
Number of cycles per wrap (Default value = 1)
mode : {'2pi','-pi2pi'}
Flag to indicate the values to wrap phase within (Default value = '2pi')
Returns
-------
ndarray
Wrapped phase time-course
"""
if mode not in ['2pi', '-pi2pi']:
raise ValueError("Invalid mode value")
if mode == '2pi':
phases = (IP) % (ncycles * 2 * np.pi)
elif mode == '-pi2pi':
phases = (IP + (np.pi * ncycles)) % (ncycles * 2 * np.pi) - (np.pi * ncycles)
return phases
# --------------------------
# Assess IMF 'quality'
[docs]def is_imf(imf, avg_tol=5e-2, envelope_opts=None, extrema_opts=None):
"""Determine whether a signal is a 'true IMF'.
Two criteria are tested. Firstly, the number of extrema and number of
zero-crossings must differ by zero or one. Secondly,the mean of the upper
and lower envelopes must be within a tolerance of zero.
Parameters
----------
imf : 2d array
Array of signals to check [nsamples x nimfs]
avg_tol : float
Tolerance of acceptance for criterion two. The sum-square of the mean
of the upper and lower envelope must be below avg_tol of the sum-square
of the signal being checked.
envelope_opts : dict
Dictionary of envelope estimation options, must be identical to options
used when estimating IMFs.
extrema_opts : dict
Dictionary of extrema estimation options, must be identical to options
used when estimating IMFs.
Returns
-------
array [2 x nimfs]
Boolean array indicating whether each IMF passed each test.
Notes
-----
These are VERY strict criteria to apply to real data. The tests may
indicate a fail if the sift doesn't coverge well in a short segment of the
signal when the majority of the IMF is well behaved.
The tests are only valid if called with identical envelope_opts and
extrema_opts as were used in the sift estimation.
"""
imf = ensure_2d([imf], ['imf'], 'is_imf')
if envelope_opts is None:
envelope_opts = {}
checks = np.zeros((imf.shape[1], 2), dtype=bool)
for ii in range(imf.shape[1]):
# Extrema and zero-crossings differ by <=1
num_zc = zero_crossing_count(imf[:, ii])
num_ext = signal.find_peaks(imf[:, ii])[0].shape[0] + signal.find_peaks(-imf[:, ii])[0].shape[0]
# Mean of envelopes should be zero
upper = interp_envelope(imf[:, ii], mode='upper',
**envelope_opts, extrema_opts=extrema_opts)
lower = interp_envelope(imf[:, ii], mode='lower',
**envelope_opts, extrema_opts=extrema_opts)
# If upper or lower are None we should stop sifting altogether
if upper is None or lower is None:
logger.debug('IMF-{0} False - no peaks detected')
continue
# Find local mean
avg = np.mean([upper, lower], axis=0)[:, None]
avg_sum = np.sum(np.abs(avg))
imf_sum = np.sum(np.abs(imf[:, ii]))
diff = avg_sum / imf_sum
# TODO: Could probably add a Rilling-like criterion here. ie - is_imf
# is true if (1-alpha)% of time is within some thresh
checks[ii, 0] = np.abs(np.diff((num_zc, num_ext))) <= 1
checks[ii, 1] = diff < avg_tol
msg = 'IMF-{0} {1} - {2} extrema and {3} zero-crossings. Avg of envelopes is {4:.4}/{5:.4} ({6:.4}%)'
msg = msg.format(ii, np.alltrue(checks[ii, :]), num_ext, num_zc, avg_sum, imf_sum, 100*diff)
logger.debug(msg)
return checks
[docs]def check_decreasing_freq(IF, mode='proportion'):
"""Similar to method 1 in http://dx.doi.org/10.11601/ijates.v5i1.139.
Parameters
----------
IF : ndarray
nsamples x nimfs array of instantaneous frequency values
mode : {'proportion', 'sum', 'full'}
Flag indicating whether the proportion of overlapping samples
('proportion', default), the total number of overlapping samples
('sum') or the full nsamples x nimfs-1 array ('full') will be returned
Returns
-------
metric : ndarray
nimfs-1 length vector containing the proportion of samples in which the
IF of adjacent pairs of IMFs overlapped. This is returned per-sample if
input squash_time is None.
"""
# Find frequency differences
dIF = np.diff(IF, axis=1)
metric = dIF > 0
if mode == 'sum' or mode == 'proportion':
metric = np.nansum(dIF > 0, axis=0)
if mode == 'proportion':
metric = metric / dIF.shape[0]
return metric
[docs]def est_orthogonality(imf):
"""Compute the index of orthogonality from a set of IMFs.
Method is described in equation 6.5 of Huang et al (1998) [1]_.
Parameters
----------
imf : ndarray
Input array of IMFs
Returns
-------
ndarray
Matrix of orthogonality values [nimfs x nimfs]
References
----------
.. [1] Huang, N. E., Shen, Z., Long, S. R., Wu, M. C., Shih, H. H., Zheng,
Q., … Liu, H. H. (1998). The empirical mode decomposition and the Hilbert
spectrum for nonlinear and non-stationary time series analysis. Proceedings
of the Royal Society of London. Series A: Mathematical, Physical and
Engineering Sciences, 454(1971), 903–995.
https://doi.org/10.1098/rspa.1998.0193
"""
ortho = np.ones((imf.shape[1], imf.shape[1])) * np.nan
for ii in range(imf.shape[1]):
for jj in range(imf.shape[1]):
ortho[ii, jj] = np.abs(np.sum(imf[:, ii] * imf[:, jj])) \
/ (np.sqrt(np.sum(imf[:, jj] * imf[:, jj])) * np.sqrt(np.sum(imf[:, ii] * imf[:, ii])))
return ortho
[docs]def pseudo_mode_mixing_index(imf):
"""Compute the Pseudo Mode Mixing Index from a set of IMFs.
Section VI in Wang et al (2018) _[1]
Parameters
----------
imf : ndarray
Input array of IMFs
Returns
-------
ndarray
Vector of PSMI [nimfs,]
References
----------
.. [1] Wang, Y.-H., Hu, K., & Lo, M.-T. (2018). Uniform Phase Empirical
Mode Decomposition: An Optimal Hybridization of Masking Signal and Ensemble
Approaches. IEEE Access, 6, 34819–34833.
https://doi.org/10.1109/access.2018.2847634
"""
psmi = np.zeros((imf.shape[1],))
for ii in range(imf.shape[1]-1):
num = np.dot(imf[:, ii], imf[:, ii+1])
denom = np.linalg.norm(imf[:, ii])**2 + np.linalg.norm(imf[:, ii+1])**2 + 1e-8
psmi[ii] = np.max([num / denom, 0])
return psmi
# --------------------------
# Epoching
[docs]def find_extrema_locked_epochs(X, winsize, lock_to='peaks', percentile=None):
"""Define epochs around peaks or troughs within the data.
Parameters
----------
X : ndarray
Input time-series
winsize : int
Width of window to extract around each extrema
lock_to : {'max','min'}
Flag to select peak or trough locking (Default value = 'max')
percentile : float
Optional flag to selection only the upper percentile of extrema by
magnitude (Default value = None)
Returns
-------
ndarray
Array of start and end indices for epochs around extrema.
"""
if lock_to not in ['peaks', 'troughs', 'combined']:
raise ValueError("Invalid lock_to value")
locs, pks = get_padded_extrema(X, pad_width=0, mode=lock_to)
if percentile is not None:
thresh = np.percentile(pks, percentile)
locs = locs[pks > thresh]
pks = pks[pks > thresh]
winstep = int(winsize / 2)
# Get all trials
trls = np.r_[np.atleast_2d(locs - winstep), np.atleast_2d(locs + winstep)].T
# Reject trials which start before 0
inds = trls[:, 0] < 0
trls = trls[inds == False, :] # noqa: E712
# Reject trials which end after X.shape[0]
inds = trls[:, 1] > X.shape[0]
trls = trls[inds == False, :] # noqa: E712
return trls
[docs]def apply_epochs(X, trls):
"""Apply a set of epochs to a continuous dataset.
Parameters
----------
X : ndarray
Input dataset to be epoched
trls : ndarray
2D array of start and end indices for each epoch. The second dimension
should be of len==2 and contain start and end indices in order.
Returns
-------
ndarray
Epoched time-series
"""
Y = np.zeros((trls[0, 1] - trls[0, 0], X.shape[1], trls.shape[0]))
for ii in np.arange(trls.shape[0]):
Y[:, :, ii] = X[trls[ii, 0]:trls[ii, 1], :]
return Y
# Circular statistics
#
# These functions are a work in progress and not currently tested.
# Mostly based on equations from wikipedia.
# https://en.wikipedia.org/wiki/Circular_mean
#
# Everything works in radians to match the instantaneous phase estimates.
def _radians_to_complex(IP, IA=None):
"""Convert phase in radians to circular/complex coordinates."""
if IA is None:
IA = np.ones_like(IP)
ensure_equal_dims([IP, IA], ['IP', 'IA'], 'ip_to_complex')
# Actual computation using exponential formula - could equivalently use the
# sine/cosine form - computation time nearly identical
# phi = np.cos(IP) + 1j * np.sin(IP)
phi = IA * np.exp(1j * IP)
return phi
def ip_mean_resultant_vector(IP, IA=None, axis=0):
"""Compute the mean resultant vector of a set of phase values."""
if IA is None:
IA = np.ones_like(IP)
phi = _radians_to_complex(IP, IA=IA)
return phi.mean(axis=axis)
def ip_circular_mean(IP, IA=None, axis=0):
"""Compute the circular mean of a set of phase values."""
phi = ip_mean_resultant_vector(IP, IA=IA, axis=axis)
return np.angle(phi)
def ip_circular_variance(IP, IA=None, axis=0):
"""Compute the circular variance of a set of phase values."""
# https://en.wikipedia.org/wiki/Directional_statistics#Standard_deviation
phi = ip_mean_resultant_vector(IP, IA=IA, axis=axis)
return 1 - np.abs(phi)