"""
Functions for automatically distinguishing between signal and background
in LA-ICPMS data.
(c) Oscar Branson : https://github.com/oscarbranson
"""
import warnings
import numpy as np
from scipy.stats import gaussian_kde
from scipy.optimize import curve_fit
from sklearn.preprocessing import scale
from sklearn.cluster import KMeans
from ..helpers.utils import Bunch
from ..helpers.signal import fastgrad, fastsmooth, findmins, bool_2_indices, bool_transitions
from ..helpers.stat_fns import gauss
warnings.filterwarnings("ignore")
[docs]def separate_signal(X, transform=None, scaleX=True):
if transform is not None:
X = transform(X)
if scale:
X = scale(X)
init = np.percentile(X, [5, 95], 0)
return KMeans(2, init=init).fit_predict(X)
[docs]def log_nozero(a, **kwargs):
a[a == 0] = a[a != 0].min()
return np.log(a)
[docs]def autorange(xvar, sig, gwin=7, swin=None, win=30,
on_mult=(1.5, 1.), off_mult=(1., 1.5),
nbin=10, transform='log', thresh=None):
"""
Automatically separates signal and background in an on/off data stream.
**Step 1: Thresholding.**
KMeans clustering is used to identify data where the laser is 'on' vs
where the laser is 'off'.
**Step 2: Transition Removal.**
The width of the transition regions between signal and background are
then determined, and the transitions are excluded from analysis. The
width of the transitions is determined by fitting a gaussian to the
smoothed first derivative of the analyte trace, and determining its
width at a point where the gaussian intensity is at at `conf` time the
gaussian maximum. These gaussians are fit to subsets of the data
centered around the transitions regions determined in Step 1, +/- `win`
data points. The peak is further isolated by finding the minima and
maxima of a second derivative within this window, and the gaussian is
fit to the isolated peak.
Parameters
----------
xvar : array-like
Independent variable (usually time).
sig : array-like
Dependent signal, of shape (nsamples, nfeatures). Should be clear
distinction between laser 'on' and 'off' regions.
gwin : int
The window used for calculating first derivative.
Defaults to 7.
swin : int
The window used for signal smoothing. If None, signal is not smoothed.
win : int
The width (c +/- win) of the transition data subsets.
Defaults to 20.
on_mult and off_mult : tuple, len=2
Control the width of the excluded transition regions, which is defined
relative to the peak full-width-half-maximum (FWHM) of the transition
gradient. The region n * FHWM below the transition, and m * FWHM above
the tranision will be excluded, where (n, m) are specified in `on_mult`
and `off_mult`.
`on_mult` and `off_mult` apply to the off-on and on-off transitions,
respectively.
Defaults to (1.5, 1) and (1, 1.5).
transform : str
How to transform the data. Default is 'log'.
Returns
-------
fbkg, fsig, ftrn, failed : tuple
where fbkg, fsig and ftrn are boolean arrays the same length as sig,
that are True where sig is background, signal and transition, respecively.
failed contains a list of transition positions where gaussian fitting
has failed.
"""
failed = []
sig = np.asanyarray(sig)
# smooth signal
if swin is not None:
sigs = fastsmooth(sig, swin)
else:
sigs = sig
# transform signal
if transform == 'log':
tsigs = log_nozero(sigs)
else:
tsigs = sigs
if thresh is None:
if tsigs.ndim == 1:
scale = False
tsigs = tsigs.reshape(-1, 1)
else:
scale = True
fsig = separate_signal(tsigs, scaleX=scale).astype(bool)
else:
if transform == 'log':
thresh = np.log(thresh)
fsig = tsigs > thresh
fsig[0] = False # the first value must always be background
fbkg = ~fsig
# remove transitions by fitting a gaussian to the gradients of
# each transition
# 1. determine the approximate index of each transition
zeros = bool_2_indices(fsig)
if zeros is not None:
zeros = zeros.flatten()
if sigs.ndim > 1:
sigs = sigs.sum(axis=1)
# 2. calculate the absolute gradient of the target trace.
grad = abs(fastgrad(sigs, gwin)) # gradient of untransformed data.
for z in zeros: # for each approximate transition
# isolate the data around the transition
if z - win < 0:
lo = gwin // 2
hi = int(z + win)
elif z + win > (len(sig) - gwin // 2):
lo = int(z - win)
hi = len(sig) - gwin // 2
else:
lo = int(z - win)
hi = int(z + win)
xs = xvar[lo:hi]
ys = grad[lo:hi]
# determine type of transition (on/off)
mid = (hi + lo) // 2
tp = sigs[mid + 3] > sigs[mid - 3] # True if 'on' transition.
# fit a gaussian to the first derivative of each
# transition. Initial guess parameters:
# - A: maximum gradient in data
# - mu: c
# - width: 2 * time step
# The 'sigma' parameter of curve_fit:
# This weights the fit by distance from c - i.e. data closer
# to c are more important in the fit than data further away
# from c. This allows the function to fit the correct curve,
# even if the data window has captured two independent
# transitions (i.e. end of one ablation and start of next)
# ablation are < win time steps apart).
centre = xvar[z] # center of transition
width = (xvar[1] - xvar[0]) * 2
try:
pg, _ = curve_fit(gauss, xs, ys,
p0=(np.nanmax(ys),
centre,
width),
sigma=(xs - centre)**2 + .01)
# get the x positions when the fitted gaussian is at 'conf' of
# maximum
# determine transition FWHM
fwhm = abs(2 * pg[-1] * np.sqrt(2 * np.log(2)))
# apply on_mult or off_mult, as appropriate.
if tp:
lim = np.array([-fwhm, fwhm]) * on_mult + pg[1]
else:
lim = np.array([-fwhm, fwhm]) * off_mult + pg[1]
fbkg[(xvar > lim[0]) & (xvar < lim[1])] = False
fsig[(xvar > lim[0]) & (xvar < lim[1])] = False
except RuntimeError:
failed.append([centre, tp])
pass
ftrn = ~fbkg & ~fsig
# if there are any failed transitions, exclude the mean transition width
# either side of the failures
if len(failed) > 0:
trns = xvar[bool_2_indices(ftrn)]
tr_mean = (trns[:, 1] - trns[:, 0]).mean() / 2
for f, tp in failed:
if tp:
ind = (xvar >= f - tr_mean *
on_mult[0]) & (xvar <= f + tr_mean * on_mult[0])
else:
ind = (xvar >= f - tr_mean *
off_mult[0]) & (xvar <= f + tr_mean * off_mult[0])
fsig[ind] = False
fbkg[ind] = False
ftrn[ind] = False
return fbkg, fsig, ftrn, [f[0] for f in failed]
[docs]def autorange_components(t, sig, transform='log', gwin=7, swin=None,
win=30, on_mult=(1.5, 1.), off_mult=(1., 1.5),
thresh=None):
"""
Returns the components underlying the autorange algorithm.
Returns
-------
t : array-like
Time axis (independent variable)
sig : array-like
Raw signal (dependent variable)
sigs : array-like
Smoothed signal (swin)
tsig : array-like
Transformed raw signal (transform)
tsigs : array-like
Transformed smoothed signal (transform, swin)
kde_x : array-like
kernel density estimate of smoothed signal.
yd : array-like
bins of kernel density estimator.
g : array-like
gradient of smoothed signal (swin, gwin)
trans : dict
per-transition data.
thresh : float
threshold identified from kernel density plot
"""
failed = []
# smooth signal
if swin is not None:
sigs = fastsmooth(sig, swin)
else:
sigs = sig
# transform signal
if transform == 'log':
tsigs = np.log10(sigs)
tsig = np.log10(sig)
else:
tsigs = sigs
tsig = sig
if thresh is None:
bins = 50
kde_x = np.linspace(tsigs.min(), tsigs.max(), bins)
kde = gaussian_kde(tsigs)
yd = kde.pdf(kde_x)
mins = findmins(kde_x, yd) # find minima in kde
if len(mins) > 0:
bkg = tsigs < (mins[0]) # set background as lowest distribution
thresh = mins[0]
else:
bkg = np.ones(tsigs.size, dtype=bool)
else:
bkg = tsigs < thresh
# assign rough background and signal regions based on kde minima
fbkg = bkg
fsig = ~bkg
# remove transitions by fitting a gaussian to the gradients of
# each transition
# 1. determine the approximate index of each transition
zeros = bool_2_indices(fsig)
# 2. calculate the absolute gradient of the target trace.
g = abs(fastgrad(sigs, gwin)) # gradient of untransformed data.
if zeros is not None:
zeros = zeros.flatten()
trans = dict(zeros=zeros.flatten(),
lohi=[],
pgs=[],
excl=[],
tps=[],
failed=[],
xs=[],
ys=[])
for z in zeros: # for each approximate transition
# isolate the data around the transition
if z - win < 0:
lo = gwin // 2
hi = int(z + win)
elif z + win > (len(sig) - gwin // 2):
lo = int(z - win)
hi = len(sig) - gwin // 2
else:
lo = int(z - win)
hi = int(z + win)
xs = t[lo:hi]
ys = g[lo:hi]
trans['xs'].append(xs)
trans['ys'].append(ys)
trans['lohi'].append([lo, hi])
# determine type of transition (on/off)
mid = (hi + lo) // 2
tp = sigs[mid + 3] > sigs[mid - 3] # True if 'on' transition.
trans['tps'].append(tp)
c = t[z] # center of transition
width = (t[1] - t[0]) * 2 # initial width guess
try:
pg, _ = curve_fit(gauss, xs, ys,
p0=(np.nanmax(ys),
c,
width),
sigma=(xs - c)**2 + .01)
trans['pgs'].append(pg)
fwhm = abs(2 * pg[-1] * np.sqrt(2 * np.log(2)))
# apply on_mult or off_mult, as appropriate.
if tp:
lim = np.array([-fwhm, fwhm]) * on_mult + pg[1]
else:
lim = np.array([-fwhm, fwhm]) * off_mult + pg[1]
trans['excl'].append(lim)
fbkg[(t > lim[0]) & (t < lim[1])] = False
fsig[(t > lim[0]) & (t < lim[1])] = False
failed.append(False)
except RuntimeError:
failed.append(True)
trans['lohi'].append([np.nan, np.nan])
trans['pgs'].append([np.nan, np.nan, np.nan])
trans['excl'].append([np.nan, np.nan])
trans['tps'].append(tp)
pass
else:
zeros = []
return t, sig, sigs, tsig, tsigs, kde_x, yd, g, trans, thresh