import os
import re
import numpy as np
import warnings
from io import BytesIO
from scipy.stats import gaussian_kde
from scipy.optimize import curve_fit
from latools.helpers.helpers import Bunch, fastgrad, fastsmooth, findmins, bool_2_indices
from latools.helpers.stat_fns import gauss
# Functions to work with laser ablation signals
# Despiking functions
[docs]def noise_despike(sig, win=3, nlim=24., maxiter=4):
"""
Apply standard deviation filter to remove anomalous values.
Parameters
----------
win : int
The window used to calculate rolling statistics.
nlim : float
The number of standard deviations above the rolling
mean above which data are considered outliers.
Returns
-------
None
"""
if win % 2 != 1:
win += 1 # win must be odd
kernel = np.ones(win) / win # make convolution kernel
over = np.ones(len(sig), dtype=bool) # initialize bool array
# pad edges to avoid edge-effects
npad = int((win - 1) / 2)
over[:npad] = False
over[-npad:] = False
# set up monitoring
nloops = 0
# do the despiking
while any(over) and (nloops < maxiter):
rmean = np.convolve(sig, kernel, 'valid') # mean by convolution
rstd = rmean**0.5 # std = sqrt(signal), because count statistics
# identify where signal > mean + std * nlim (OR signa < mean - std *
# nlim)
# | (sig[npad:-npad] < rmean - nlim * rstd)
over[npad:-npad] = (sig[npad:-npad] > rmean + nlim * rstd)
# if any are over, replace them with mean of neighbours
if any(over):
# replace with values either side
# sig[over] = sig[np.roll(over, -1) | np.roll(over, 1)].reshape((sum(over), 2)).mean(1)
# replace with mean
sig[npad:-npad][over[npad:-npad]] = rmean[over[npad:-npad]]
nloops += 1
# repeat until no more removed.
return sig
[docs]def expdecay_despike(sig, expdecay_coef, tstep, maxiter=3):
"""
Apply exponential decay filter to remove physically impossible data based on instrumental washout.
The filter is re-applied until no more points are removed, or maxiter is reached.
Parameters
----------
exponent : float
Exponent used in filter
tstep : float
The time increment between data points.
maxiter : int
The maximum number of times the filter should be applied.
Returns
-------
None
"""
# determine rms noise of data
noise = np.std(sig[:5]) # initially, calculated based on first 5 points
# expand the selection up to 50 points, unless it dramatically increases
# the std (i.e. catches the 'laser on' region)
for i in [10, 20, 30, 50]:
inoise = np.std(sig[:i])
if inoise < 1.5 * noise:
noise = inoise
rms_noise3 = 3 * noise
i = 0
f = True
while (i < maxiter) and f:
# calculate low and high possibles values based on exponential decay
siglo = np.roll(sig * np.exp(tstep * expdecay_coef), 1)
sighi = np.roll(sig * np.exp(-tstep * expdecay_coef), -1)
# identify points that are outside these limits, beyond what might be explained
# by noise in the data
loind = (sig < siglo - rms_noise3) & (sig < np.roll(sig, -1) - rms_noise3)
hiind = (sig > sighi + rms_noise3) & (sig > np.roll(sig, 1) + rms_noise3)
# replace all such values with their preceding
sig[loind] = sig[np.roll(loind, -1)]
sig[hiind] = sig[np.roll(hiind, -1)]
f = any(np.concatenate([loind, hiind]))
i += 1
return sig
[docs]def read_data(data_file, dataformat, name_mode):
"""
Load data_file described by a dataformat dict.
Parameters
----------
data_file : str
Path to data file, including extension.
dataformat : dict
A dataformat dict, see example below.
name_mode : str
How to identyfy sample names. If 'file_names' uses the
input name of the file, stripped of the extension. If
'metadata_names' uses the 'name' attribute of the 'meta'
sub-dictionary in dataformat. If any other str, uses this
str as the sample name.
Example
-------
>>>
{'genfromtext_args': {'delimiter': ',',
'skip_header': 4}, # passed directly to np.genfromtxt
'column_id': {'name_row': 3, # which row contains the column names
'delimiter': ',', # delimeter between column names
'timecolumn': 0, # which column contains the 'time' variable
'pattern': '([A-z]{1,2}[0-9]{1,3})'}, # a regex pattern which captures the column names
'meta_regex': { # a dict of (line_no: ([descriptors], [regexs])) pairs
0: (['path'], '(.*)'),
2: (['date', 'method'], # MUST include date
'([A-Z][a-z]+ [0-9]+ [0-9]{4}[ ]+[0-9:]+ [amp]+).* ([A-z0-9]+\.m)')
}
}
Returns
-------
sample, analytes, data, meta : tuple
"""
with open(data_file) as f:
lines = f.readlines()
if 'meta_regex' in dataformat.keys():
meta = Bunch()
for k, v in dataformat['meta_regex'].items():
out = re.search(v[-1], lines[int(k)]).groups()
for i in np.arange(len(v[0])):
meta[v[0][i]] = out[i]
# sample name
if name_mode == 'file_names':
sample = os.path.basename(data_file).split('.')[0]
elif name_mode == 'metadata_names':
sample = meta['name']
else:
sample = name_mode
# column and analyte names
columns = np.array(lines[dataformat['column_id']['name_row']].strip().split(
dataformat['column_id']['delimiter']))
if 'pattern' in dataformat['column_id'].keys():
pr = re.compile(dataformat['column_id']['pattern'])
analytes = [pr.match(c).groups()[0] for c in columns if pr.match(c)]
# do any required pre-formatting
if 'preformat_replace' in dataformat.keys():
with open(data_file) as f:
fbuffer = f.read()
for k, v in dataformat['preformat_replace'].items():
fbuffer = re.sub(k, v, fbuffer)
# dead data
read_data = np.genfromtxt(BytesIO(fbuffer.encode()),
**dataformat['genfromtext_args']).T
else:
# read data
read_data = np.genfromtxt(data_file,
**dataformat['genfromtext_args']).T
# data dict
dind = np.ones(read_data.shape[0], dtype=bool)
dind[dataformat['column_id']['timecolumn']] = False
data = Bunch()
data['Time'] = read_data[dataformat['column_id']['timecolumn']]
# convert raw data into counts
# TODO: Is this correct? Should actually be per-analyte dwell?
# if 'unit' in dataformat:
# if dataformat['unit'] == 'cps':
# tstep = data['Time'][1] - data['Time'][0]
# read_data[dind] *= tstep
# else:
# pass
data['rawdata'] = Bunch(zip(analytes, read_data[dind]))
data['total_counts'] = read_data[dind].sum(0)
return sample, analytes, data, meta
[docs]def autorange(t, sig, gwin=7, swin=None, win=30,
on_mult=(1.5, 1.), off_mult=(1., 1.5),
nbin=10, thresh=None):
"""
Automatically separates signal and background in an on/off data stream.
**Step 1: Thresholding.**
The background signal is determined using a gaussian kernel density
estimator (kde) of all the data. Under normal circumstances, this
kde should find two distinct data distributions, corresponding to
'signal' and 'background'. The minima between these two distributions
is taken as a rough threshold to identify signal and background
regions. Any point where the trace crosses this thrshold is identified
as a 'transition'.
**Step 2: Transition Removal.**
The width of the transition regions between signal and background are
then determined, and the transitions are excluded from analysis. The
width of the transitions is determined by fitting a gaussian to the
smoothed first derivative of the analyte trace, and determining its
width at a point where the gaussian intensity is at at `conf` time the
gaussian maximum. These gaussians are fit to subsets of the data
centered around the transitions regions determined in Step 1, +/- `win`
data points. The peak is further isolated by finding the minima and
maxima of a second derivative within this window, and the gaussian is
fit to the isolated peak.
Parameters
----------
t : array-like
Independent variable (usually time).
sig : array-like
Dependent signal, with distinctive 'on' and 'off' regions.
gwin : int
The window used for calculating first derivative.
Defaults to 7.
swin : int
The window ised for signal smoothing. If None, ``gwin // 2``.
win : int
The width (c +/- win) of the transition data subsets.
Defaults to 20.
on_mult and off_mult : tuple, len=2
Control the width of the excluded transition regions, which is defined
relative to the peak full-width-half-maximum (FWHM) of the transition
gradient. The region n * FHWM below the transition, and m * FWHM above
the tranision will be excluded, where (n, m) are specified in `on_mult`
and `off_mult`.
`on_mult` and `off_mult` apply to the off-on and on-off transitions,
respectively.
Defaults to (1.5, 1) and (1, 1.5).
nbin : ind
Used to calculate the number of bins in the data histogram.
``bins = len(sig) // nbin``
Returns
-------
fbkg, fsig, ftrn, failed : tuple
where fbkg, fsig and ftrn are boolean arrays the same length as sig,
that are True where sig is background, signal and transition, respecively.
failed contains a list of transition positions where gaussian fitting
has failed.
"""
if swin is None:
swin = gwin // 2
failed = []
# smooth signal
sigs = fastsmooth(sig, swin)
if thresh is None:
# bins = 50
bins = sig.size // nbin
kde_x = np.linspace(sig.min(), sig.max(), bins)
kde = gaussian_kde(sigs)
yd = kde.pdf(kde_x)
mins = findmins(kde_x, yd) # find minima in kde
if len(mins) > 0:
bkg = sigs < (mins[0]) # set background as lowest distribution
else:
bkg = np.ones(sig.size, dtype=bool)
# bkg[0] = True # the first value must always be background
else:
bkg = sigs < thresh
# assign rough background and signal regions based on kde minima
fbkg = bkg
fsig = ~bkg
# remove transitions by fitting a gaussian to the gradients of
# each transition
# 1. calculate the absolute gradient of the target trace.
g = abs(fastgrad(sigs, gwin))
# 2. determine the approximate index of each transition
zeros = bool_2_indices(fsig)
if zeros is not None:
zeros = zeros.flatten()
for z in zeros: # for each approximate transition
# isolate the data around the transition
if z - win < 0:
lo = gwin // 2
hi = int(z + win)
elif z + win > (len(sig) - gwin // 2):
lo = int(z - win)
hi = len(sig) - gwin // 2
else:
lo = int(z - win)
hi = int(z + win)
xs = t[lo:hi]
ys = g[lo:hi]
# determine type of transition (on/off)
mid = (hi + lo) // 2
tp = sigs[mid + 3] > sigs[mid - 3] # True if 'on' transition.
# fit a gaussian to the first derivative of each
# transition. Initial guess parameters:
# - A: maximum gradient in data
# - mu: c
# - width: 2 * time step
# The 'sigma' parameter of curve_fit:
# This weights the fit by distance from c - i.e. data closer
# to c are more important in the fit than data further away
# from c. This allows the function to fit the correct curve,
# even if the data window has captured two independent
# transitions (i.e. end of one ablation and start of next)
# ablation are < win time steps apart).
c = t[z] # center of transition
width = (t[1] - t[0]) * 2
try:
pg, _ = curve_fit(gauss, xs, ys,
p0=(np.nanmax(ys),
c,
width),
sigma=(xs - c)**2 + .01)
# get the x positions when the fitted gaussian is at 'conf' of
# maximum
# determine transition FWHM
fwhm = abs(2 * pg[-1] * np.sqrt(2 * np.log(2)))
# apply on_mult or off_mult, as appropriate.
if tp:
lim = np.array([-fwhm, fwhm]) * on_mult + pg[1]
else:
lim = np.array([-fwhm, fwhm]) * off_mult + pg[1]
fbkg[(t > lim[0]) & (t < lim[1])] = False
fsig[(t > lim[0]) & (t < lim[1])] = False
except RuntimeError:
failed.append([c, tp])
pass
ftrn = ~fbkg & ~fsig
# if there are any failed transitions, exclude the mean transition width
# either side of the failures
if len(failed) > 0:
trns = t[bool_2_indices(ftrn)]
tr_mean = (trns[:, 1] - trns[:, 0]).mean() / 2
for f, tp in failed:
if tp:
ind = (t >= f - tr_mean *
on_mult[0]) & (t <= f + tr_mean * on_mult[0])
else:
ind = (t >= f - tr_mean *
off_mult[0]) & (t <= f + tr_mean * off_mult[0])
fsig[ind] = False
fbkg[ind] = False
ftrn[ind] = False
return fbkg, fsig, ftrn, [f[0] for f in failed]