chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
synbio / SynBio / Codons / AlternativeORF.py
r2:775d9e9a5c4d 219 loc 9.1 KB embed / history / annotate / raw /
"""Find alternative ORFs (not in the coding frame) and remove them.

This searches for ORFs longer than a defined number of amino acids in the
+2, +3, -1, -2, -3 strands. If found, the start codon is removed to prevent
expression of spurious protein products.
"""
from CodingRegion import FullSequence
from Bio.Seq import Seq
from Bio.Alphabet.IUPAC import unambiguous_dna, ambiguous_dna

from RestrictionChange import CutSite

class RemovalError(Exception):
    """Exception related to being unable to remove a site.
    """
    pass

class AlternativeORFRemover:
    """Remove alternative ORFs from coding regions.
    """
    def __init__(self, min_size, no_use_thresh, count_compensate):
        self._min_size = min_size
        self._no_use_thresh = no_use_thresh
        self._count_compensate = count_compensate
        self._logger = None

    def set_logger(self, logger):
        self._logger = logger

    def get_updated_sequence(self, sequence, cds_regions, codon_table,
            codon_usage):
        full_seq = FullSequence(sequence, cds_regions, codon_table,
                codon_usage)
        cds_iterator = full_seq.check_out_cds_iterator()
        for cds in cds_iterator:
            c_table = cds.codon_table()
            protein_seq = cds.protein_seq()
            ignore_starts = []
            while 1:
                nt_seq = cds.get_cdsonly_sequence()
                alt_starts = self._get_alt_starts(['ATG'],
                        c_table.stop_codons, nt_seq)
                for ig_start in ignore_starts:
                    if ig_start in alt_starts:
                        alt_starts.remove(ig_start)
                if len(alt_starts) == 0:
                    break
                # work on the first one we find
                site = alt_starts[0]
                self._logger.info("\tFound site %s in CDS(%s to %s): %s-%s" %
                        (site.site, cds.start, cds.end, site.start, site.end))
                try:
                    cds = self._remove_alt_site(cds, site)
                except RemovalError:
                    self._logger.info('\tCould not remove site: ignoring')
                    ignore_starts.append(site)
            new_protein_seq = cds.protein_seq()
            assert new_protein_seq == protein_seq
            full_seq.add_modified_cds(cds)
        final_seq = full_seq.get_sequence()
        return final_seq
    
    def _remove_alt_site(self, cds, alt_site):
        """Remove a defined site from the given coding region.
        """
        # retrieve all codons in the region, getting forward and reverse
        # codons to allow us to check if we accidentally add one site while
        # removing another one
        if cds.get_sequence()[:3] == 'ATG':
            to_start = 4
        else:
            to_start = 0
        codon_info = cds.get_codons_in_region(
                max(alt_site.start - 1, to_start),
                min(alt_site.end + 1, len(cds.get_sequence()) - 1))
        initial_seq = cds.codon_list_seq(codon_info)
        self._logger.info("\t Start region: %s" % (initial_seq))
        assert initial_seq.find(alt_site.site) >= 0
        # adjust sites while we have problems
        # we adjust each codon trying to fix it be replacing with the most
        # common codon at that position. If that can't work, then we
        # progressively move down codon frequencies until we find a codon
        # distribution that will work
        change_type_index = 1
        cur_codon_index = 0
        cur_cai_index = 0
        while 1:
            work_codons = []
            start_codons = []
            for update_i in range(change_type_index):
                work_codon_index, w_codon = \
                        codon_info[cur_codon_index + update_i]
                work_codons.append(w_codon)
                start_codons.append(w_codon.cur_codon)
            try:
                for work_codon in work_codons:
                    replace_codon = work_codon.get_codon_at_frequency(
                            cur_cai_index, self._no_use_thresh,
                            self._count_compensate)
                    work_codon.change_codon(replace_codon)
            # no codon to change to at the current choice frequency
            except IndexError:
                pass
            # if we fixed the issue, be done with this
            final_seq = cds.codon_list_seq(codon_info)
            if final_seq.find(alt_site.site) == -1:
                break
            # if not, revert the codon and keep trying
            else:
                for i, work_codon in enumerate(work_codons):
                    start_codon = start_codons[i]
                    work_codon.change_codon(start_codon)
            # update the codon to try; if we've reached past all codons to try
            # then increase to the next codon frequency
            cur_codon_index += 1
            if cur_codon_index > (len(codon_info) - change_type_index):
                cur_codon_index = 0
                cur_cai_index += 1
            # we can't have more than 6 choices at a position, meaning we've
            # exhausted all choices and not fixed anything. Raise an error.
            if cur_cai_index > 6:
                if change_type_index >= len(codon_info):
                    raise RemovalError("Could not remove site")
                else:
                    cur_codon_index = 0
                    change_type_index += 1
                    cur_cai_index = 0

        assert final_seq.find(alt_site.site) == -1
        self._logger.info("\t Final region: %s" % (final_seq))
        for index, codon in codon_info:
            cds.update_codon_index(index, codon)

        return cds

    def _get_alt_starts(self, start_codons, stop_codons, nt_seq):
        """Find any alternative coding regions longer then our minimum size.
        """
        assert len(nt_seq) % 3 == 0
        rc_seq = Seq(nt_seq, unambiguous_dna).reverse_complement().data
        sites_2 = self._find_alt_starts(nt_seq[1:-2], 1, False, start_codons,
                stop_codons)
        sites_3 = self._find_alt_starts(nt_seq[2:-1], 2, False, start_codons,
                stop_codons)
        sites_m1 = self._find_alt_starts(rc_seq[:], 0, True, start_codons,
                stop_codons)
        sites_m2 = self._find_alt_starts(rc_seq[1:-2], 1, True, start_codons,
                stop_codons)
        sites_m3 = self._find_alt_starts(rc_seq[2:-1], 2, True, start_codons,
                stop_codons)

        all_sites = sites_2 + sites_3 + sites_m1 + sites_m2 + sites_m3
        all_sites = list(set(all_sites))
        all_sites.sort()
        return all_sites

    def _find_alt_starts(self, alt_region, offset, is_rc, start_codons,
            stop_codons):
        """Return alternative starts in the given coding frame.
        """
        assert len(alt_region) % 3 == 0
        # break into codons
        cur_pos = 0
        codons = []
        while cur_pos < len(alt_region) / 3:
            cur_start = cur_pos * 3
            codons.append(alt_region[cur_start:cur_start + 3])
            cur_pos += 1
        assert len(codons) == len(alt_region) / 3
        # find all start and stop positions in these codons
        all_starts = []
        for start in start_codons:
            start_pos = 0
            while start_pos >= 0:
                try:
                    start_find = codons.index(start, start_pos)
                    all_starts.append(start_find)
                    start_pos = start_find + 1
                except ValueError:
                    start_pos = -1
        all_stops = []
        for stop in stop_codons:
            stop_pos = 0
            while stop_pos >= 0:
                try:
                    stop_find = codons.index(stop, stop_pos)
                    all_stops.append(stop_find)
                    stop_pos = stop_find + 1
                except ValueError:
                    stop_pos = -1
        # now get all of the too long problem regions and convert back
        # to initial coordinates
        sites = []
        for start in all_starts:
            following_stops = [x for x in all_stops if x >= start]
            if len(following_stops) > 1:
                follow_stop = following_stops[0]
            else:
                follow_stop = len(codons)
            assert follow_stop > start
            if follow_stop - start >= self._min_size:
                rel_start = start * 3
                rel_end = rel_start + 3
                bad_start = alt_region[rel_start:rel_end]
                assert bad_start in start_codons
                if not is_rc:
                    final_start = rel_start + offset
                    final_end = rel_end + offset
                    sites.append(CutSite(final_start, final_end, bad_start))
                else:
                    tmp = Seq(bad_start, unambiguous_dna)
                    bad_start_rc = tmp.reverse_complement().data
                    total_size = len(alt_region) + offset
                    rc_start = rel_start + offset
                    rc_end = rel_end + offset
                    final_start = total_size - rc_end
                    final_end = total_size - rc_start
                    sites.append(CutSite(final_start, final_end, bad_start_rc))
        return sites