chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
r2:775d9e9a5c4d 286 loc 11.1 KB embed / history / annotate / raw /
"""Code to trim vector sequence from a sequencing read.

Lucy does not do a good job with tricky reads -- like those with small inserts
or those with lots of vector read. It ends up tossing them away as vector,
when in fact they contain useful sequence. This code tries to handle vector
trimming in a smarter way which is more in line with what we'd like to do
during sequencing.
"""
import os
import subprocess

from Bio import Fasta
from Bio.Blast import NCBIStandalone

class VectorTrimmer:
    def __init__(self, min_insert, work_dir, seq_db, start_dir, base_dir,
            data_dir, blastcmd, logger):
        """Initialize with a minimum insert size and vector split site.
        """
        self._min_insert = min_insert
        self._work_dir = work_dir
        self._blastcmd = blastcmd
        self._logger = logger

        self._seq_db = seq_db

        self._start_dir = start_dir
        self._data_dir = data_dir
        self._base_dir = base_dir

        self._blast_dbs = {}

        self._min_match = 20

    def _get_blast_db(self, plate_db, design_vector):
        """Retrieve a blast-db for the vector specified on the given plate.
        """
        # Don't get vector information from the plate any longer
        #vector_plate = plate_db.get_vector()
        #vector_names = vector_plate.split(",")
        #assert design_vector is not None, "Need a design specified vector"
        if design_vector == 'None' or design_vector is None:
            vector_names = []
        else:
            vector_names = [design_vector]
        self._logger.info("^^ Trimming with vector database: %s" % vector_names)

        try:
            return self._blast_dbs[tuple(vector_names)]
        except KeyError:
            # get all of the vector splice sites
            all_splices = []
            for vector_name in vector_names:
                vector_splice = self._retrieve_vector_splice(self._seq_db,
                        vector_name, self._base_dir, self._data_dir)
                all_splices.append(vector_splice)
            blast_db = self._setup_vector_db(self._work_dir, all_splices,
                    vector_names)
            self._blast_dbs[tuple(vector_names)] = blast_db
            return blast_db

    def _retrieve_vector_splice(self, seq_db, vector_name, base_dir, data_dir):
        """Retrieve the location of the vector splice file given the name.

        From the name, splice location information is taken from the database,
        and then used with standard data directories to retrieve the location.
        """
        sql = "SELECT splice_file FROM vector WHERE vector_name = %s"
        results = seq_db.do_select(sql, [vector_name])
        if len(results) == 0:
            raise ValueError("Could not find information on vector %s" %
                    (vector_name))
        elif len(results) > 1:
            raise ValueError("Multiple vectors")
        else:
            splice_location = results[0][0]

        return os.path.join(base_dir, data_dir, splice_location)

    def _setup_vector_db(self, work_dir, all_vector_splices, vector_names,
            num_tries = 0):
        """Setup a vector database in our work directory.

        This retrieves records from all of our various splice files, and
        sets up a working vector splice database of the concatenation of all
        of them.
        """
        vector_str = "_".join(vector_names)
        blast_db = os.path.join(work_dir, "vector_db-%s.fa" % vector_str)
        log_file = os.path.join(work_dir, "formatdb.log")
        output_handle = open(blast_db, "w")

        recs = []
        for vector_splice in all_vector_splices:
            orig_splice = open(vector_splice)
            iterator = Fasta.Iterator(orig_splice, Fasta.RecordParser())
            for rec in iterator:
                recs.append(rec)
            orig_splice.close()

        for rec in recs:
            output_handle.write(str(rec) + "\n")
        output_handle.close()
        cl = "formatdb -i %s -p F -l %s" % (blast_db, log_file)
        retval = subprocess.call(cl.split())

        # look for failures, which seem to be caused by issues with formatdb and
        # the samba mounted work directory
        error_lines = []
        log_handle = open(log_file)
        for line in log_handle.xreadlines():
            if line.find("failure") >= 0:
                error_lines.append(line)

        # check for errors, trying again if we have any
        if len(error_lines) > 0:
            if num_tries >= 10:
                raise ValueError("Could not fix formatdb in %s" % work_dir)

            self._logger.info("formatdb failures : \n%s" %
                    ("\n".join(error_lines)))
            self._setup_vector_db(work_dir, all_vector_splices, vector_names,
                    num_tries + 1)

        return blast_db

    def screen_read(self, name, plate_db, seq_list, qual_list,
            design_vector, min_match = None, at_ends = False):
        """Screen a read to remove vector sequence, using blast.

        design_vector is the string name of the vector planned to
        be cloned in from the process details in the design database.
        """
        blast_db = self._get_blast_db(plate_db, design_vector)

        if min_match is None:
            min_match = self._min_match
        blast_rec = self._do_blast(seq_list, blast_db)
        seq_full = "".join(seq_list)
        vec_locs = []
        if blast_rec:
            for alignment in blast_rec.alignments:
                hsp = alignment.hsps[0] # grab the longest hsp
                match_no_gap = hsp.query.replace("-", "").upper()
                # no ambiguities in the sequence
                if match_no_gap.find("N") == -1:
                    start = seq_full.find(match_no_gap)
                # otherwise, trust the blast start
                else:
                    start = hsp.query_start - 1
                assert start >= 0, (start, match_no_gap, seq_full)
                end = start + len(match_no_gap)
                if end - start > min_match:
                    vec_locs.append((start, end))

        #self._logger.info("Original vector: %s" % vec_locs)
        if len(vec_locs) > 1:
            vec_locs = self._combine_vector_locations(vec_locs)
        self._logger.info("-> BLAST vector locations: %s" % vec_locs)

        # we are required to be at the ends of the sequence
        if at_ends:
            good_vec_locs = []
            for start, end in vec_locs:
                if start == 0 or end == (len(seq_list)):
                    good_vec_locs.append((start, end))
            vec_locs = good_vec_locs
        
        self._logger.info(" Vector regions: %s" % vec_locs)
   
        vec_locs.sort()
        # if we have extra vector due to something like a duplicated gateway
        # site, take the first item and deal from there
        if len(vec_locs) > 2:
            assert len(vec_locs) <= 3
            vec_locs = vec_locs[:1]
        trim_seq_list, trim_qual = self._do_trim(seq_list, qual_list, vec_locs)
        
        if len(trim_seq_list) > 0:
            full_seq = "".join(seq_list)
            trim_seq = "".join(trim_seq_list)
            start = full_seq.find(trim_seq)
            assert start >= 0, "Can't find trimmed sequence in full seq"
            end = start + len(trim_seq)
            self._logger.info(" Final trimmed region: %s to %s" % (start, end))
        else:
            self._logger.info(" No sequence after vector trim")

        return trim_seq_list, trim_qual

    def _combine_vector_locations(self, vec_locs):
        """Combine any vector locations which overlap with each other.
        """
        assert len(vec_locs) >= 2
        vec_locs.sort()
        final_locs = []
        prev_start, prev_end = vec_locs.pop(0)
        new_start, new_end = vec_locs.pop(0)
        while 1:
            if (new_start >= prev_start and new_start <= prev_end) or \
                    (prev_start >= new_start and prev_start <= new_end):
                min_start = min(new_start, prev_start)
                max_end = max(new_end, prev_end)
                prev_start, prev_end = min_start, max_end
            else:
                final_locs.append((prev_start, prev_end))
                prev_start, prev_end = new_start, new_end

            try:
                new_start, new_end = vec_locs.pop(0)
            except IndexError:
                # done, add the last item
                final_locs.append((prev_start, prev_end))
                break

        return final_locs

    def _do_trim(self, seq_list, qual_list, vec_locs):
        """Trim the sequence based on location of vector sequences.
        """
        assert len(seq_list) == len(qual_list)
        # case one -- we have a sequence with both vector ends
        if len(vec_locs) == 2:
            good_start = vec_locs[0][1]
            good_end = vec_locs[1][0]
            if good_end - good_start > self._min_insert:
                return seq_list[good_start:good_end], \
                        qual_list[good_start:good_end]
            else:
                return [], []
        # no vector sequence found
        elif len(vec_locs) == 0:
            good_start = 0
            good_end = len(seq_list)
        # only one vector found -- decide which side to trim on
        # basic idea is to trim the minimum amount
        else:
            assert len(vec_locs) == 1
            vec_start, vec_end = vec_locs[0]
            five_potential_remove = vec_end
            three_potential_remove = len(seq_list) - vec_start
            # if we are removing less from the five side, trim that side
            if five_potential_remove < three_potential_remove:
                good_start = vec_end
                good_end = len(seq_list)
            # if we are removing less from the three side, do that
            elif three_potential_remove < five_potential_remove:
                good_start = 0
                good_end = vec_start
            # case where the entire insert is vector
            elif five_potential_remove == len(seq_list) and \
                 three_potential_remove == len(seq_list):
                good_start = 0
                good_end = 0
            # not sure what to do in the case where they are the same
            # just do not trim -- all vector won't match later
            else:
                good_start = 0
                good_end = len(seq_list)

        if good_end - good_start > self._min_insert:
            return seq_list[good_start:good_end], \
                    qual_list[good_start:good_end]
        else:
            return [], []

    def _do_blast(self, seq_list, blast_db):
        sequence_file = os.path.join(self._work_dir, "vector_check.fa")
        output_handle = open(sequence_file, "w")
        rec = Fasta.Record()
        rec.title = "seq-vec-check"
        rec.sequence = "".join(seq_list)
        output_handle.write(str(rec) + "\n")
        output_handle.close()

        blast_out, error = NCBIStandalone.blastall(blastcmd = self._blastcmd,
                program = 'blastn', database = blast_db,
                infile = sequence_file)
        
        parser = NCBIStandalone.BlastErrorParser()
        try:
            blast_rec = parser.parse(blast_out)
        except NCBIStandalone.LowQualityBlastError:
            blast_rec = None

        return blast_rec