"""
Functions for splitting long files into multiple short ones.
(c) Oscar Branson : https://github.com/oscarbranson
"""
import re
import os
import json
import datetime
import dateutil
import textwrap
import numpy as np
import pandas as pd
import pkg_resources as pkgrs
from warnings import warn
from ..processes import read_data, autorange
from ..helpers.signal import bool_2_indices
from ..helpers.analytes import analyte_2_namemass
from ..helpers.io import read_dataformat
import matplotlib.pyplot as plt
textwidth = 70 # characters, for printing
[docs]def by_regex(file, outdir=None, split_pattern=None, global_header_rows=0, fname_pattern=None, trim_tail_lines=0, trim_head_lines=0):
"""
Split one long analysis file into multiple smaller ones.
Parameters
----------
file : str
The path to the file you want to split.
outdir : str
The directory to save the split files to.
If None, files are saved to a new directory
called 'split', which is created inside the
data directory.
split_pattern : regex string
A regular expression that will match lines in the
file that mark the start of a new section. Does
not have to match the whole line, but must provide
a positive match to the lines containing the pattern.
global_header_rows : int
How many rows at the start of the file to include
in each new sub-file.
fname_pattern : regex string
A regular expression that identifies a new file name
in the lines identified by split_pattern. If none,
files will be called 'noname_N'. The extension of the
main file will be used for all sub-files.
trim_head_lines : int
If greater than zero, this many lines are removed from the start of each segment
trim_tail_lines : int
If greater than zero, this many lines are removed from the end of each segment
Returns
-------
Path to new directory : str
"""
# create output sirectory
if outdir is None:
outdir = os.path.join(os.path.dirname(file), 'split')
if not os.path.exists(outdir):
os.mkdir(outdir)
# read input file
with open(file, 'r') as f:
lines = f.readlines()
# get file extension
extension = os.path.splitext(file)[-1]
# grab global header rows
global_header = lines[:global_header_rows]
# find indices of lines containing split_pattern
starts = []
for i, line in enumerate(lines):
if re.search(split_pattern, line):
starts.append(i)
starts.append(len(lines)) # get length of lines
# split lines into segments based on positions of regex
splits = {}
for i in range(len(starts) - 1):
m = re.search(fname_pattern, lines[starts[i]])
if m:
fname = m.groups()[0].strip()
else:
fname = 'no_name_{:}'.format(i)
splits[fname] = global_header + lines[starts[i]:starts[i+1]][trim_head_lines:trim_tail_lines]
# write files
print('Writing files to: {:}'.format(outdir))
for k, v in splits.items():
fname = (k + extension).replace(' ', '_')
with open(os.path.join(outdir, fname), 'w') as f:
f.writelines(v)
print(' {:}'.format(fname))
print('Done.')
return outdir
[docs]def long_file(data_file, dataformat, sample_list, analyte='total_counts', savedir=None, srm_id=None, combine_same_name=True, defrag_to_match_sample_list=True, min_points=0, plot=True, passthrough=False, **autorange_args):
"""
Split single long files containing multiple analyses into multiple files containing single analyses.
Imports a long datafile and uses `latools.processes.autorange` to
identify ablations in the long file based on your chosen analyte.
The data are then saved as multiple files each containing a single
ablation, named using the list of names you provide.
Data will be saved in latools' 'REPRODUCE' format.
TODO: Check for existing files in savedir, don't overwrite?
Parameters
----------
data_file : str
The path to the data file you want to read.
dataformat : dataformat dict
A valid dataformat dict. See online documentation for more details.
sample_list : array-like
A list of strings that will be used to name the individual files.
One sample name can contain a 'wildcard' character '+' or '*'. If
we find more ablations than the number of names in sample_list, we'll
expand this wildcard to name each unlabelled ablation. If the wildcard
is '+' the abltations are given unique numbered names and split up into
separate files, whereas for '*' the ablations are given the same and
saved into a single file. One *one* sample name can contain a wildcard.
analyte : str
The analyte that autorange uses to identify ablations. Can be any valid
analyte in the data. Defaults to 'total_counts'.
savedir : str
The directory to save the data in. Defaults to the name of the data_file,
appended with '_split'.
srm_id : str
If given, all file names containing srm_id will be replaced with srm_id.
passthrough : bool
If False data are saved, if True data are yielded in correct format for
loading by latools.analyse object.
**autorange_args
Additional arguments passed to la.processes.autorange used for identifying ablations.
Returns
-------
None
"""
if isinstance(sample_list, str):
if os.path.exists(sample_list):
sample_list = np.genfromtxt(sample_list, dtype=str)
else:
raise ValueError('File {} not found.')
else:
sample_list = np.asanyarray(sample_list)
# detect wildcard samples
# * = multiple analyses to be numbered
# + = multiple analyses to be combined
mode = 'strict'
wilds = []
for s in sample_list:
if '*' in s:
mode = '*'
wilds.append(s)
if '+' in s:
mode = '+'
wilds.append(s)
if len(wilds) > 1:
errmsg = (
["More than one sample name contains a wildcard:"] +
[f" {w}" for w in wilds] +
["I don't know how to cope with this..."])
raise ValueError('\n'.join(errmsg))
if len(wilds) == 1:
wildind = np.argwhere(sample_list == wilds[0]).item()
wildsample = wilds[0].replace('+','').replace('*','')
if srm_id is not None:
srm_replace = []
for s in sample_list:
if srm_id in s:
s = srm_id
srm_replace.append(s)
sample_list = srm_replace
dataformat = read_dataformat(dataformat, silent=False)
_, _, dat, meta = read_data(data_file, dataformat=dataformat, name_mode='file')
if 'date' in meta:
d = dateutil.parser.parse(meta['date'])
else:
d = datetime.datetime.now()
# analyte handling
if analyte == 'total_counts':
y_data = dat['total_counts']
elif analyte in dat['rawdata'].keys():
y_data = dat['rawdata'][analyte]
else:
valid = list(dat['rawdata'].keys()) + ['total_counts']
raise ValueError("'{}' is not a valid analyte. Please use one of:\n {}".format(analyte, valid))
# autorange
bkg, sig, _, _ = autorange(dat['Time'], y_data, **autorange_args)
ns = np.zeros(sig.size)
ns[sig] = np.cumsum((sig ^ np.roll(sig, 1)) & sig)[sig]
n = int(max(ns))
nsamples = len(sample_list)
# deal with wildcards
if nsamples <= n and mode != 'strict':
msg = f"There are more ablations than samples in list, but you've given wildcard '{mode}' for sample '{wildsample}'..."
print('\n'.join(textwrap.wrap(msg, textwidth)))
pre = sample_list[:wildind]
post = sample_list[wildind + 1:]
nnew = n - nsamples + 1
# deal with '*' wildcard
if mode == '+':
newnames = [f"{wildsample}_{n}" for n in range(nnew)]
msg = f' -> These {nnew} ablations will be prepended with consecutive numbers and split into separate files.'
# deal with '+' wildcard
if mode == '*':
newnames = [f"{wildsample}" for n in range(nnew)]
msg = f' -> These {nnew} ablations will be given the same name and combined into a single file.'
print('\n '.join(textwrap.wrap(msg, textwidth - 5)))
sample_list = np.concatenate([pre, newnames, post])
nsamples = len(sample_list)
if nsamples != n:
print('Number of samples in list ({}) does not match number of ablations ({}).'.format(nsamples, n))
if nsamples < n:
print(' -> There are more ablations than samples...')
if defrag_to_match_sample_list:
print(' Removing data fragments to match sample list length.')
while nsamples < n:
min_points += 1
sig = sig & np.roll(sig, min_points)
ns = np.zeros(sig.size)
ns[sig] = np.cumsum((sig ^ np.roll(sig, 1)) & sig)[sig]
n = int(max(ns))
print(' (Removed data fragments < {} points long)'.format(min_points))
elif isinstance(min_points, (int, float)):
# minimum point filter
sig = sig & np.roll(sig, min_points)
ns = np.zeros(sig.size)
ns[sig] = np.cumsum((sig ^ np.roll(sig, 1)) & sig)[sig]
n = int(max(ns))
else:
print(' -> There are more samples than ablations...')
print(' Check your sample list is correct. If so, consider')
print(' adding autorange_params to change the signal detection.')
return
minn = min([len(sample_list), n])
# calculate split boundaries
bounds = []
lower = 0
sn = 0
next_sample = ''
for ni in range(minn-1):
sample = sample_list[sn]
next_sample = sample_list[sn + 1]
if not combine_same_name or sample != next_sample:
current_end = np.argwhere(dat['Time'] == dat['Time'][ns == ni + 1].max())[0]
next_start = np.argwhere(dat['Time'] == dat['Time'][ns == ni + 2].min())[0]
upper = (current_end + next_start) // 2
bounds.append((sample, (int(lower), int(upper))))
lower = upper + 1
sn += 1
if len(sample_list) == 1:
bounds.append((sample_list[-1], (0, len(dat['Time']))))
else:
bounds.append((sample_list[-1], (int(upper) + 1, len(ns))))
# split up data
sections = {}
seen = {}
for s, (lo, hi) in bounds:
if s not in seen:
seen[s] = 0
else:
seen[s] += 1
s += '_{}'.format(seen[s])
sections[s] = {'oTime': dat['Time'][lo:hi]}
sections[s]['Time'] = sections[s]['oTime'] - np.nanmin(sections[s]['oTime'])
sections[s]['rawdata'] = {}
for k, v in dat['rawdata'].items():
sections[s]['rawdata'][k] = v[lo:hi]
sections[s]['starttime'] = d + datetime.timedelta(seconds=np.nanmin(sections[s]['oTime']))
sections[s]['total_counts'] = dat['total_counts'][lo:hi]
# save output
if passthrough:
print(f"Success! {n} ablations identified.")
for sample, sdat in sections.items():
sanalytes = list(sdat['rawdata'].keys())
sdata = {
'Time': sdat['Time'],
'rawdata': sdat['rawdata'],
'total_counts': sdat['total_counts']
}
# minimal meta - datetime only
smeta = {
'date': sdat['starttime']
}
yield data_file, sample, sanalytes, sdata, smeta
# yield file : str, sample : str, analytes : set, data : dict, meta : dict
else:
if savedir is None:
savedir = os.path.join(os.path.dirname(os.path.abspath(data_file)), os.path.splitext(os.path.basename(data_file))[0] + '_split')
if not os.path.isdir(savedir):
os.makedirs(savedir)
header = ['# Long data file split by latools on {}'.format(datetime.datetime.now().strftime('%Y:%m:%d %H:%M:%S'))]
if 'date' not in meta:
header.append('# Warning: No date specified in file - Analysis Times are date file was split. ')
else:
header.append('# ')
header.append('# ')
header.append('# ')
flist = []
for s, sdat in sections.items():
iheader = header.copy()
iheader.append('# Sample: {}'.format(s))
iheader.append('# Analysis Time: {}'.format(sdat['starttime'].strftime('%Y-%m-%d %H:%M:%S')))
iheader = '\n'.join(iheader) + '\n'
out = pd.DataFrame({analyte_2_namemass(k): v for k, v in sdat['rawdata'].items()}, index=sdat['Time'])
out.index.name = 'Time'
csv = out.to_csv()
with open('{}/{}.csv'.format(savedir, s), 'w') as f:
f.write(iheader)
f.write(csv)
flist.append(' {}.csv'.format(s))
print("Success! File split into {} sections.".format(n))
print("New files saved to:\n{}/\n{}\n\nImport the split files using the 'REPRODUCE' configuration.".format(os.path.relpath(savedir), '\n'.join(flist)))
if plot:
return plot_long_file_split(dat, sig, bkg, sections)
else:
return None
# return dat, sig, sections
[docs]def plot_long_file_split(dat, sig, bkg, sections):
n = len(sections)
fig, ax = plt.subplots(1, 1, figsize=(n * 1.5, 2.5))
ax.plot(dat['Time'], dat['total_counts'], c=(0,0,0,0))
ax.set_yscale('log')
ax.set_xlim(dat['Time'].min(), dat['Time'].max())
ylim = ax.get_ylim()
yrng = np.ptp(ylim)
xlim = ax.get_xlim()
xrng = np.ptp(xlim)
for s, d in sections.items():
line = ax.plot(d['oTime'], d['total_counts'])
ax.axvline(d['oTime'][0], color=line[0].get_color())
ax.text(d['oTime'][0] + 0.02 * xrng / n, ylim[0] + 0.95 * yrng, s, rotation=90, ha='left', va='top', color=line[0].get_color())
sigs = bool_2_indices(sig)
for slo, shi in sigs:
ax.axvspan(dat['Time'][slo], dat['Time'][shi], zorder=-2, color=(.8,.7,0,0.15), lw=0)
bkgs = bool_2_indices(bkg)
for blo, bhi in bkgs:
ax.axvspan(dat['Time'][blo], dat['Time'][bhi], zorder=-2, color=(.2,.2,0,0.1), lw=0)
fig.tight_layout()
return fig, ax