Source code for emd.sift

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

"""
Implementations of the sift algorithm for Empirical Mode Decomposition.

Main Routines:
  sift                    - The classic sift algorithm
  ensemble_sift           - Noise-assisted sift algorithm
  complete_ensemble_sift  - Adapeted noise-assisted sift algorithm
  mask_sift               - Sift with masks to separate very sparse or nonlinear components
  iterated_mask_sift      - Sift which automatically identifies optimal masks
  sift_second_layer       - Apply sift to amplitude envlope of a set of IMFs

Sift Helper Routines:
  get_next_imf
  get_next_imf_mask
  get_mask_freqs
  energy_difference
  stop_imf_energy
  stop_imf_sd
  stop_imf_rilling
  stop_imf_fixed_iter

Sift Config:
  get_config
  SiftConfig

"""

import collections
import functools
import inspect
import logging
import sys

import numpy as np
import yaml
from scipy.stats import zscore

from ._sift_core import (_find_extrema, get_padded_extrema, interp_envelope,
                         zero_crossing_count)
from .logger import sift_logger, wrap_verbose
from .spectra import frequency_transform
from .support import (EMDSiftCovergeError, ensure_1d_with_singleton, ensure_2d,
                      ensure_equal_dims, run_parallel)

# Housekeeping for logging
logger = logging.getLogger(__name__)


##################################################################
# Basic SIFT

# Utilities

[docs] def get_next_imf(X, env_step_size=1, max_iters=1000, energy_thresh=50, stop_method='sd', sd_thresh=.1, rilling_thresh=(0.05, 0.5, 0.05), envelope_opts=None, extrema_opts=None): """Compute the next IMF from a data set. This is a helper function used within the more general sifting functions. Parameters ---------- X : ndarray [nsamples x 1] 1D input array containing the time-series data to be decomposed env_step_size : float Scaling of envelope prior to removal at each iteration of sift. The average of the upper and lower envelope is muliplied by this value before being subtracted from the data. Values should be between 0 > x >= 1 (Default value = 1) max_iters : int > 0 Maximum number of iterations to compute before throwing an error energy_thresh : float > 0 Threshold for energy difference (in decibels) between IMF and residual to suggest stopping overall sift. (Default is None, recommended value is 50) stop_method : {'sd','rilling','fixed'} Flag indicating which metric to use to stop sifting and return an IMF. sd_thresh : float Used if 'stop_method' is 'sd'. The threshold at which the sift of each IMF will be stopped. (Default value = .1) rilling_thresh : tuple Used if 'stop_method' is 'rilling', needs to contain three values (sd1, sd2, alpha). An evaluation function (E) is defined by dividing the residual by the mode amplitude. The sift continues until E < sd1 for the fraction (1-alpha) of the data, and E < sd2 for the remainder. See section 3.2 of http://perso.ens-lyon.fr/patrick.flandrin/NSIP03.pdf Returns ------- proto_imf : ndarray 1D vector containing the next IMF extracted from X continue_flag : bool Boolean indicating whether the sift can be continued beyond this IMF Other Parameters ---------------- envelope_opts : dict Optional dictionary of keyword arguments to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.sift emd.sift.interp_envelope """ X = ensure_1d_with_singleton([X], ['X'], 'get_next_imf') if envelope_opts is None: envelope_opts = {} proto_imf = X.copy() continue_imf = True # TODO - assess this properly here, return input if already passing! continue_flag = True niters = 0 while continue_imf: if stop_method != 'fixed': if niters == 3*max_iters//4: logger.debug('Sift reached {0} iterations, taking a long time to coverge'.format(niters)) elif niters > max_iters: msg = 'Sift failed. No covergence after {0} iterations'.format(niters) raise EMDSiftCovergeError(msg) niters += 1 # Compute envelopes, local mean and next proto imf upper, lower = interp_envelope(proto_imf, mode='both', **envelope_opts, extrema_opts=extrema_opts) # If upper or lower are None we should stop sifting altogether if upper is None or lower is None: continue_flag = False continue_imf = False logger.debug('Finishing sift: IMF has no extrema') continue # Find local mean avg = np.mean([upper, lower], axis=0)[:, None] # Remove local mean estimate from proto imf #x1 = proto_imf - avg next_proto_imf = proto_imf - (env_step_size*avg) # Evaluate if we should stop the sift - methods are very different in # requirements here... # Stop sifting if we pass threshold if stop_method == 'sd': # Cauchy criterion stop, _ = stop_imf_sd(proto_imf, next_proto_imf, sd=sd_thresh, niters=niters) elif stop_method == 'rilling': # Rilling et al 2003 - this actually evaluates proto_imf NOT next_proto_imf stop, _ = stop_imf_rilling(upper, lower, niters=niters, sd1=rilling_thresh[0], sd2=rilling_thresh[1], tol=rilling_thresh[2]) if stop: next_proto_imf = proto_imf elif stop_method == 'energy': # Rato et al 2008 # Compare energy of signal at start of sift with energy of envelope average stop, _ = stop_imf_energy(X, avg, thresh=energy_thresh, niters=niters) elif stop_method == 'fixed': stop = stop_imf_fixed_iter(niters, max_iters) else: raise ValueError("stop_method '{0}' not recogised".format(stop_method)) proto_imf = next_proto_imf if stop: continue_imf = False continue if proto_imf.ndim == 1: proto_imf = proto_imf[:, None] return proto_imf, continue_flag
def _energy_difference(imf, residue): """Compute energy change in IMF during a sift. Parameters ---------- imf : ndarray IMF to be evaluated residue : ndarray Remaining signal after IMF removal Returns ------- float Energy difference in decibels Notes ----- This function is used during emd.sift.stop_imf_energy to implement the energy-difference sift-stopping method defined in section 3.2.4 of https://doi.org/10.1016/j.ymssp.2007.11.028 """ sumsqr = np.sum(imf**2) imf_energy = 20 * np.log10(sumsqr, where=sumsqr > 0) sumsqr = np.sum(residue ** 2) resid_energy = 20 * np.log10(sumsqr, where=sumsqr > 0) return imf_energy-resid_energy
[docs] def stop_imf_energy(imf, residue, thresh=50, niters=None): """Compute energy change in IMF during a sift. The energy in the IMFs are compared to the energy at the start of sifting. The sift terminates once this ratio reaches a predefined threshold. Parameters ---------- imf : ndarray IMF to be evaluated residue : ndarray Average of the upper and lower envelopes thresh : float Energy ratio threshold (default=50) niters : int Number of sift iterations currently completed Returns ------- bool A flag indicating whether to stop siftingg float Energy difference in decibels Notes ----- This function implements the energy-difference sift-stopping method defined in section 3.2.4 of https://doi.org/10.1016/j.ymssp.2007.11.028 """ diff = _energy_difference(imf, residue) stop = bool(diff > thresh) if stop: logger.debug('Sift stopped by Energy Ratio in {0} iters with difference of {1}dB'.format(niters, diff)) else: logger.debug('Energy Ratio evaluated at iter {0} is : {1}dB'.format(niters, diff)) return stop, diff
[docs] def stop_imf_sd(proto_imf, prev_imf, sd=0.2, niters=None): """Compute the sd sift stopping metric. Parameters ---------- proto_imf : ndarray A signal which may be an IMF prev_imf : ndarray The previously identified IMF sd : float The stopping threshold niters : int Number of sift iterations currently completed niters : int Number of sift iterations currently completed Returns ------- bool A flag indicating whether to stop siftingg float The SD metric value """ metric = np.sum((prev_imf - proto_imf)**2) / np.sum(prev_imf**2) stop = metric < sd if stop: logger.verbose('Sift stopped by SD-thresh in {0} iters with sd {1}'.format(niters, metric)) else: logger.debug('SD-thresh stop metric evaluated at iter {0} is : {1}'.format(niters, metric)) return stop, metric
[docs] def stop_imf_rilling(upper_env, lower_env, sd1=0.05, sd2=0.5, tol=0.05, niters=None): """Compute the Rilling et al 2003 sift stopping metric. This metric tries to guarantee globally small fluctuations in the IMF mean while taking into account locally large excursions that may occur in noisy signals. Parameters ---------- upper_env : ndarray The upper envelope of a proto-IMF lower_env : ndarray The lower envelope of a proto-IMF sd1 : float The maximum threshold for globally small differences from zero-mean sd2 : float The maximum threshold for locally large differences from zero-mean tol : float (0 < tol < 1) (1-tol) defines the proportion of time which may contain large deviations from zero-mean niters : int Number of sift iterations currently completed Returns ------- bool A flag indicating whether to stop siftingg float The SD metric value Notes ----- This method is described in section 3.2 of: Rilling, G., Flandrin, P., & Goncalves, P. (2003, June). On empirical mode decomposition and its algorithms. In IEEE-EURASIP workshop on nonlinear signal and image processing (Vol. 3, No. 3, pp. 8-11). NSIP-03, Grado (I). http://perso.ens-lyon.fr/patrick.flandrin/NSIP03.pdf """ avg_env = (upper_env+lower_env)/2 amp = np.abs(upper_env-lower_env)/2 eval_metric = np.abs(avg_env)/amp metric = np.mean(eval_metric > sd1) continue1 = metric > tol continue2 = np.any(eval_metric > sd2) stop = (continue1 or continue2) == False # noqa: E712 if stop: logger.verbose('Sift stopped by Rilling-metric in {0} iters (val={1})'.format(niters, metric)) else: logger.debug('Rilling stop metric evaluated at iter {0} is : {1}'.format(niters, metric)) return stop, metric
[docs] def stop_imf_fixed_iter(niters, max_iters): """Compute the fixed-iteraiton sift stopping metric. Parameters ---------- niters : int Number of sift iterations currently completed max_iters : int Maximum number of sift iterations to be completed Returns ------- bool A flag indicating whether to stop siftingg """ stop = bool(niters == max_iters) if stop: logger.debug('Sift stopped at fixed number of {0} iterations'.format(niters)) return stop
def _nsamples_warn(N, max_imfs): if max_imfs is None: return if N < 2**(max_imfs+1): msg = 'Inputs samples ({0}) is small for specified max_imfs ({1})' msg += ' very likely that {2} or fewer imfs are returned' logger.warning(msg.format(N, max_imfs, np.floor(np.log2(N)).astype(int)-1)) def _set_rilling_defaults(rilling_thresh): rilling_thresh = (0.05, 0.5, 0.05) if rilling_thresh is True else rilling_thresh return rilling_thresh # SIFT implementation
[docs] @wrap_verbose @sift_logger('sift') def sift(X, sift_thresh=1e-8, energy_thresh=50, rilling_thresh=None, max_imfs=None, verbose=None, return_residual=True, imf_opts=None, envelope_opts=None, extrema_opts=None): """Compute Intrinsic Mode Functions from an input data vector. This function implements the original sift algorithm [1]_. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema verbose : {None,'CRITICAL','WARNING','INFO','DEBUG'} Option to override the EMD logger level for a call to this function. See Also -------- emd.sift.get_next_imf emd.sift.get_config Notes ----- The classic sift is computed by passing an input vector with all options left to default >>> imf = emd.sift.sift(x) The sift can be customised by passing additional options, here we only compute the first four IMFs. >>> imf = emd.sift.sift(x, max_imfs=4) More detailed options are passed as dictionaries which are passed to the relevant lower-level functions. For instance `imf_opts` are passed to `get_next_imf`. >>> imf_opts = {'env_step_size': 1/3, 'stop_method': 'rilling'} >>> imf = emd.sift.sift(x, max_imfs=4, imf_opts=imf_opts) A modified dictionary of all options can be created using `get_config`. This can be modified and used by unpacking the options into a `sift` call. >>> conf = emd.sift.get_config('sift') >>> conf['max_imfs'] = 4 >>> conf['imf_opts'] = imf_opts >>> imfs = emd.sift.sift(x, **conf) References ---------- .. [1] Huang, N. E., Shen, Z., Long, S. R., Wu, M. C., Shih, H. H., Zheng, Q., … Liu, H. H. (1998). The empirical mode decomposition and the Hilbert spectrum for nonlinear and non-stationary time series analysis. Proceedings of the Royal Society of London. Series A: Mathematical, Physical and Engineering Sciences, 454(1971), 903–995. https://doi.org/10.1098/rspa.1998.0193 """ if not imf_opts: imf_opts = {'env_step_size': 1, 'sd_thresh': .1} rilling_thresh = _set_rilling_defaults(rilling_thresh) X = ensure_1d_with_singleton([X], ['X'], 'sift') _nsamples_warn(X.shape[0], max_imfs) layer = 0 # Only evaluate peaks and if already an IMF if rilling is specified. continue_sift = check_sift_continue(X, X, layer, max_imfs=max_imfs, sift_thresh=None, energy_thresh=None, rilling_thresh=rilling_thresh, envelope_opts=envelope_opts, extrema_opts=extrema_opts, merge_tests=True) proto_imf = X.copy() while continue_sift: logger.info('sifting IMF : {0}'.format(layer)) next_imf, continue_sift = get_next_imf(proto_imf, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) if layer == 0: imf = next_imf else: imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] layer += 1 # Check if sifting should continue - all metrics whose thresh is not # None will be assessed and sifting will stop if any metric says so continue_sift = check_sift_continue(X, proto_imf, layer, max_imfs=max_imfs, sift_thresh=sift_thresh, energy_thresh=energy_thresh, rilling_thresh=rilling_thresh, envelope_opts=envelope_opts, extrema_opts=extrema_opts, merge_tests=True) # Append final residual as last mode - unless its empty if np.sum(np.abs(proto_imf)) != 0: imf = np.c_[imf, proto_imf] return imf
def check_sift_continue(X, residual, layer, max_imfs=None, sift_thresh=1e-8, energy_thresh=50, rilling_thresh=None, envelope_opts=None, extrema_opts=None, merge_tests=True): """Run checks to see if siftiing should continue into another layer. Parameters ---------- X : ndarray 1D array containing the data being decomposed residual : ndarray 1D array containing the current residuals (X - imfs so far) layer : int Current IMF number being decomposed max_imf : int Largest number of IMFs to compute sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) energy_thresh : float The difference in energy between the raw data and the residuals in decibels at which we stop sifting (default = 50). rilling_thresh : tuple or None Tuple (or tuple-like) containing three values (sd1, sd2, alpha). An evaluation function (E) is defined by dividing the residual by the mode amplitude. The sift continues until E < sd1 for the fraction (1-alpha) of the data, and E < sd2 for the remainder. See section 3.2 of http://perso.ens-lyon.fr/patrick.flandrin/NSIP03.pdf envelope_opts : dict or None Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict or None Optional dictionary of keyword options to be passed to emd.get_padded_extrema Returns ------- bool Flag indicating whether to stop sifting. """ continue_sift = [None, None, None, None, None] # Check if we've reached the pre-specified number of IMFs if max_imfs is not None and layer == max_imfs: logger.info('Finishing sift: reached max number of imfs ({0})'.format(layer)) continue_sift[0] = False else: continue_sift[0] = True # Check if residual has enough peaks to sift again pks, _ = _find_extrema(residual) trs, _ = _find_extrema(-residual) if len(pks) < 2 or len(trs) < 2: logger.info('Finishing sift: {0} peaks {1} trough in residual'.format(len(pks), len(trs))) continue_sift[1] = False else: continue_sift[1] = True # Optional: Check if the sum-sqr of the resduals is below the sift_thresh sumsq_resid = np.abs(residual).sum() if sift_thresh is not None and sumsq_resid < sift_thresh: logger.info('Finishing sift: reached threshold {0}'.format(sumsq_resid)) continue_sift[2] = False else: continue_sift[2] = True # Optional: Check if energy_ratio of residual to original signal is below thresh energy_ratio = _energy_difference(X, residual) if energy_thresh is not None and energy_ratio > energy_thresh: logger.info('Finishing sift: reached energy ratio {0}'.format(energy_ratio)) continue_sift[3] = False else: continue_sift[3] = True # Optional: Check if the residual is already an IMF with Rilling method - # only run if we have enough extrema if rilling_thresh is not None and continue_sift[1]: upper, lower = interp_envelope(residual, mode='both', **envelope_opts, extrema_opts=extrema_opts) rilling_continue_sift, rilling_metric = stop_imf_rilling(upper, lower, niters=-1) if rilling_continue_sift is False: logger.info('Finishing sift: reached rilling {0}'.format(rilling_metric)) continue_sift[4] = False else: continue_sift[4] = True if merge_tests: # Merge tests that aren't none - return False for any Falses return np.any([x == False for x in continue_sift if x is not None]) == False # noqa: E712 else: return continue_sift ################################################################## # Ensemble SIFT variants # Utilities def _sift_with_noise(X, noise_scaling=None, noise=None, noise_mode='single', sift_thresh=1e-8, max_imfs=None, job_ind=1, imf_opts=None, envelope_opts=None, extrema_opts=None): """Apply white noise to a signal prior to computing a sift. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed noise_scaling : float Standard deviation of noise to add to each ensemble (Default value = None) noise : ndarray array of noise values the same size as X to add prior to sift (Default value = None) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) job_ind : 1 Optional job index value for display in logger (Default value = 1) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.ensemble_sift emd.sift.complete_ensemble_sift emd.sift.get_next_imf """ if job_ind is not None: logger.info('Starting SIFT Ensemble: {0}'.format(job_ind)) if noise is None: noise = np.random.randn(*X.shape) X = ensure_1d_with_singleton([X], ['X'], 'sift') ensure_equal_dims([X, noise], ['X', 'noise'], '_sift_with_noise', dim=0) if noise_scaling is not None: noise = noise * noise_scaling ensX = X.copy() + noise imf = sift(ensX, sift_thresh=sift_thresh, max_imfs=max_imfs, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) if noise_mode == 'single': return imf elif noise_mode == 'flip': ensX = X.copy() - noise imf += sift(ensX, sift_thresh=sift_thresh, max_imfs=max_imfs, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) return imf / 2 # Implementation
[docs] @wrap_verbose @sift_logger('ensemble_sift') def ensemble_sift(X, nensembles=4, ensemble_noise=.2, noise_mode='single', noise_seed=None, nprocesses=1, sift_thresh=1e-8, max_imfs=None, verbose=None, imf_opts=None, envelope_opts=None, extrema_opts=None): """Compute Intrinsic Mode Functions with the ensemble EMD. This function implements the ensemble empirical model decomposition algorithm defined in [1]_. This approach sifts an ensemble of signals with white-noise added and treats the mean IMFs as the result. The resulting IMFs from the ensemble sift resembles a dyadic filter [2]_. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed nensembles : int Integer number of different ensembles to compute the sift across. ensemble_noise : float Standard deviation of noise to add to each ensemble (Default value = .2) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') noise_seed : int seed value to use for random noise generation. nprocesses : int Integer number of parallel processes to compute. Each process computes a single realisation of the total ensemble (Default value = 1) sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf. envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema verbose : {None,'CRITICAL','WARNING','INFO','DEBUG'} Option to override the EMD logger level for a call to this function. See Also -------- emd.sift.get_next_imf References ---------- .. [1] Wu, Z., & Huang, N. E. (2009). Ensemble Empirical Mode Decomposition: A Noise-Assisted Data Analysis Method. Advances in Adaptive Data Analysis, 1(1), 1–41. https://doi.org/10.1142/s1793536909000047 .. [2] Wu, Z., & Huang, N. E. (2004). A study of the characteristics of white noise using the empirical mode decomposition method. Proceedings of the Royal Society of London. Series A: Mathematical, Physical and Engineering Sciences, 460(2046), 1597–1611. https://doi.org/10.1098/rspa.2003.1221 """ if noise_mode not in ['single', 'flip']: raise ValueError( 'noise_mode: {0} not recognised, please use \'single\' or \'flip\''.format(noise_mode)) X = ensure_1d_with_singleton([X], ['X'], 'sift') _nsamples_warn(X.shape[0], max_imfs) # Noise is defined with respect to variance in the data noise_scaling = X.std() * ensemble_noise if noise_seed is not None: np.random.seed(noise_seed) # Create partial function containing everything we need to run one iteration pfunc = functools.partial(_sift_with_noise, X, noise_scaling=noise_scaling, noise=None, noise_mode=noise_mode, sift_thresh=sift_thresh, max_imfs=max_imfs, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) # Run the actual sifting - in parallel if requested args = [[] for ii in range(nensembles)] res = run_parallel(pfunc, args, nprocesses=nprocesses) # Keep largest group of ensembles with matching number of imfs. nimfs = [r.shape[1] for r in res] uni, unic = np.unique(nimfs, return_counts=True) target_imfs = uni[np.argmax(unic)] # Adjust for max_imfs if it was defined if (max_imfs is not None) and (target_imfs > max_imfs): target_imfs = max_imfs msg = 'Retaining {0} ensembles ({1}%) each with {2} IMFs' logger.info(msg.format(np.max(unic), 100*(np.max(unic)/nensembles), target_imfs)) # Take average across ensembles imfs = np.zeros((X.shape[0], target_imfs)) for ii in range(target_imfs): imfs[:, ii] = np.array([r[:, ii] for r in res if r.shape[1] >= target_imfs]).mean(axis=0) return imfs
[docs] @wrap_verbose @sift_logger('complete_ensemble_sift') def complete_ensemble_sift(X, nensembles=4, ensemble_noise=.2, nprocesses=1, noise_seed=None, sift_thresh=1e-8, energy_thresh=50, rilling_thresh=None, max_imfs=None, verbose=None, imf_opts=None, envelope_opts=None, extrema_opts=None): """Compute Intrinsic Mode Functions with complete ensemble EMD. This function implements the complete ensemble empirical model decomposition algorithm defined in [1]_. This approach sifts an ensemble of signals with white-noise added taking a single IMF across all ensembles at before moving to the next IMF. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed nensembles : int Integer number of different ensembles to compute the sift across. ensemble_noise : float Standard deviation of noise to add to each ensemble (Default value = .2) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') nprocesses : int Integer number of parallel processes to compute. Each process computes a single realisation of the total ensemble (Default value = 1) sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. noise: array_like The Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf. envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema verbose : {None,'CRITICAL','WARNING','INFO','DEBUG'} Option to override the EMD logger level for a call to this function. See Also -------- emd.sift.get_next_imf References ---------- .. [1] Torres, M. E., Colominas, M. A., Schlotthauer, G., & Flandrin, P. (2011). A complete ensemble empirical mode decomposition with adaptive noise. In 2011 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE. https://doi.org/10.1109/icassp.2011.5947265 """ X = ensure_1d_with_singleton([X], ['X'], 'sift') imf_opts = {} if imf_opts is None else imf_opts envelope_opts = {} if envelope_opts is None else envelope_opts _nsamples_warn(X.shape[0], max_imfs) # Work with normalised units internally - easier for noise scaling Xstd = X.std() X = X / Xstd # Compute white noise if noise_seed is not None: np.random.seed(noise_seed) white_noise = zscore(np.random.randn(nensembles, X.shape[0]), axis=1) # Compute white noise modes - sift each to completion modes_white_noise = [sift(white_noise[ii, :], imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) for ii in range(nensembles)] # Define the core sifting func and options - this is applied to compute # successive IMFs in the main loop pfunc = functools.partial(get_next_imf, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) # Wrapper to return local mean terms rather than IMFs - could make this an # option in get_next_imf in future def get_next_local_mean(X): X = ensure_1d_with_singleton([X], ['X'], 'get_next_local_mean') imf, flag = pfunc(X) return X - imf, flag # Get first local mean from across ensemble args = [] for ii in range(nensembles): scaled_noise = ensemble_noise*modes_white_noise[ii][:, 0]/modes_white_noise[ii][:, 0].std() args.append([X + scaled_noise[:, np.newaxis]]) res = run_parallel(get_next_local_mean, args, nprocesses=nprocesses) # Finaly local mean is average across all local_mean = np.array([r[0] for r in res]).mean(axis=0) # IMF is data minus final local mean imf = X - local_mean residue = local_mean # Prep for loop layer = 1 # continue_sift = _ceemdan_check_continue(local_mean, sift_thresh) continue_sift = check_sift_continue(X, local_mean, layer, max_imfs=max_imfs, sift_thresh=sift_thresh, energy_thresh=energy_thresh, rilling_thresh=rilling_thresh, envelope_opts=envelope_opts, extrema_opts=extrema_opts, merge_tests=True) snrflag = 1 while continue_sift: # Prepare noise for ensembles args = [] for ii in range(nensembles): noise = modes_white_noise[ii][:, layer].copy() if snrflag == 2: noise = noise / noise.std() noise = ensemble_noise * noise # Sift current local-mean + each noise process args.append([local_mean[:, 0]+noise*local_mean.std()]) res = run_parallel(get_next_local_mean, args, nprocesses=nprocesses) # New local mean is the mean of local means (resid_i - imf_i) across ensemble local_mean = np.array([r[0] for r in res]).mean(axis=0) # New IMF is current residue minus new local mean imf = np.c_[imf, (residue[:, -1] - local_mean[:, 0])[:, None]] # Next residue is current new local mean residue = np.c_[residue, local_mean] # Check if sifting should continue - all metrics whose thresh is not # None will be assessed and sifting will stop if any metric says so continue_sift = check_sift_continue(X, local_mean, layer, max_imfs=max_imfs, sift_thresh=sift_thresh, energy_thresh=energy_thresh, rilling_thresh=rilling_thresh, envelope_opts=envelope_opts, extrema_opts=extrema_opts, merge_tests=True) layer += 1 # Concatenate final IMF imf = np.c_[imf, local_mean] # Reinstate original variance imf = imf * Xstd return imf
################################################################## # Mask SIFT implementations # Utilities
[docs] def get_next_imf_mask(X, z, amp, nphases=4, nprocesses=1, imf_opts=None, envelope_opts=None, extrema_opts=None): """Compute the next IMF from a data set a mask sift. This is a helper function used within the more general sifting functions. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed z : float Mask frequency as a proportion of the sampling rate, values between 0->z->.5 amp : float Mask amplitude nphases : int > 0 The number of separate sinusoidal masks to apply for each IMF, the phase of masks are uniformly spread across a 0<=p<2pi range (Default=4). nprocesses : int Integer number of parallel processes to compute. Each process computes an IMF from the signal plus a mask. nprocesses should be less than or equal to nphases, no additional benefit from setting nprocesses > nphases (Default value = 1) Returns ------- proto_imf : ndarray 1D vector containing the next IMF extracted from X continue_sift : bool Boolean indicating whether the sift can be continued beyond this IMF Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.mask_sift emd.sift.get_next_imf """ X = ensure_1d_with_singleton([X], ['X'], 'get_next_imf_mask') if imf_opts is None: imf_opts = {} logger.info("Defining masks with freq {0} and amp {1} at {2} phases".format(z, amp, nphases)) # Create normalised freq zf = z * 2 * np.pi # Create time matrix including mask phase-shifts t = np.repeat(np.arange(X.shape[0])[:, np.newaxis], nphases, axis=1) phases = np.linspace(0, (2*np.pi), nphases+1)[:nphases] # Create masks m = amp * np.cos(zf * t + phases) # Work with a partial function to make the parallel loop cleaner # This partial function contains all the settings which will be constant across jobs. pfunc = functools.partial(get_next_imf, **imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) args = [[X+m[:, ii, np.newaxis]] for ii in range(nphases)] res = run_parallel(pfunc, args, nprocesses=nprocesses) # Collate results imfs = [r[0] for r in res] continue_flags = [r[1] for r in res] # star map should preserve the order of outputs so we can remove masks easily imfs = np.concatenate(imfs, axis=1) - m logger.verbose('Averaging across {0} proto IMFs'.format(imfs.shape[1])) return imfs.mean(axis=1)[:, np.newaxis], np.any(continue_flags)
def get_mask_freqs(X, first_mask_mode='zc', imf_opts=None): """Determine mask frequencies for a sift. Parameters ---------- X : ndarray Vector time-series first_mask_mode : (str, float<0.5) Either a string denoting a method {'zc', 'if'} or a float determining and initial frequency. See notes for more details. imf_opts : dict Options to be passed to get_next_imf if first_mask_mode is 'zc' or 'if'. Returns ------- float Frequency for the first mask in normalised units. """ if imf_opts is None: imf_opts = {} if first_mask_mode in ('zc', 'if'): logger.info('Computing first mask frequency with method {0}'.format(first_mask_mode)) logger.info('Getting first IMF with no mask') # First IMF is computed normally imf, _ = get_next_imf(X, **imf_opts) # Compute first mask frequency from first IMF if first_mask_mode == 'zc': num_zero_crossings = zero_crossing_count(imf)[0, 0] z = num_zero_crossings / imf.shape[0] / 2 logger.info('Found first mask frequency of {0}'.format(z)) elif first_mask_mode == 'if': _, IF, IA = frequency_transform(imf[:, 0, None], 1, 'nht', smooth_phase=3) z = np.average(IF, weights=IA) logger.info('Found first mask frequency of {0}'.format(z)) elif isinstance(first_mask_mode, (int, float)): if first_mask_mode <= 0 or first_mask_mode > .5: raise ValueError("The frequency of the first mask must be 0 <= x < 0.5") logger.info('Using specified first mask frequency of {0}'.format(first_mask_mode)) z = first_mask_mode return z # Implementation
[docs] @wrap_verbose @sift_logger('mask_sift') def mask_sift(X, mask_amp=1, mask_amp_mode='ratio_sig', mask_freqs='zc', mask_step_factor=2, ret_mask_freq=False, max_imfs=9, sift_thresh=1e-8, nphases=4, nprocesses=1, verbose=None, imf_opts=None, envelope_opts=None, extrema_opts=None): """Compute Intrinsic Mode Functions using a mask sift. This function implements a masked sift from a dataset using a set of masking signals to reduce mixing of components between modes [1]_, multiple masks of different phases can be applied when isolating each IMF [2]_. This function can either compute the mask frequencies based on the fastest dynamics in the data (the properties of the first IMF from a standard sift) or apply a pre-specified set of masks. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed mask_amp : float or array_like Amplitude of mask signals as specified by mask_amp_mode. If float the same value is applied to all IMFs, if an array is passed each value is applied to each IMF in turn (Default value = 1) mask_amp_mode : {'abs','ratio_imf','ratio_sig'} Method for computing mask amplitude. Either in absolute units ('abs'), or as a ratio of the standard deviation of the input signal ('ratio_sig') or previous imf ('ratio_imf') (Default value = 'ratio_imf') mask_freqs : {'zc','if',float,,array_like} Define the set of mask frequencies to use. If 'zc' or 'if' are passed, the frequency of the first mask is taken from either the zero-crossings or instantaneous frequnecy the first IMF of a standard sift on the data. If a float is passed this is taken as the first mask frequency. Subsequent masks are defined by the mask_step_factor. If an array_like vector is passed, the values in the vector will specify the mask frequencies. mask_step_factor : float Step in frequency between successive masks (Default value = 2) mask_type : {'all','sine','cosine'} Which type of masking signal to use. 'sine' or 'cosine' options return the average of a +ve and -ve flipped wave. 'all' applies four masks: sine and cosine with +ve and -ve sign and returns the average of all four. nphases : int > 0 The number of separate sinusoidal masks to apply for each IMF, the phase of masks are uniformly spread across a 0<=p<2pi range (Default=4). ret_mask_freq : bool Boolean flag indicating whether mask frequencies are returned (Default value = False) max_imfs : int The maximum number of IMFs to compute. (Default value = None) sift_thresh : float The threshold at which the overall sifting process will stop. (Default value = 1e-8) Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. mask_freqs : ndarray 1D array of mask frequencies, if ret_mask_freq is set to True. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema verbose : {None,'CRITICAL','WARNING','INFO','DEBUG'} Option to override the EMD logger level for a call to this function. Notes ----- Here are some example mask_sift variants you can run: A mask sift in which the mask frequencies are determined with zero-crossings and mask amplitudes by a ratio with the amplitude of the previous IMF (note - this is also the default): >>> imf = emd.sift.mask_sift(X, mask_amp_mode='ratio_imf', mask_freqs='zc') A mask sift in which the first mask is set at .4 of the sampling rate and subsequent masks found by successive division of this mask_freq by 3: >>> imf = emd.sift.mask_sift(X, mask_freqs=.4, mask_step_factor=3) A mask sift using user specified frequencies and amplitudes: >>> mask_freqs = np.array([.4,.2,.1,.05,.025,0]) >>> mask_amps = np.array([2,2,1,1,.5,.5]) >>> imf = emd.sift.mask_sift(X, mask_freqs=mask_freqs, mask_amp=mask_amps, mask_amp_mode='abs') See Also -------- emd.sift.get_next_imf emd.sift.get_next_imf_mask References ---------- .. [1] Ryan Deering, & James F. Kaiser. (2005). The Use of a Masking Signal to Improve Empirical Mode Decomposition. In Proceedings. (ICASSP ’05). IEEE International Conference on Acoustics, Speech, and Signal Processing, 2005. IEEE. https://doi.org/10.1109/icassp.2005.1416051 .. [2] Tsai, F.-F., Fan, S.-Z., Lin, Y.-S., Huang, N. E., & Yeh, J.-R. (2016). Investigating Power Density and the Degree of Nonlinearity in Intrinsic Components of Anesthesia EEG by the Hilbert-Huang Transform: An Example Using Ketamine and Alfentanil. PLOS ONE, 11(12), e0168108. https://doi.org/10.1371/journal.pone.0168108 """ X = ensure_1d_with_singleton([X], ['X'], 'sift') # if first mask is if or zc - compute first imf as normal and get freq if isinstance(mask_freqs, (list, tuple, np.ndarray)): logger.info('Using user specified masks') if len(mask_freqs) < max_imfs: max_imfs = len(mask_freqs) logger.info("Reducing max_imfs to {0} as len(mask_freqs) < max_imfs".format(max_imfs)) elif mask_freqs in ['zc', 'if'] or isinstance(mask_freqs, float): z = get_mask_freqs(X, mask_freqs, imf_opts=imf_opts) mask_freqs = np.array([z/mask_step_factor**ii for ii in range(max_imfs)]) _nsamples_warn(X.shape[0], max_imfs) # Initialise mask amplitudes if mask_amp_mode == 'ratio_imf': sd = X.std() # Take ratio of input signal for first IMF elif mask_amp_mode == 'ratio_sig': sd = X.std() elif mask_amp_mode == 'abs': sd = 1 continue_sift = True imf_layer = 0 proto_imf = X.copy() imf = [] while continue_sift: # Update mask amplitudes if needed if mask_amp_mode == 'ratio_imf' and imf_layer > 0: sd = imf[:, -1].std() if isinstance(mask_amp, (int, float)): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[imf_layer] * sd logger.info('Sifting IMF-{0}'.format(imf_layer)) next_imf, continue_sift = get_next_imf_mask(proto_imf, mask_freqs[imf_layer], amp, nphases=nphases, nprocesses=nprocesses, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) if imf_layer == 0: imf = next_imf else: imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] if max_imfs is not None and imf_layer == max_imfs-1: logger.info('Finishing sift: reached max number of imfs ({0})'.format(imf.shape[1])) continue_sift = False if np.abs(next_imf).sum() < sift_thresh: continue_sift = False imf_layer += 1 if ret_mask_freq: return imf, mask_freqs else: return imf
[docs] @wrap_verbose @sift_logger('iterated_mask_sift') def iterated_mask_sift(X, # Iterated mask sift arguments mask_0='zc', w_method='power', max_iter=15, iter_th=0.1, N_avg=1, exclude_edges=False, sample_rate=1.0, seed=None, # Standard mask sift arguments - specify a couple which need defaults. max_imfs=6, ret_mask_freq=False, mask_amp_mode='ratio_imf', **kwargs): """Compute Intrinsic Mode Functions using an iterated mask sift. This function implements a masked sift from a dataset using a set of masking signals to reduce mixing of components between modes [1]_, multiple masks of different phases can be applied when isolating each IMF [2]_. Mask frequencies are determined automatically by an iterative process [3]_. The iteration can be started with either a random mask, a mask based on the fastest dynamics (same as 'zc' in mask_sift), or a pre-specified mask. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed mask_0 : {array_like, 'zc', 'random'} Initial mask for the iteration process, can be one of: * 'zc' or 'if' initialises with the masks chosen by the zero-crossing count or instantaneous frequency method in the standard mask sift. * 'random' chooses random integers between 0 and sample_rate/4 as the starting mask. seed=int can be optionally passed to control the random seed in numpy. * array-like needs to be in normalised units, i.e. divided by the sample rate. (Default value = 'zc') w_method : {'amplitude', 'power', float, None} Weighting method to use in the iteration process. 'amplitude' weights frequencies by the instantaneous amplitude, 'power' by its square. If a float is passed, the amplitude is raised to that exponent before averaging. None performs a simple average without weighting. (Default value = 'power') max_imfs : int The maximum number of IMFs to compute. (Default value = 6) max_iter : int The maximum number of iterations to compute. (Default value = 15) iter_th : float Relative mask variability threshold below which iteration is stopped. (Default value = 0.1) N_avg : int Number of iterations to average after convergence is reached. (Default value = 1) exlude_edges : bool If True, excludes first and last 2.5% of frequency data during the iteration process to avoid edge effects. (Default value = False) sample_rate : float Sampling rate of the data in Hz (Default value = 1.0) seed : int or None Random seed to use for random initial mask selection when mask_0 = 'random' **kwargs Any additional arguments for the standard emd.sift.mask_sift can be specified - see the documentation for emd.sift.mask_sift for more details. Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. mask_freqs : ndarray 1D array of mask frequencies, if ret_mask_freq is set to True. Notes ----- Here are some example iterated_mask_sift variants you can run: An iterated mask sift in which the mask frequencies are determined with zero-crossings and iteration stop at 15 iterations or if masks stabilize to within 10% (note - this is also the default): >>> imf = emd.sift.iterated_mask_sift(X, sample_rate, mask_0='zc', max_iter=15, iter_th=0.1) An iterated mask sift in which a custom initial mask is used and after convergence 5 further iterations are averaged: >>> imf = emd.sift.iterated_mask_sift(X, sample_rate, mask_0=[10, 5, 3, 1]/sample_rate, N_avg=5) An iterated mask sift weighted by instantaneous amplitude that also returns the automatically determined mask and excludes 5% of edge data to avoid edge effectd: >>> imf, mask = emd.sift.iterated_mask_sift(X, sample_rate, w_method='amplitude', exclude_edges=True, ret_mask_freq=True) See Also -------- emd.sift.mask_sift emd.sift.get_next_imf_mask References ---------- .. [1] Ryan Deering, & James F. Kaiser. (2005). The Use of a Masking Signal to Improve Empirical Mode Decomposition. In Proceedings. (ICASSP ’05). IEEE International Conference on Acoustics, Speech, and Signal Processing, 2005. IEEE. https://doi.org/10.1109/icassp.2005.1416051 .. [2] Tsai, F.-F., Fan, S.-Z., Lin, Y.-S., Huang, N. E., & Yeh, J.-R. (2016). Investigating Power Density and the Degree of Nonlinearity in Intrinsic Components of Anesthesia EEG by the Hilbert-Huang Transform: An Example Using Ketamine and Alfentanil. PLOS ONE, 11(12), e0168108. https://doi.org/10.1371/journal.pone.0168108 .. [3] Marco S. Fabus, Andrew J. Quinn, Catherine E. Warnaby, and Mark W. Woolrich (2021). Automatic decomposition of electrophysiological data into distinct nonsinusoidal oscillatory modes. Journal of Neurophysiology 2021 126:5, 1670-1684. https://doi.org/10.1152/jn.00315.2021 """ # Housekeeping X = ensure_1d_with_singleton([X], ['X'], 'sift') _nsamples_warn(X.shape[0], max_imfs) nsamples = X.shape[0] # Add explicitly specified mask_sift kwargs into full dict for use later kwargs['max_imfs'] = max_imfs kwargs['mask_amp_mode'] = mask_amp_mode # Main switch initialising the mask frequency set if isinstance(mask_0, (list, tuple, np.ndarray)): # User has provided a full set of masks logger.info('Initialising masks with user specified frequencies') if len(mask_0) < max_imfs: max_imfs = len(mask_0) logger.info("Reducing max_imfs to {0} as len(mask_freqs) < max_imfs".format(max_imfs)) mask = mask_0 elif isinstance(mask_0, (int, float)): logger.info('Initialising masks with user specified single frequency') mask = mask_0 elif mask_0 in ('zc', 'if'): logger.info('Initialising masks with mask_sift default mask_freqs={0}'.format(mask_0)) # if first mask is if or zc - compute first imf as normal and get freq _, mask = mask_sift(X, mask_freqs=mask_0, ret_mask_freq=True, **kwargs) mask = mask elif mask_0 == 'random': logger.info('Initialising masks with random values') if seed is not None: np.random.seed(seed) mask = np.random.randint(0, sample_rate/4, size=max_imfs) / sample_rate else: raise ValueError("'mask_0' input {0} not recognised - cannot initialise mask frequencies".format(mask_0)) # Preallocate arrays for loop process mask_all = np.zeros((max_iter+N_avg, max_imfs)) imf_all = np.zeros((max_iter+N_avg, nsamples, max_imfs)) # Start counters niters = 0 niters_c = 0 maxiter_flag = 0 continue_iter = True converged = False # Main loop while continue_iter: if not converged: logger.info('Computing iteration number ' + str(niters)) else: logger.info('Converged, averaging... ' + str(niters_c) + ' / ' + str(N_avg)) # Update masks mask_prev = mask.copy() mask_all[niters+niters_c, :len(mask)] = mask.copy() # Compute mask sift imf = mask_sift(X, mask_freqs=mask, **kwargs) imf_all[niters+niters_c, :, :imf.shape[1]] = imf # Compute IMF frequencies IP, IF, IA = frequency_transform(imf, sample_rate, 'nht') # Trim IMF edges if requested - avoids edge effects distorting IF average if exclude_edges: logger.info('Excluding 5% of edge frequencies in mask estimation.') ex = int(0.025*nsamples) samples_included = list(range(ex, nsamples-ex)) # Edge effects ignored else: samples_included = list(range(nsamples)) # All, default # find weighted IF average as the next mask if w_method == 'amplitude': # IF weighed by amplitude values in IA IF_weighted = np.average(IF[samples_included, :], 0, weights=IA[samples_included, :]) elif w_method == 'power': # IF weighed by power values from IA**2 IF_weighted = np.average(IF[samples_included, :], 0, weights=IA[samples_included, :]**2) elif isinstance(w_method, float): # IF weighed by amplitude raised to user specified power IF_weighted = np.average(IF[samples_included, :], 0, weights=IA[samples_included, :]**w_method) elif w_method == 'avg': # IF average not weighted IF_weighted = np.mean(IF[samples_included, :], axis=0) else: raise ValueError("w_method '{0}' not recognised".format(w_method)) # Compute new mask frequencies and variances mask = IF_weighted/sample_rate l = min(len(mask), len(mask_prev)) mask_variance = np.abs((mask[:l] - mask_prev[:l]) / mask_prev[:l]) # Check convergence if np.all(mask_variance[~np.isnan(mask_variance)] < iter_th) or converged: converged = True logger.info('Finishing iteration process: convergence reached in {0} iterations '.format(niters)) if niters_c < N_avg: niters_c += 1 else: continue_iter = False if not converged: niters += 1 if niters >= max_iter: logger.info('Finishing iteration process: reached max number of iterations: {0}'.format(max_iter)) maxiter_flag = 1 continue_iter = False # Average IMFs across iterations after convergence imf_final = np.nanmean(imf_all[niters:niters+N_avg, :, :], axis=0) IF_final = np.nanmean(mask_all[niters:niters+N_avg, :], axis=0)*sample_rate IF_std_final = np.nanstd(mask_all[niters:niters+N_avg, :], axis=0)*sample_rate if maxiter_flag: imf_final = imf_all[niters-1, :, :] IF_final = mask IF_std_final = mask_variance # If we are not averaging, output relative change from last mask instead if N_avg == 1: IF_std_final = mask_variance N_imf_final = int(np.sum(~np.isnan(mask_all[niters-1, :]))) imf_final = imf_final[:, :N_imf_final] IF_final = IF_final[:N_imf_final] IF_std_final = IF_std_final[:N_imf_final] imf = imf_final logger.info('Final mask variability: %s', str(IF_std_final)) logger.info('COMPLETED: iterated mask sift') if ret_mask_freq: return imf, IF_final else: return imf
################################################################## # Second Layer SIFT
[docs] @sift_logger('second_layer') def sift_second_layer(IA, sift_func=sift, sift_args=None): """Compute second layer intrinsic mode functions. This function implements a second-layer sift to be appliede to the amplitude envelopes of a set of first layer IMFs [1]_. Parameters ---------- IA : ndarray Input array containing a set of first layer IMFs sift_func : function Sift function to apply sift_args : dict Dictionary of sift options to be passed into sift_func Returns ------- imf2 : ndarray 3D array [samples x first layer imfs x second layer imfs ] containing the second layer IMFs References ---------- .. [1] Huang, N. E., Hu, K., Yang, A. C. C., Chang, H.-C., Jia, D., Liang, W.-K., … Wu, Z. (2016). On Holo-Hilbert spectral analysis: a full informational spectral representation for nonlinear and non-stationary data. Philosophical Transactions of the Royal Society A: Mathematical, Physical and Engineering Sciences, 374(2065), 20150206. https://doi.org/10.1098/rsta.2015.0206 """ IA = ensure_2d([IA], ['IA'], 'sift_second_layer') if (sift_args is None) or ('max_imfs' not in sift_args): max_imfs = IA.shape[1] elif 'max_imfs' in sift_args: max_imfs = sift_args['max_imfs'] imf2 = np.zeros((IA.shape[0], IA.shape[1], max_imfs)) for ii in range(max_imfs): tmp = sift_func(IA[:, ii], **sift_args) imf2[:, ii, :tmp.shape[1]] = tmp return imf2
[docs] @sift_logger('mask_sift_second_layer') def mask_sift_second_layer(IA, mask_freqs, sift_args=None): """Compute second layer IMFs using a mask sift. Second layer IMFs are computed from the amplitude envelopes of a set of first layer IMFs [1]_.A single set of masks is applied across all IMFs with the highest frequency mask dropped for each successive first level IMF. Parameters ---------- IA : ndarray Input array containing a set of first layer IMFs mask_freqs : function Sift function to apply sift_args : dict Dictionary of sift options to be passed into sift_func Returns ------- imf2 : ndarray 3D array [samples x first layer imfs x second layer imfs ] containing the second layer IMFs References ---------- .. [1] Huang, N. E., Hu, K., Yang, A. C. C., Chang, H.-C., Jia, D., Liang, W.-K., … Wu, Z. (2016). On Holo-Hilbert spectral analysis: a full informational spectral representation for nonlinear and non-stationary data. Philosophical Transactions of the Royal Society A: Mathematical, Physical and Engineering Sciences, 374(2065), 20150206. https://doi.org/10.1098/rsta.2015.0206 """ IA = ensure_2d([IA], ['IA'], 'sift_second_layer') if (sift_args is None): sift_args = {'max_imfs': IA.shape[1]} elif ('max_imfs' not in sift_args): sift_args['max_imfs'] = IA.shape[1] imf2 = np.zeros((IA.shape[0], IA.shape[1], sift_args['max_imfs'])) for ii in range(IA.shape[1]): sift_args['mask_freqs'] = mask_freqs[ii:] tmp = mask_sift(IA[:, ii], **sift_args) imf2[:, ii, :tmp.shape[1]] = tmp return imf2
################################################################## # SIFT Estimation Utilities ################################################################## # SIFT Config Utilities class SiftConfig(collections.abc.MutableMapping): """A dictionary-like object specifying keyword arguments configuring a sift.""" def __init__(self, name='sift', *args, **kwargs): """Specify keyword arguments configuring a sift.""" self.store = dict() self.sift_type = name self.update(dict(*args, **kwargs)) # use the free update to set keys def __getitem__(self, key): """Return an item from the internal store.""" key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: return self.store[key[0]][key[1]] elif len(key) == 3: return self.store[key[0]][key[1]][key[2]] else: return self.store[key] def __setitem__(self, key, value): """Set or change the value of an item in the internal store.""" key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: self.store[key[0]][key[1]] = value elif len(key) == 3: self.store[key[0]][key[1]][key[2]] = value else: self.store[key] = value def __delitem__(self, key): """Remove an item from the internal store.""" key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: del self.store[key[0]][key[1]] elif len(key) == 3: del self.store[key[0]][key[1]][key[2]] else: del self.store[key] def __iter__(self): """Iterate through items in the internal store.""" return iter(self.store) def __str__(self): """Print summary of internal store.""" out = [] lower_level = ['imf_opts', 'envelope_opts', 'extrema_opts'] for stage in self.store.keys(): if stage not in lower_level: out.append('{0} : {1}'.format(stage, self.store[stage])) else: out.append(stage + ':') for key in self.store[stage].keys(): out.append(' {0} : {1}'.format(key, self.store[stage][key])) return '%s %s\n%s' % (self.sift_type, self.__class__, '\n'.join(out)) def __repr__(self): """Print summary of internal store.""" return "<{0} ({1})>".format(self.__module__ + '.' + type(self).__name__, self.sift_type) def _repr_html_(self): _str_html = "<h3><b>%s %s</b></h3><hr><ul>" % (self.sift_type, self.__class__) lower_level = ['imf_opts', 'envelope_opts', 'extrema_opts'] for stage in self.store.keys(): if stage not in lower_level: _str_html += '<li><b>{0}</b> : {1}</li>'.format(stage, self.store[stage]) else: outer_list = '<li><b>{0}</b></li>%s'.format(stage) inner_list = '<ul>' for key in self.store[stage].keys(): inner_list += '<li><i>{0}</i> : {1}</li>'.format(key, self.store[stage][key]) _str_html += outer_list % (inner_list + '</ul>') return _str_html + '</ul>' def __len__(self): """Return number of items in internal store.""" return len(self.store) def __keytransform__(self, key): """Split a merged dictionary key into separate levels.""" key = key.split('/') if len(key) == 1: return key[0] else: if len(key) > 3: raise ValueError("Requested key is nested too deep. Should be a \ maximum of three levels separated by '/'") return key def _get_yamlsafe_dict(self): """Return copy of internal store with values prepped for saving into yaml format.""" conf = self.store.copy() conf = _array_or_tuple_to_list(conf) return [{'sift_type': self.sift_type}, conf] def to_yaml_text(self): """Return a copy of the internal store in yaml-text format.""" return yaml.dump(self._get_yamlsafe_dict(), sort_keys=False) def to_yaml_file(self, fname): """Save a copy of the internal store in a specified yaml file.""" with open(fname, 'w') as f: yaml.dump_all(self._get_yamlsafe_dict(), f, sort_keys=False) logger.info("Saved SiftConfig ({0}) to {1}".format(self, fname)) @classmethod def from_yaml_file(cls, fname): """Create and return a new SiftConfig object with options loaded from a yaml file.""" ret = cls() with open(fname, 'r') as f: cfg = [d for d in yaml.load_all(f, Loader=yaml.FullLoader)] if len(cfg) == 1: ret.store = cfg[0] ret.sift_type = 'Unknown' else: ret.sift_type = cfg[0]['sift_type'] ret.store = cfg[1] logger.info("Loaded SiftConfig ({0}) from {1}".format(ret, fname)) return ret @classmethod def from_yaml_stream(cls, stream): """Create and return a new SiftConfig object with options loaded from a yaml stream.""" ret = cls() ret.store = yaml.load(stream, Loader=yaml.FullLoader) return ret def get_func(self): """Get a partial-function coded with the options from this config.""" mod = sys.modules[__name__] func = getattr(mod, self.sift_type) return functools.partial(func, **self.store)
[docs] def get_config(siftname='sift'): """Return a SiftConfig with default options for a specified sift variant. Helper function for specifying config objects specifying parameters to be used in a sift. The functions used during the sift areinspected automatically and default values are populated into a nested dictionary which can be modified and used as input to one of the sift functions. Parameters ---------- siftname : str Name of the sift function to find configuration from Returns ------- SiftConfig A modified dictionary containing the sift specification Notes ----- The sift config acts as a nested dictionary which can be modified to specify parameters for different parts of the sift. This is initialised using this function >>> config = emd.sift.get_config() The first level of the dictionary contains three sub-dicts configuring different parts of the algorithm: >>> config['imf_opts'] # options passed to `get_next_imf` >>> config['envelope_opts'] # options passed to interp_envelope >>> config['extrema_opts'] # options passed to get_padded_extrema Specific values can be modified in the dictionary >>> config['extrema_opts']['parabolic_extrema'] = True or using this shorthand >>> config['imf_opts/env_step_factor'] = 1/3 Finally, the SiftConfig dictionary should be nested before being passed as keyword arguments to a sift function. >>> imfs = emd.sift.sift(X, **config) """ # Extrema padding opts are hard-coded for the moment, these run through # np.pad which has a complex signature mag_pad_opts = {'mode': 'median', 'stat_length': 1} loc_pad_opts = {'mode': 'reflect', 'reflect_type': 'odd'} # Get defaults for extrema detection and padding extrema_opts = _get_function_opts(get_padded_extrema, ignore=['X', 'mag_pad_opts', 'loc_pad_opts', 'mode']) # Get defaults for envelope interpolation envelope_opts = _get_function_opts(interp_envelope, ignore=['X', 'extrema_opts', 'mode', 'ret_extrema', 'trim']) # Get defaults for computing IMFs imf_opts = _get_function_opts(get_next_imf, ignore=['X', 'envelope_opts', 'extrema_opts']) # Get defaults for the given sift variant sift_types = ['sift', 'ensemble_sift', 'complete_ensemble_sift', 'mask_sift', 'iterated_mask_sift'] if siftname in sift_types: mod = sys.modules[__name__] sift_opts = _get_function_opts(getattr(mod, siftname), ignore=['X', 'imf_opts' 'envelope_opts', 'extrema_opts', 'kwargs']) if siftname == 'iterated_mask_sift': # Add options for mask sift as well mask_opts = _get_function_opts(getattr(mod, 'mask_sift'), ignore=['X', 'imf_opts' 'envelope_opts', 'extrema_opts', 'mask_freqs', 'mask_step_factor']) sift_opts = {**sift_opts, **mask_opts} else: raise AttributeError('Sift siftname not recognised: please use one of {0}'.format(sift_types)) out = SiftConfig(siftname) for key in sift_opts: out[key] = sift_opts[key] out['imf_opts'] = imf_opts out['envelope_opts'] = envelope_opts out['extrema_opts'] = extrema_opts out['extrema_opts/mag_pad_opts'] = mag_pad_opts out['extrema_opts/loc_pad_opts'] = loc_pad_opts return out
def _get_function_opts(func, ignore=None): """Inspect a function and extract its keyword arguments and their default values. Parameters ---------- func : function handle for the function to be inspected ignore : {None or list} optional list of keyword argument names to be ignored in function signature Returns ------- dict Dictionary of keyword arguments with keyword keys and default value values. """ if ignore is None: ignore = [] out = {} sig = inspect.signature(func) for p in sig.parameters: if p not in out.keys() and p not in ignore: out[p] = sig.parameters[p].default return out def _array_or_tuple_to_list(conf): """Convert an input array or tuple to list (for yaml_safe dict creation.""" for key, val in conf.items(): if isinstance(val, np.ndarray): conf[key] = val.tolist() elif isinstance(val, dict): conf[key] = _array_or_tuple_to_list(conf[key]) elif isinstance(val, tuple): conf[key] = list(val) return conf