"""Code to trim vector sequence from a sequencing read.
Lucy does not do a good job with tricky reads -- like those with small inserts
or those with lots of vector read. It ends up tossing them away as vector,
when in fact they contain useful sequence. This code tries to handle vector
trimming in a smarter way which is more in line with what we'd like to do
during sequencing.
"""
import os
import subprocess
from Bio import Fasta
from Bio.Blast import NCBIStandalone
class VectorTrimmer:
def __init__(self, min_insert, work_dir, seq_db, start_dir, base_dir,
data_dir, blastcmd, logger):
"""Initialize with a minimum insert size and vector split site.
"""
self._min_insert = min_insert
self._work_dir = work_dir
self._blastcmd = blastcmd
self._logger = logger
self._seq_db = seq_db
self._start_dir = start_dir
self._data_dir = data_dir
self._base_dir = base_dir
self._blast_dbs = {}
self._min_match = 20
def _get_blast_db(self, plate_db, design_vector):
"""Retrieve a blast-db for the vector specified on the given plate.
"""
# Don't get vector information from the plate any longer
#vector_plate = plate_db.get_vector()
#vector_names = vector_plate.split(",")
#assert design_vector is not None, "Need a design specified vector"
if design_vector == 'None' or design_vector is None:
vector_names = []
else:
vector_names = [design_vector]
self._logger.info("^^ Trimming with vector database: %s" % vector_names)
try:
return self._blast_dbs[tuple(vector_names)]
except KeyError:
# get all of the vector splice sites
all_splices = []
for vector_name in vector_names:
vector_splice = self._retrieve_vector_splice(self._seq_db,
vector_name, self._base_dir, self._data_dir)
all_splices.append(vector_splice)
blast_db = self._setup_vector_db(self._work_dir, all_splices,
vector_names)
self._blast_dbs[tuple(vector_names)] = blast_db
return blast_db
def _retrieve_vector_splice(self, seq_db, vector_name, base_dir, data_dir):
"""Retrieve the location of the vector splice file given the name.
From the name, splice location information is taken from the database,
and then used with standard data directories to retrieve the location.
"""
sql = "SELECT splice_file FROM vector WHERE vector_name = %s"
results = seq_db.do_select(sql, [vector_name])
if len(results) == 0:
raise ValueError("Could not find information on vector %s" %
(vector_name))
elif len(results) > 1:
raise ValueError("Multiple vectors")
else:
splice_location = results[0][0]
return os.path.join(base_dir, data_dir, splice_location)
def _setup_vector_db(self, work_dir, all_vector_splices, vector_names,
num_tries = 0):
"""Setup a vector database in our work directory.
This retrieves records from all of our various splice files, and
sets up a working vector splice database of the concatenation of all
of them.
"""
vector_str = "_".join(vector_names)
blast_db = os.path.join(work_dir, "vector_db-%s.fa" % vector_str)
log_file = os.path.join(work_dir, "formatdb.log")
output_handle = open(blast_db, "w")
recs = []
for vector_splice in all_vector_splices:
orig_splice = open(vector_splice)
iterator = Fasta.Iterator(orig_splice, Fasta.RecordParser())
for rec in iterator:
recs.append(rec)
orig_splice.close()
for rec in recs:
output_handle.write(str(rec) + "\n")
output_handle.close()
cl = "formatdb -i %s -p F -l %s" % (blast_db, log_file)
retval = subprocess.call(cl.split())
# look for failures, which seem to be caused by issues with formatdb and
# the samba mounted work directory
error_lines = []
log_handle = open(log_file)
for line in log_handle.xreadlines():
if line.find("failure") >= 0:
error_lines.append(line)
# check for errors, trying again if we have any
if len(error_lines) > 0:
if num_tries >= 10:
raise ValueError("Could not fix formatdb in %s" % work_dir)
self._logger.info("formatdb failures : \n%s" %
("\n".join(error_lines)))
self._setup_vector_db(work_dir, all_vector_splices, vector_names,
num_tries + 1)
return blast_db
def screen_read(self, name, plate_db, seq_list, qual_list,
design_vector, min_match = None, at_ends = False):
"""Screen a read to remove vector sequence, using blast.
design_vector is the string name of the vector planned to
be cloned in from the process details in the design database.
"""
blast_db = self._get_blast_db(plate_db, design_vector)
if min_match is None:
min_match = self._min_match
blast_rec = self._do_blast(seq_list, blast_db)
seq_full = "".join(seq_list)
vec_locs = []
if blast_rec:
for alignment in blast_rec.alignments:
hsp = alignment.hsps[0] # grab the longest hsp
match_no_gap = hsp.query.replace("-", "").upper()
# no ambiguities in the sequence
if match_no_gap.find("N") == -1:
start = seq_full.find(match_no_gap)
# otherwise, trust the blast start
else:
start = hsp.query_start - 1
assert start >= 0, (start, match_no_gap, seq_full)
end = start + len(match_no_gap)
if end - start > min_match:
vec_locs.append((start, end))
#self._logger.info("Original vector: %s" % vec_locs)
if len(vec_locs) > 1:
vec_locs = self._combine_vector_locations(vec_locs)
self._logger.info("-> BLAST vector locations: %s" % vec_locs)
# we are required to be at the ends of the sequence
if at_ends:
good_vec_locs = []
for start, end in vec_locs:
if start == 0 or end == (len(seq_list)):
good_vec_locs.append((start, end))
vec_locs = good_vec_locs
self._logger.info(" Vector regions: %s" % vec_locs)
vec_locs.sort()
# if we have extra vector due to something like a duplicated gateway
# site, take the first item and deal from there
if len(vec_locs) > 2:
assert len(vec_locs) <= 3
vec_locs = vec_locs[:1]
trim_seq_list, trim_qual = self._do_trim(seq_list, qual_list, vec_locs)
if len(trim_seq_list) > 0:
full_seq = "".join(seq_list)
trim_seq = "".join(trim_seq_list)
start = full_seq.find(trim_seq)
assert start >= 0, "Can't find trimmed sequence in full seq"
end = start + len(trim_seq)
self._logger.info(" Final trimmed region: %s to %s" % (start, end))
else:
self._logger.info(" No sequence after vector trim")
return trim_seq_list, trim_qual
def _combine_vector_locations(self, vec_locs):
"""Combine any vector locations which overlap with each other.
"""
assert len(vec_locs) >= 2
vec_locs.sort()
final_locs = []
prev_start, prev_end = vec_locs.pop(0)
new_start, new_end = vec_locs.pop(0)
while 1:
if (new_start >= prev_start and new_start <= prev_end) or \
(prev_start >= new_start and prev_start <= new_end):
min_start = min(new_start, prev_start)
max_end = max(new_end, prev_end)
prev_start, prev_end = min_start, max_end
else:
final_locs.append((prev_start, prev_end))
prev_start, prev_end = new_start, new_end
try:
new_start, new_end = vec_locs.pop(0)
except IndexError:
# done, add the last item
final_locs.append((prev_start, prev_end))
break
return final_locs
def _do_trim(self, seq_list, qual_list, vec_locs):
"""Trim the sequence based on location of vector sequences.
"""
assert len(seq_list) == len(qual_list)
# case one -- we have a sequence with both vector ends
if len(vec_locs) == 2:
good_start = vec_locs[0][1]
good_end = vec_locs[1][0]
if good_end - good_start > self._min_insert:
return seq_list[good_start:good_end], \
qual_list[good_start:good_end]
else:
return [], []
# no vector sequence found
elif len(vec_locs) == 0:
good_start = 0
good_end = len(seq_list)
# only one vector found -- decide which side to trim on
# basic idea is to trim the minimum amount
else:
assert len(vec_locs) == 1
vec_start, vec_end = vec_locs[0]
five_potential_remove = vec_end
three_potential_remove = len(seq_list) - vec_start
# if we are removing less from the five side, trim that side
if five_potential_remove < three_potential_remove:
good_start = vec_end
good_end = len(seq_list)
# if we are removing less from the three side, do that
elif three_potential_remove < five_potential_remove:
good_start = 0
good_end = vec_start
# case where the entire insert is vector
elif five_potential_remove == len(seq_list) and \
three_potential_remove == len(seq_list):
good_start = 0
good_end = 0
# not sure what to do in the case where they are the same
# just do not trim -- all vector won't match later
else:
good_start = 0
good_end = len(seq_list)
if good_end - good_start > self._min_insert:
return seq_list[good_start:good_end], \
qual_list[good_start:good_end]
else:
return [], []
def _do_blast(self, seq_list, blast_db):
sequence_file = os.path.join(self._work_dir, "vector_check.fa")
output_handle = open(sequence_file, "w")
rec = Fasta.Record()
rec.title = "seq-vec-check"
rec.sequence = "".join(seq_list)
output_handle.write(str(rec) + "\n")
output_handle.close()
blast_out, error = NCBIStandalone.blastall(blastcmd = self._blastcmd,
program = 'blastn', database = blast_db,
infile = sequence_file)
parser = NCBIStandalone.BlastErrorParser()
try:
blast_rec = parser.parse(blast_out)
except NCBIStandalone.LowQualityBlastError:
blast_rec = None
return blast_rec