Source code for latools.helpers.signal

"""
Helper functions for signal separation and identification.

(c) Oscar Branson : https://github.com/oscarbranson
"""
import numpy as np
from .stat_fns import nominal_values
from .utils import Bunch

[docs]def bool_transitions(a): """ Return indices where a boolean array changes from True to False """ return np.where(a[:-1] != a[1:])[0]
[docs]def bool_2_indices(a): """ Convert boolean array into a 2D array of (start, stop) pairs. """ if any(a): lims = [] lims.append(np.where(a[:-1] != a[1:])[0]) if a[0]: lims.append([0]) if a[-1]: lims.append([len(a) - 1]) lims = np.concatenate(lims) lims.sort() return np.reshape(lims, (lims.size // 2, 2)) else: return None
[docs]def enumerate_bool(bool_array, nstart=0): """ Consecutively numbers contiguous booleans in array. i.e. a boolean sequence, and resulting numbering T F T T T F T F F F T T F 0-1 1 1 - 2 ---3 3 - where ' - ' Parameters ---------- bool_array : array_like Array of booleans. nstart : int The number of the first boolean group. """ ind = bool_2_indices(bool_array) ns = np.full(bool_array.size, nstart, dtype=int) for n, lims in enumerate(ind): ns[lims[0]:lims[-1] + 1] = nstart + n + 1 return ns
[docs]def tuples_2_bool(tuples, x): """ Generate boolean array from list of limit tuples. Parameters ---------- tuples : array_like [2, n] array of (start, end) values x : array_like x scale the tuples are mapped to Returns ------- array_like boolean array, True where x is between each pair of tuples. """ if np.ndim(tuples) == 1: tuples = [tuples] out = np.zeros(x.size, dtype=bool) for l, u in tuples: out[(x > l) & (x < u)] = True return out
[docs]def rolling_window(a, window, window_mode='mid', pad=None): """ Returns (win, len(a)) rolling - window array of data. Parameters ---------- a : array_like Array to calculate the rolling window of window : int Description of `window`. window_mode : str Describes the jusitification of the rolling window relative to the returned values. Can be 'left', 'mid' or 'right'. pad : same as dtype(a) How to pad the ends of the array such that shape[0] of the returned array is the same as len(a). Can be 'ends', 'mean_ends' or 'repeat_ends'. 'ends' just extends the start or end value across all the extra windows. 'mean_ends' extends the mean value of the end windows. 'repeat_ends' repeats the end window to completion. Returns ------- array_like An array of shape (n, window), where n is either len(a) - window if pad is None, or len(a) if pad is not None. """ shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) strides = a.strides + (a.strides[-1], ) out = np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) # pad shape if window_mode == 'mid': if window % 2 == 0: npre = window // 2 - 1 npost = window // 2 else: npre = npost = window // 2 elif window_mode == 'right': npre = window - 1 npost = 0 elif window_mode == 'left': npre = 0 npost = window - 1 else: raise ValueError("`window_mode` must be 'left', 'mid' or 'right'.") if isinstance(pad, str): if pad == 'ends': prepad = np.full((npre, window), a[0]) postpad = np.full((npost, window), a[-1]) elif pad == 'mean_ends': prepad = np.full((npre, window), np.mean(a[:(window // 2)])) postpad = np.full((npost, window), np.mean(a[-(window // 2):])) elif pad == 'repeat_ends': prepad = np.full((npre, window), out[0]) postpad = np.full((npost, window), out[-1]) else: raise ValueError("`pad` must be either 'ends', 'mean_ends' or 'repeat_ends'.") return np.concatenate((prepad, out, postpad)) elif pad is not None: pre_blankpad = np.empty(((npre, window))) pre_blankpad[:] = pad post_blankpad = np.empty(((npost, window))) post_blankpad[:] = pad return np.concatenate([pre_blankpad, out, post_blankpad]) else: return out
[docs]def fastsmooth(a, win=11): """ Returns rolling - window smooth of a. Function to efficiently calculate the rolling mean of a numpy array using 'stride_tricks' to split up a 1D array into an ndarray of sub - sections of the original array, of dimensions [len(a) - win, win]. Parameters ---------- a : array_like The 1D array to calculate the rolling gradient of. win : int The width of the rolling window. Returns ------- array_like Gradient of a, assuming as constant integer x - scale. """ # check to see if 'window' is odd (even does not work) if win % 2 == 0: win += 1 # add 1 to window if it is even. kernel = np.ones(win) / win npad = int((win - 1) / 2) spad = np.full(npad + 1, np.mean(a[:(npad + 1)])) epad = np.full(npad - 1, np.mean(a[-(npad - 1):])) return np.concatenate([spad, np.convolve(a, kernel, 'valid'), epad])
[docs]def fastgrad(a, win=11, win_mode='mid'): """ Returns rolling - window gradient of a. Function to efficiently calculate the rolling gradient of a numpy array using 'stride_tricks' to split up a 1D array into an ndarray of sub - sections of the original array, of dimensions [len(a) - win, win]. Parameters ---------- a : array_like The 1D array to calculate the rolling gradient of. win : int The width of the rolling window. win_mode : str Describes the jusitification of the rolling window relative to the returned values. Can be 'left', 'mid' or 'right'. Returns ------- array_like Gradient of a, assuming as constant integer x - scale. """ # check to see if 'window' is odd (even does not work) if win % 2 == 0: win += 1 # add 1 to window if it is even. # trick for efficient 'rolling' computation in numpy # shape = a.shape[:-1] + (a.shape[-1] - win + 1, win) # strides = a.strides + (a.strides[-1], ) # wins = np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) wins = rolling_window(a, win, pad='ends', window_mode=win_mode) # apply rolling gradient to data a = map(lambda x: np.polyfit(np.arange(win), x, 1)[0], wins) return np.array(list(a))
[docs]def calc_grads(x, dat, keys=None, win=5, win_mode='mid'): """ Calculate gradients of values in dat. Parameters ---------- x : array like Independent variable for items in dat. dat : dict {key: dependent_variable} pairs keys : str or array-like Which keys in dict to calculate the gradient of. win : int The side of the rolling window for gradient calculation win_mode : str Describes the jusitification of the rolling window relative to the returned values. Can be 'left', 'mid' or 'right'. Returns ------- dict of gradients """ if keys is None: keys = dat.keys() def grad(xy): idx = np.isfinite(xy[1]) if sum(idx) > 2: try: return np.polyfit(xy[0][idx], xy[1][idx], 1)[0] except ValueError: return np.nan else: return np.nan xs = rolling_window(x, win, pad='repeat_ends', window_mode=win_mode) grads = Bunch() for k in keys: d = nominal_values(rolling_window(dat[k], win, pad='repeat_ends', window_mode=win_mode)) grads[k] = np.array(list(map(grad, zip(xs, d)))) return grads
[docs]def findmins(x, y): """ Function to find local minima. Parameters ---------- x, y : array_like 1D arrays of the independent (x) and dependent (y) variables. Returns ------- array_like Array of points in x where y has a local minimum. """ return x[np.r_[False, y[1:] < y[:-1]] & np.r_[y[:-1] < y[1:], False]]