Source code for emd.cycles

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

"""
Identification and analysis of cycles in an oscillatory signal.

Routines:

bin_by_phase
phase_align
get_cycle_inds
get_cycle_stat
get_control_points
get_cycle_chain
mean_vector
kdt_match

"""

import numpy as np
from scipy import interpolate as interp

from . import spectra, utils, sift

# Housekeeping for logging
import logging
logger = logging.getLogger(__name__)


def bin_by_phase(ip, x, nbins=24, weights=None, variance_metric='variance',
                 bin_edges=None):
    """
    Compute distribution of x by phase-bins in the Instantaneous Frequency.

    Parameters
    ----------
    ip : ndarray
        Input vector of instataneous phase values
    x : ndarray
        Input array of values to be binned, first dimension much match length of
        IP
    nbins : integer
         number of phase bins to define (Default value = 24)
    weights : ndarray (optional)
         Optional set of linear weights to apply before averaging (Default value = None)
    variance_metric : {'variance','std','sem'}
         Flag to select whether the variance, standard deviation or standard
         error of the mean in computed across cycles (Default value = 'variance')
    bin_edges : ndarray (optional)
         Optional set of bin edges to override automatic bin specification (Default value = None)

    Returns
    -------
    avg : ndarray
        Vector containing the average across cycles as a function of phase
    var : ndarray
        Vector containing the selected variance metric across cycles as a
        function of phase
    bin_centres : ndarray
        Vector of bin centres

    """

    if bin_edges is None:
        bin_edges, bin_centres = spectra.define_hist_bins(0, 2 * np.pi, nbins)
    else:
        nbins = len(bin_edges) - 1
        bin_centres = bin_edges[:-1] + np.diff(bin_edges) / 2

    bin_inds = np.digitize(ip, bin_edges)[:, 0]

    out_dims = list((nbins, *x.shape[1:]))
    avg = np.zeros(out_dims) * np.nan
    var = np.zeros(out_dims) * np.nan
    for ii in range(1, nbins):
        inds = bin_inds == ii
        if weights is None:
            avg[ii - 1, ...] = np.average(x[inds, ...], axis=0)
            v = np.average(
                (x[inds, ...] - np.repeat(avg[None, ii - 1, ...], np.sum(inds), axis=0))**2, axis=0)
        else:
            if inds.sum() > 0:
                avg[ii - 1, ...] = np.average(x[inds, ...], axis=0,
                                              weights=weights[inds].dot(np.ones((1, x.shape[1]))))
                v = np.average((x[inds, ...] - np.repeat(avg[None, ii - 1, ...], np.sum(inds), axis=0)**2),
                               weights=weights[inds].dot(np.ones((1, x.shape[1]))), axis=0)
            else:
                v = np.nan

        if variance_metric == 'variance':
            var[ii - 1, ...] = v
        elif variance_metric == 'std':
            var[ii - 1, ...] = np.sqrt(v)
        elif variance_metric == 'sem':
            var[ii - 1, ...] = np.sqrt(v) / np.repeat(np.sqrt(inds.sum()
                                                              [None, ...]), x.shape[0], axis=0)

    return avg, var, bin_centres


[docs]def phase_align(ip, x, cycles=None, npoints=48, interp_kind='linear'): """ Compute phase alignment of a vector of observed values across a set of cycles. Parameters ---------- ip : ndarray Input array of Instantaneous Phase values to base alignment on x : ndarray Input array of observed values to phase align cycles : ndarray (optional) Optional set of cycles within IP to use (Default value = None) npoints : int Number of points in the phase cycle to align to (Default = 48) interp_kind : {'linear','nearest','zero','slinear', 'quadratic','cubic','previous', 'next'} Type of interpolation to perform. Argument is passed onto scipy.interpolate.interp1d. (Default = 'linear') Returns ------- ndarray : array containing the phase aligned observations """ logger.info('STARTED: phase-align cycles') phase_edges, phase_bins = spectra.define_hist_bins(0, 2 * np.pi, npoints) if cycles is None: cycles = get_cycle_inds(ip) if ip.ndim == 2 and ip.shape[1] > 1: # too many imfs - error msg = 'emd.cycles.phase_align only works on a single IMF - input IP dims are {0}'.format(ip.shape) logger.warning(msg) raise ValueError(msg) elif ip.ndim == 1: ip = ip[:, None] if x.ndim == 2 and x.shape[1] > 1: # too many imfs - error msg = 'emd.cycles.phase_align only works on a single IMF - input x dims are {0}'.format(x.shape) logger.warning(msg) raise ValueError(msg) elif x.ndim == 1: x = x[:, None] if cycles.ndim == 2 and cycles.shape[1] > 1: # too many imfs - error msg = 'emd.cycles.phase_align only works on a single IMF - input cycles dims are {0}'.format(cycles.shape) logger.warning(msg) raise ValueError(msg) elif cycles.ndim == 1: cycles = cycles[:, None] logger.debug('aligning {0} cycles over {1} phase points with {2} interpolation'.format(cycles.max(), npoints, interp_kind)) ncycles = cycles.max() avg = np.zeros((npoints, ncycles)) for ii in range(1, ncycles + 1): phase_data = ip[cycles[:] == ii] x_data = x[cycles[:] == ii] f = interp.interp1d(phase_data, x_data, kind=interp_kind, bounds_error=False, fill_value='extrapolate') avg[:, ii - 1] = f(phase_bins) logger.info('COMPLETED: phase-align cycles') return avg
[docs]def get_cycle_inds(phase, return_good=True, mask=None, imf=None, phase_step=1.5 * np.pi, phase_edge=np.pi / 12): """ Identify cycles within a instantaneous phase time-course and, optionally, remove 'bad' cycles by a number of criteria. Parameters ---------- phase : ndarray Input vector of Instantaneous Phase values return_good : bool Boolean indicating whether 'bad' cycles should be removed (Default value = True) mask : ndarray Vector of mask values that should be ignored (Default value = None) imf : ndarray Optional array of IMFs to used for control point identification when identifying good/bad cycles (Default value = None) phase_step : scalar Minimum value in the differential of the wrapped phase to identify a cycle transition (Default value = 1.5*np.pi) phase_edge : scalar Maximum distance from 0 or 2pi for the first and last phase value in a good cycle. Only used when return_good is True (Default value = np.pi/12) Returns ------- ndarray Vector of integers indexing the location of each cycle Notes ----- Good cycles are those with 1 : A strictly positively increasing phase 2 : A phase starting within phase_step of zero (ie 0 < x < phase_edge) 3 : A phase ending within phase_step of 2pi (is 2pi-phase_edge < x < 2pi) 4 : A set of 4 unique control points (ascending zero, peak, descending zero & trough) Good cycles can be idenfied with: >> good_cycles = emd.utils.get_cycle_inds( phase ) The total number of cycles is then >> good_cycles.max() Indices where good cycles is zero do not contain a valid cycle bad_segments = good_cycles>0 A single cycle can be isolated by matching its index, eg for the 5th cycle cycle_5_inds = good_cycles==5 """ logger.info('STARTED: get cycle indices') logger.debug('computing on {0} samples over {1} IMFs '.format(phase.shape[0], phase.shape[1])) if mask is not None: logger.debug('{0} ({1}%) samples masked out'.format(mask.sum(), np.round(100*(mask.sum()/phase.shape[0]), 2))) if phase.max() > 2 * np.pi: print('Wrapping phase') phase = utils.wrap_phase(phase) cycles = np.zeros_like(phase, dtype=int) for ii in range(phase.shape[1]): inds = np.where(np.abs(np.diff(phase[:, ii])) > phase_step)[0] + 1 # No Cycles to be found if len(inds) == 0: continue # Include first and last cycles, # These are likely to be bad/incomplete in real data but we should # check anyway if inds[0] >= 1: inds = np.r_[0, inds] if inds[-1] <= phase.shape[0] - 1: inds = np.r_[inds, phase.shape[0] - 1] unwrapped = np.unwrap(phase[:, ii], axis=0) count = 1 for jj in range(len(inds) - 1): if mask is not None: # Ignore cycle if a part of it is masked out if any(~mask[inds[jj]:inds[jj + 1]]): continue dat = unwrapped[inds[jj]:inds[jj + 1]] if return_good: cycle_checks = np.zeros((4,), dtype=bool) # Check for postively increasing phase if all(np.diff(dat) > 0): cycle_checks[0] = True # Check that start of cycle is close to 0 if (phase[inds[jj], ii] >= 0 and phase[inds[jj], ii] <= phase_edge): cycle_checks[1] = True # Check that end of cycle is close to pi if (phase[inds[jj + 1] - 1, ii] <= 2 * np.pi) and \ (phase[inds[jj + 1] - 1, ii] >= 2 * np.pi - phase_edge): cycle_checks[2] = True if imf is not None: # Check we find 5 sensible control points if imf is provided try: cycle = imf[inds[jj]:inds[jj + 1]] # Should extend this to cope with multiple peaks etc ctrl = (0, sift.find_extrema(cycle)[0][0], np.where(np.gradient(np.sign(cycle)) == -1)[0][0], sift.find_extrema(-cycle)[0][0], len(cycle)) if len(ctrl) == 5 and np.all(np.sign(np.diff(ctrl))): cycle_checks[3] = True except IndexError: # Sometimes we don't find any candidate for a control point cycle_checks[3] = False else: # No time-series so assume everything is fine cycle_checks[3] = True else: # Pretend eveything is ok cycle_checks = np.ones((3,), dtype=bool) # Add cycle to list if the checks are good if all(cycle_checks): cycles[inds[jj]:inds[jj + 1], ii] = count count += 1 logger.info('found {0} cycles in IMF-{1}'.format(cycles[:, ii].max(), ii)) logger.info('COMPLETED: get cycle indices') return cycles
[docs]def get_cycle_stat(cycles, values, mode='compressed', func=np.mean): """ Compute the average of a set of observations for each cycle. Parameters ---------- cycles : ndarray array whose content index cycle locations values : ndarray array of observations to average within each cycle mode : {'compressed','full'} Flag to indicate whether to return a single value per cycle or the average values filled within a vector of the same size as values (Default value = 'compressed') func : function Function to call on the data in values for each cycle (Default np.mean). This can be any function, built-in or user defined, that processes a single vector of data returning a single value. Returns ------- ndarray Array containing the cycle-averaged values """ if (cycles.ndim > 1) and (cycles.shape[1] > 1): raise ValueError('Cycles for {0} IMFs passed in, \ please input the cycles for a single IMF'.format(cycles.shape[1])) logger.info('STARTED: get cycle stats') logger.debug('computing stats for {0} cycles over {1} samples'.format(cycles.max(), cycles.shape[0])) logger.debug('computing metric {0} and returning {1}-array'.format(func, mode)) if mode == 'compressed': out = np.zeros((cycles.max() + 1, )) * np.nan elif mode == 'full': out = np.zeros_like(values) * np.nan for cind in range(1, cycles.max() + 1): stat = func(values[cycles == cind]) if mode == 'compressed': out[cind] = stat elif mode == 'full': out[cycles == cind] = stat # Currently including the first value as the stat for 'non-cycles' in # compressed mode for backwards compatibility with earlier work, might be # confusing overall - should probably rethink this whether this makes any # sense if mode == 'compressed': out[0] = func(values[cycles == 0]) logger.info('COMPLETED: get cycle stats') return out
[docs]def get_control_points(x, good_cycles): """ Identify sets of control points from identified cycles. The control points are the ascending zero, peak, descending zero & trough. Parameters ---------- x : ndarray Input array of oscillatory data good_cycles : ndarray array whose content index cycle locations Returns ------- ndarray The control points for each cycle in x """ ctrl = list() for ii in range(1, good_cycles.max() + 1): cycle = x[good_cycles == ii] # Peak pk = sift.find_extrema(cycle)[0] # Ascending-zero crossing asc = np.where(np.diff(np.sign(cycle)) == -2)[0] # Trough tr = sift.find_extrema(-cycle)[0] # Replace values with nan if more or less than 1 ctrl point is found if len(pk) == 1: pk = pk[0] else: pk = np.nan if len(tr) == 1: tr = tr[0] else: tr = np.nan if len(asc) == 1: asc = asc[0] else: asc = np.nan # Append to list ctrl.append((0, pk, asc, tr, len(cycle)-1)) # Return as array return np.array(ctrl)
[docs]def get_cycle_chain(cycles, min_chain=1, drop_first=False, drop_last=False): """ Identify chains of valid cycles in a set of cycles. Parameters ---------- cycles : ndarray array whose content index cycle locations min_chain : integer Minimum length of chain to return (Default value = 1) drop_first : {bool, integer} Number of cycles to remove from start of chain (default is False) drop_last : {bool, integer} Number of cycles to remove from end of chain (default is False) Returns ------- list nested list of cycle numbers within each chain """ if cycles.ndim == 1: cycles = cycles[:, None] if drop_first is True: drop_first = 1 if drop_last is True: drop_last = 1 chains = list() chn = None # get diff to next cycle for each cycle for ii in range(1, cycles.max() + 1): if chn is None: chn = [ii] # Start new chain if there isn't one else: # We're currently in a chain - test whether current cycle is directly after previous cycle if cycles[np.where(cycles == ii)[0][0] - 1][0] == 0: # Start of new chain - store previous chain and start new one if len(chn) >= min_chain: # Drop chains which are too short if drop_first > 0: chn = chn[drop_first:] if drop_last > 0: chn = chn[:-drop_last] chains.append(chn) # Initialise next chain chn = [ii] else: # Continuation of previous chain chn.append(ii) # If we're at the end - store what we have if len(chn) >= min_chain: # Drop chains which are too short if drop_first > 0: chn = chn[drop_first:] if drop_last > 0: chn = chn[:-drop_last] chains.append(chn) return chains
[docs]def mean_vector(IP, X, mask=None): """ Compute the mean vector of a set of values wrapped around the unit circle. Parameters ---------- IP : ndarray Instantaneous Phase values X : ndarray Observations corresponding to IP values mask : (Default value = None) Returns ------- mv : ndarray Set of mean vectors """ phi = np.cos(IP) + 1j * np.sin(IP) mv = phi[:, None] * X return mv.mean(axis=0)
def basis_project(X, ncomps=1, ret_basis=False): """ Express a set of signals in a simple sine-cosine basis set Parameters ---------- IP : ndarray Instantaneous Phase values X : ndarray Observations corresponding to IP values ncomps : int Number of sine-cosine pairs to express signal in (default=1) ret_basis : bool Flag indicating whether to return basis set (default=False) Returns ------- basis : ndarray Set of values in basis dimensions """ nsamples = X.shape[0] basis = np.c_[np.cos(np.linspace(0, 2 * np.pi, nsamples)), np.sin(np.linspace(0, 2 * np.pi, nsamples))] if ncomps > 1: for ii in range(1, ncomps + 1): basis = np.c_[basis, np.cos(np.linspace(0, 2 * (ii + 1) * np.pi, nsamples)), np.sin(np.linspace(0, 2 * (ii + 1) * np.pi, nsamples))] basis = basis.T if ret_basis: return basis.dot(X), basis else: return basis.dot(X)
[docs]def kdt_match(x, y, K=15, distance_upper_bound=np.inf): """ Find unique nearest-neighbours between two n-dimensional feature sets. Useful for matching two sets of cycles on one or more features (ie amplitude and average frequency). Rows in x are matched to rows in y. As such - it is good to have (many) more rows in y than x if possible. This uses a k-dimensional tree to query for the K nearest neighbours and returns the closest unique neighbour. If no unique match is found - the row is not returned. Increasing K will find more matches but allow matches between more distant observations. Not advisable for use with more than a handful of features. Parameters ---------- x : ndarray [ num observations x num features ] array to match to y : ndarray [ num observations x num features ] array of potential matches K : int number of potential nearest-neigbours to query Returns ------- ndarray indices of matched observations in x ndarray indices of matched observations in y """ if x.ndim == 1: x = x[:, None] if y.ndim == 1: y = y[:, None] ## logging.info('Starting KD-Tree Match') msg = 'Matching {0} features from y ({1} observations) to x ({2} observations)' logging.info(msg.format(x.shape[1], y.shape[0], x.shape[0])) logging.debug('K: {0}, distance_upper_bound: {1}'.format(K, distance_upper_bound)) # Initialise Tree and find nearest neighbours from scipy import spatial kdt = spatial.cKDTree(y) D, inds = kdt.query(x, k=K, distance_upper_bound=distance_upper_bound) II = np.zeros_like(inds) selected = [] for ii in range(K): # Find unique values and their indices in this column uni, uni_inds = _unique_inds(inds[:, ii]) # Get index of lowest distance match amongst occurrences of each unique value ix = [np.argmin(D[uni_inds[jj], ii]) for jj in range(len(uni))] # Map closest match index to full column index closest_uni_inds = [uni_inds[jj][ix[jj]] for jj in range(len(uni))] # Remove duplicates and -1s (-1 indicates distance to neighbour is # above threshold) uni = uni[(uni != np.inf)] # Remove previously selected bo = np.array([u in selected for u in uni]) uni = uni[bo == False] # noqa: E712 # Find indices of matches between uniques and values in col uni_matches = np.zeros((inds.shape[0],)) uni_matches[closest_uni_inds] = np.sum(inds[closest_uni_inds, ii, None] == uni, axis=1) # Remove matches which are selected in previous columns uni_matches[II[:, :ii].sum(axis=1) > 0] = 0 # Mark remaining matches with 1s in this col II[np.where(uni_matches)[0], ii] = 1 selected.extend(inds[np.where(uni_matches)[0], ii]) msg = '{0} Matches in layer {1}' logging.debug(msg.format(np.sum(uni_matches), ii)) # Find column index of left-most choice per row (ie closest unique neighbour) winner = np.argmax(II, axis=1) # Find row index of winner final = np.zeros((II.shape[0],), dtype=int) for ii in range(II.shape[0]): if (np.sum(II[ii, :]) == 1) and (winner[ii] < y.shape[0]) and \ (inds[ii, winner[ii]] < y.shape[0]): final[ii] = inds[ii, winner[ii]] else: final[ii] = -1 #  No good match # Remove failed matches uni, cnt = np.unique(final, return_counts=True) x_inds = np.where(final > -1)[0] y_inds = final[x_inds] ## logging.info('Returning {0} matched observations'.format(x_inds.shape[0])) return x_inds, y_inds
def _unique_inds(ar): """ Find the unique elements of an array, ignoring shape. Adapted from numpy.lib.arraysetops._unique1d Original function only returns index of first occurrence of unique value """ ar = np.asanyarray(ar).flatten() ar.sort() aux = ar mask = np.empty(aux.shape, dtype=np.bool_) mask[:1] = True mask[1:] = aux[1:] != aux[:-1] ar_inds = [np.where(ar == ii)[0] for ii in ar[mask]] return ar[mask], ar_inds