Source code for emd.sift

#!/usr/bin/python

# vim: set expandtab ts=4 sw=4:

"""
Implementations of the SIFT algorithm for Empirical Mode Decomposition.

Routines:

sift
ensemble_sift
complete_ensemble_sift
sift_second_layer
mask_sift_adaptive
mask_sift_specified
get_next_imf
get_next_imf_mask

"""


import logging
import warnings
import numpy as np
import collections
from scipy import signal
from scipy import interpolate as interp

from . import spectra
from .logger import sift_logger

# Housekeeping for logging
logger = logging.getLogger(__name__)


##################################################################
# Basic SIFT

# Utilities

[docs]def get_next_imf(X, sd_thresh=.1, env_step_size=1, envelope_opts={}, extrema_opts={}): """ Compute the next IMF from a data set. This is a helper function used within the more general sifting functions. Parameters ---------- X : ndarray [nsamples x 1] 1D input array containing the time-series data to be decomposed sd_thresh : scalar The threshold at which the sift of each IMF will be stopped. (Default value = .1) env_step_size : float Scaling of envelope prior to removal at each iteration of sift. The average of the upper and lower envelope is muliplied by this value before being subtracted from the data. Values should be between 0 > x >= 1 (Default value = 1) Returns ------- proto_imf : ndarray 1D vector containing the next IMF extracted from X continue_flag : bool Boolean indicating whether the sift can be continued beyond this IMF Other Parameters ---------------- envelope_opts : dict Optional dictionary of keyword arguments to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.sift emd.sift.interp_envelope """ proto_imf = X.copy() continue_imf = True continue_flag = True niters = 0 while continue_imf: niters += 1 upper = interp_envelope(proto_imf, mode='upper', **envelope_opts) lower = interp_envelope(proto_imf, mode='lower', **envelope_opts) # If upper or lower are None we should stop sifting altogether if upper is None or lower is None: continue_flag = False continue_imf = False logger.debug('Finishing sift: IMF has no extrema') continue # Find local mean avg = np.mean([upper, lower], axis=0)[:, None] # Remove local mean estimate from proto imf x1 = proto_imf - avg # Stop sifting if we pass threshold sd = np.sum((proto_imf - x1)**2) / np.sum(proto_imf**2) if sd < sd_thresh: proto_imf = x1 continue_imf = False logger.debug('Completed in {0} iters with sd {1}'.format(niters, sd)) continue proto_imf = proto_imf - (env_step_size*avg) if proto_imf.ndim == 1: proto_imf = proto_imf[:, None] return proto_imf, continue_flag
# SIFT implementation
[docs]@sift_logger('sift') def sift(X, sift_thresh=1e-8, max_imfs=None, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from an input data vector using the original sift algorithm [1]_. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.get_next_imf References ---------- .. [1] Huang, N. E., Shen, Z., Long, S. R., Wu, M. C., Shih, H. H., Zheng, Q., … Liu, H. H. (1998). The empirical mode decomposition and the Hilbert spectrum for nonlinear and non-stationary time series analysis. Proceedings of the Royal Society of London. Series A: Mathematical, Physical and Engineering Sciences, 454(1971), 903–995. https://doi.org/10.1098/rspa.1998.0193 """ if not imf_opts: imf_opts = {'env_step_size': 1, 'sd_thresh': .1} if X.ndim == 1: # add dummy dimension X = X[:, None] continue_sift = True layer = 0 proto_imf = X.copy() while continue_sift: next_imf, continue_sift = get_next_imf(proto_imf, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) if layer == 0: imf = next_imf else: imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] layer += 1 if max_imfs is not None and layer == max_imfs: logger.info('Finishing sift: reached max number of imfs ({0})'.format(layer)) continue_sift = False if np.abs(next_imf).sum() < sift_thresh: logger.info('Finishing sift: reached threshold {0}'.format(np.abs(next_imf).sum())) continue_sift = False return imf
################################################################## # Ensemble SIFT variants # Utilities def _sift_with_noise(X, noise_scaling=None, noise=None, noise_mode='single', sift_thresh=1e-8, max_imfs=None, job_ind=1, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Helper function for applying white noise to a signal prior to computing the sift. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed noise_scaling : scalar Standard deviation of noise to add to each ensemble (Default value = None) noise : ndarray array of noise values the same size as X to add prior to sift (Default value = None) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) job_ind : 1 Optional job index value for display in logger (Default value = 1) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.ensemble_sift emd.sift.complete_ensemble_sift emd.sift.get_next_imf """ if job_ind is not None: from multiprocessing import current_process p = current_process() logger.info('Starting SIFT Ensemble: {0} on process {1}'.format(job_ind, p._identity[0])) if noise is None: noise = np.random.randn(*X.shape) if noise_scaling is not None: noise = noise * noise_scaling ensX = X.copy() + noise imf = sift(ensX, sift_thresh=sift_thresh, max_imfs=max_imfs, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) if noise_mode == 'single': return imf elif noise_mode == 'flip': ensX = X.copy() - noise imf += sift(ensX, sift_thresh=sift_thresh, max_imfs=max_imfs, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) return imf / 2 # Implementation
[docs]@sift_logger('ensemble_sift') def ensemble_sift(X, nensembles=4, ensemble_noise=.2, noise_mode='single', nprocesses=1, sift_thresh=1e-8, max_imfs=None, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from an input data vector using the ensemble empirical model decomposition algorithm [1]_. This approach sifts an ensemble of signals with white-noise added and treats the mean IMFs as the result. The resulting IMFs from the ensemble sift resembles a dyadic filter [2]_. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed nensembles : int Integer number of different ensembles to compute the sift across. ensemble_noise : scalar Standard deviation of noise to add to each ensemble (Default value = .2) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') nprocesses : integer Integer number of parallel processes to compute. Each process computes a single realisation of the total ensemble (Default value = 1) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf. envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.get_next_imf References ---------- .. [1] Wu, Z., & Huang, N. E. (2009). Ensemble Empirical Mode Decomposition: A Noise-Assisted Data Analysis Method. Advances in Adaptive Data Analysis, 1(1), 1–41. https://doi.org/10.1142/s1793536909000047 .. [2] Wu, Z., & Huang, N. E. (2004). A study of the characteristics of white noise using the empirical mode decomposition method. Proceedings of the Royal Society of London. Series A: Mathematical, Physical and Engineering Sciences, 460(2046), 1597–1611. https://doi.org/10.1098/rspa.2003.1221 """ if noise_mode not in ['single', 'flip']: raise ValueError( 'noise_mode: {0} not recognised, please use \'single\' or \'flip\''.format(noise_mode)) # Noise is defined with respect to variance in the data noise_scaling = X.std() * ensemble_noise import multiprocessing as mp p = mp.Pool(processes=nprocesses) noise = None args = [(X, noise_scaling, noise, noise_mode, sift_thresh, max_imfs, ii, imf_opts, envelope_opts, extrema_opts) for ii in range(nensembles)] res = p.starmap(_sift_with_noise, args) p.close() if max_imfs is None: max_imfs = res[0].shape[1] imfs = np.zeros((X.shape[0], max_imfs)) for ii in range(max_imfs): imfs[:, ii] = np.array([r[:, ii] for r in res]).mean(axis=0) return imfs
[docs]@sift_logger('complete_ensemble_sift') def complete_ensemble_sift(X, nensembles=4, ensemble_noise=.2, noise_mode='single', nprocesses=1, sift_thresh=1e-8, max_imfs=None, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from an input data vector using the complete ensemble empirical model decomposition algorithm [1]_. This approach sifts an ensemble of signals with white-noise added taking a single IMF across all ensembles at before moving to the next IMF. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed nensembles : int Integer number of different ensembles to compute the sift across. ensemble_noise : scalar Standard deviation of noise to add to each ensemble (Default value = .2) noise_mode : {'single','flip'} Flag indicating whether to compute each ensemble with noise once or twice with the noise and sign-flipped noise (Default value = 'single') nprocesses : integer Integer number of parallel processes to compute. Each process computes a single realisation of the total ensemble (Default value = 1) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf: ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. noise: array_like The Intrisic Mode Functions from the decomposition of X. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword options to be passed to emd.get_next_imf. envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.get_next_imf References ---------- .. [1] Torres, M. E., Colominas, M. A., Schlotthauer, G., & Flandrin, P. (2011). A complete ensemble empirical mode decomposition with adaptive noise. In 2011 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE. https://doi.org/10.1109/icassp.2011.5947265 """ import multiprocessing as mp p = mp.Pool(processes=nprocesses) if X.ndim == 1: # add dummy dimension X = X[:, None] # Noise is defined with respect to variance in the data noise_scaling = X.std() * ensemble_noise continue_sift = True layer = 0 # Compute the noise processes - large matrix here... noise = np.random.random_sample((X.shape[0], nensembles)) * noise_scaling # Do a normal ensemble sift to obtain the first IMF args = [(X, noise_scaling, noise[:, ii, None], noise_mode, sift_thresh, 1, ii, imf_opts, envelope_opts, extrema_opts) for ii in range(nensembles)] res = p.starmap(_sift_with_noise, args) imf = np.array([r for r in res]).mean(axis=0) args = [(noise[:, ii, None], sift_thresh, 1, imf_opts) for ii in range(nensembles)] res = p.starmap(sift, args) noise = noise - np.array([r[:, 0] for r in res]).T while continue_sift: proto_imf = X - imf.sum(axis=1)[:, None] args = [(proto_imf, None, noise[:, ii, None], noise_mode, sift_thresh, 1, ii, imf_opts, envelope_opts, extrema_opts) for ii in range(nensembles)] res = p.starmap(_sift_with_noise, args) next_imf = np.array([r for r in res]).mean(axis=0) imf = np.concatenate((imf, next_imf), axis=1) args = [(noise[:, ii, None], sift_thresh, 1, imf_opts) for ii in range(nensembles)] res = p.starmap(sift, args) noise = noise - np.array([r[:, 0] for r in res]).T pks, locs = find_extrema(imf[:, -1]) if len(pks) < 2: continue_sift = False if max_imfs is not None and layer == max_imfs: continue_sift = False if np.abs(next_imf).mean() < sift_thresh: continue_sift = False layer += 1 p.close() return imf, noise
################################################################## # Mask SIFT implementations # Utilities
[docs]def get_next_imf_mask(X, z, amp, mask_type='all', imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute the next IMF from a data set using the mask sift appraoch. This is a helper function used within the more general sifting functions. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed z : scalar Mask frequency as a proportion of the sampling rate, values between 0->z->.5 amp : scalar Mask amplitude mask_type : {'all','sine','cosine'} Flag indicating whether to apply sine, cosine or all masks (Default value = 'all') Returns ------- proto_imf : ndarray 1D vector containing the next IMF extracted from X Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema See Also -------- emd.sift.mask_sift emd.sift.get_next_imf """ if mask_type not in ['all', 'sine', 'cosine']: raise ValueError("Invalid mask type") z = z * 2 * np.pi if mask_type == 'all' or mask_type == 'cosine': mask = amp * np.cos(z * np.arange(X.shape[0]))[:, None] next_imf_up_c, continue_sift = get_next_imf(X + mask, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) next_imf_up_c -= mask next_imf_down_c, continue_sift = get_next_imf(X - mask, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) next_imf_down_c += mask if mask_type == 'all' or mask_type == 'sine': mask = amp * np.sin(z * np.arange(X.shape[0]))[:, None] next_imf_up_s, continue_sift = get_next_imf(X + mask, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) next_imf_up_s -= mask next_imf_down_s, continue_sift = get_next_imf(X - mask, envelope_opts=envelope_opts, extrema_opts=extrema_opts, **imf_opts) next_imf_down_s += mask if mask_type == 'all': return (next_imf_up_c + next_imf_down_c + next_imf_up_s + next_imf_down_s) / 4. elif mask_type == 'sine': return (next_imf_up_s + next_imf_down_s) / 2. elif mask_type == 'cosine': return (next_imf_up_c + next_imf_down_c) / 2.
def get_mask_freqs(X, first_mask_mode='zc', imf_opts={}): if (first_mask_mode == 'zc') or (first_mask_mode == 'if'): logger.info('Computing first mask frequency with method {0}'.format(first_mask_mode)) logger.info('Getting first IMF with no mask') # First IMF is computed normally imf, _ = get_next_imf(X, **imf_opts) # Compute first mask frequency from first IMF if first_mask_mode == 'zc': num_zero_crossings = zero_crossing_count(imf)[0, 0] z = num_zero_crossings / imf.shape[0] / 4 logger.info('Found first mask frequency of {0}'.format(z)) elif first_mask_mode == 'if': _, IF, IA = spectra.frequency_stats(imf[:, 0, None], 1, 'nht', smooth_phase=3) z = np.average(IF, weights=IA) logger.info('Found first mask frequency of {0}'.format(z)) elif first_mask_mode < .5: if first_mask_mode <= 0 or first_mask_mode >= .5: raise ValueError("The frequency of the first mask must be 0<x<.5") logger.info('Using specified first mask frequency of {0}'.format(first_mask_mode)) z = first_mask_mode return z # Implementation @sift_logger('mask_sift') def mask_sift(X, mask_amp=1, mask_amp_mode='ratio_imf', mask_freqs='zc', mask_step_factor=2, mask_type='all', ret_mask_freq=False, max_imfs=9, sift_thresh=1e-8, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from a dataset using a set of masking signals to reduce mixing of components between modes [1]_. This function can either compute the mask frequencies based on the fastest dynamics in the data (the properties of the first IMF from a standard sift) or apply a pre-specified set of masks. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed mask_amp : scalar or array_like Amplitude of mask signals as specified by mask_amp_mode. If scalar the same value is applied to all IMFs, if an array is passed each value is applied to each IMF in turn (Default value = 1) mask_amp_mode : {'abs','ratio_imf','ratio_sig'} Method for computing mask amplitude. Either in absolute units ('abs'), or as a ratio of the amplitude of the input signal ('ratio_signal') or previous imf ('ratio_imf') (Default value = 'ratio_imf') mask_freqs : {'zc','if',float,,array_like} Define the set of mask frequencies to use. If 'zc' or 'if' are passed, the frequency of the first mask is taken from either the zero-crossings or instantaneous frequnecy the first IMF of a standard sift on the data. If a float is passed this is taken as the first mask frequency. Subsequent masks are defined by the mask_step_factor. If an array_like vector is passed, the values in the vector will specify the mask frequencies. mask_step_factor : scalar Step in frequency between successive masks (Default value = 2) mask_type : {'all','sine','cosine'} Which type of masking signal to use. 'sine' or 'cosine' options return the average of a +ve and -ve flipped wave. 'all' applies four masks: sine and cosine with +ve and -ve sign and returns the average of all four. ret_mask_freq : bool Boolean flag indicating whether mask frequencies are returned (Default value = False) max_imfs : int The maximum number of IMFs to compute. (Default value = None) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. mask_freqs : ndarray 1D array of mask frequencies, if ret_mask_freq is set to True. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema Notes ----- Here are some example mask_sift variants you can run: A mask sift in which the mask frequencies are determined with zero-crossings and mask amplitudes by a ratio with the amplitude of the previous IMF (note - this is also the default): >> imf = emd.sift.mask_sift(X, mask_amp_mode='ratio_imf', mask_freqs='zc') A mask sift in which the first mask is set at .4 of the sampling rate and subsequent masks found by successive division of this mask_freq by 3: >> imf = emd.sift.mask_sift(X, mask_freqs=.4, mask_step_factor=3) A mask sift using user specified frequencies and amplitudes: >> mask_freqs = np.array([.4,.2,.1,.05,.025,0]) >> mask_amps = np.array([2,2,1,1,.5,.5]) >> imf = emd.sift.mask_sift(X, mask_freqs=mask_freqs, mask_amp=mask_amps, mask_amp_mode='abs') See Also -------- emd.sift.get_next_imf emd.sift.get_next_imf_mask References ---------- .. [1] Ryan Deering, & James F. Kaiser. (2005). The Use of a Masking Signal to Improve Empirical Mode Decomposition. In Proceedings. (ICASSP ’05). IEEE International Conference on Acoustics, Speech, and Signal Processing, 2005. IEEE. https://doi.org/10.1109/icassp.2005.1416051 """ # initialise if X.ndim == 1: # add dummy dimension X = X[:, None] # if first mask is if or zc - compute first imf as normal and get freq if isinstance(mask_freqs, (list, tuple, np.ndarray)): logger.info('Using user specified masks') elif mask_freqs in ['zc', 'if'] or isinstance(mask_freqs, float): z = get_mask_freqs(X, mask_freqs, imf_opts=imf_opts) mask_freqs = np.array([z/mask_step_factor**ii for ii in range(max_imfs)]) # Initialise mask amplitudes if mask_amp_mode == 'ratio_imf': sd = X.std() # Take ratio of input signal for first IMF elif mask_amp_mode == 'ratio_sig': sd = X.std() elif mask_amp_mode == 'abs': sd = 1 continue_sift = True imf_layer = 0 proto_imf = X.copy() imf = [] while continue_sift: # Update mask amplitudes if needed if mask_amp_mode == 'ratio_imf' and imf_layer > 0: sd = imf[:, -1].std() if isinstance(mask_amp, int) or isinstance(mask_amp, float): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[imf_layer] * sd logging.info('Sift IMF-{0} with mask-freq {1} and amp {2}'.format(imf_layer, mask_freqs[imf_layer], amp)) next_imf = get_next_imf_mask(proto_imf, mask_freqs[imf_layer], amp, mask_type=mask_type, imf_opts=imf_opts, envelope_opts=envelope_opts, extrema_opts=extrema_opts) if imf_layer == 0: imf = next_imf else: imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] if max_imfs is not None and imf_layer == max_imfs-1: continue_sift = False if np.abs(next_imf).sum() < sift_thresh: continue_sift = False imf_layer += 1 if ret_mask_freq: return imf, mask_freqs else: return imf
[docs]@sift_logger('mask_sift_adaptive') def mask_sift_adaptive(X, sift_thresh=1e-8, max_imfs=None, mask_amp=1, mask_amp_mode='ratio_imf', mask_step_factor=2, ret_mask_freq=False, first_mask_mode='if', imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from a dataset using a set of masking signals to reduce mixing of components between modes. The simplest masking signal approach uses single mask for each IMF after the first is computed as normal [1]_. This has since been expanded to the complete mask sift which uses a set of positive and negative sign sine and cosine signals as masks for each IMF. The mean of the four is taken as the IMF. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed sd_thresh : scalar The threshold at which the sift of each IMF will be stopped. (Default value = .1) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) mask_amp : scalar or array_like Amplitude of mask signals as specified by mask_amp_mode. If scalar the same value is applied to all IMFs, if an array is passed each value is applied to each IMF in turn (Default value = 1) mask_amp_mode : {'abs','imf_ratio','sig_ratio'} Method for computing mask amplitude. Either in absolute units ('abs'), or as a ratio of the amplitude of the input signal ('ratio_signal') or previous imf ('ratio_imf') (Default value = 'ratio_imf') mask_step_factor : scalar Step in frequency between successive masks (Default value = 2) ret_mask_freq : bool Boolean flag indicating whether mask frequencies are returned (Default value = False) mask_initial_freq : scalar Frequency of initial mask as a proportion of the sampling frequency (Default value = None) interp_method : {'mono_pchip','splrep','pchip'} The interpolation method used when computing upper and lower envelopes (Default value = 'mono_pchip') Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. mask_freqs : ndarray 1D array of mask frequencies, if ret_mask_freq is set to True. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema References ---------- .. [1] Ryan Deering, & James F. Kaiser. (2005). The Use of a Masking Signal to Improve Empirical Mode Decomposition. In Proceedings. (ICASSP ’05). IEEE International Conference on Acoustics, Speech, and Signal Processing, 2005. IEEE. https://doi.org/10.1109/icassp.2005.1416051 """ warnings.warn("'emd.sift.mask_sift_adaptive' is deprecated and will be \ removed in the next version of EMD.\nPlease switch to use \ 'emd.sift.mask_sift' to remove this warning", DeprecationWarning) if X.ndim == 1: # add dummy dimension X = X[:, None] continue_sift = True layer = 0 # Initialise mask amplitudes if mask_amp_mode == 'ratio_imf': sd = X.std() # Take ratio of input signal for first IMF elif mask_amp_mode == 'ratio_sig': sd = X.std() elif mask_amp_mode == 'abs': sd = 1 if isinstance(mask_amp, int) or isinstance(mask_amp, float): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[layer] * sd if (first_mask_mode == 'zc') or (first_mask_mode == 'if'): logger.info('Sift IMF-{0} with no mask'.format(layer)) # First IMF is computed normally imf, _ = get_next_imf(X, **imf_opts) # Compute first mask frequency from first IMF if first_mask_mode == 'zc': num_zero_crossings = zero_crossing_count(imf)[0, 0] w = X.shape[0] / (num_zero_crossings / 2) z = num_zero_crossings / imf.shape[0] / 4 zs = [z] elif first_mask_mode == 'if': _, IF, IA = spectra.frequency_stats(imf[:, 0, None], 1, 'nht', smooth_phase=3) w = np.average(IF, weights=IA) z = 2 * np.pi * w / mask_step_factor z = w / 2 zs = [z] elif first_mask_mode < .5: z = first_mask_mode amp = amp * X.std() logging.info('Sift IMF-{0} with mask-freq {1} and amp {2}'.format(layer, z, amp)) imf = get_next_imf_mask(X, z, amp, mask_type='all', imf_opts=imf_opts, envelope_opts=extrema_opts, extrema_opts=extrema_opts) zs = [z] z = z / mask_step_factor zs.append(z) layer = 1 proto_imf = X.copy() - imf while continue_sift: # Update mask amplitude if needed if mask_amp_mode == 'ratio_imf': sd = imf[:, -1].std() if isinstance(mask_amp, int) or isinstance(mask_amp, float): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[layer] * sd logging.info('Sift IMF-{0} with mask-freq {1} and amp {2}'.format(layer, z, amp)) next_imf = get_next_imf_mask(proto_imf, z, amp, mask_type='all', imf_opts=imf_opts, envelope_opts=extrema_opts, extrema_opts=extrema_opts) imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] z = z / mask_step_factor zs.append(z) layer += 1 if max_imfs is not None and layer == max_imfs: continue_sift = False if ret_mask_freq: return imf, np.array(zs) else: return imf
[docs]@sift_logger('mask_sift_specified') def mask_sift_specified(X, sd_thresh=.1, max_imfs=None, mask_amp=1, mask_amp_mode='ratio_imf', mask_step_factor=2, ret_mask_freq=False, mask_initial_freq=None, mask_freqs=None, mask_amps=None, imf_opts={}, envelope_opts={}, extrema_opts={}): """ Compute Intrinsic Mode Functions from a dataset using a set of masking signals to reduce mixing of components between modes. The simplest masking signal approach uses single mask for each IMF after the first is computed as normal [1]_. This has since been expanded to the complete mask sift which uses a set of positive and negative sign sine and cosine signals as masks for each IMF. The mean of the four is taken as the IMF. Parameters ---------- X : ndarray 1D input array containing the time-series data to be decomposed sd_thresh : scalar The threshold at which the sift of each IMF will be stopped. (Default value = .1) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) mask_amp : scalar or array_like Amplitude of mask signals as specified by mask_amp_mode. If scalar the same value is applied to all IMFs, if an array is passed each value is applied to each IMF in turn (Default value = 1) mask_amp_mode : {'abs','imf_ratio','sig_ratio'} Method for computing mask amplitude. Either in absolute units ('abs'), or as a ratio of the amplitude of the input signal ('ratio_signal') or previous imf ('ratio_imf') (Default value = 'ratio_imf') mask_step_factor : scalar Step in frequency between successive masks (Default value = 2) ret_mask_freq : bool Boolean flag indicating whether mask frequencies are returned (Default value = False) mask_initial_freq : scalar Frequency of initial mask as a proportion of the sampling frequency (Default value = None) mask_freqs : array_like 1D array, list or tuple of mask frequencies as a proportion of the sampling frequency (Default value = None) interp_method : {'mono_pchip','splrep','pchip'} The interpolation method used when computing upper and lower envelopes (Default value = 'mono_pchip') Returns ------- imf : ndarray 2D array [samples x nimfs] containing he Intrisic Mode Functions from the decomposition of X. mask_freqs : ndarray 1D array of mask frequencies, if ret_mask_freq is set to True. Other Parameters ---------------- imf_opts : dict Optional dictionary of keyword arguments to be passed to emd.get_next_imf envelope_opts : dict Optional dictionary of keyword options to be passed to emd.interp_envelope extrema_opts : dict Optional dictionary of keyword options to be passed to emd.get_padded_extrema References ---------- .. [1] Ryan Deering, & James F. Kaiser. (2005). The Use of a Masking Signal to Improve Empirical Mode Decomposition. In Proceedings. (ICASSP ’05). IEEE International Conference on Acoustics, Speech, and Signal Processing, 2005. IEEE. https://doi.org/10.1109/icassp.2005.1416051 """ warnings.warn("'emd.sift.mask_sift_specified' is deprecated and will be \ removed in the next version of EMD.\nPlease switch to use \ 'emd.sift.mask_sift' to remove this warning", DeprecationWarning) if X.ndim == 1: # add dummy dimension X = X[:, None] continue_sift = True layer = 0 # First sift z = mask_freqs[0] zs = [z] # Store mask freqs for return later # Initialise mask amplitudes if mask_amp_mode == 'ratio_imf': sd = X.std() # Take ratio of input signal for first IMF elif mask_amp_mode == 'ratio_sig': sd = X.std() elif mask_amp_mode == 'abs': sd = 1 if isinstance(mask_amp, int) or isinstance(mask_amp, float): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[layer] * sd logging.info('Sift IMF-{0} with mask-freq {1} and amp {2}'.format(1, z, amp)) imf = get_next_imf_mask(X, z, amp, mask_type='all', imf_opts=imf_opts, envelope_opts=extrema_opts, extrema_opts=extrema_opts) layer = 1 proto_imf = X.copy() - imf while continue_sift: z = mask_freqs[layer] zs.append(z) layer += 1 # Update mask amplitudes if needed if mask_amp_mode == 'ratio_imf': sd = imf[:, -1].std() if isinstance(mask_amp, int) or isinstance(mask_amp, float): amp = mask_amp * sd else: # Should be array_like if not a single number amp = mask_amp[layer] * sd logging.info('Sift IMF-{0} with mask-freq {1} and amp {2}'.format(layer, z, amp)) next_imf = get_next_imf_mask(proto_imf, z, amp, mask_type='all', imf_opts=imf_opts, envelope_opts=extrema_opts, extrema_opts=extrema_opts) imf = np.concatenate((imf, next_imf), axis=1) proto_imf = X - imf.sum(axis=1)[:, None] if max_imfs is not None and layer == max_imfs: continue_sift = False if ret_mask_freq: return imf, np.array(zs) else: return imf
################################################################## # Second Layer SIFT @sift_logger('second_layer_sift') def sift_second_layer(IA, sift_func=sift, sift_args=None): """ Compute second layer IMFs from the amplitude envelopes of a set of first layer IMFs [1]_. Parameters ---------- IA : ndarray Input array containing a set of first layer IMFs sd_thresh : scalar The threshold at which the sift of each IMF will be stopped. (Default value = .1) sift_thresh : scalar The threshold at which the overall sifting process will stop. (Default value = 1e-8) max_imfs : int The maximum number of IMFs to compute. (Default value = None) Returns ------- imf2 : ndarray 3D array [samples x first layer imfs x second layer imfs ] containing the second layer IMFs References ---------- .. [1] Huang, N. E., Hu, K., Yang, A. C. C., Chang, H.-C., Jia, D., Liang, W.-K., … Wu, Z. (2016). On Holo-Hilbert spectral analysis: a full informational spectral representation for nonlinear and non-stationary data. Philosophical Transactions of the Royal Society A: Mathematical, Physical and Engineering Sciences, 374(2065), 20150206. https://doi.org/10.1098/rsta.2015.0206 """ if (sift_args is None) or ('max_imfs' not in sift_args): max_imfs = IA.shape[1] elif 'max_imfs' in sift_args: max_imfs = sift_args['max_imfs'] imf2 = np.zeros((IA.shape[0], IA.shape[1], max_imfs)) for ii in range(max_imfs): tmp = sift_func(IA[:, ii], **sift_args) imf2[:, ii, :tmp.shape[1]] = tmp return imf2 ################################################################## # SIFT Estimation Utilities def get_padded_extrema(X, pad_width=2, combined_upper_lower=False, loc_pad_opts={}, mag_pad_opts={}, parabolic_extrema=False): """ Return a set of extrema from a signal including padded extrema at the edges of the signal. Parameters ---------- X : ndarray Input signal combined_upper_lower : bool Flag to indicate whether both upper and lower extrema should be considered (Default value = False) Returns ------- max_locs : ndarray location of extrema in samples max_pks : ndarray Magnitude of each extrema """ if not loc_pad_opts: # Empty dict evaluates to False loc_pad_opts = {'mode': 'reflect', 'reflect_type': 'odd'} else: loc_pad_opts = loc_pad_opts.copy() # Don't work in place... loc_pad_mode = loc_pad_opts.pop('mode') if not mag_pad_opts: # Empty dict evaluates to False mag_pad_opts = {'mode': 'median', 'stat_length': 1} else: mag_pad_opts = mag_pad_opts.copy() # Don't work in place... mag_pad_mode = mag_pad_opts.pop('mode') if X.ndim == 2: X = X[:, 0] if combined_upper_lower: max_locs, max_pks = find_extrema(np.abs(X)) else: max_locs, max_pks = find_extrema(X) if parabolic_extrema: y = np.c_[X[max_locs-1], X[max_locs], X[max_locs+1]].T max_locs, max_pks = compute_parabolic_extrema(y, max_locs) # Return nothing we don't have enough extrema if max_locs.size <= 1: return None, None # Determine how much padding to use if max_locs.size < pad_width: pad_width = max_locs.size # Pad peak locations ret_max_locs = np.pad(max_locs, pad_width, loc_pad_mode, **loc_pad_opts) # Pad peak magnitudes ret_max_pks = np.pad(max_pks, pad_width, mag_pad_mode, **mag_pad_opts) while max(ret_max_locs) < len(X) or min(ret_max_locs) >= 0: ret_max_locs = np.pad(ret_max_locs, pad_width, loc_pad_mode, **loc_pad_opts) ret_max_pks = np.pad(ret_max_pks, pad_width, mag_pad_mode, **mag_pad_opts) return ret_max_locs, ret_max_pks def compute_parabolic_extrema(y, locs): """ Compute a parabolic approximation of the extrema of in triplets of points based on section 3.2.1 from Rato 2008 [1]_. Parameters ---------- y : array_like A [3 x nextrema] array containing the points immediately around the extrema in a time-series. locs : array_like A [nextrema] length vector containing x-axis positions of the extrema Returns ------- numpy array The estimated y-axis values of the interpolated extrema numpy array The estimated x-axis values of the interpolated extrema References ---------- .. [1] Rato, R. T., Ortigueira, M. D., & Batista, A. G. (2008). On the HHT, its problems, and some solutions. Mechanical Systems and Signal Processing, 22(6), 1374–1394. https://doi.org/10.1016/j.ymssp.2007.11.028 """ # Parabola equation parameters for computing y from parameters a, b and c # w = np.array([[1, 1, 1], [4, 2, 1], [9, 3, 1]]) # ... and its inverse for computing a, b and c from y w_inv = np.array([[.5, -1, .5], [-5/2, 4, -3/2], [3, -3, 1]]) abc = w_inv.dot(y) # Find co-ordinates of extrema from parameters abc tp = - abc[1, :] / (2*abc[0, :]) t = tp - 2 + locs y_hat = tp*abc[1, :]/2 + abc[2, :] return t, y_hat def interp_envelope(X, mode='upper', interp_method='splrep', extrema_opts={}, ret_extrema=False): """ Interpolate the amplitude envelope of a signal. Parameters ---------- X : ndarray Input signal mode : {'upper','lower','combined'} Flag to set which envelope should be computed (Default value = 'upper') interp_method : {'splrep','pchip','mono_pchip'} Flag to indicate which interpolation method should be used (Default value = 'splrep') Returns ------- ndarray Interpolated amplitude envelope """ if not extrema_opts: # Empty dict evaluates to False extrema_opts = {'pad_width': 2, 'loc_pad_opts': {}, 'mag_pad_opts': {}} else: extrema_opts = extrema_opts.copy() # Don't work in place... if interp_method not in ['splrep', 'mono_pchip', 'pchip']: raise ValueError("Invalid interp_method value") if mode == 'upper': locs, pks = get_padded_extrema(X, combined_upper_lower=False, **extrema_opts) elif mode == 'lower': locs, pks = get_padded_extrema(-X, combined_upper_lower=False, **extrema_opts) elif mode == 'combined': locs, pks = get_padded_extrema(X, combined_upper_lower=True, **extrema_opts) else: raise ValueError('Mode not recognised. Use mode= \'upper\'|\'lower\'|\'combined\'') if locs is None: return None # Run interpolation on envelope t = np.arange(locs[0], locs[-1]) if interp_method == 'splrep': f = interp.splrep(locs, pks) env = interp.splev(t, f) elif interp_method == 'mono_pchip': pchip = interp.PchipInterpolator(locs, pks) env = pchip(t) elif interp_method == 'pchip': pchip = interp.pchip(locs, pks) env = pchip(t) t_max = np.arange(locs[0], locs[-1]) tinds = np.logical_and((t_max >= 0), (t_max < X.shape[0])) env = np.array(env[tinds]) if env.shape[0] != X.shape[0]: raise ValueError('Envelope length does not match input data {0} {1}'.format( env.shape[0], X.shape[0])) if mode == 'lower': env = -env pks = -pks if ret_extrema: return env, (locs, pks) else: return env def find_extrema(X, ret_min=False): """ Identify extrema within a time-course and reject extrema whose magnitude is below a set threshold. Parameters ---------- X : ndarray Input signal ret_min : bool Flag to indicate whether maxima (False) or minima (True) should be identified(Default value = False) Returns ------- locs : ndarray Location of extrema in samples extrema : ndarray Value of each extrema """ if ret_min: ind = signal.argrelmin(X, order=1)[0] else: ind = signal.argrelmax(X, order=1)[0] # Only keep peaks with magnitude above machine precision if len(ind) / X.shape[0] > 1e-3: good_inds = ~(np.isclose(X[ind], X[ind - 1]) * np.isclose(X[ind], X[ind + 1])) ind = ind[good_inds] # if ind[0] == 0: # ind = ind[1:] # if ind[-1] == X.shape[0]: # ind = ind[:-2] return ind, X[ind] def zero_crossing_count(X): """ Count the number of zero-crossings within a time-course through differentiation of the sign of the signal. Parameters ---------- X : ndarray Input array Returns ------- int Number of zero-crossings """ if X.ndim == 2: X = X[:, None] return (np.diff(np.sign(X), axis=0) != 0).sum(axis=0) ################################################################## # SIFT Config Utilities class SiftConfig(collections.abc.MutableMapping): """ A dictionary like object specifying keyword arguments configuring a sift. """ def __init__(self, name='sift', *args, **kwargs): self.store = dict() self.name = name self.update(dict(*args, **kwargs)) # use the free update to set keys def __getitem__(self, key): key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: return self.store[key[0]][key[1]] elif len(key) == 3: return self.store[key[0]][key[1]][key[2]] else: return self.store[key] def __setitem__(self, key, value): key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: self.store[key[0]][key[1]] = value elif len(key) == 3: self.store[key[0]][key[1]][key[2]] = value else: self.store[key] = value def __delitem__(self, key): key = self.__keytransform__(key) if isinstance(key, list): if len(key) == 2: del self.store[key[0]][key[1]] elif len(key) == 3: del self.store[key[0]][key[1]][key[2]] else: del self.store[key] def __iter__(self): return iter(self.store) def __str__(self): out = [] lower_level = ['imf_opts', 'envelope_opts', 'extrema_opts'] for stage in self.store.keys(): if stage not in lower_level: out.append('{0} : {1}'.format(stage, self.store[stage])) else: out.append(stage + ':') for key in self.store[stage].keys(): out.append(' {0} : {1}'.format(key, self.store[stage][key])) return '%s %s\n%s' % (self.name, self.__class__, '\n'.join(out)) def __len__(self): return len(self.store) def __keytransform__(self, key): key = key.split('/') if len(key) == 1: return key[0] else: if len(key) > 3: raise ValueError("Requested key is nested too deep. Should be a \ maximum of three levels separated by '/'") return key def to_yaml(self): import yaml return yaml.dump(self.store, sort_keys=False) def to_opts(self, stage='sift'): if stage == 'sift': out = self.store['sift'] out['imf_opts'] = self.store['imf'] out['imf_opts']['envelope_opts'] = self.store['envelope'] out['imf_opts']['envelope_opts']['extrema_opts'] = self.store['extrema'] out['imf_opts']['envelope_opts']['extrema_opts']['mag_pad_opts'] = self.store['mag_pad'] out['imf_opts']['envelope_opts']['extrema_opts']['loc_pad_opts'] = self.store['loc_pad'] elif stage == 'imf': out = self.store['imf'] out['envelope_opts'] = self.store['envelope'] out['envelope_opts']['extrema_opts'] = self.store['extrema'] out['envelope_opts']['extrema_opts']['mag_pad_opts'] = self.store['mag_pad'] out['envelope_opts']['extrema_opts']['loc_pad_opts'] = self.store['loc_pad'] elif stage == 'envelope': out = self.store['envelope'] out['extrema_opts'] = self.store['extrema'] out['extrema_opts']['mag_pad_opts'] = self.store['mag_pad'] out['extrema_opts']['loc_pad_opts'] = self.store['loc_pad'] elif stage == 'extrema': out = self.store['extrema'] out['mag_pad_opts'] = self.store['mag_pad'] out['loc_pad_opts'] = self.store['loc_pad'] else: raise TypeError("stage ({}) not recognised, use 'sift', 'imf', 'envelope' or 'extrema'".format(stage)) return out.copy() def get_config(siftname='sift'): """ Helper function for specifying config objects specifying parameters to be used in a sift. The functions used during the sift areinspected automatically and default values are populated into a nested dictionary which can be modified and used as input to one of the sift functions. Parameters ---------- siftname : str Name of the sift function to find configuration from Returns ------- SiftConfig A modified dictionary containing the sift specification Notes ----- The sift config acts as a nested dictionary which can be modified to specify parameters for different parts of the sift. This is initialised using this function: config = emd.sift.get_config() The first level of the dictionary contains six sub-dicts configuring different parts of the algorithm: config['sift'] - top level sift options, mostly specific to the particular sift algorithm config['imf'] - options for detecting IMFs config['envelope'] - options for upper and lower envelope interpolation config['extrema'] - options for extrema detection config['mag_pad'] - options for y-values of padded extrema at edges config['loc_pad'] - options for x-values of padded extrema at edges Specific values can be modified in the dictionary config['extrema']['parabolic_extrema'] = True or using this shorthand config['imf/env_step_factor'] = 1/3 Finally, the SiftConfig dictionary should be nested before being passed as keyword arguments to a sift function. imfs = emd.sift.sift(X, **config.nest()) """ # Extrema padding opts are hard-coded for the moment, these run through # np.pad which has a complex signature mag_pad_opts = {'mode': 'median', 'stat_length': 1} loc_pad_opts = {'mode': 'reflect', 'reflect_type': 'odd'} # Get defaults for extrema detection and padding extrema_opts = _get_function_opts(get_padded_extrema, ignore=['X', 'mag_pad_opts', 'loc_pad_opts', 'combined_upper_lower']) # Get defaults for envelope interpolation envelope_opts = _get_function_opts(interp_envelope, ignore=['X', 'extrema_opts', 'mode', 'ret_extrema']) # Get defaults for computing IMFs imf_opts = _get_function_opts(get_next_imf, ignore=['X', 'envelope_opts', 'extrema_opts']) # Get defaults for the given sift variant sift_types = ['sift', 'ensemble_sift', 'complete_ensemble_sift', 'mask_sift', 'mask_sift_adaptive', 'mask_sift_specified'] if siftname in sift_types: import sys mod = sys.modules[__name__] sift_opts = _get_function_opts(getattr(mod, siftname), ignore=['X', 'imf_opts' 'envelope_opts', 'extrema_opts']) else: raise AttributeError('Sift siftname not recognised: please use one of {0}'.format(sift_types)) print(sift_opts) out = SiftConfig(siftname) for key in sift_opts: out[key] = sift_opts[key] out['imf_opts'] = imf_opts out['envelope_opts'] = envelope_opts out['extrema_opts'] = extrema_opts out['extrema_opts/mag_pad_opts'] = mag_pad_opts out['extrema_opts/loc_pad_opts'] = loc_pad_opts return out def _get_function_opts(func, ignore=None): """ Helper function for inspecting a function and extracting its keyword arguments and their default values Parameters ---------- func : function handle for the function to be inspected ignore : {None or list} optional list of keyword argument names to be ignored in function signature Returns ------- dict Dictionary of keyword arguments with keyword keys and default value values. """ if ignore is None: ignore = [] import inspect out = {} sig = inspect.signature(func) for p in sig.parameters: if p not in out.keys() and p not in ignore: out[p] = sig.parameters[p].default return out