Source code for latools.latools

"""
Main functions for interacting with LAtools.

(c) Oscar Branson : https://github.com/oscarbranson
"""

import configparser
import itertools
import inspect
import json
import os
import re
import time
import warnings
import dateutil
import textwrap

import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import pkg_resources as pkgrs

import uncertainties as unc
import uncertainties.unumpy as un
from uncertainties.unumpy import nominal_values, std_devs

from sklearn.preprocessing import minmax_scale, scale
from sklearn.cluster import KMeans
from scipy.optimize import curve_fit

from .helpers import plot
from .filtering import filters
from .filtering.classifier_obj import classifier

from .processes import read_data
from .preprocessing.split import long_file

from .D_obj import D
from .helpers import Bunch
from .helpers.plot import rangecalc
from .helpers.signal import rolling_window, enumerate_bool, calc_grads
from .helpers import logging
from .helpers.logging import _log
from .helpers.config import read_configuration, config_locator
from .helpers.stat_fns import gauss_weighted_stats, R2calc, stderr, un_interp1d, H15_mean, H15_std, H15_se
from .helpers import utils
from .helpers import srm as srms
from .helpers.progressbars import progressbar
from .helpers.chemistry import analyte_mass, decompose_molecule
from .helpers.analytes import get_analyte_name, analyte_2_massname, pretty_element, unitpicker, analyte_sort_fn, analyte_checker, split_analyte_ratios
from .helpers.io import get_date

idx = pd.IndexSlice  # multi-index slicing!

# deactivate IPython deprecations warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
# deactivate numpy invalid comparison warnings
np.seterr(invalid='ignore')


# TODO: Allow full sklearn integration by allowing sample-wise application of custom classifiers. i.e. Provide data collection (get_data) ajd filter addition API.
# Especially: PCA, Gaussian Mixture Models

# TODO: Move away from single `internal_standard` specification towards specifying multiple internal standards.

# TODO: Add 'smooth all' function.

[docs]class analyse(object): """ For processing and analysing whole LA - ICPMS datasets. Parameters ---------- data_path : str The path to a directory containing multiple data files. errorhunt : bool If True, latools prints the name of each file before it imports the data. This is useful for working out which data file is causing the import to fail. config : str The name of the configuration to use for the analysis. This determines which configuration set from the latools.cfg file is used, and overrides the default configuration setup. You might sepcify this if your lab routinely uses two different instruments. dataformat : str or dict Either a path to a data format file, or a dataformat dict. See documentation for more details. extension : str The file extension of your data files. Defaults to '.csv'. srm_identifier : str A string used to separate samples and standards. srm_identifier must be present in all standard measurements. Defaults to 'STD'. cmap : dict A dictionary of {analyte: colour} pairs. Colour can be any valid matplotlib colour string, RGB or RGBA sequence, or hex string. time_format : str A regex string identifying the time format, used by pandas when created a universal time scale. If unspecified (None), pandas attempts to infer the time format, but in some cases this might not work. internal_standard : str The name of the analyte used as an internal standard throughout analysis. file_structure : str This specifies whether latools should expect multiplte files in a folder ('multi') or a single file containing multiple analyses ('long'). Default is 'multi'. names : str or array-like If file_structure is 'multi', this should be either: * 'file_names' : use the file names as labels (default) * 'metadata_names' : used the 'names' attribute of metadata as the name anything else : use numbers. If file_structure is 'long', this should be a list of names for the ablations in the file. The wildcards '+' and '*' are supported in file names, and are used when the number of ablations does not match the number of sample names provided. If a sample name contains '+', all ablations that are not specified in the list are combined into a single file and given this name. If a sample name contains '*' these are analyses are numbered sequentially and split into separate files. For example, if you have 5 ablations with one standard at the start and stop you could provide one of: * names = ['std', 'sample+', 'std'], which would divide the long file into [std, sample (containing three ablations), std]. * names = ['std', 'sample+', 'std'], which would divide the long file into [std, sample0, sample1, sample2, std], where each name is associated with a single ablation. split_kwargs : dict Arguments to pass to latools.split.long_file() Attributes ---------- path : str Path to the directory containing the data files, as specified by `data_path`. dirname : str The name of the directory containing the data files, without the entire path. files : array_like A list of all files in `folder`. param_dir : str The directory where parameters are stored. report_dir : str The directory where plots are saved. data : dict A dict of `latools.D` data objects, labelled by sample name. samples : array_like A list of samples. analytes : array_like A list of analytes measured. stds : array_like A list of the `latools.D` objects containing hte SRM data. These must contain srm_identifier in the file name. srm_identifier : str A string present in the file names of all standards. cmaps : dict An analyte - specific colour map, used for plotting. """ def __init__(self, data_path, errorhunt=False, config='DEFAULT', dataformat=None, extension='.csv', srm_identifier='STD', cmap=None, time_format=None, internal_standard='Ca43', file_structure='multi', names='file_names', srm_file=None, pbar=None, split_kwargs={}): """ For processing and analysing whole LA - ICPMS datasets. """ # initialise log params = {k: v for k, v in locals().items() if k not in ['self', 'pbar']} self.log = ['__init__ :: args=() kwargs={}'.format(str(params))] # assign file paths self.path = os.path.realpath(data_path) self.parent_folder = os.path.dirname(self.path) # set line length for outputs self._line_width = 80 # make output directories self.report_dir = re.sub('//', '/', os.path.join(self.parent_folder, os.path.splitext(os.path.basename(self.path))[0] + '_reports/')) if not os.path.isdir(self.report_dir): os.mkdir(self.report_dir) self.export_dir = re.sub('//', '/', os.path.join(self.parent_folder, os.path.splitext(os.path.basename(self.path))[0] + '_export/')) if not os.path.isdir(self.export_dir): os.mkdir(self.export_dir) # set up file paths self._file_internal_standard_massfrac = os.path.join(self.export_dir, 'internal_standard_massfrac.csv') # load configuration parameters self.config = read_configuration(config) # print some info about the analysis and setup. startmsg = self._fill_line('-') + 'Starting analysis:' if srm_file is None or dataformat is None: startmsg += '\n Using {} configuration'.format(self.config['config']) if config == 'DEFAULT': startmsg += ' (default).' else: startmsg += '.' pretext = ' with' else: pretext = 'Using' if srm_file is not None: startmsg += '\n ' + pretext + ' custom srm_file ({})'.format(srm_file) if isinstance(dataformat, str): startmsg += '\n ' + pretext + ' custom dataformat file ({})'.format(dataformat) elif isinstance(dataformat, dict): startmsg += '\n ' + pretext + ' custom dataformat dict' print(startmsg) self._load_srmfile(srm_file) self._load_dataformat(dataformat) # link up progress bars if pbar is None: self.pbar = progressbar() else: self.pbar = pbar if file_structure == 'multi': self.files = np.array([f for f in os.listdir(self.path) if extension in f]) # load data into list (initialise D objects) with self.pbar.set(total=len(self.files), desc='Loading Data') as prog: data = [None] * len(self.files) for i, f in enumerate(self.files): data_passthrough = read_data(data_file=os.path.join(self.path, f), dataformat=self.dataformat, name_mode=names) data[i] = D(passthrough=(f, *data_passthrough)) # data[i] = (D(os.path.join(self.path, f), # dataformat=self.dataformat, # errorhunt=errorhunt, # cmap=cmap, # internal_standard=internal_standard, # name=names)) prog.update() elif file_structure == 'long': data = [] print(self.path) for data_passthrough in long_file(data_file=self.path, dataformat=self.dataformat, sample_list=names, passthrough=True, **split_kwargs): data.append(D(passthrough=data_passthrough)) # create universal time scale if 'date' in data[0].meta: if (time_format is None) and ('time_format' in self.dataformat): time_format = self.dataformat['time_format'] start_times = [] for d in data: start_times.append(get_date(d.meta['date'], time_format)) min_time = min(start_times) for d, st in zip(data, start_times): d.uTime = d.Time + (st - min_time).seconds else: ts = 0 for d in data: d.uTime = d.Time + ts ts += d.Time[-1] msg = self._wrap_text( "Time not determined from dataformat. Universal time scale " + "approximated as continuously measured samples. " + "Samples might not be in the right order. " "Background correction and calibration may not behave " + "as expected.") warnings.warn(self._wrap_msg(msg, '*')) self.max_time = max([d.uTime.max() for d in data]) # sort data by uTime data.sort(key=lambda d: d.uTime[0]) # process sample names if (names == 'file_names') | (names == 'metadata_names'): samples = np.array([s.sample.replace(' ', '') for s in data], dtype=object) # get all sample names # if duplicates, rename them usamples, ucounts = np.unique(samples, return_counts=True) if usamples.size != samples.size: dups = usamples[ucounts > 1] # identify duplicates nreps = ucounts[ucounts > 1] # identify how many times they repeat for d, n in zip(dups, nreps): # cycle through duplicates new = [d + '_{}'.format(i) for i in range(n)] # append number to duplicate names ind = samples == d samples[ind] = new # rename in samples for s, ns in zip([data[i] for i in np.where(ind)[0]], new): s.sample = ns # rename in D objects elif file_structure == 'long': samples = np.array([s.sample for s in data], dtype=object) else: samples = np.arange(len(data)) # assign a range of numbers for i, s in enumerate(samples): data[i].sample = s self.samples = samples # copy colour map to top level self.cmaps = data[0].cmap # get analytes # TODO: does this preserve the *order* of the analytes? all_analytes = set() extras = set() for d in data: all_analytes.update(d.analytes) extras.update(all_analytes.symmetric_difference(d.analytes)) self.analytes = all_analytes.difference(extras) mismatch = [] if self.analytes != all_analytes: smax = 0 for d in data: if d.analytes != self.analytes: mismatch.append((d.sample, d.analytes.difference(self.analytes))) if len(d.sample) > smax: smax = len(d.sample) msg = (self._fill_line('*') + 'All data files do not contain the same analytes.\n' + 'Only analytes present in all files will be processed.\n' + 'In the following files, these analytes will be excluded:\n') for s, a in mismatch: msg += (' {0: <' + '{:}'.format(smax + 2) + '}: ').format(s) + str(a) + '\n' msg += self._fill_line('*') warnings.warn(msg) # set for recording calculated ratios self.analyte_ratios = set() self.uncalibrated = set() if len(self.analytes) == 0: raise ValueError('No analyte names identified. Please check the \ncolumn_id > pattern ReGeX in your dataformat file.') if internal_standard in self.analytes: self.internal_standard = internal_standard else: self.internal_standard = None warnings.warn( self._wrap_text(f'The specified internal_standard {internal_standard} is not in the list of analytes ({self.analytes}). You will have to specify a valid analyte when calling the `ratio()` function later in the analysis.') ) self.internal_standard_concs = None self.minimal_analytes = set() # record which analytes are needed for calibration self.calibration_analytes = set() # keep record of which stages of processing have been performed self.stages_complete = set(['rawdata']) # From this point on, data stored in dicts self.data = Bunch(zip(self.samples, data)) # remove mismatch analytes - QUICK-FIX - SHOULD BE DONE HIGHER UP? for s, a in mismatch: self.data[s].analytes = self.data[s].analytes.difference(a) # get SRM info self.srm_identifier = srm_identifier self.stds = [] # make this a dict _ = [self.stds.append(s) for s in self.data.values() if self.srm_identifier in s.sample] self.srms_ided = False # set up focus_stage recording self.focus_stage = 'rawdata' self.stat_focus_stage = None self.focus = Bunch() # set up subsets self.clear_subsets() # remove any analytes for which all counts are zero # self.get_focus() # for a in self.analytes: # if np.nanmean(self.focus[a] == 0): # self.analytes.remove(a) # warnings.warn('{} contains no data - removed from analytes') # initialise classifiers self.classifiers = Bunch() # report print(('Loading Data:\n {:d} Data Files Loaded: {:d} standards, {:d} ' 'samples').format(len(self.data), len(self.stds), len(self.data) - len(self.stds))) astr = self._wrap_text('Analytes: ' + ' '.join(self.analytes_sorted())) print(astr) print(' Internal Standard: {}'.format(self.internal_standard)) def _fill_line(self, char, newline=True): """Generate a full line of given character""" if newline: return char * self._line_width + '\n' else: return char * self._line_width def _wrap_text(self, text): """Splits text over multiple lines to fit within self._line_width""" return '\n'.join(textwrap.wrap(text, width=self._line_width, break_long_words=False)) def _wrap_msg(self, msg, char): return self._fill_line(char) + msg + '\n' + self._fill_line(char, False) def _load_dataformat(self, dataformat): """ Load in dataformat. Check dataformat file exists, and store it in a class attribute. If dataformat is not provided during initialisation, assign it fom configuration file """ if dataformat is None: if os.path.exists(self.config['dataformat']): dataformat = self.config['dataformat'] elif os.path.exists(pkgrs.resource_filename('latools', self.config['dataformat'])): dataformat = pkgrs.resource_filename('latools', self.config['dataformat']) else: config_file = config_locator() raise ValueError(('The dataformat file specified in the ' + self.config['config'] + ' configuration cannot be found.\n' 'Please make sure the file exists, and that' 'the path in the config file is correct.\n' 'Your configurations can be found here:' ' {}\n'.format(config_file))) self.dataformat_file = dataformat else: self.dataformat_file = 'None: dict provided' # if it's a string, check the file exists and import it. if isinstance(dataformat, str): if os.path.exists(dataformat): # self.dataformat = eval(open(dataformat).read()) self.dataformat = json.load(open(dataformat)) else: warnings.warn(("The dataformat file (" + dataformat + ") cannot be found.\nPlease make sure the file " "exists, and that the path is correct.\n\nFile " "Path: " + dataformat)) # if it's a dict, just assign it straight away. elif isinstance(dataformat, dict): self.dataformat = dataformat def _load_srmfile(self, srm_file): """ Check srmfile exists, and store it in a class attribute. """ if srm_file is not None: if os.path.exists(srm_file): self.srmfile = srm_file else: raise ValueError(('Cannot find the specified SRM file:\n ' + srm_file + 'Please check that the file location is correct.')) else: if os.path.exists(self.config['srmfile']): self.srmfile = self.config['srmfile'] elif os.path.exists(pkgrs.resource_filename('latools', self.config['srmfile'])): self.srmfile = pkgrs.resource_filename('latools', self.config['srmfile']) else: config_file = config_locator() raise ValueError(('The SRM file specified in the ' + self.config['config'] + ' configuration cannot be found.\n' 'Please make sure the file exists, and that the ' 'path in the config file is correct.\n' 'Your configurations can be found here:' ' {}\n'.format(config_file))) def _get_samples(self, subset=None): """ Helper function to get sample names from subset. Parameters ---------- subset : str Subset name. If None, returns all samples. Returns ------- List of sample names """ if subset is None: samples = self.subsets['All_Samples'] if len(samples) == 0: print('Warning: No samples in dataset. Using all analyses.') samples = self.subsets['All_Analyses'] else: try: samples = self.subsets[subset] except KeyError: raise KeyError(("Subset '{:s}' does not ".format(subset) + "exist.\nUse 'make_subset' to create a" + "subset.")) return samples def _log_header(self): return ['# LATOOLS analysis log saved at {}'.format(time.strftime('%Y:%m:%d %H:%M:%S')), 'data_path :: {}'.format(self.path), '# Analysis Log Start: \n' ] def _analyte_checker(self, analytes=None, check_ratios=True, single=False, focus_stage=None): """ Return valid analytes depending on the analysis stage """ return analyte_checker(self, analytes=analytes, check_ratios=check_ratios, single=single, focus_stage=focus_stage)
[docs] def analytes_sorted(self, analytes=None, check_ratios=True, single=False, focus_stage=None): return sorted(self._analyte_checker(analytes=analytes, check_ratios=check_ratios, single=single, focus_stage=focus_stage), key=analyte_sort_fn)
[docs] @_log def basic_processing(self, noise_despiker=True, despike_win=3, despike_nlim=12., # despike args despike_maxiter=4, autorange_analyte='total_counts', autorange_gwin=5, autorange_swin=3, autorange_win=20, # autorange args autorange_on_mult=[1., 1.5], autorange_off_mult=[1.5, 1], autorange_transform='log', bkg_weight_fwhm=300., # bkg_calc_weightedmean bkg_n_min=20, bkg_n_max=None, bkg_cstep=None, bkg_filter=False, bkg_f_win=7, bkg_f_n_lim=3, bkg_errtype='stderr', # bkg_sub calib_drift_correct=True, # calibrate calib_srms_used=['NIST610', 'NIST612', 'NIST614'], calib_zero_intercept=True, calib_n_min=10, plots=True): self.despike(noise_despiker=noise_despiker, win=despike_win, nlim=despike_nlim, maxiter=despike_maxiter) self.autorange(analyte=autorange_analyte, gwin=autorange_gwin, swin=autorange_swin, win=autorange_win, on_mult=autorange_on_mult, off_mult=autorange_off_mult, transform=autorange_transform) if plots: self.trace_plots(ranges=True) self.bkg_calc_weightedmean(weight_fwhm=bkg_weight_fwhm, n_min=bkg_n_min, n_max=bkg_n_max, cstep=bkg_cstep, bkg_filter=bkg_filter, f_win=bkg_f_win, f_n_lim=bkg_f_n_lim) if plots: self.bkg_plot() self.bkg_subtract(errtype=bkg_errtype) self.ratio() self.calibrate(drift_correct=calib_drift_correct, srms_used=calib_srms_used, zero_intercept=calib_zero_intercept, n_min=calib_n_min) if plots: self.calibration_plot() return
[docs] @_log def autorange(self, analyte='total_counts', gwin=5, swin=3, win=20, on_mult=[1., 1.5], off_mult=[1.5, 1], transform='log', ploterrs=True, focus_stage='despiked', **kwargs): """ Automatically separates signal and background data regions. Automatically detect signal and background regions in the laser data, based on the behaviour of a single analyte. The analyte used should be abundant and homogenous in the sample. **Step 1: Thresholding.** The background signal is determined using a gaussian kernel density estimator (kde) of all the data. Under normal circumstances, this kde should find two distinct data distributions, corresponding to 'signal' and 'background'. The minima between these two distributions is taken as a rough threshold to identify signal and background regions. Any point where the trace crosses this thrshold is identified as a 'transition'. **Step 2: Transition Removal.** The width of the transition regions between signal and background are then determined, and the transitions are excluded from analysis. The width of the transitions is determined by fitting a gaussian to the smoothed first derivative of the analyte trace, and determining its width at a point where the gaussian intensity is at at `conf` time the gaussian maximum. These gaussians are fit to subsets of the data centered around the transitions regions determined in Step 1, +/- `win` data points. The peak is further isolated by finding the minima and maxima of a second derivative within this window, and the gaussian is fit to the isolated peak. Parameters ---------- analyte : str The analyte that autorange should consider. For best results, choose an analyte that is present homogeneously in high concentrations. This can also be 'total_counts' to use the sum of all analytes. gwin : int The smoothing window used for calculating the first derivative. Must be odd. win : int Determines the width (c +/- win) of the transition data subsets. smwin : int The smoothing window used for calculating the second derivative. Must be odd. conf : float The proportional intensity of the fitted gaussian tails that determines the transition width cutoff (lower = wider transition regions excluded). trans_mult : array_like, len=2 Multiples of the peak FWHM to add to the transition cutoffs, e.g. if the transitions consistently leave some bad data proceeding the transition, set trans_mult to [0, 0.5] to ad 0.5 * the FWHM to the right hand side of the limit. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'despiked', or rawdata' if not despiked. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. Returns ------- Outputs added as instance attributes. Returns None. bkg, sig, trn : iterable, bool Boolean arrays identifying background, signal and transision regions bkgrng, sigrng and trnrng : iterable (min, max) pairs identifying the boundaries of contiguous True regions in the boolean arrays. """ if focus_stage == 'despiked': if 'despiked' not in self.stages_complete: focus_stage = 'rawdata' if analyte in self.analytes: self.minimal_analytes.update([analyte]) fails = {} # list for catching failures. with self.pbar.set(total=len(self.data), desc='AutoRange') as prog: for s, d in self.data.items(): f = d.autorange(analyte=analyte, gwin=gwin, swin=swin, win=win, on_mult=on_mult, off_mult=off_mult, ploterrs=ploterrs, transform=transform, **kwargs) if f is not None: fails[s] = f prog.update() # advance progress bar # handle failures if len(fails) > 0: wstr = ('\n\n' + '*' * 41 + '\n' + ' WARNING\n' + '*' * 41 + '\n' + 'Autorange failed for some samples:\n') kwidth = max([len(k) for k in fails]) + 1 fstr = ' {:' + '{}'.format(kwidth) + 's}: ' for k in sorted(fails.keys()): wstr += fstr.format(k) + ', '.join(['{:.1f}'.format(f) for f in fails[k][-1]]) + '\n' wstr += ('\n*** THIS IS NOT NECESSARILY A PROBLEM ***\n' + 'But please check the plots below to make\n' + 'sure they look OK. Failures are marked by\n' + 'dashed vertical red lines.\n\n' + 'To examine an autorange failure in more\n' + 'detail, use the `autorange_plot` method\n' + 'of the failing data object, e.g.:\n' + "dat.data['Sample'].autorange_plot(params)\n" + '*' * 41 + '\n') warnings.warn(wstr) self.stages_complete.update(['autorange']) return
[docs] def find_expcoef(self, nsd_below=0., plot=False, trimlim=None, autorange_kwargs={}): """ Determines exponential decay coefficient for despike filter. Fits an exponential decay function to the washout phase of standards to determine the washout time of your laser cell. The exponential coefficient reported is `nsd_below` standard deviations below the fitted exponent, to ensure that no real data is removed. Total counts are used in fitting, rather than a specific analyte. Parameters ---------- nsd_below : float The number of standard deviations to subtract from the fitted coefficient when calculating the filter exponent. plot : bool or str If True, creates a plot of the fit, if str the plot is to the location specified in str. trimlim : float A threshold limit used in determining the start of the exponential decay region of the washout. Defaults to half the increase in signal over background. If the data in the plot don't fall on an exponential decay line, change this number. Normally you'll need to increase it. Returns ------- None """ print('Calculating exponential decay coefficient\nfrom SRM washouts...') def findtrim(tr, lim=None): trr = np.roll(tr, -1) trr[-1] = 0 if lim is None: lim = 0.5 * np.nanmax(tr - trr) ind = (tr - trr) >= lim return np.arange(len(ind))[ind ^ np.roll(ind, -1)][0] if not hasattr(self.stds[0], 'trnrng'): for s in self.stds: s.autorange(**autorange_kwargs, ploterrs=False) trans = [] times = [] for v in self.stds: for trnrng in v.trnrng[-1::-2]: tr = minmax_scale(v.data['total_counts'][(v.Time > trnrng[0]) & (v.Time < trnrng[1])]) sm = np.apply_along_axis(np.nanmean, 1, rolling_window(tr, 3, pad=0)) sm[0] = sm[1] trim = findtrim(sm, trimlim) + 2 trans.append(minmax_scale(tr[trim:])) times.append(np.arange(tr[trim:].size) * np.diff(v.Time[1:3])) times = np.concatenate(times) times = np.round(times, 2) trans = np.concatenate(trans) ti = [] tr = [] for t in np.unique(times): ti.append(t) tr.append(np.nanmin(trans[times == t])) def expfit(x, e): """ Exponential decay function. """ return np.exp(e * x) ep, ecov = curve_fit(expfit, ti, tr, p0=(-1.)) eeR2 = R2calc(trans, expfit(times, ep)) if plot: fig, ax = plt.subplots(1, 1, figsize=[6, 4]) ax.scatter(times, trans, alpha=0.2, color='k', marker='x', zorder=-2) ax.scatter(ti, tr, alpha=1, color='k', marker='o') fitx = np.linspace(0, max(ti)) ax.plot(fitx, expfit(fitx, ep), color='r', label='Fit') ax.plot(fitx, expfit(fitx, ep - nsd_below * np.diag(ecov)**.5, ), color='b', label='Used') ax.text(0.95, 0.75, ('y = $e^{%.2f \pm %.2f * x}$\n$R^2$= %.2f \nCoefficient: ' '%.2f') % (ep, np.diag(ecov)**.5, eeR2, ep - nsd_below * np.diag(ecov)**.5), transform=ax.transAxes, ha='right', va='top', size=12) ax.set_xlim(0, ax.get_xlim()[-1]) ax.set_xlabel('Time (s)') ax.set_ylim(-0.05, 1.05) ax.set_ylabel('Proportion of Signal') plt.legend() if isinstance(plot, str): fig.savefig(plot) self.expdecay_coef = ep - nsd_below * np.diag(ecov)**.5 print(' {:0.2f}'.format(self.expdecay_coef[0])) return
[docs] @_log def despike(self, expdecay_despiker=False, exponent=None, noise_despiker=True, win=3, nlim=12., exponentrace_plot=False, maxiter=4, autorange_kwargs={}, focus_stage='rawdata'): """ Despikes data with exponential decay and noise filters. Parameters ---------- expdecay_despiker : bool Whether or not to apply the exponential decay filter. exponent : None or float The exponent for the exponential decay filter. If None, it is determined automatically using `find_expocoef`. tstep : None or float The timeinterval between measurements. If None, it is determined automatically from the Time variable. noise_despiker : bool Whether or not to apply the standard deviation spike filter. win : int The rolling window over which the spike filter calculates the trace statistics. nlim : float The number of standard deviations above the rolling mean that data are excluded. exponentrace_plot : bool Whether or not to show a plot of the automatically determined exponential decay exponent. maxiter : int The max number of times that the fitler is applied. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'rawdata'. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. Returns ------- None """ if focus_stage != self.focus_stage: self.set_focus(focus_stage) if expdecay_despiker and exponent is None: if not hasattr(self, 'expdecay_coef'): self.find_expcoef(plot=exponentrace_plot, autorange_kwargs=autorange_kwargs) exponent = self.expdecay_coef time.sleep(0.1) with self.pbar.set(total=len(self.data), desc='Despiking') as prog: for d in self.data.values(): d.despike(expdecay_despiker, exponent, noise_despiker, win, nlim, maxiter) prog.update() self.stages_complete.update(['despiked']) self.focus_stage = 'despiked' return
# functions for background correction
[docs] def get_background(self, n_min=10, n_max=None, focus_stage='despiked', bkg_filter=False, f_win=5, f_n_lim=3): """ Extract all background data from all samples on universal time scale. Used by both 'polynomial' and 'weightedmean' methods. Parameters ---------- n_min : int The minimum number of points a background region must have to be included in calculation. n_max : int The maximum number of points a background region must have to be included in calculation. filter : bool If true, apply a rolling filter to the isolated background regions to exclude regions with anomalously high values. If True, two parameters alter the filter's behaviour: f_win : int The size of the rolling window f_n_lim : float The number of standard deviations above the rolling mean to set the threshold. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'despiked' if present, or 'rawdata' if not. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. Returns ------- pandas.DataFrame object containing background data. """ allbkgs = {'uTime': [], 'ns': []} if focus_stage == 'despiked': if 'despiked' not in self.stages_complete: focus_stage = 'rawdata' for a in self.analytes: allbkgs[a] = [] n0 = 0 for s in self.data.values(): if sum(s.bkg) > 0: allbkgs['uTime'].append(s.uTime[s.bkg]) allbkgs['ns'].append(enumerate_bool(s.bkg, n0)[s.bkg]) n0 = allbkgs['ns'][-1][-1] for a in self.analytes: allbkgs[a].append(s.data[focus_stage][a][s.bkg]) allbkgs.update((k, np.concatenate(v)) for k, v in allbkgs.items()) bkgs = pd.DataFrame(allbkgs) # using pandas here because it's much more efficient than loops. self.bkg = Bunch() # extract background data from whole dataset if n_max is None: self.bkg['raw'] = bkgs.groupby('ns').filter(lambda x: len(x) > n_min) else: self.bkg['raw'] = bkgs.groupby('ns').filter(lambda x: (len(x) > n_min) & (len(x) < n_max)) # calculate per - background region stats self.bkg['summary'] = self.bkg['raw'].groupby('ns').aggregate([np.mean, np.std, stderr]) # sort summary by uTime self.bkg['summary'].sort_values(('uTime', 'mean'), inplace=True) # self.bkg['summary'].index = np.arange(self.bkg['summary'].shape[0]) # self.bkg['summary'].index.name = 'ns' if bkg_filter: # calculate rolling mean and std from summary t = self.bkg['summary'].loc[:, idx[:, 'mean']] r = t.rolling(f_win).aggregate([np.nanmean, np.nanstd]) # calculate upper threshold upper = r.loc[:, idx[:, :, 'nanmean']] + f_n_lim * r.loc[:, idx[:, :, 'nanstd']].values # calculate which are over upper threshold over = r.loc[:, idx[:, :, 'nanmean']] > np.roll(upper.values, 1, 0) # identify them ns_drop = over.loc[over.apply(any, 1), :].index.values # drop them from summary self.bkg['summary'].drop(ns_drop, inplace=True) # remove them from raw ind = np.ones(self.bkg['raw'].shape[0], dtype=bool) for ns in ns_drop: ind = ind & (self.bkg['raw'].loc[:, 'ns'] != ns) self.bkg['raw'] = self.bkg['raw'].loc[ind, :] return
[docs] @_log def bkg_calc_weightedmean(self, analytes=None, weight_fwhm=600, n_min=20, n_max=None, cstep=None, errtype='stderr', bkg_filter=False, f_win=7, f_n_lim=3, focus_stage='despiked'): """ Background calculation using a gaussian weighted mean. Parameters ---------- analytes : str or iterable Which analyte or analytes to calculate. weight_fwhm : float The full-width-at-half-maximum of the gaussian used to calculate the weighted average. n_min : int Background regions with fewer than n_min points will not be included in the fit. cstep : float or None The interval between calculated background points. filter : bool If true, apply a rolling filter to the isolated background regions to exclude regions with anomalously high values. If True, two parameters alter the filter's behaviour: f_win : int The size of the rolling window f_n_lim : float The number of standard deviations above the rolling mean to set the threshold. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'despiked' if present, or 'rawdata' if not. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. """ if analytes is None: self.bkg = Bunch() analytes = self._analyte_checker(analytes=analytes) self.get_background(n_min=n_min, n_max=n_max, bkg_filter=bkg_filter, f_win=f_win, f_n_lim=f_n_lim, focus_stage=focus_stage) # Gaussian - weighted average if 'calc' not in self.bkg: # create time points to calculate background if cstep is None: cstep = weight_fwhm / 20 elif cstep > weight_fwhm: warnings.warn("\ncstep should be less than weight_fwhm. Your backgrounds\n" + "might not behave as expected.\n") bkg_t = np.linspace(0, self.max_time, int(self.max_time // cstep)) self.bkg['calc'] = Bunch() self.bkg['calc']['uTime'] = bkg_t # TODO : calculation then dict assignment is clumsy... mean, std, stderr = gauss_weighted_stats(x=self.bkg['raw'].uTime.values, yarray=self.bkg['raw'].loc[:, list(analytes)].values, x_new=self.bkg['calc']['uTime'], fwhm=weight_fwhm) self.bkg_interps = {} for i, a in enumerate(analytes): self.bkg['calc'][a] = {'mean': mean[i], 'std': std[i], 'stderr': stderr[i]} self.bkg_interps[a] = un_interp1d(x=self.bkg['calc']['uTime'], y=un.uarray(self.bkg['calc'][a]['mean'], self.bkg['calc'][a][errtype]))
[docs] @_log def bkg_calc_interp1d(self, analytes=None, kind=1, n_min=10, n_max=None, cstep=30, bkg_filter=False, f_win=7, f_n_lim=3, errtype='stderr', focus_stage='despiked'): """ Background calculation using a 1D interpolation. scipy.interpolate.interp1D is used for interpolation. Parameters ---------- analytes : str or iterable Which analyte or analytes to calculate. kind : str or int Integer specifying the order of the spline interpolation used, or string specifying a type of interpolation. Passed to `scipy.interpolate.interp1D`. n_min : int Background regions with fewer than n_min points will not be included in the fit. cstep : float or None The interval between calculated background points. filter : bool If true, apply a rolling filter to the isolated background regions to exclude regions with anomalously high values. If True, two parameters alter the filter's behaviour: f_win : int The size of the rolling window f_n_lim : float The number of standard deviations above the rolling mean to set the threshold. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'despiked' if present, or 'rawdata' if not. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. """ if analytes is None: self.bkg = Bunch() analytes = self._analyte_checker(analytes=analytes) self.get_background(n_min=n_min, n_max=n_max, bkg_filter=bkg_filter, f_win=f_win, f_n_lim=f_n_lim, focus_stage=focus_stage) def pad(a, lo=None, hi=None): if lo is None: lo = [a[0]] if hi is None: hi = [a[-1]] return np.concatenate((lo, a, hi)) if 'calc' not in self.bkg: # create time points to calculate background bkg_t = pad(np.ravel(self.bkg.raw.loc[:, ['uTime', 'ns']].groupby('ns').aggregate([min, max]))) bkg_t = np.unique(np.sort(np.concatenate([bkg_t, np.arange(0, self.max_time, cstep)]))) self.bkg['calc'] = Bunch() self.bkg['calc']['uTime'] = bkg_t d = self.bkg['summary'] self.bkg_interps = {} with self.pbar.set(total=len(analytes), desc='Calculating Analyte Backgrounds') as prog: for a in analytes: fill_vals = (un.uarray(d.loc[:, (a, 'mean')].iloc[0], d.loc[:, (a, errtype)].iloc[0]), un.uarray(d.loc[:, (a, 'mean')].iloc[-1], d.loc[:, (a, errtype)].iloc[-1])) p = un_interp1d(x=d.loc[:, ('uTime', 'mean')], y=un.uarray(d.loc[:, (a, 'mean')], d.loc[:, (a, errtype)]), kind=kind, bounds_error=False, fill_value=fill_vals) self.bkg_interps[a] = p self.bkg['calc'][a] = {'mean': p.new_nom(self.bkg['calc']['uTime']), errtype: p.new_std(self.bkg['calc']['uTime'])} prog.update() # self.bkg['calc'] return
[docs] @_log def bkg_subtract(self, analytes=None, errtype='stderr', focus_stage='despiked'): """ Subtract calculated background from data. Must run bkg_calc first! Parameters ---------- analytes : str or iterable Which analyte(s) to subtract. errtype : str Which type of error to propagate. default is 'stderr'. focus_stage : str Which stage of analysis to apply processing to. Defaults to 'despiked' if present, or 'rawdata' if not. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. """ analytes = self._analyte_checker(analytes) if focus_stage == 'despiked': if 'despiked' not in self.stages_complete: focus_stage = 'rawdata' # make uncertainty-aware background interpolators # bkg_interps = {} # for a in analytes: # bkg_interps[a] = un_interp1d(x=self.bkg['calc']['uTime'], # y=un.uarray(self.bkg['calc'][a]['mean'], # self.bkg['calc'][a][errtype])) # self.bkg_interps = bkg_interps # apply background corrections with self.pbar.set(total=len(self.data), desc='Background Subtraction') as prog: for d in self.data.values(): # [d.bkg_subtract(a, bkg_interps[a].new(d.uTime), None, focus_stage=focus_stage) for a in analytes] [d.bkg_subtract(a, self.bkg_interps[a].new(d.uTime), ~d.sig, focus_stage=focus_stage) for a in analytes] d.setfocus('bkgsub') prog.update() self.stages_complete.update(['bkgsub']) self.focus_stage = 'bkgsub' return
[docs] @_log def correct_spectral_interference(self, target_analyte, source_analyte, f): """ Correct spectral interference. Subtract interference counts from target_analyte, based on the intensity of a source_analayte and a known fractional contribution (f). Correction takes the form: target_analyte -= source_analyte * f Only operates on background-corrected data ('bkgsub'). To undo a correction, rerun `self.bkg_subtract()`. Example ------- To correct 44Ca+ for an 88Sr++ interference, where both 43.5 and 44 Da peaks are known: f = abundance(88Sr) / (abundance(87Sr) counts(44Ca) = counts(44 Da) - counts(43.5 Da) * f Parameters ---------- target_analyte : str The name of the analyte to modify. source_analyte : str The name of the analyte to base the correction on. f : float The fraction of the intensity of the source_analyte to subtract from the target_analyte. Correction is: target_analyte - source_analyte * f Returns ------- None """ if target_analyte not in self.analytes: raise ValueError('target_analyte: {:} not in available analytes ({:})'.format(target_analyte, ', '.join(self.analytes))) if source_analyte not in self.analytes: raise ValueError('source_analyte: {:} not in available analytes ({:})'.format(source_analyte, ', '.join(self.analytes))) with self.pbar.set(total=len(self.data), desc='Interference Correction') as prog: for d in self.data.values(): d.correct_spectral_interference(target_analyte, source_analyte, f) prog.update()
[docs] @_log def bkg_plot(self, analytes=None, figsize=None, yscale='log', ylim=None, err='stderr', save=True): """ Plot the calculated background. Parameters ---------- analytes : str or iterable Which analyte(s) to plot. figsize : tuple The (width, height) of the figure, in inches. If None, calculated based on number of samples. yscale : str 'log' (default) or 'linear'. ylim : tuple Manually specify the y scale. err : str What type of error to plot. Default is stderr. save : bool If True, figure is saved. Returns ------- fig, ax : matplotlib.figure, matplotlib.axes """ # if not hasattr(self, 'bkg'): # raise ValueError("\nPlease calculate a background before attempting to\n" + # "plot it... either:\n" + # " bkg_calc_interp1d\n" + # " bkg_calc_weightedmean\n") if not hasattr(self, 'bkg'): self.get_background() analytes = self._analyte_checker(analytes) if figsize is None: if len(self.samples) > 50: figsize = (len(self.samples) * 0.2, 5) else: figsize = (10, 5) fig = plt.figure(figsize=figsize) ax = fig.add_axes([.07, .1, .84, .8]) with self.pbar.set(total=len(analytes), desc='Plotting backgrounds') as prog: for a in analytes: # draw data points ax.scatter(self.bkg['raw'].uTime, self.bkg['raw'].loc[:, a], alpha=0.5, s=3, c=self.cmaps[a], lw=0.5) # draw STD boxes for i, r in self.bkg['summary'].iterrows(): x = (r.loc['uTime', 'mean'] - r.loc['uTime', 'std'] * 2, r.loc['uTime', 'mean'] + r.loc['uTime', 'std'] * 2) yl = [r.loc[a, 'mean'] - r.loc[a, err]] * 2 yu = [r.loc[a, 'mean'] + r.loc[a, err]] * 2 ax.fill_between(x, yl, yu, alpha=0.8, lw=0.5, color=self.cmaps[a], zorder=1) prog.update() if yscale == 'log': ax.set_yscale('log') if ylim is not None: ax.set_ylim(ylim) else: ax.set_ylim(ax.get_ylim() * np.array([1, 10])) # x10 to make sample names readable. if 'calc' in self.bkg: for a in analytes: # draw confidence intervals of calculated x = self.bkg['calc']['uTime'] y = self.bkg['calc'][a]['mean'] yl = self.bkg['calc'][a]['mean'] - self.bkg['calc'][a][err] yu = self.bkg['calc'][a]['mean'] + self.bkg['calc'][a][err] # trim values below zero if log scale= if yscale == 'log': yl[yl < ax.get_ylim()[0]] = ax.get_ylim()[0] ax.plot(x, y, c=self.cmaps[a], zorder=2, label=pretty_element(a)) ax.fill_between(x, yl, yu, color=self.cmaps[a], alpha=0.3, zorder=-1) else: for a in analytes: ax.plot([], [], c=self.cmaps[a], label=pretty_element(a)) ax.set_xlabel('Time (s)') ax.set_ylabel('Background Counts') ax.set_title('Points = raw data; Bars = {:s}; Lines = Calculated Background; Envelope = Background {:s}'.format(err, err), fontsize=10) ha, la = ax.get_legend_handles_labels() ax.legend(labels=la[:len(analytes)], handles=ha[:len(analytes)], bbox_to_anchor=(1, 1)) # scale x axis to range ± 2.5% xlim = [0, max([d.uTime[-1] for d in self.data.values()])] ax.set_xlim(xlim) # add sample labels for s, d in self.data.items(): ax.axvline(d.uTime[0], alpha=0.2, color='k', zorder=-1) ax.text(d.uTime[0], ax.get_ylim()[1], s, rotation=90, va='top', ha='left', zorder=-1, fontsize=7) if save: fig.savefig(self.report_dir + '/background.png', dpi=200) return fig, ax
# functions for calculating ratios
[docs] @_log def ratio(self, internal_standard=None, analytes=None, focus_stage=None): """ Calculates the ratio of all analytes to a single analyte. Parameters ---------- internal_standard : str The name of the analyte to divide all other analytes by. Returns ------- None """ if focus_stage is None: focus_stage = self.focus_stage if 'bkgsub' not in self.stages_complete: raise RuntimeError('Cannot calculate ratios before background subtraction.') analytes = self._analyte_checker(analytes, focus_stage=focus_stage) if internal_standard is not None: self.internal_standard = internal_standard if self.internal_standard in self.analytes.union(self.analyte_ratios): self.minimal_analytes.update([internal_standard]) self.calibration_analytes.update([internal_standard]) self.calibration_analytes.update(analytes) else: raise ValueError('The internal standard ({}) is not amongst the '.format(internal_standard) + 'analytes in\nyour data files. Please make sure it is specified correctly.') # check internal_standard is valid internal_standard = self._analyte_checker(self.internal_standard, focus_stage=focus_stage).pop() with self.pbar.set(total=len(self.data), desc='Ratio Calculation') as prog: for s in self.data.values(): s.ratio(internal_standard=internal_standard, analytes=analytes, focus_stage=focus_stage) self.analyte_ratios.update(s.analyte_ratios) self.cmaps.update(s.cmap) prog.update() if self.focus_stage not in ['ratios', 'calibrated', 'mass_fraction']: self.stages_complete.update(['ratios']) self.focus_stage = 'ratios' return
[docs] def srm_load_database(self, srms_used=None, reload=False): if not hasattr(self, 'srmdat') or reload: # load SRM info srmdat = srms.read_table(self.srmfile) srmdat = srmdat.loc[srms_used] srmdat.reset_index(inplace=True) srmdat.set_index(['SRM', 'Item'], inplace=True) # calculate ratios to internal_standard for calibration ratios analyte_srm_link = {} warns = {} self.uncalibrated = set() self._analytes_missing_from_srm = set() # create an empty SRM table srmtab = pd.DataFrame(index=srms_used, columns=pd.MultiIndex.from_product([self.analyte_ratios, ['mean', 'err']])) for srm in srms_used: srm_nocal = set() srmsub = srmdat.loc[srm] # determine analyte - Item pairs in table ad = {} for ar in self.analyte_ratios: a_num, a_denom = ar.split('_') # separate numerator and denominator for a in [a_num, a_denom]: if a in ad: continue # check if there's an exact match of form [Mass][Element] in srmdat mna = analyte_2_massname(a) if mna in srmsub.index: ad[a] = mna else: # if not, match by element name. item = srmsub.index[srmsub.index.str.contains(get_analyte_name(a))].values if len(item) > 1: item = item[item == get_analyte_name(a)] if len(item) == 1: ad[a] = item[0] else: if srm not in warns: warns[srm] = [] warns[srm].append(a) srm_nocal.update([ar]) analyte_srm_link[srm] = ad # build calibration database for given ratios for a in self.analyte_ratios.difference(srm_nocal): a_num, a_denom = a.split('_') # calculate SRM polyatom multiplier (multiplier to account for stoichiometry, # e.g. if internal standard is Na, N will be 2 if measured in SRM as Na2O) N_denom = float(decompose_molecule(ad[a_denom])[get_analyte_name(a_denom)]) N_num = float(decompose_molecule(ad[a_num])[get_analyte_name(a_num)]) # calculate molar ratio srmtab.loc[srm, (a, 'mean')] = ((srmdat.loc[(srm, ad[a_num]), 'mol/g'] * N_num) / (srmdat.loc[(srm, ad[a_denom]), 'mol/g'] * N_denom)) srmtab.loc[srm, (a, 'err')] = (srmtab.loc[srm, (a, 'mean')] * ((srmdat.loc[(srm, ad[a_num]), 'mol/g_err'] / (srmdat.loc[(srm, ad[a_num]), 'mol/g']))**2 + (srmdat.loc[(srm, ad[a_denom]), 'mol/g_err'] / (srmdat.loc[(srm, ad[a_denom]), 'mol/g']))**2)**0.5) # where uncertainties are missing, replace with zeros srmtab[srmtab.loc[:, idx[:, 'err']].isnull()] = 0 # record outputs self.srmdat = srmdat # the full SRM table self._analyte_srmdat_link = analyte_srm_link # dict linking analyte names to rows in srmdat self.srmtab = srmtab.astype(float) # a summary of relevant mol/mol values only # record which analytes have missing CRM data means = self.srmtab.loc[:, idx[:, 'mean']] means.columns = means.columns.droplevel(1) self._analytes_missing_srm = means.columns.values[means.isnull().any()] # analyte ratios missing from SRM table self._srm_id_analyte_ratios = means.columns.values[~means.isnull().any()] # analyte ratioes identified # self._calib_analyte_ratios = means.columns.values[~means.isnull().all()] self.uncalibrated.intersection_update(srm_nocal) self._analytes_missing_from_srm.update(srm_nocal) # Print any warnings if len(warns) > 0: print('WARNING: Some analytes are not present in the SRM database for some standards:') for srm, a in warns.items(): print(f' {srm}: ' + ', '.join(self.analytes_sorted(a, focus_stage='bkgsub'))) if len(self.uncalibrated) > 0: self.analyte_ratios.difference_update(self.uncalibrated) print('WARNING: Some analytes are not present in the SRM database for ANY standards:') print(f'{self.uncalibrated} have been removed from further analysis.')
[docs] def srm_compile_measured(self, n_min=10, focus_stage='ratios'): """ Compile mean and standard errors of measured SRMs Parameters ---------- n_min : int The minimum number of points to consider as a valid measurement. Default = 10. """ warns = [] # compile mean and standard errors of samples for s in self.stds: s_stdtab = pd.DataFrame(columns=pd.MultiIndex.from_product([s.analyte_ratios, ['err', 'mean']])) s_stdtab.index.name = 'uTime' if not s.n > 0: s.stdtab = s_stdtab continue for n in range(1, s.n + 1): ind = s.ns == n if sum(ind) >= n_min: for a in s.analyte_ratios: aind = ind & ~np.isnan(nominal_values(s.data[focus_stage][a])) s_stdtab.loc[np.nanmean(s.uTime[s.ns == n]), (a, 'mean')] = np.nanmean(nominal_values(s.data[focus_stage][a][aind])) s_stdtab.loc[np.nanmean(s.uTime[s.ns == n]), (a, 'err')] = np.nanstd(nominal_values(s.data[focus_stage][a][aind])) / np.sqrt(sum(aind)) else: warns.append(' Ablation {:} of SRM measurement {:} ({:} points)'.format(n, s.sample, sum(ind))) # sort column multiindex s_stdtab = s_stdtab.loc[:, s_stdtab.columns.sort_values()] # sort row index s_stdtab.sort_index(inplace=True) # create 'SRM' column for naming SRM s_stdtab.loc[:, 'STD'] = s.sample s.stdtab = s_stdtab if len(warns) > 0: print('WARNING: Some SRM ablations have been excluded because they do not contain enough data:') print('\n'.join(warns)) print("To *include* these ablations, reduce the value of n_min (currently {:})".format(n_min)) # compile them into a table stdtab = pd.concat([s.stdtab for s in self.stds]).apply(pd.to_numeric, 1, errors='ignore') stdtab = stdtab.reindex(self.analytes_sorted(self.analyte_ratios, focus_stage=focus_stage) + ['STD'], level=0, axis=1) # identify groups of consecutive SRMs ts = stdtab.index.values start_times = [s.uTime[0] for s in self.data.values()] lastpos = sum(ts[0] > start_times) group = [1] for t in ts[1:]: pos = sum(t > start_times) rpos = pos - lastpos if rpos <= 1: group.append(group[-1]) else: group.append(group[-1] + 1) lastpos = pos stdtab.loc[:, 'group'] = group # calculate centre time for the groups stdtab.loc[:, 'gTime'] = np.nan for g, d in stdtab.groupby('group'): ind = stdtab.group == g stdtab.loc[ind, 'gTime'] = stdtab.loc[ind].index.values.mean() # replace zeros with very small number stdtab.replace(0, np.nanmin(stdtab[stdtab != 0].loc[:, idx[:, 'mean']]) * 1e-6, inplace=True) self.stdtab = stdtab
[docs] def srm_id_auto(self, srms_used=['NIST610', 'NIST612', 'NIST614'], analytes=None, n_min=10, reload_srm_database=False): """ Function for automarically identifying SRMs using KMeans clustering. KMeans is performed on the log of SRM composition, which aids separation of relatively similar SRMs within a large compositional range. Parameters ---------- srms_used : iterable Which SRMs have been used. Must match SRM names in SRM database *exactly* (case sensitive!). analytes : array_like Which analyte ratios to base the identification on. If None, all analyte ratios are used (default). n_min : int The minimum number of data points a SRM measurement must contain to be included. reload_srm_database : bool Whether or not to re-load the SRM database before running the function. """ # TODO: srm_id_plot! if isinstance(srms_used, str): srms_used = [srms_used] # reload SRM database (if reloard_srm_databse=True) self.srm_load_database(srms_used, reload_srm_database) analytes = self._analyte_checker(analytes) analytes.difference_update(self._analytes_missing_srm) analytes = list(analytes) # get and scale mean srm values for all analytes srmid = self.srmtab.loc[:, idx[list(analytes), 'mean']] _srmid = scale(np.log(srmid)) srm_labels = srmid.index.values # get and scale measured srm values for all analytes stdid = self.stdtab.loc[:, idx[list(analytes), 'mean']] _stdid = scale(np.log(stdid)) _stdid[np.isnan(_stdid)] = -12 # fit KMeans classifier to srm database classifier = KMeans(len(srms_used)).fit(_srmid) # apply classifier to measured data std_classes = classifier.predict(_stdid) # get srm names from classes std_srm_labels = np.array([srm_labels[np.argwhere(classifier.labels_ == i)][0][0] for i in std_classes]) self.stdtab.loc[:, 'SRM'] = std_srm_labels self._srm_key_dict = {k: v for k, v in zip(self.stdtab.STD, self.stdtab.SRM)} self.srms_ided = True self.srm_build_calib_table()
[docs] def srm_build_calib_table(self): """ Combine SRM database values and identified measured values into a calibration database. """ caltab = self.stdtab.reset_index() caltab.set_index(['gTime', 'uTime'], inplace=True) levels = ['meas_' + c if c != '' else c for c in caltab.columns.levels[1]] caltab.columns = caltab.columns.set_levels(levels, level=1) for a in self.analyte_ratios: caltab.loc[:, (a, 'srm_mean')] = self.srmtab.loc[caltab.SRM, (a, 'mean')].values caltab.loc[:, (a, 'srm_err')] = self.srmtab.loc[caltab.SRM, (a, 'err')].values self.caltab = caltab.reindex(self.stdtab.columns.levels[0], axis=1, level=0)
[docs] def clear_calibration(self): if self.srms_ided: del self.stdtab del self.srmdat del self.srmtab self.srms_ided = False if 'calibrated' in self.stages_complete: del self.calib_params del self.calib_ps self.stages_complete.remove('calibrated') self.focus_stage = 'ratios' self.set_focus('ratios')
# apply calibration to data
[docs] @_log def calibrate(self, analytes=None, drift_correct=True, srms_used=['NIST610', 'NIST612', 'NIST614'], zero_intercept=True, n_min=10, reload_srm_database=False): """ Calibrates the data to measured SRM values. Assumes that y intercept is zero. Parameters ---------- analytes : str or iterable Which analytes you'd like to calibrate. Defaults to all. drift_correct : bool Whether to pool all SRM measurements into a single calibration, or vary the calibration through the run, interpolating coefficients between measured SRMs. srms_used : str or iterable Which SRMs have been measured. Must match names given in SRM data file *exactly*. n_min : int The minimum number of data points an SRM measurement must have to be included. Returns ------- None """ # load SRM database self.srm_load_database(srms_used, reload_srm_database) # compile measured SRM data self.srm_compile_measured(n_min) analytes = self._analyte_checker(analytes) if isinstance(srms_used, str): srms_used = [srms_used] if not hasattr(self, 'srmtabs'): self.srm_id_auto(srms_used=srms_used, n_min=n_min, reload_srm_database=reload_srm_database) # make container for calibration params gTime = np.asanyarray(self.caltab.index.levels[0]) if not hasattr(self, 'calib_params'): self.calib_params = pd.DataFrame(columns=pd.MultiIndex.from_product([list(analytes), ['m']]), index=gTime) if zero_intercept: fn = lambda x, m: x * m else: fn = lambda x, m, c: x * m + c for a in analytes: if zero_intercept: if (a, 'c') in self.calib_params: self.calib_params.drop((a, 'c'), 1, inplace=True) else: self.calib_params.loc[:, (a, 'c')] = 0 self.calib_params.loc[:, (a, 'c')] = self.calib_params[(a, 'c')].astype(object, copy=False) # set new column to objet type if drift_correct: # Fails to calculate errors sometimes (34S in Madi's data) for g in gTime: if self.caltab.loc[g].size == 0: continue meas = self.caltab.loc[g, (a, 'meas_mean')].values srm = self.caltab.loc[g, (a, 'srm_mean')].values viable = ~np.isnan(meas + srm) # remove any nan values meas = meas[viable] srm = srm[viable] meas_err = self.caltab.loc[g, (a, 'meas_err')].values[viable] srm_err = self.caltab.loc[g, (a, 'srm_err')].values[viable] # TODO: replace curve_fit with Sambridge's 2D likelihood function for better uncertainty incorporation? sigma = np.sqrt(meas_err**2 + srm_err**2) if len(meas) > 1: # multiple SRMs - do a regression p, cov = curve_fit(fn, meas, srm, sigma=sigma) pe = unc.correlated_values(p, cov) self.calib_params.loc[g, (a, 'm')] = pe[0] if not zero_intercept: self.calib_params.loc[g, (a, 'c')] = pe[1] else: # deal with case where there's only one datum self.calib_params.loc[g, (a, 'm')] = (un.uarray(srm, srm_err) / un.uarray(meas, meas_err))[0] if not zero_intercept: self.calib_params.loc[g, (a, 'c')] = 0 else: meas = self.caltab.loc[:, (a, 'meas_mean')].values srm = self.caltab.loc[:, (a, 'srm_mean')].values viable = ~np.isnan(meas + srm) # remove any nan values meas = meas[viable] srm = srm[viable] meas_err = self.caltab.loc[:, (a, 'meas_err')].values[viable] srm_err = self.caltab.loc[:, (a, 'srm_err')].values[viable] # TODO: replace curve_fit with Sambridge's 2D likelihood function for better uncertainty incorporation? sigma = np.sqrt(meas_err**2 + srm_err**2) if sum(viable) > 1: p, cov = curve_fit(fn, meas, srm, sigma=sigma) pe = unc.correlated_values(p, cov) self.calib_params.loc[:, (a, 'm')] = pe[0] if not zero_intercept: self.calib_params.loc[:, (a, 'c')] = pe[1] else: self.calib_params.loc[:, (a, 'm')] = (un.uarray(srm, srm_err) / un.uarray(meas, meas_err))[0] if not zero_intercept: self.calib_params.loc[:, (a, 'c')] = 0 if self.calib_params.index.min() == 0: self.calib_params.drop(0, inplace=True) self.calib_params.drop(self.calib_params.index.max(), inplace=True) self.calib_params.loc[0, :] = self.calib_params.loc[self.calib_params.index.min(), :] maxuT = np.max([d.uTime.max() for d in self.data.values()]) # calculate max uTime self.calib_params.loc[maxuT, :] = self.calib_params.loc[self.calib_params.index.max(), :] # sort indices for slice access self.calib_params.sort_index(axis=1, inplace=True) self.calib_params.sort_index(axis=0, inplace=True) # calculcate interpolators for applying calibrations self.calib_ps = Bunch() for a in analytes: # TODO: revisit un_interp1d to see whether it plays well with correlated values. # Possible re-write to deal with covariance matrices? self.calib_ps[a] = {'m': un_interp1d(self.calib_params.index.values, self.calib_params.loc[:, (a, 'm')].values)} if not zero_intercept: self.calib_ps[a]['c'] = un_interp1d(self.calib_params.index.values, self.calib_params.loc[:, (a, 'c')].values) with self.pbar.set(total=len(self.data), desc='Applying Calibrations') as prog: for d in self.data.values(): d.calibrate(self.calib_ps, analytes) d.uncalibrated = self.uncalibrated prog.update() # record SRMs used for plotting markers = 'osDsv<>PX' # for future implementation of SRM-specific markers. if not hasattr(self, 'srms_used'): self.srms_used = set(srms_used) else: self.srms_used.update(srms_used) self.srm_mdict = {k: markers[i] for i, k in enumerate(self.srms_used)} self.stages_complete.update(['calibrated']) self.focus_stage = 'calibrated' return
# data filtering # TODO: Re-factor filtering to use 'classifier' objects? # functions for calculating mass fraction (ppm)
[docs] def get_sample_list(self, save_as=None, overwrite=False): """ Save a csv list of of all samples to be populated with internal standard concentrations. Parameters ---------- save_as : str Location to save the file. Defaults to the export directory. """ if save_as is None: save_as = self._file_internal_standard_massfrac else: self._file_internal_standard_massfrac = save_as if os.path.exists(save_as): if not overwrite: raise IOError(f'File {save_as} exists. Please change the save location or specify overwrite=True') empty = pd.DataFrame(index=self.samples, columns=['int_stand_massfrac']) empty.to_csv(save_as) print(self._wrap_text(f'Sample List saved to {save_as} \nPlease modify and re-import using read_internal_standard_concs()'))
[docs] def read_internal_standard_concs(self, sample_conc_file=None): """ Load in a per-sample list of internal sample concentrations. Parameters ---------- sample_conc_file : str Path to csv file containing internal standard mass fractions. Must contain the sample names in the first column, column names in the first row, and contain a column called 'int_stand_massfrac'. If in doubt, use the `get_sample_list` function to generate a blank template for your samples. """ if sample_conc_file is None: sample_conc_file = self._file_internal_standard_massfrac else: self._file_internal_standard_massfrac = sample_conc_file self.internal_standard_concs = pd.read_csv(sample_conc_file, index_col=0) return self.internal_standard_concs
[docs] @_log def calculate_mass_fraction(self, internal_standard_concs=None, analytes=None, analyte_masses=None): """ Convert calibrated molar ratios to mass fraction. Parameters ---------- internal_standard_concs : float or str The concentration of the internal standard in your samples. If a string, should be the file name pointing towards the [completed] output of get_sample_list(). analytes : str of array_like The analytes you want to calculate. analyte_masses : dict A dict containing the masses to use for each analyte. If None and the analyte names contain a number, that number is used as the mass. If None and the analyte names do *not* contain a number, the average mass for the element is used. """ analytes = self._analyte_checker(analytes, focus_stage='calibrated') if analyte_masses is None: analyte_masses = analyte_mass(self.analytes, False) if isinstance(internal_standard_concs, str): self.internal_standard_concs = self.read_internal_standard_concs(sample_conc_file=internal_standard_concs) elif isinstance(internal_standard_concs, float): self.internal_standard_concs = internal_standard_concs elif not isinstance(self.internal_standard_concs, pd.DataFrame): self.internal_standard_concs = self.read_internal_standard_concs() isc = self.internal_standard_concs if not isinstance(isc, pd.core.frame.DataFrame): with self.pbar.set(total=len(self.data), desc='Calculating Mass Fractions') as prog: for d in self.data.values(): d.calc_mass_fraction(isc, analytes, analyte_masses) prog.update() else: with self.pbar.set(total=len(self.data), desc='Calculating Mass Fractions') as prog: for k, d in self.data.items(): if k in isc.index: d.calc_mass_fraction(isc.loc[k, 'int_stand_massfrac'], analytes, analyte_masses) else: d.calc_mass_fraction(np.nan, analytes, analyte_masses) prog.update() self.stages_complete.update(['mass_fraction']) self.focus_stage = 'mass_fraction'
[docs] @_log def clear_subsets(self): """ Clears all subsets """ self._has_subsets = False self._subset_names = [] self.subsets = Bunch() self.subsets['All_Analyses'] = self.samples self.subsets[self.srm_identifier] = [s for s in self.samples if self.srm_identifier in s] self.subsets['All_Samples'] = [s for s in self.samples if self.srm_identifier not in s] self.subsets['not_in_set'] = self.subsets['All_Samples'].copy()
[docs] @_log def make_subset(self, samples=None, name=None, force=False, silent=False): """ Creates a subset of samples, which can be treated independently. Parameters ---------- samples : str or array_like Name of sample, or list of sample names. name : (optional) str or number The name of the sample group. Defaults to n + 1, where n is the highest existing group number force : bool If there is an existing subset that contains the same samples, a new set is not created unles `force=True`. Default is False. """ if isinstance(samples, str): samples = [samples] # Check if a subset containing the same samples already exists. already_present = False existing_name = '' for k, v in self.subsets.items(): if set(v) == set(samples) and k != 'not_in_set': already_present = True existing_name = k if already_present: if not silent: print('***NOPE***') print(self._wrap_text( f"A subset containing those samples already exists, and is called '{existing_name}'. A new subset has not been created. I suggest you use the existing one. If you'd like to go ahead anyway, set `force=True` to make a new subset with your provided name." )) return existing_name if not force: return not_exists = [s for s in samples if s not in self.subsets['All_Analyses']] if len(not_exists) > 0: raise ValueError(', '.join(not_exists) + ' not in the list of sample names.\nPlease check your sample names.\nNote: Sample names are stored in the .samples attribute of your analysis.') if name is None: name = max([-1] + [x for x in self.subsets if isinstance(x, int)]) + 1 self._subset_names.append(name) if samples is not None: self.subsets[name] = samples for s in samples: try: self.subsets['not_in_set'].remove(s) except ValueError: pass self._has_subsets = True # for subset in np.unique(list(self.subsets.values())): # self.subsets[subset] = sorted([k for k, v in self.subsets.items() if str(v) == subset]) if not silent: print(f'Subset created called {name}.') return name
[docs] @_log def zeroscreen(self, focus_stage=None): """ Remove all points containing data below zero (which are impossible!) """ if focus_stage is None: focus_stage = self.focus_stage for s in self.data.values(): ind = np.ones(len(s.Time), dtype=bool) for v in s.data[focus_stage].values(): ind = ind & (nominal_values(v) > 0) for k in s.data[focus_stage]: s.data[focus_stage][k][~ind] = unc.ufloat(np.nan, np.nan) self.set_focus(focus_stage) return
[docs] @_log def filter_threshold(self, analyte, threshold, samples=None, subset=None): """ Applies a threshold filter to the data. Generates two filters above and below the threshold value for a given analyte. Parameters ---------- analyte : str The analyte that the filter applies to. threshold : float The threshold value. filt : bool Whether or not to apply existing filters to the data before calculating this filter. samples : array_like or None Which samples to apply this filter to. If None, applies to all samples. subset : str or number The subset of samples (defined by make_subset) you want to apply the filter to. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte, single=True) self.minimal_analytes.update([analyte]) with self.pbar.set(total=len(samples), desc='Threshold Filter') as prog: for s in samples: self.data[s].filter_threshold(analyte, threshold) prog.update()
[docs] @_log def filter_threshold_percentile(self, analyte, percentiles, level='population', filt=False, samples=None, subset=None): """ Applies a threshold filter to the data. Generates two filters above and below the threshold value for a given analyte. Parameters ---------- analyte : str The analyte that the filter applies to. percentiles : float or iterable of len=2 The percentile values. level : str Whether to calculate percentiles from the entire dataset ('population') or for each individual sample ('individual') filt : bool Whether or not to apply existing filters to the data before calculating this filter. samples : array_like or None Which samples to apply this filter to. If None, applies to all samples. subset : str or number The subset of samples (defined by make_subset) you want to apply the filter to. Returns ------- None """ params = locals() del(params['self']) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte, single=True) self.minimal_analytes.update([analyte]) if isinstance(percentiles, (int, float)): percentiles = [percentiles] if level == 'population': # Get all samples self.get_focus(filt=filt, subset=subset, nominal=True) dat = self.focus[analyte][~np.isnan(self.focus[analyte])] # calculate filter limits lims = np.percentile(dat, percentiles) # Calculate filter for individual samples with self.pbar.set(total=len(samples), desc='Percentile theshold filter') as prog: for s in samples: d = self.data[s] setn = d.filt.maxset + 1 g = d.focus[analyte] if level == 'individual': gt = nominal_values(g) lims = np.percentile(gt[~np.isnan(gt)], percentiles) if len(lims) == 1: above = g >= lims[0] below = g < lims[0] d.filt.add(analyte + '_{:.1f}-pcnt_below'.format(percentiles[0]), below, 'Values below {:.1f}th {:} percentile ({:.2e})'.format(percentiles[0], analyte, lims[0]), params, setn=setn) d.filt.add(analyte + '_{:.1f}-pcnt_above'.format(percentiles[0]), above, 'Values above {:.1f}th {:} percentile ({:.2e})'.format(percentiles[0], analyte, lims[0]), params, setn=setn) elif len(lims) == 2: inside = (g >= min(lims)) & (g <= max(lims)) outside = (g < min(lims)) | (g > max(lims)) lpc = '-'.join(['{:.1f}'.format(p) for p in percentiles]) d.filt.add(analyte + '_' + lpc + '-pcnt_inside', inside, 'Values between ' + lpc + ' ' + analyte + 'percentiles', params, setn=setn) d.filt.add(analyte + '_' + lpc + '-pcnt_outside', outside, 'Values outside ' + lpc + ' ' + analyte + 'percentiles', params, setn=setn) prog.update() return
[docs] @_log def filter_gradient_threshold(self, analyte, threshold, win=15, recalc=True, win_mode='mid', win_exclude_outside=True, absolute_gradient=True, samples=None, subset=None): """ Calculate a gradient threshold filter to the data. Generates two filters above and below the threshold value for a given analyte. Parameters ---------- analyte : str The analyte that the filter applies to. win : int The window over which to calculate the moving gradient threshold : float The threshold value. recalc : bool Whether or not to re-calculate the gradients. win_mode : str Whether the rolling window should be centered on the left, middle or centre of the returned value. Can be 'left', 'mid' or 'right'. win_exclude_outside : bool If True, regions at the start and end where the gradient cannot be calculated (depending on win_mode setting) will be excluded by the filter. absolute_gradient : bool If True, the filter is applied to the absolute gradient (i.e. always positive), allowing the selection of 'flat' vs 'steep' regions regardless of slope direction. If Falose, the sign of the gradient matters, allowing the selection of positive or negative slopes only. samples : array_like or None Which samples to apply this filter to. If None, applies to all samples. subset : str or number The subset of samples (defined by make_subset) you want to apply the filter to. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte, single=True) self.minimal_analytes.update([analyte]) with self.pbar.set(total=len(samples), desc='Gradient Threshold Filter') as prog: for s in samples: self.data[s].filter_gradient_threshold(analyte=analyte, win=win, threshold=threshold, recalc=recalc, win_mode=win_mode, win_exclude_outside=win_exclude_outside, absolute_gradient=absolute_gradient) prog.update()
[docs] @_log def filter_gradient_threshold_percentile(self, analyte, percentiles, level='population', win=15, filt=False, samples=None, subset=None): """ Calculate a gradient threshold filter to the data. Generates two filters above and below the threshold value for a given analyte. Parameters ---------- analyte : str The analyte that the filter applies to. win : int The window over which to calculate the moving gradient percentiles : float or iterable of len=2 The percentile values. filt : bool Whether or not to apply existing filters to the data before calculating this filter. samples : array_like or None Which samples to apply this filter to. If None, applies to all samples. subset : str or number The subset of samples (defined by make_subset) you want to apply the filter to. Returns ------- None """ params = locals() del(params['self']) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte, single=True) self.minimal_analytes.update([analyte]) # Calculate gradients of all samples self.get_gradients(analytes=[analyte], win=win, filt=filt, subset=subset) grad = self.gradients[analyte][~np.isnan(self.gradients[analyte])] if isinstance(percentiles, (int, float)): percentiles = [percentiles] if level == 'population': # calculate filter limits lims = np.percentile(grad, percentiles) # Calculate filter for individual samples with self.pbar.set(total=len(samples), desc='Percentile Threshold Filter') as prog: for s in samples: d = self.data[s] setn = d.filt.maxset + 1 g = calc_grads(d.Time, d.focus, [analyte], win)[analyte] if level == 'individual': gt = nominal_values(g) lims = np.percentile(gt[~np.isnan(gt)], percentiles) if len(lims) == 1: above = g >= lims[0] below = g < lims[0] d.filt.add(analyte + '_{:.1f}-grd-pcnt_below'.format(percentiles[0]), below, 'Gradients below {:.1f}th {:} percentile ({:.2e})'.format(percentiles[0], analyte, lims[0]), params, setn=setn) d.filt.add(analyte + '_{:.1f}-grd-pcnt_above'.format(percentiles[0]), above, 'Gradients above {:.1f}th {:} percentile ({:.2e})'.format(percentiles[0], analyte, lims[0]), params, setn=setn) elif len(lims) == 2: inside = (g >= min(lims)) & (g <= max(lims)) outside = (g < min(lims)) | (g > max(lims)) lpc = '-'.join(['{:.1f}'.format(p) for p in percentiles]) d.filt.add(analyte + '_' + lpc + '-grd-pcnt_inside', inside, 'Gradients between ' + lpc + ' ' + analyte + 'percentiles', params, setn=setn) d.filt.add(analyte + '_' + lpc + '-grd-pcnt_outside', outside, 'Gradients outside ' + lpc + ' ' + analyte + 'percentiles', params, setn=setn) prog.update() return
[docs] @_log def filter_clustering(self, analytes, filt=False, normalise=True, method='kmeans', include_time=False, samples=None, sort=True, subset=None, level='sample', min_data=10, **kwargs): """ Applies an n - dimensional clustering filter to the data. Parameters ---------- analytes : str The analyte(s) that the filter applies to. filt : bool Whether or not to apply existing filters to the data before calculating this filter. normalise : bool Whether or not to normalise the data to zero mean and unit variance. Reccomended if clustering based on more than 1 analyte. Uses `sklearn.preprocessing.scale`. method : str Which clustering algorithm to use: * 'meanshift': The `sklearn.cluster.MeanShift` algorithm. Automatically determines number of clusters in data based on the `bandwidth` of expected variation. * 'kmeans': The `sklearn.cluster.KMeans` algorithm. Determines the characteristics of a known number of clusters within the data. Must provide `n_clusters` to specify the expected number of clusters. level : str Whether to conduct the clustering analysis at the 'sample' or 'population' level. include_time : bool Whether or not to include the Time variable in the clustering analysis. Useful if you're looking for spatially continuous clusters in your data, i.e. this will identify each spot in your analysis as an individual cluster. samples : optional, array_like or None Which samples to apply this filter to. If None, applies to all samples. sort : bool Whether or not you want the cluster labels to be sorted by the mean magnitude of the signals they are based on (0 = lowest) min_data : int The minimum number of data points that should be considered by the filter. Default = 10. **kwargs Parameters passed to the clustering algorithm specified by `method`. Meanshift Parameters bandwidth : str or float The bandwith (float) or bandwidth method ('scott' or 'silverman') used to estimate the data bandwidth. bin_seeding : bool Modifies the behaviour of the meanshift algorithm. Refer to sklearn.cluster.meanshift documentation. K-Means Parameters n_clusters : int The number of clusters expected in the data. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, focus_stage=self.focus_stage) samples = self._get_samples(subset) analytes = self.analytes_sorted(analytes) self.minimal_analytes.update(analytes) if level == 'sample': with self.pbar.set(total=len(samples), desc='Clustering Filter') as prog: for s in samples: self.data[s].filter_clustering(analytes=analytes, filt=filt, normalise=normalise, method=method, include_time=include_time, min_data=min_data, sort=sort, **kwargs) prog.update() if level == 'population': if isinstance(sort, bool): sort_by = 0 else: sort_by = sort name = '_'.join(analytes) + '_{}'.format(method) self.fit_classifier(name=name, analytes=analytes, method=method, subset=subset, filt=filt, sort_by=sort_by, **kwargs) self.apply_classifier(name=name, subset=subset)
[docs] @_log def fit_classifier(self, name, analytes, method, samples=None, subset=None, filt=True, sort_by=0, **kwargs): """ Create a clustering classifier based on all samples, or a subset. Parameters ---------- name : str The name of the classifier. analytes : str or iterable Which analytes the clustering algorithm should consider. method : str Which clustering algorithm to use. Can be: 'meanshift' The `sklearn.cluster.MeanShift` algorithm. Automatically determines number of clusters in data based on the `bandwidth` of expected variation. 'kmeans' The `sklearn.cluster.KMeans` algorithm. Determines the characteristics of a known number of clusters within the data. Must provide `n_clusters` to specify the expected number of clusters. samples : iterable list of samples to consider. Overrides 'subset'. subset : str The subset of samples used to fit the classifier. Ignored if 'samples' is specified. sort_by : int Which analyte the resulting clusters should be sorted by - defaults to 0, which is the first analyte. **kwargs : method-specific keyword parameters - see below. Meanshift Parameters bandwidth : str or float The bandwith (float) or bandwidth method ('scott' or 'silverman') used to estimate the data bandwidth. bin_seeding : bool Modifies the behaviour of the meanshift algorithm. Refer to sklearn.cluster.meanshift documentation. K - Means Parameters n_clusters : int The number of clusters expected in the data. Returns ------- name : str """ # isolate data if samples is not None: subset = self.make_subset(samples, silent=True) analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) self.minimal_analytes.update(analytes) self.get_focus(subset=subset, filt=filt) # create classifer c = classifier(analytes, sort_by) # fit classifier c.fit(data=self.focus, method=method, **kwargs) self.classifiers[name] = c return name
[docs] @_log def apply_classifier(self, name, samples=None, subset=None): """ Apply a clustering classifier based on all samples, or a subset. Parameters ---------- name : str The name of the classifier to apply. subset : str The subset of samples to apply the classifier to. Returns ------- name : str """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) c = self.classifiers[name] labs = c.classifier.ulabels_ with self.pbar.set(total=len(samples), desc='Applying ' + name + ' classifier') as prog: for s in samples: d = self.data[s] try: f = c.predict(d.focus) except ValueError: # in case there's no data f = np.array([-2] * len(d.Time)) for l in labs: ind = f == l d.filt.add(name=name + '_{:.0f}'.format(l), filt=ind, info=name + ' ' + c.method + ' classifier', params=(c.analytes, c.method)) prog.update() return name
[docs] @_log def filter_correlation(self, x_analyte, y_analyte, window=None, r_threshold=0.9, p_threshold=0.05, filt=True, samples=None, subset=None): """ Applies a correlation filter to the data. Calculates a rolling correlation between every `window` points of two analytes, and excludes data where their Pearson's R value is above `r_threshold` and statistically significant. Data will be excluded where their absolute R value is greater than `r_threshold` AND the p - value associated with the correlation is less than `p_threshold`. i.e. only correlations that are statistically significant are considered. Parameters ---------- x_analyte, y_analyte : str The names of the x and y analytes to correlate. window : int, None The rolling window used when calculating the correlation. r_threshold : float The correlation index above which to exclude data. Note: the absolute pearson R value is considered, so negative correlations below -`r_threshold` will also be excluded. p_threshold : float The significant level below which data are excluded. filt : bool Whether or not to apply existing filters to the data before calculating this filter. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) x_analyte = self._analyte_checker(x_analyte, single=True) y_analyte = self._analyte_checker(y_analyte, single=True) self.minimal_analytes.update([x_analyte, y_analyte]) with self.pbar.set(total=len(samples), desc='Correlation Filter') as prog: for s in samples: self.data[s].filter_correlation(x_analyte, y_analyte, window=window, r_threshold=r_threshold, p_threshold=p_threshold, filt=filt) prog.update()
[docs] @_log def correlation_plots(self, x_analyte, y_analyte, window=15, filt=True, recalc=False, samples=None, subset=None, outdir=None): """ Plot the local correlation between two analytes. Parameters ---------- x_analyte, y_analyte : str The names of the x and y analytes to correlate. window : int, None The rolling window used when calculating the correlation. filt : bool Whether or not to apply existing filters to the data before calculating this filter. recalc : bool If True, the correlation is re-calculated, even if it is already present. Returns ------- None """ if outdir is None: outdir = self.report_dir + '/correlations/' if not os.path.isdir(outdir): os.mkdir(outdir) x_analyte = self._analyte_checker(x_analyte, single=True) y_analyte = self._analyte_checker(y_analyte, single=True) if subset is not None: samples = self._get_samples(subset) elif samples is None: samples = self.subsets['All_Analyses'] elif isinstance(samples, str): samples = [samples] with self.pbar.set(total=len(samples), desc='Drawing Plots') as prog: for s in samples: f, _ = self.data[s].correlation_plot(x_analyte=x_analyte, y_analyte=y_analyte, window=window, filt=filt, recalc=recalc) f.savefig('{}/{}_{}-{}.pdf'.format(outdir, s, x_analyte, y_analyte)) plt.close(f) prog.update() return
[docs] @_log def filter_on(self, filt=None, analyte=None, samples=None, subset=None, show_status=False): """ Turns data filters on for particular analytes and samples. Parameters ---------- filt : optional, str or array_like Name, partial name or list of names of filters. Supports partial matching. i.e. if 'cluster' is specified, all filters with 'cluster' in the name are activated. Defaults to all filters. analyte : optional, str or array_like Name or list of names of analytes. Defaults to all analytes. samples : optional, array_like or None Which samples to apply this filter to. If None, applies to all samples. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte) for s in samples: try: self.data[s].filt.on(analyte, filt) except: warnings.warn("filt.on failure in sample " + s) if show_status: self.filter_status(subset=subset) return
[docs] @_log def filter_off(self, filt=None, analyte=None, samples=None, subset=None, show_status=False): """ Turns data filters off for particular analytes and samples. Parameters ---------- filt : optional, str or array_like Name, partial name or list of names of filters. Supports partial matching. i.e. if 'cluster' is specified, all filters with 'cluster' in the name are activated. Defaults to all filters. analyte : optional, str or array_like Name or list of names of analytes. Defaults to all analytes. samples : optional, array_like or None Which samples to apply this filter to. If None, applies to all samples. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analyte = self._analyte_checker(analyte) for s in samples: try: self.data[s].filt.off(analyte, filt) except: warnings.warn("filt.off failure in sample " + s) if show_status: self.filter_status(subset=subset) return
[docs] def filter_status(self, sample=None, subset=None, stds=False): """ Prints the current status of filters for specified samples. Parameters ---------- sample : str Which sample to print. subset : str Specify a subset stds : bool Whether or not to include standards. """ if sample is None and subset is None: if not self._has_subsets: return self.data[self.subsets['All_Samples'][0]].filt.filter_table else: fdfs = {} for n in sorted(str(sn) for sn in self._subset_names): if n in self.subsets: pass elif int(n) in self.subsets: n = int(n) pass subset_name = str(n) fdfs[subset_name] = self.data[self.subsets[n][0]].filt.filter_table if len(self.subsets['not_in_set']) > 0: fdfs['Not in Subset'] = self.data[self.subsets['not_in_set'][0]].filt.filter_table return pd.concat(fdfs, names=['subset']) elif sample is not None: fdfs = {} fdfs[sample] = self.data[sample].filt.filter_table return pd.concat(fdfs, names=['sample']) elif subset is not None: if isinstance(subset, (str, int, float)): subset = [subset] fdfs = {} for n in subset: subset_name = str(n) fdfs[subset_name] = self.data[self.subsets[n][0]].filt.filter_table return pd.concat(fdfs, names=['subset'])
[docs] @_log def filter_clear(self, samples=None, subset=None): """ Clears (deletes) all data filters. """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) for s in samples: self.data[s].filt.clear()
[docs] @_log def filter_defragment(self, threshold, mode='include', filt=True, samples=None, subset=None): """ Remove 'fragments' from the calculated filter Parameters ---------- threshold : int Contiguous data regions that contain this number or fewer points are considered 'fragments' mode : str Specifies wither to 'include' or 'exclude' the identified fragments. filt : bool or filt string Which filter to apply the defragmenter to. Defaults to True samples : array_like or None Which samples to apply this filter to. If None, applies to all samples. subset : str or number The subset of samples (defined by make_subset) you want to apply the filter to. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) for s in samples: f = self.data[s].filt.grab_filt(filt) self.data[s].filt.add(name='defrag_{:s}_{:.0f}'.format(mode, threshold), filt=filters.defrag(f, threshold, mode), info='Defrag {:s} filter with threshold {:.0f}'.format(mode, threshold), params=(threshold, mode, filt, samples, subset))
[docs] @_log def filter_exclude_downhole(self, threshold, filt=True, samples=None, subset=None): """ Exclude all points down-hole (after) the first excluded data. Parameters ---------- threhold : int The minimum number of contiguous excluded data points that must exist before downhole exclusion occurs. file : valid filter string or bool Which filter to consider. If True, applies to currently active filters. """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) for s in samples: self.data[s].filter_exclude_downhole(threshold, filt)
[docs] @_log def filter_trim(self, start=1, end=1, filt=True, samples=None, subset=None): """ Remove points from the start and end of filter regions. Parameters ---------- start, end : int The number of points to remove from the start and end of the specified filter. filt : valid filter string or bool Which filter to trim. If True, applies to currently active filters. """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) for s in samples: self.data[s].filter_trim(start, end, filt)
[docs] def filter_nremoved(self, filt=True, quiet=False): """ Report how many data are removed by the active filters. """ rminfo = {} for n in self.subsets['All_Samples']: s = self.data[n] rminfo[n] = s.filt_nremoved(filt) if not quiet: maxL = max([len(s) for s in rminfo]) print('{string:{number}s}'.format(string='Sample ', number=maxL + 3) + '{total:4s}'.format(total='tot') + '{removed:4s}'.format(removed='flt') + '{percent:4s}'.format(percent='%rm')) for k, (ntot, nfilt, pcrm) in rminfo.items(): print('{string:{number}s}'.format(string=k, number=maxL + 3) + '{total:4.0f}'.format(total=ntot) + '{removed:4.0f}'.format(removed=nfilt) + '{percent:4.0f}'.format(percent=pcrm)) return rminfo
[docs] @_log def optimise_signal(self, analytes, min_points=5, threshold_mode='kde_first_max', threshold_mult=1., x_bias=0, filt=True, weights=None, mode='minimise', samples=None, subset=None): """ Optimise data selection based on specified analytes. Identifies the longest possible contiguous data region in the signal where the relative standard deviation (std) and concentration of all analytes is minimised. Optimisation is performed via a grid search of all possible contiguous data regions. For each region, the mean std and mean scaled analyte concentration ('amplitude') are calculated. The size and position of the optimal data region are identified using threshold std and amplitude values. Thresholds are derived from all calculated stds and amplitudes using the method specified by `threshold_mode`. For example, using the 'kde_max' method, a probability density function (PDF) is calculated for std and amplitude values, and the threshold is set as the maximum of the PDF. These thresholds are then used to identify the size and position of the longest contiguous region where the std is below the threshold, and the amplitude is either below the threshold. All possible regions of the data that have at least `min_points` are considered. For a graphical demonstration of the action of signal_optimiser, use `optimisation_plot`. Parameters ---------- d : latools.D object An latools data object. analytes : str or array_like Which analytes to consider. min_points : int The minimum number of contiguous points to consider. threshold_mode : str The method used to calculate the optimisation thresholds. Can be 'mean', 'median', 'kde_max' or 'bayes_mvs', or a custom function. If a function, must take a 1D array, and return a single, real number. weights : array_like of length len(analytes) An array of numbers specifying the importance of each analyte considered. Larger number makes the analyte have a greater effect on the optimisation. Default is None. """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) analytes = self._analyte_checker(analytes) self.minimal_analytes.update(analytes) errs = [] with self.pbar.set(total=len(samples), desc='Optimising Data selection') as prog: for s in samples: e = self.data[s].signal_optimiser(analytes=analytes, min_points=min_points, threshold_mode=threshold_mode, threshold_mult=threshold_mult, x_bias=x_bias, weights=weights, filt=filt, mode=mode) if e != '': errs.append(e) prog.update() if len(errs) > 0: print('\nA Few Problems:\n' + '\n'.join(errs) + '\n\n *** Check Optimisation Plots ***')
[docs] @_log def optimisation_plots(self, overlay_alpha=0.5, samples=None, subset=None, **kwargs): """ Plot the result of signal_optimise. `signal_optimiser` must be run first, and the output stored in the `opt` attribute of the latools.D object. Parameters ---------- d : latools.D object A latools data object. overlay_alpha : float The opacity of the threshold overlays. Between 0 and 1. **kwargs Passed to `trace_plot` """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) outdir=self.report_dir + '/optimisation_plots/' if not os.path.isdir(outdir): os.mkdir(outdir) with self.pbar.set(total=len(samples), desc='Drawing Plots') as prog: for s in samples: figs = self.data[s].optimisation_plot(overlay_alpha, **kwargs) n = 1 for f, _ in figs: if f is not None: f.savefig(os.path.join(outdir, s + '_optim_{:.0f}.pdf'.format(n))) plt.close(f) n += 1 prog.update() return
# plot calibrations
[docs] @_log def calibration_plot(self, analyte_ratios=None, datarange=True, loglog=False, ncol=3, srm_group=None, percentile_data_cutoff=85, save=True): return plot.calibration_plot(self=self, analyte_ratios=analyte_ratios, datarange=datarange, loglog=loglog, ncol=ncol, srm_group=srm_group, percentile_data_cutoff=percentile_data_cutoff, save=save)
# set the focus attribute for specified samples
[docs] @_log def set_focus(self, focus_stage=None, samples=None, subset=None): """ Set the 'focus' attribute of the data file. The 'focus' attribute of the object points towards data from a particular stage of analysis. It is used to identify the 'working stage' of the data. Processing functions operate on the 'focus' stage, so if steps are done out of sequence, things will break. Names of analysis stages: * 'rawdata': raw data, loaded from csv file when object is initialised. * 'despiked': despiked data. * 'signal'/'background': isolated signal and background data, padded with np.nan. Created by self.separate, after signal and background regions have been identified by self.autorange. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. Parameters ---------- focus : str The name of the analysis stage desired. Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) if subset is None: subset = 'All_Analyses' samples = self._get_samples(subset) if focus_stage is None: focus_stage = self.focus_stage else: self.focus_stage = focus_stage for s in samples: self.data[s].setfocus(focus_stage)
# fetch all the data from the data objects
[docs] def get_focus(self, filt=False, samples=None, subset=None, nominal=False): """ Collect all data from all samples into a single array. Data from standards is not collected. Parameters ---------- filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. samples : str or list which samples to get subset : str or int which subset to get Returns ------- None """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) focus = {'uTime': []} columns = self._analyte_checker() focus.update({a: [] for a in columns}) for sa in samples: s = self.data[sa] focus['uTime'].append(s.uTime) for a in columns: tmp = s.focus[a].copy() if s.filt is not None: ind = s.filt.grab_filt(filt, a) tmp[~ind] = np.nan focus[a].append(tmp) if nominal: self.focus.update({k: nominal_values(np.concatenate(v)) for k, v, in focus.items()}) else: self.focus.update({k: np.concatenate(v) for k, v, in focus.items()}) # remove old columns for k in list(self.focus): if k not in columns: self.focus.pop(k) return
# fetch all the gradients from the data objects
[docs] def get_gradients(self, analytes=None, win=15, filt=False, samples=None, subset=None, recalc=True): """ Collect all data from all samples into a single array. Data from standards is not collected. Parameters ---------- filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. samples : str or list which samples to get subset : str or int which subset to get Returns ------- None """ analytes = self._analyte_checker(analytes) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) # check if gradients already calculated if all([self.data[s].grads_calced for s in samples]) and hasattr(self, 'gradients'): if not recalc: print("Using existing gradients. Set recalc=True to re-calculate.") return if not hasattr(self, 'gradients'): self.gradients = Bunch() # t = 0 focus = {'uTime': []} focus.update({a: [] for a in analytes}) with self.pbar.set(total=len(samples), desc='Calculating Gradients') as prog: for sa in samples: s = self.data[sa] focus['uTime'].append(s.uTime) ind = s.filt.grab_filt(filt) grads = calc_grads(s.uTime, s.focus, keys=analytes, win=win) for a in analytes: tmp = grads[a] tmp[~ind] = np.nan focus[a].append(tmp) s.grads = tmp s.grads_calced = True prog.update() self.gradients.update({k: np.concatenate(v) for k, v, in focus.items()}) return
[docs] def gradient_histogram(self, analytes=None, win=15, filt=False, bins=None, samples=None, subset=None, recalc=True, ncol=4): """ Plot a histogram of the gradients in all samples. Parameters ---------- filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. bins : None or array_like The bins to use in the histogram samples : str or list which samples to get subset : str or int which subset to get recalc : bool Whether to re-calculate the gradients, or use existing gradients. Returns ------- fig, ax """ analytes = self._analyte_checker(analytes) if not hasattr(self, 'gradients'): self.gradients = Bunch() ncol = int(ncol) n = len(analytes) nrow = plot.calc_nrow(n, ncol) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) self.get_gradients(analytes=analytes, win=win, filt=filt, subset=subset, recalc=recalc) fig, axs = plt.subplots(nrow, ncol, figsize=[3. * ncol, 2.5 * nrow]) if not isinstance(axs, np.ndarray): axs = [axs] i = 0 for a, ax in zip(analytes, axs.flatten()): d = nominal_values(self.gradients[a]) d = d[~np.isnan(d)] m, u = unitpicker(d, focus_stage=self.focus_stage, denominator=self.internal_standard) if bins is None: ibins = np.linspace(*np.percentile(d * m, [1, 99]), 50) else: ibins = bins ax.hist(d * m, bins=ibins, color=self.cmaps[a]) ax.axvline(0, ls='dashed', lw=1, c=(0,0,0,0.7)) ax.set_title(a, loc='left') if ax.get_subplotspec().is_first_col(): ax.set_ylabel('N') ax.set_xlabel(u + '/s') i += 1 if i < ncol * nrow: for ax in axs.flatten()[i:]: ax.set_visible(False) fig.tight_layout() return fig, axs
# crossplot of all data
[docs] @_log def crossplot(self, analytes=None, lognorm=True, bins=25, filt=False, samples=None, subset=None, figsize=(12, 12), save=False, colourful=True, mode='hist2d', **kwargs): """ Plot analytes against each other. Parameters ---------- analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. lognorm : bool Whether or not to log normalise the colour scale of the 2D histogram. bins : int The number of bins in the 2D histogram. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. figsize : tuple Figure size (width, height) in inches. save : bool or str If True, plot is saves as 'crossplot.png', if str plot is saves as str. colourful : bool Whether or not the plot should be colourful :). mode : str 'hist2d' (default) or 'scatter' Returns ------- (fig, axes) """ analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) # sort analytes try: analytes = sorted(analytes, key=lambda x: float(re.findall('[0-9.-]+', x)[0])) except IndexError: analytes = sorted(analytes) self.get_focus(filt=filt, samples=samples, subset=subset) fig, axes = plot.crossplot(dat=self.focus, keys=analytes, lognorm=lognorm, bins=bins, figsize=figsize, colourful=colourful, focus_stage=self.focus_stage, cmap=self.cmaps, mode=mode) if save or isinstance(save, str): if isinstance(save, str): fig.savefig(os.path.join(self.report_dir, save), dpi=200) else: fig.savefig(os.path.join(self.report_dir, 'crossplot.png'), dpi=200) return fig, axes
[docs] @_log def gradient_crossplot(self, analytes=None, win=15, lognorm=True, bins=25, filt=False, samples=None, subset=None, figsize=(12, 12), save=False, colourful=True, mode='hist2d', recalc=True, **kwargs): """ Plot analyte gradients against each other. Parameters ---------- analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. lognorm : bool Whether or not to log normalise the colour scale of the 2D histogram. bins : int The number of bins in the 2D histogram. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. figsize : tuple Figure size (width, height) in inches. save : bool or str If True, plot is saves as 'crossplot.png', if str plot is saves as str. colourful : bool Whether or not the plot should be colourful :). mode : str 'hist2d' (default) or 'scatter' recalc : bool Whether to re-calculate the gradients, or use existing gradients. Returns ------- (fig, axes) """ analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) samples = self._get_samples(subset) # calculate gradients self.get_gradients(analytes=analytes, win=win, filt=filt, subset=subset, recalc=recalc) # self.get_focus(filt=filt, samples=samples, subset=subset) # grads = calc_grads(self.focus.uTime, self.focus, analytes, win) fig, axes = plot.crossplot(dat=self.gradients, keys=analytes, lognorm=lognorm, bins=bins, figsize=figsize, colourful=colourful, focus_stage=self.focus_stage, cmap=self.cmaps, denominator=self.internal_standard, mode=mode) if save: fig.savefig(self.report_dir + '/g_crossplot.png', dpi=200) return fig, axes
[docs] def histograms(self, analytes=None, bins=25, logy=False, samples=None, subset=None, filt=False, colourful=True): """ Plot histograms of analytes. Parameters ---------- analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. bins : int The number of bins in each histogram (default = 25) logy : bool If true, y axis is a log scale. samples : array_like or None Which samples to plot. If None, all samples are plotted. subset : str or number The subset of samples (defined by make_subset) you want to plot. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. colourful : bool If True, histograms are colourful :) Returns ------- (fig, axes) """ if samples is not None: subset = self.make_subset(samples, silent=True) analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) if colourful: cmap = self.cmaps else: cmap = None self.get_focus(filt=filt, subset=subset) fig, axes = plot.histograms(self.focus, keys=analytes, bins=bins, logy=logy, cmap=cmap) return fig, axes
[docs] def filter_effect(self, analytes=None, stats=['mean', 'std'], filt=True): """ Quantify the effects of the active filters. Parameters ---------- analytes : str or list Which analytes to consider. stats : list Which statistics to calculate. file : valid filter string or bool Which filter to consider. If True, applies all active filters. Returns ------- pandas.DataFrame Contains statistics calculated for filtered and unfiltered data, and the filtered/unfiltered ratio. """ analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) # calculate filtered and unfiltered stats self.sample_stats(analytes, stats=stats, filt=False) suf = self.stats.copy() self.sample_stats(analytes, stats=stats, filt=filt) sf = self.stats.copy() # create dataframe for results cols = [] for s in self.stats_calced: cols += ['unfiltered_{:}'.format(s), 'filtered_{:}'.format(s)] comp = pd.DataFrame(index=self.samples, columns=pd.MultiIndex.from_arrays([cols, [None] * len(cols)])) # collate stats for k, v in suf.items(): vf = sf[k] for i, a in enumerate(v['analytes']): for s in self.stats_calced: comp.loc[k, ('unfiltered_{:}'.format(s), a)] = v[s][i,0] comp.loc[k, ('filtered_{:}'.format(s), a)] = vf[s][i,0] comp.dropna(0, 'all', inplace=True) comp.dropna(1, 'all', inplace=True) comp.sort_index(1, inplace=True) # calculate filtered/unfiltered ratios rats = [] for s in self.stats_calced: rat = comp.loc[:, 'filtered_{:}'.format(s)] / comp.loc[:, 'unfiltered_{:}'.format(s)] rat.columns = pd.MultiIndex.from_product([['{:}_ratio'.format(s)], rat.columns]) rats.append(rat) # join it all up comp = comp.join(pd.concat(rats, 1)) comp.sort_index(1, inplace=True) return comp.loc[:, (pd.IndexSlice[:], pd.IndexSlice[list(analytes)])]
[docs] def crossplot_filters(self, filter_string, analytes=None, samples=None, subset=None, filt=None): """ Plot the results of a group of filters in a crossplot. Parameters ---------- filter_string : str A string that identifies a group of filters. e.g. 'test' would plot all filters with 'test' in the name. analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. Returns ------- fig, axes objects """ analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) if samples is None: samples = self._get_samples(subset) # isolate relevant filters filts = self.data[samples[0]].filt.components.keys() cfilts = [f for f in filts if filter_string in f] flab = re.compile('.*_(.*)$') # regex to get filter name # aggregate data self.get_focus(subset=subset, filt=filt) # set up axes numvars = len(analytes) fig, axes = plt.subplots(nrows=numvars, ncols=numvars, figsize=(12, 12)) fig.subplots_adjust(hspace=0.05, wspace=0.05) for ax in axes.flat: ax.xaxis.set_visible(False) ax.yaxis.set_visible(False) if ax.get_subplotspec().is_first_col(): ax.yaxis.set_ticks_position('left') if ax.get_subplotspec().is_last_col(): ax.yaxis.set_ticks_position('right') if ax.get_subplotspec().is_first_row(): ax.xaxis.set_ticks_position('top') if ax.get_subplotspec().is_last_row(): ax.xaxis.set_ticks_position('bottom') cmlist = ['Blues', 'BuGn', 'BuPu', 'GnBu', 'Greens', 'Greys', 'Oranges', 'OrRd', 'PuBu', 'PuBuGn', 'PuRd', 'Purples', 'RdPu', 'Reds', 'YlGn', 'YlGnBu', 'YlOrBr', 'YlOrRd'] # isolate nominal_values for all analytes focus = {k: nominal_values(v) for k, v in self.focus.items()} # determine units for all analytes udict = {a: unitpicker(np.nanmean(focus[a]), focus_stage=self.focus_stage, denominator=self.internal_standard) for a in analytes} # determine ranges for all analytes rdict = {a: (np.nanmin(focus[a] * udict[a][0]), np.nanmax(focus[a] * udict[a][0])) for a in analytes} for f in cfilts: self.get_focus(f, subset=subset) focus = {k: nominal_values(v) for k, v in self.focus.items()} lab = flab.match(f).groups()[0] axes[0, 0].scatter([], [], s=10, label=lab) for i, j in zip(*np.triu_indices_from(axes, k=1)): # get analytes ai = analytes[i] aj = analytes[j] # remove nan, apply multipliers pi = focus[ai][~np.isnan(focus[ai])] * udict[ai][0] pj = focus[aj][~np.isnan(focus[aj])] * udict[aj][0] # make plot axes[i, j].scatter(pj, pi, alpha=0.4, s=10, lw=0.5, edgecolor='k') axes[j, i].scatter(pi, pj, alpha=0.4, s=10, lw=0.5, edgecolor='k') axes[i, j].set_ylim(*rdict[ai]) axes[i, j].set_xlim(*rdict[aj]) axes[j, i].set_ylim(*rdict[aj]) axes[j, i].set_xlim(*rdict[ai]) # diagonal labels for a, n in zip(analytes, np.arange(len(analytes))): axes[n, n].annotate(a + '\n' + udict[a][1], (0.5, 0.5), xycoords='axes fraction', ha='center', va='center') axes[n, n].set_xlim(*rdict[a]) axes[n, n].set_ylim(*rdict[a]) axes[0, 0].legend(loc='upper left', title=filter_string) # switch on alternating axes for i, j in zip(range(numvars), itertools.cycle((-1, 0))): axes[j, i].xaxis.set_visible(True) for label in axes[j, i].get_xticklabels(): label.set_rotation(90) axes[i, j].yaxis.set_visible(True) return fig, axes
# Plot traces
[docs] @_log def trace_plots(self, analytes=None, samples=None, ranges=False, focus_stage=None, outdir=None, filt=None, scale='log', figsize=[10, 4], stats=False, stat='nanmean', err='nanstd', subset=None): """ Plot analytes as a function of time. Parameters ---------- analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. samples: optional, array_like or str The sample(s) to plot. Defaults to all samples. ranges : bool Whether or not to show the signal/background regions identified by 'autorange'. focus_stage : str The focus 'stage' of the analysis to plot. Can be 'rawdata', 'despiked':, 'signal', 'background', 'bkgsub', 'ratios' or 'calibrated'. outdir : str Path to a directory where you'd like the plots to be saved. Defaults to 'reports/[focus]' in your data directory. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. scale : str If 'log', plots the data on a log scale. figsize : array_like Array of length 2 specifying figure [width, height] in inches. stats : bool Whether or not to overlay the mean and standard deviations for each trace. stat, err: str The names of the statistic and error components to plot. Deafaults to 'nanmean' and 'nanstd'. Returns ------- None """ if focus_stage is None: focus_stage = self.focus_stage if outdir is None: outdir = os.path.join(self.report_dir, focus_stage) if not os.path.isdir(outdir): os.makedirs(outdir) analytes = self.analytes_sorted(analytes, focus_stage=focus_stage) # if samples is not None: # subset = self.make_subset(samples, silent=True) if subset is not None: samples = self._get_samples(subset) elif samples is None: samples = self.subsets['All_Analyses'] elif isinstance(samples, str): samples = [samples] with self.pbar.set(total=len(samples), desc='Drawing Plots') as prog: for s in samples: f, a = self.data[s].trace_plot(analytes=analytes, figsize=figsize, scale=scale, filt=filt, ranges=ranges, stats=stats, stat=stat, err=err, focus_stage=focus_stage) # ax = fig.axes[0] # for l, u in s.sigrng: # ax.axvspan(l, u, color='r', alpha=0.1) # for l, u in s.bkgrng: # ax.axvspan(l, u, color='k', alpha=0.1) with open(os.path.join(outdir, s + '_traces.pdf'), 'wb') as file: f.savefig(file) # TODO: on older(?) computers raises # 'OSError: [Errno 24] Too many open files' prog.update() return
# Plot gradients
[docs] @_log def gradient_plots(self, analytes=None, win=None, samples=None, ranges=False, focus_stage=None, filt=False, recalc=False, outdir=None, figsize=[10, 4], subset='All_Analyses'): """ Plot analyte gradients as a function of time. Parameters ---------- analytes : optional, array_like or str The analyte(s) to plot. Defaults to all analytes. samples: optional, array_like or str The sample(s) to plot. Defaults to all samples. ranges : bool Whether or not to show the signal/background regions identified by 'autorange'. focus_stage : str The focus 'stage' of the analysis to plot. Can be 'rawdata', 'despiked':, 'signal', 'background', 'bkgsub', 'ratios' or 'calibrated'. outdir : str Path to a directory where you'd like the plots to be saved. Defaults to 'reports/[focus]' in your data directory. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. scale : str If 'log', plots the data on a log scale. figsize : array_like Array of length 2 specifying figure [width, height] in inches. stats : bool Whether or not to overlay the mean and standard deviations for each trace. stat, err: str The names of the statistic and error components to plot. Deafaults to 'nanmean' and 'nanstd'. Returns ------- None """ if focus_stage is None: focus_stage = self.focus_stage if outdir is None: outdir = os.path.join(self.report_dir, focus_stage + '_gradient') if not os.path.isdir(outdir): os.mkdir(outdir) analytes = self.analytes_sorted(analytes, focus_stage=focus_stage) # if samples is not None: # subset = self.make_subset(samples, silent=True) if subset is not None: samples = self._get_samples(subset) elif samples is None: samples = self.subsets['All_Analyses'] elif isinstance(samples, str): samples = [samples] with self.pbar.set(total=len(samples), desc='Drawing Plots') as prog: for s in samples: f, a = self.data[s].gplot(analytes=analytes, win=win, figsize=figsize, ranges=ranges, focus_stage=focus_stage, filt=filt, recalc=recalc) # ax = fig.axes[0] # for l, u in s.sigrng: # ax.axvspan(l, u, color='r', alpha=0.1) # for l, u in s.bkgrng: # ax.axvspan(l, u, color='k', alpha=0.1) f.savefig(os.path.join(outdir, s + '_gradients.pdf')) # TODO: on older(?) computers raises # 'OSError: [Errno 24] Too many open files' plt.close(f) prog.update() return
[docs] def plot_stackhist(self, subset='All_Samples', samples=None, analytes=None, axs=None, filt=True, **kwargs): """ Plot a stacked histograms of analytes for all given samples (or a pre-defined subset) Parameters ---------- subset : str The subset of samples to plot. Overruled by 'samples', if provided. samples : array-like The samples to plot. If blank, reverts to 'All_Samples' subset. analytes : str or array-like The analytes to plot axs : array-like An array of matplotlib.Axes objects the same length as analytes. **kwargs passed to matplotlib.pyplot.bar() plotting function """ analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) if axs is None: fig, axs = plt.subplots(1, len(analytes), figsize=[2 * len(analytes), 2], constrained_layout=True, sharey=True) elif len(axs) != len(analytes): raise ValueError(f'Must provide the same number of axes ({len(axs)}) and analytes ({len(analytes)})') if samples is None: samples = self.subsets[subset] elif isinstance(samples, str): samples = [samples] self.get_focus(filt=filt, samples=samples) for i, a in enumerate(analytes): m, unit = unitpicker(self.focus[a], focus_stage=self.focus_stage) arrays = [] for s in samples: sub = self.data[s].get_individual_ablations(analytes, filt=filt) arrays += [nominal_values(d[a]) * m for d in sub] plot.stackhist(arrays, ax=axs[i], **kwargs) axs[i].set_xlabel(pretty_element(a) + '\n' + unit)
# filter reports
[docs] @_log def filter_reports(self, analytes, filt_str='all', nbin=5, samples=None, outdir=None, subset='All_Samples'): """ Plot filter reports for all filters that contain ``filt_str`` in the name. """ if outdir is None: outdir = self.report_dir + '/filters/' + filt_str if not os.path.isdir(self.report_dir + '/filters'): os.mkdir(self.report_dir + '/filters') if not os.path.isdir(outdir): os.mkdir(outdir) analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) with self.pbar.set(total=len(samples), desc='Drawing Plots') as prog: for s in samples: _ = self.data[s].filter_report(filt=filt_str, analytes=analytes, savedir=outdir, nbin=nbin) prog.update() # plt.close(fig) return
# def _stat_boostrap(self, analytes=None, filt=True, # stat_fn=np.nanmean, ci=95): # """ # Calculate sample statistics with bootstrapped confidence intervals. # Parameters # ---------- # analytes : optional, array_like or str # The analyte(s) to calculate statistics for. Defaults to # all analytes. # filt : str, dict or bool # Either logical filter expression contained in a str, # a dict of expressions specifying the filter string to # use for each analyte or a boolean. Passed to `grab_filt`. # stat_fns : array_like # list of functions that take a single array_like input, # and return a single statistic. Function should be able # to cope with numpy NaN values. # ci : float # Confidence interval to calculate. # Returns # ------- # None # """ # return
[docs] @_log def sample_stats(self, analytes=None, filt=True, stats=['mean', 'std'], include_srms=False, eachtrace=True, focus_stage=None, csf_dict={}): """ Calculate sample statistics. Returns samples, analytes, and arrays of statistics of shape (samples, analytes). Statistics are calculated from the 'focus' data variable, so output depends on how the data have been processed. Included stat functions: * :func:`~latools.stat_fns.mean`: arithmetic mean * :func:`~latools.stat_fns.std`: arithmetic standard deviation * :func:`~latools.stat_fns.se`: arithmetic standard error * :func:`~latools.stat_fns.H15_mean`: Huber mean (outlier removal) * :func:`~latools.stat_fns.H15_std`: Huber standard deviation (outlier removal) * :func:`~latools.stat_fns.H15_se`: Huber standard error (outlier removal) Parameters ---------- analytes : optional, array_like or str The analyte(s) to calculate statistics for. Defaults to all analytes. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. stats : array_like or str take a single array_like input, and return a single statistic. list of functions or names (see above) or functions that Function should be able to cope with NaN values. eachtrace : bool Whether to calculate the statistics for each analysis spot individually, or to produce per - sample means. Default is True. focus_stage : str Which stage of analysis to calculate stats for. Defaults to current stage. Can be one of: * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. * 'mass_fraction': mass fraction of each element. Returns ------- None Adds dict to analyse object containing samples, analytes and functions and data. """ if 'autorange' not in self.stages_complete: raise RuntimeError('Cannot calculate statistics until autorange has been run.') analytes = self.analytes_sorted(analytes, focus_stage=focus_stage) if focus_stage is None: focus_stage = self.focus_stage self.stats = Bunch() self.stats_calced = [] stat_fns = Bunch() stat_dict = {'mean': np.nanmean, 'std': np.nanstd, 'nanmean': np.nanmean, 'nanstd': np.nanstd, 'se': stderr, 'H15_mean': H15_mean, 'H15_std': H15_std, 'H15_se': H15_se} if isinstance(stats, str): stats = [stats] for s in stats: if isinstance(s, str): if s in stat_dict: self.stats_calced.append(s) stat_fns[s] = stat_dict[s] if s in csf_dict: self.stats_calced.append(s) exec(csf_dict[s]) stat_fns[s] = eval(s) elif callable(s): self.stats_calced.append(s.__name__) stat_fns[s.__name__] = s if not hasattr(self, 'custom_stat_functions'): self.custom_stat_functions = '' self.custom_stat_functions += inspect.getsource(s) + '\n\n\n\n' # calculate stats for each sample if include_srms: samples = self.samples else: samples = [s for s in self.samples if self.srm_identifier not in s] with self.pbar.set(total=len(samples), desc='Calculating Stats') as prog: for s in samples: self.data[s].sample_stats(analytes, filt=1616, stat_fns=stat_fns, eachtrace=eachtrace, focus_stage=focus_stage) self.stats[s] = self.data[s].stats prog.update() self.stat_focus_stage = focus_stage return
[docs] @_log def ablation_times(self, samples=None, subset=None): if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) ats = Bunch() for s in samples: ats[s] = self.data[s].ablation_times() frames = [] for s in samples: d = ats[s] td = pd.DataFrame.from_dict(d, orient='index') td.columns = ['Time'] frames.append(td) out = pd.concat(frames, keys=samples) out.index.names = ['sample', 'rep'] return out
# function for visualising sample statistics
[docs] @_log def statrace_plot(self, analytes=None, samples=None, figsize=None, stat='mean', err='std', subset=None): """ Function for visualising per-ablation and per-sample means. Parameters ---------- analytes : str or iterable Which analyte(s) to plot samples : str or iterable Which sample(s) to plot figsize : tuple Figure (width, height) in inches stat : str Which statistic to plot. Must match the name of the functions used in 'sample_stats'. err : str Which uncertainty to plot. subset : str Which subset of samples to plot. """ if not hasattr(self, 'stats'): self.sample_stats() analytes = self.analytes_sorted(analytes, focus_stage=self.focus_stage) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) if figsize is None: figsize = (1.5 * len(self.stats), 3 * len(analytes)) fig, axs = plt.subplots(len(analytes), 1, figsize=figsize) for ax, an in zip(axs, analytes): i = 0 stab = self.getstats() m, u = unitpicker(np.percentile(stab.loc[:, an].dropna(), 25), 0.1, focus_stage='calibrated', denominator=self.internal_standard) for s in samples: if self.srm_identifier not in s: d = self.stats[s] if d[stat].ndim == 2: n = d[stat].shape[-1] x = np.linspace(i - .1 * n / 2, i + .1 * n / 2, n) else: x = [i] a_ind = d['analytes'] == an # plot individual ablations with error bars ax.errorbar(x, d[stat][a_ind][0] * m, yerr=d[err][a_ind][0] * m, marker='o', color=self.cmaps[an], lw=0, elinewidth=1) ax.set_ylabel('%s / %s (%s )' % (pretty_element(an), pretty_element(self.internal_standard), u)) # plot whole - sample mean if len(x) > 1: # mean calculation with error propagation? # umean = un.uarray(d[stat][a_ind][0] * m, d[err][a_ind][0] * m).mean() # std = un.std_devs(umean) # mean = un.nominal_values(umean) mean = np.nanmean(d[stat][a_ind][0] * m) std = np.nanstd(d[stat][a_ind][0] * m) ax.plot(x, [mean] * len(x), c=self.cmaps[an], lw=2) ax.fill_between(x, [mean + std] * len(x), [mean - std] * len(x), lw=0, alpha=0.2, color=self.cmaps[an]) # highlight each sample if i % 2 == 1: ax.axvspan(i - .5, i + .5, color=(0, 0, 0, 0.05), lw=0) i += 1 ax.set_xticks(np.arange(0, len(self.stats))) ax.set_xlim(-0.5, len(self.stats) - .5) ax.set_xticklabels(samples) return fig, ax
[docs] @_log def getstats(self, filename=None, samples=None, subset=None, ablation_time=False, save=True): """ Return pandas dataframe of all sample statistics. """ slst = [] if samples is None and subset is None: samples = list(self.stats.keys()) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) for s in self.stats_calced: for nm in samples: if self.stats[nm][s].ndim == 2: # make multi - index reps = np.arange(self.stats[nm][s].shape[-1]) ss = np.array([s] * reps.size) nms = np.array([nm] * reps.size) # make sub - dataframe stdf = pd.DataFrame(self.stats[nm][s].T, columns=list(self.stats[nm]['analytes']), index=[ss, nms, reps]) stdf.index.set_names(['statistic', 'sample', 'rep'], inplace=True) else: stdf = pd.DataFrame(self.stats[nm][s], index=list(self.stats[nm]['analytes']), columns=[[s], [nm]]).T stdf.index.set_names(['statistic', 'sample'], inplace=True) slst.append(stdf) out = pd.concat(slst) if ablation_time: ats = self.ablation_times(samples=samples, subset=subset) ats['statistic'] = 'nanmean' ats.set_index('statistic', append=True, inplace=True) ats = ats.reorder_levels(['statistic', 'sample', 'rep']) out = out.join(ats) if filename is not None: out.to_csv(filename) elif save: out.to_csv(os.path.join(self.export_dir, 'stat_export.csv')) self.stats_df = out return out.reindex(self.analytes_sorted(out.columns.values, focus_stage=self.stat_focus_stage), axis=1)
# raw data export function def _minimal_export_traces(self, outdir=None, analytes=None, samples=None, subset='All_Analyses'): """ Used for exporting minimal dataset. DON'T USE. """ if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) focus_stage = 'rawdata' # ud = 'counts' if not os.path.isdir(outdir): os.mkdir(outdir) for s in samples: d = self.data[s].data[focus_stage] out = Bunch() for a in analytes: out[a] = d[a] out = pd.DataFrame(out, index=self.data[s].Time) out.index.name = 'Time' d = dateutil.parser.parse(self.data[s].meta['date']) header = ['# Minimal Reproduction Dataset Exported from LATOOLS on %s' % (time.strftime('%Y:%m:%d %H:%M:%S')), "# Analysis described in '../analysis.lalog'", '# Run latools.reproduce to import analysis.', '#', '# Sample: %s' % (s), '# Analysis Time: ' + d.strftime('%Y-%m-%d %H:%M:%S')] header = '\n'.join(header) + '\n' csv = out.to_csv() with open('%s/%s.csv' % (outdir, s), 'w') as f: f.write(header) f.write(csv) return
[docs] @_log def export_traces(self, outdir=None, focus_stage=None, analytes=None, samples=None, subset='All_Analyses', filt=False, zip_archive=False): """ Function to export raw data. Parameters ---------- outdir : str directory to save toe traces. Defaults to 'main-dir-name_export'. focus_stage : str The name of the analysis stage to export. * 'rawdata': raw data, loaded from csv file. * 'despiked': despiked data. * 'background': the calculated background that is subtracted from the data. * 'bkgsub': background subtracted data, created by self.bkg_correct * 'ratios': element ratio data, created by self.ratio. * 'calibrated': ratio data calibrated to standards, created by self.calibrate. * 'mass_fraction': mass fraction of each element. Defaults to the most recent stage of analysis. analytes : str or array_like Either a single analyte, or list of analytes to export. Defaults to all analytes. samples : str or array_like Either a single sample name, or list of samples to export. Defaults to all samples. filt : str, dict or bool Either logical filter expression contained in a str, a dict of expressions specifying the filter string to use for each analyte or a boolean. Passed to `grab_filt`. """ analytes = self.analytes_sorted(analytes, focus_stage=focus_stage) if samples is not None: subset = self.make_subset(samples, silent=True) samples = self._get_samples(subset) if focus_stage is None: focus_stage = self.focus_stage if outdir is None: outdir = os.path.join(self.export_dir, 'trace_export') ud = {'rawdata': 'counts', 'despiked': 'counts', 'bkgsub': 'background corrected counts', 'ratios': 'counts/count', 'calibrated': 'mol/mol', 'mass_fraction': 'mass fraction'} if not os.path.isdir(outdir): os.mkdir(outdir) for s in samples: d = self.data[s].data[focus_stage] out = Bunch() for a in analytes: if a not in d: continue ind = self.data[s].filt.grab_filt(filt, a) out[a] = nominal_values(d[a][ind]) if focus_stage not in ['rawdata', 'despiked']: out[a + '_std'] = std_devs(d[a][ind]) out[a + '_std'][out[a + '_std'] == 0] = np.nan out = pd.DataFrame(out, index=self.data[s].Time[ind]) out.index.name = 'Time' header = ['# Sample: %s' % (s), '# Data Exported from LATOOLS on %s' % (time.strftime('%Y:%m:%d %H:%M:%S')), '# Processed using %s configuration' % (self.config['config']), '# Analysis Stage: %s' % (focus_stage), '# Unit: %s' % ud[focus_stage]] header = '\n'.join(header) + '\n' csv = out.to_csv() with open('%s/%s_%s.csv' % (outdir, s, focus_stage), 'w') as f: f.write(header) f.write(csv) if zip_archive: utils.zipdir(outdir, delete=True) return
[docs] def save_log(self, directory=None, logname=None, header=None): """ Save analysis.lalog in specified location """ if directory is None: directory = self.export_dir if not os.path.isdir(directory): directory = os.path.dirname(directory) if logname is None: logname = 'analysis.lalog' if header is None: header = self._log_header() loc = logging.write_logfile(self.log, header, os.path.join(directory, logname)) return loc
[docs] def minimal_export(self, target_analytes=None, path=None): """ Exports a analysis parameters, standard info and a minimal dataset, which can be imported by another user. Parameters ---------- target_analytes : str or iterable Which analytes to include in the export. If specified, the export will contain these analytes, and all other analytes used during data processing (e.g. during filtering). If not specified, all analytes are exported. path : str Where to save the minimal export. If it ends with .zip, a zip file is created. If it's a folder, all data are exported to a folder. """ target_analytes = self._analyte_checker(target_analytes, check_ratios=False) zip_archive = False # set up data path if path is None: path = self.export_dir + '/minimal_export.zip' if path.endswith('.zip'): path = path.replace('.zip', '') zip_archive = True if not os.path.isdir(path): os.mkdir(path) # parse minimal analytes (exclude ratios, include target_analytes) export_analytes = target_analytes.union(split_analyte_ratios(self.minimal_analytes)) export_analytes = self.analytes_sorted(export_analytes, check_ratios=False) # export data self._minimal_export_traces(path + '/data', analytes=export_analytes) # define analysis_log header log_header = ['# Minimal Reproduction Dataset Exported from LATOOLS on %s' % (time.strftime('%Y:%m:%d %H:%M:%S')), 'data_path :: ./data/'] if hasattr(self, 'srmdat'): log_header.append('srm_table :: ./srm.table') # export srm table items = set() for a in export_analytes: for srm, ad in self._analyte_srmdat_link.items(): items.update([ad[a]]) srmdat = self.srmdat.loc[idx[:, list(items)], :] with open(path + '/srm.table', 'w') as f: f.write(srmdat.to_csv()) # save internal_standard_concs if self.internal_standard_concs is not None: log_header.append('internal_standard_concs :: ./internal_standard_concs.csv') self.internal_standard_concs.to_csv(os.path.join(path, './internal_standard_concs.csv')) # save custom functions (of defined) if hasattr(self, 'custom_stat_functions'): with open(path + '/custom_stat_fns.py', 'w') as f: f.write(self.custom_stat_functions) log_header.append('custom_stat_functions :: ./custom_stat_fns.py') log_header.append('# Analysis Log Start: \n') # format sample_stats correctly lss = [(i, l) for i, l in enumerate(self.log) if 'sample_stats' in l] rep = re.compile("(.*'stats': )(\[.*?\])(.*)") for i, l in lss: self.log[i] = rep.sub(r'\1' + str(self.stats_calced) + r'\3', l) # save log self.save_log(path, 'analysis.lalog', header=log_header) if zip_archive: utils.zipdir(directory=path, delete=True) return
[docs]def reproduce(past_analysis, plotting=False, data_path=None, srm_table=None, internal_standard_concs=None, custom_stat_functions=None): """ Reproduce a previous analysis exported with :func:`latools.analyse.minimal_export` For normal use, supplying `log_file` and specifying a plotting option should be enough to reproduce an analysis. All requisites (raw data, SRM table and any custom stat functions) will then be imported from the minimal_export folder. You may also specify your own raw_data, srm_table and custom_stat_functions, if you wish. Parameters ---------- log_file : str The path to the log file produced by :func:`~latools.analyse.minimal_export`. plotting : bool Whether or not to output plots. data_path : str Optional. Specify a different data folder. Data folder should normally be in the same folder as the log file. srm_table : str Optional. Specify a different SRM table. SRM table should normally be in the same folder as the log file. internal_standard_concs : pandas.DataFrame Optional. Specify internal standard concentrations used to calculate mass fractions. custom_stat_functions : str Optional. Specify a python file containing custom stat functions for use by reproduce. Any custom stat functions should normally be included in the same folder as the log file. """ if '.zip' in past_analysis: dirpath = utils.extract_zipdir(past_analysis) logpath = os.path.join(dirpath, 'analysis.lalog') elif os.path.isdir(past_analysis): if os.path.exists(os.path.join(past_analysis, 'analysis.lalog')): logpath = os.path.join(past_analysis, 'analysis.lalog') elif 'analysis.lalog' in past_analysis: logpath = past_analysis else: raise ValueError(('\n\n{} is not a valid input.\n\n' + 'Must be one of:\n' + ' - A .zip file exported by latools\n' + ' - An analysis.lalog file\n' + ' - A directory containing an analysis.lalog files\n')) runargs, paths = logging.read_logfile(logpath) # parse custom stat functions csfs = Bunch() if custom_stat_functions is None and 'custom_stat_functions' in paths: # load custom functions as a dict with open(paths['custom_stat_functions'], 'r') as f: csf = f.read() fname = re.compile('def (.*)\(.*') for c in csf.split('\n\n\n\n'): if fname.match(c): csfs[fname.match(c).groups()[0]] = c # create analysis object rep = analyse(*runargs[0][-1]['args'], **runargs[0][-1]['kwargs']) # deal with internal standard concentrations if internal_standard_concs is None and 'internal_standard_concs' in paths: rep.read_internal_standard_concs(paths['internal_standard_concs']) # rest of commands for fname, arg in runargs: if fname != '__init__': if 'plot' in fname.lower() and plotting: getattr(rep, fname)(*arg['args'], **arg['kwargs']) elif 'sample_stats' in fname.lower(): rep.sample_stats(*arg['args'], csf_dict=csfs, **arg['kwargs']) else: getattr(rep, fname)(*arg['args'], **arg['kwargs']) return rep