import re
import os
import datetime
import dateutil
import numpy as np
import pandas as pd
from warnings import warn
from ..processes import read_data, autorange
from ..helpers.helpers import analyte_2_namemass
[docs]def by_regex(file, outdir=None, split_pattern=None, global_header_rows=0, fname_pattern=None, trim_tail_lines=0, trim_head_lines=0):
"""
Split one long analysis file into multiple smaller ones.
Parameters
----------
file : str
The path to the file you want to split.
outdir : str
The directory to save the split files to.
If None, files are saved to a new directory
called 'split', which is created inside the
data directory.
split_pattern : regex string
A regular expression that will match lines in the
file that mark the start of a new section. Does
not have to match the whole line, but must provide
a positive match to the lines containing the pattern.
global_header_rows : int
How many rows at the start of the file to include
in each new sub-file.
fname_pattern : regex string
A regular expression that identifies a new file name
in the lines identified by split_pattern. If none,
files will be called 'noname_N'. The extension of the
main file will be used for all sub-files.
trim_head_lines : int
If greater than zero, this many lines are removed from the start of each segment
trim_tail_lines : int
If greater than zero, this many lines are removed from the end of each segment
Returns
-------
Path to new directory : str
"""
# create output sirectory
if outdir is None:
outdir = os.path.join(os.path.dirname(file), 'split')
if not os.path.exists(outdir):
os.mkdir(outdir)
# read input file
with open(file, 'r') as f:
lines = f.readlines()
# get file extension
extension = os.path.splitext(file)[-1]
# grab global header rows
global_header = lines[:global_header_rows]
# find indices of lines containing split_pattern
starts = []
for i, line in enumerate(lines):
if re.search(split_pattern, line):
starts.append(i)
starts.append(len(lines)) # get length of lines
# split lines into segments based on positions of regex
splits = {}
for i in range(len(starts) - 1):
m = re.search(fname_pattern, lines[starts[i]])
if m:
fname = m.groups()[0].strip()
else:
fname = 'no_name_{:}'.format(i)
splits[fname] = global_header + lines[starts[i]:starts[i+1]][trim_head_lines:trim_tail_lines]
# write files
print('Writing files to: {:}'.format(outdir))
for k, v in splits.items():
fname = (k + extension).replace(' ', '_')
with open(os.path.join(outdir, fname), 'w') as f:
f.writelines(v)
print(' {:}'.format(fname))
print('Done.')
return outdir
[docs]def long_file(data_file, dataformat, sample_list, savedir=None, srm_id=None, **autorange_args):
if isinstance(sample_list, str):
if os.path.exists(sample_list):
sample_list = np.genfromtxt(sample_list, dtype=str)
else:
raise ValueError('File {} not found.')
elif not isinstance(sample_list, (list, np.ndarray)):
raise ValueError('sample_list should be an array_like or a file.')
if srm_id is not None:
srm_replace = []
for s in sample_list:
if srm_id in s:
s = srm_id
srm_replace.append(s)
sample_list = srm_replace
_, _, dat, meta = read_data(data_file, dataformat=dataformat, name_mode='file')
if 'date' in meta:
d = dateutil.parser.parse(meta['date'])
else:
d = datetime.datetime.now()
# autorange
bkg, sig, trn, _ = autorange(dat['Time'], dat['total_counts'], **autorange_args)
ns = np.zeros(sig.size)
ns[sig] = np.cumsum((sig ^ np.roll(sig, 1)) & sig)[sig]
n = int(max(ns))
if len(sample_list) != n:
warn('Length of sample list does not match number of ablations in file.\n' +
'We will continue, but please make sure the assignments are correct.')
# calculate split boundaries
bounds = []
lower = 0
sn = 0
next_sample = ''
for ni in range(n-1):
sample = sample_list[sn]
next_sample = sample_list[sn + 1]
if sample != next_sample:
current_end = np.argwhere(dat['Time'] == dat['Time'][ns == ni + 1].max())[0]
next_start = np.argwhere(dat['Time'] == dat['Time'][ns == ni + 2].min())[0]
upper = (current_end + next_start) // 2
bounds.append((sample, (int(lower), int(upper))))
lower = upper + 1
sn += 1
bounds.append((sample_list[-1], (int(upper) + 1, len(ns))))
# split up data
sections = {}
seen = {}
for s, (lo, hi) in bounds:
if s not in seen:
seen[s] = 0
else:
seen[s] += 1
s += '_{}'.format(seen[s])
sections[s] = {'oTime': dat['Time'][lo:hi]}
sections[s]['Time'] = sections[s]['oTime'] - np.nanmin(sections[s]['oTime'])
sections[s]['rawdata'] = {}
for k, v in dat['rawdata'].items():
sections[s]['rawdata'][k] = v[lo:hi]
sections[s]['starttime'] = d + datetime.timedelta(seconds=np.nanmin(sections[s]['oTime']))
# save output
if savedir is None:
savedir = os.path.join(os.path.dirname(os.path.abspath(data_file)), os.path.splitext(os.path.basename(data_file))[0] + '_split')
if not os.path.isdir(savedir):
os.makedirs(savedir)
header = ['# Long data file split by latools on {}'.format(datetime.datetime.now().strftime('%Y:%m:%d %H:%M:%S'))]
if 'date' not in meta:
header.append('# Warning: No date specified in file - Analysis Times are date file was split. ')
else:
header.append('# ')
header.append('# ')
header.append('# ')
flist = [savedir]
for s, dat in sections.items():
iheader = header.copy()
iheader.append('# Sample: {}'.format(s))
iheader.append('# Analysis Time: {}'.format(dat['starttime'].strftime('%Y-%m-%d %H:%M:%S')))
iheader = '\n'.join(iheader) + '\n'
out = pd.DataFrame({analyte_2_namemass(k): v for k, v in dat['rawdata'].items()}, index=dat['Time'])
out.index.name = 'Time'
csv = out.to_csv()
with open('{}/{}.csv'.format(savedir, s), 'w') as f:
f.write(iheader)
f.write(csv)
flist.append(' {}.csv'.format(s))
print("File split into {} sections.\n Saved to: {}\n\n Import using the 'REPRODUCE' configuration.".format(n, '\n'.join(flist)))
return None