bx-python / lib / bx / align / tools / tile.py

"""
Tools for tiling / projecting alignments onto an interval of a sequence.
"""

import bx.align as align
from bx import misc
import bx.seq.nib
import os
import string
import sys


def tile_interval( sources, index, ref_src, start, end, seq_db=None ):
    """
    Tile maf blocks onto an interval. The resulting block will span the interval
    exactly and contain the column from the highest scoring alignment at each
    position.

    `sources`: list of sequence source names to include in final block
    `index`: an instnace that can return maf blocks overlapping intervals
    `ref_src`: source name of the interval (ie, hg17.chr7)
    `start`: start of interval
    `end`: end of interval
    `seq_db`: a mapping for source names in the reference species to nib files
    """
    # First entry in sources should also be on the reference species
    assert sources[0].split('.')[0] == ref_src.split('.')[0], \
        "%s != %s" % ( sources[0].split('.')[0], ref_src.split('.')[0] )
    base_len = end - start
    blocks = index.get( ref_src, start, end )
    # From low to high score
    blocks.sort(key=lambda t: t.score)
    mask = [ -1 ] * base_len
    ref_src_size = None
    for i, block in enumerate( blocks ):
        ref = block.get_component_by_src_start( ref_src )
        ref_src_size = ref.src_size
        assert ref.strand == "+"
        slice_start = max( start, ref.start )
        slice_end = min( end, ref.end )
        for j in range( slice_start, slice_end ):
            mask[j-start] = i
    tiled = []
    for i in range( len( sources ) ):
        tiled.append( [] )
    for ss, ee, index in intervals_from_mask( mask ):
        # Interval with no covering alignments
        if index < 0:
            # Get sequence if available, otherwise just use 'N'
            if seq_db:
                tiled[0].append( bx.seq.nib.NibFile( open( seq_db[ ref_src ] ) ).get( start+ss, ee-ss ) )
            else:
                tiled[0].append( "N" * (ee-ss) )
            # Gaps in all other species
            for row in tiled[1:]:
                row.append( "-" * ( ee - ss ) )
        else:
            slice_start = start + ss
            slice_end = start + ee
            block = blocks[index]
            ref = block.get_component_by_src_start( ref_src )
            sliced = block.slice_by_component( ref, slice_start, slice_end )
            sliced = sliced.limit_to_species( sources )
            sliced.remove_all_gap_columns()
            for i, src in enumerate( sources ):
                comp = sliced.get_component_by_src_start( src )
                if comp:
                    tiled[i].append( comp.text )
                else:
                    tiled[i].append( "-" * sliced.text_size )
    return [ "".join( t ) for t in tiled ]

def intervals_from_mask( mask ):
    start = 0
    last = mask[0]
    for i in range( 1, len( mask ) ):
        if mask[i] != last:
            yield start, i, last
            start = i
            last = mask[i]
    yield start, len(mask), last
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.