Source

bx-python / lib / bx / align / maf.py

Full commit
James Taylor ff9456b 






James Taylor 250c585 

James Taylor 5121970 
James Taylor 5d24e10 
James Taylor 5121970 
James Taylor 250c585 


James Taylor 5d24e10 

James Taylor 2210cf4 









James Taylor e20fb32 









James Taylor 250c585 
James Taylor e20fb32 






James Taylor 8e5aaa2 
James Taylor 250c585 
James Taylor e20fb32 

James Taylor 250c585 

James Taylor 6345de1 



James Taylor 250c585 
James Taylor 6345de1 
James Taylor 250c585 

mruffalo a3b93fa 
James Taylor 250c585 


James Taylor 6345de1 
James Taylor 250c585 







James Taylor 6345de1 


James Taylor 250c585 




























James Taylor e20fb32 




James Taylor 250c585 
James Taylor 870c9bf 


James Taylor e20fb32 



James Taylor 250c585 




James Taylor 2210cf4 
James Taylor 250c585 
James Taylor 6345de1 

James Taylor 5121970 
James Taylor 6345de1 





James Taylor 250c585 




mruffalo a3b93fa 
James Taylor 250c585 
Daniel Blankenbe… a4a3620 




James Taylor 250c585 
James Taylor 2210cf4 
James Taylor 250c585 




James Taylor 2210cf4 
James Taylor 250c585 
James Taylor 2210cf4 












James Taylor 6345de1 



James Taylor e20fb32 
James Taylor 6345de1 




James Taylor e20fb32 
James Taylor 6345de1 





James Taylor 2210cf4 





James Taylor 870c9bf 



James Taylor 2210cf4 
James Taylor 250c585 





James Taylor e20fb32 
James Taylor 250c585 



























"""
Support for the `MAF`_ multiple sequence alignment format used by `multiz`_.

.. _MAF: http://genome.ucsc.edu/FAQ/FAQformat.html#format5
.. _multiz: http://www.bx.psu.edu/miller_lab/
"""

from bx.align import *

from StringIO import StringIO
import os

import itertools
from bx import interval_index_file

from bx.misc.seekbzip2 import SeekableBzip2File

MAF_INVERSE_STATUS = 'V'
MAF_INSERT_STATUS = 'I'
MAF_CONTIG_STATUS = 'C'
MAF_CONTIG_NESTED_STATUS = 'c'
MAF_NEW_STATUS = 'N'
MAF_NEW_NESTED_STATUS = 'n'
MAF_MAYBE_NEW_STATUS = 'S'
MAF_MAYBE_NEW_NESTED_STATUS = 's'
MAF_MISSING_STATUS = 'M'

class MAFIndexedAccess( interval_index_file.AbstractIndexedAccess ):
    """
    Indexed access to a MAF file.
    """
    def read_at_current_offset( self, file, **kwargs ):
        """
        Read the MAF block at the current position in `file` and return an
        instance of `Alignment`.
        """
        return read_next_maf( file, **kwargs )

class MAFMultiIndexedAccess( interval_index_file.AbstractMultiIndexedAccess ):
    """
    Indexed access to multiple MAF files.
    """
    indexed_access_class = MAFIndexedAccess
      
Indexed = MAFIndexedAccess
"""Deprecated: `MAFIndexedAccess` is also available under the name `Indexed`."""

MultiIndexed = MAFMultiIndexedAccess
"""Deprecated: `MAFMultiIndexedAccess` is also available under the name `MultiIndexed`."""

class Reader( object ):
    """
    Iterate over all maf blocks in a file in order
    """
    def __init__( self, file, **kwargs ):
        self.file = file
        self.maf_kwargs = kwargs
        # Read and verify maf header, store any attributes
        fields = self.file.readline().split()
        if fields[0] != '##maf': raise Exception("File does not have MAF header")
        self.attributes = parse_attributes( fields[1:] )

    def next( self ):
        return read_next_maf( self.file, **self.maf_kwargs )

    def __iter__( self ):
        return ReaderIter( self )

    def close( self ):
        self.file.close()

class ReaderIter( object ):
    """
    Adapts a `Reader` to the iterator protocol.
    """
    def __init__( self, reader ):
        self.reader = reader
    def __iter__( self ): 
        return self
    def next( self ):
        v = self.reader.next()
        if not v: raise StopIteration
        return v

class Writer( object ):

    def __init__( self, file, attributes={} ):
        self.file = file
        # Write header, Webb's maf code wants version first, we accomodate
        if not attributes.has_key('version'): attributes['version'] = 1
        self.file.write( "##maf version=%s" % attributes['version'] )
        for key in attributes: 
            if key == 'version': continue
            self.file.writelines( " %s=%s" % ( key, attributes[key] ) )
        self.file.write( "\n" )

    def write( self, alignment ):
        self.file.write( "a score=" + str( alignment.score ) )
        for key in alignment.attributes:
            self.file.write( " %s=%s" % ( key, alignment.attributes[key] ) )
        self.file.write( "\n" )
        # Components
        rows = []
        for c in alignment.components:
            # "Empty component" generates an 'e' row
            if c.empty:
                rows.append( ( "e", c.src, str( c.start ), str( c.size ), c.strand, str( c.src_size ), c.synteny_empty ) )
                continue
            # Regular component
            rows.append( ( "s", c.src, str( c.start ), str( c.size ), c.strand, str( c.src_size ), c.text ) )
            # If component has quality, write a q row
            if c.quality is not None:
                rows.append( ( "q", c.src, "", "", "", "", c.quality ) )
            # If component has synteny follow up with an 'i' row
            if c.synteny_left and c.synteny_right:
                rows.append( ( "i", c.src, "", "", "", "", " ".join( map( str, c.synteny_left + c.synteny_right ) ) ) )
        self.file.write( format_tabular( rows, "llrrrrl" ) )
        self.file.write( "\n" )

    def close( self ):
        self.file.close()

# ---- Helper methods -------------------------------------------------------

def from_string( string, **kwargs ):
    return read_next_maf( StringIO( string ), **kwargs )

def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
    """
    Read the next MAF block from `file` and return as an `Alignment` 
    instance. If `parse_i_rows` is true, empty components will be created 
    when e rows are encountered.
    """
    alignment = Alignment(species_to_lengths=species_to_lengths)
    # Attributes line
    line = readline( file, skip_blank=True )
    if not line: return None
    fields = line.split() 
    if fields[0] != 'a': raise Exception("Expected 'a ...' line")
    alignment.attributes = parse_attributes( fields[1:] )
    if 'score' in alignment.attributes:
        alignment.score = alignment.attributes['score']
        del alignment.attributes['score']
    else:
        alignment.score = 0
    # Sequence lines
    last_component = None
    while 1:
        line = readline( file )
        # EOF or Blank line terminates alignment components
        if not line or line.isspace(): break
        if line.isspace(): break 
        # Parse row
        fields = line.split()
        if fields[0] == 's':
            # An 's' row contains sequence for a component
            component = Component()
            component.src = fields[1]
            component.start = int( fields[2] )
            component.size = int( fields[3] )
            component.strand = fields[4]
            component.src_size = int( fields[5] )
            if len(fields) > 6: component.text = fields[6].strip()
            # Add to set
            alignment.add_component( component )
            last_component = component
        elif fields[0] == 'e':
            # An 'e' row, when no bases align for a given species this tells
            # us something about the synteny 
            if parse_e_rows:
                component = Component()
                component.empty = True
                component.src = fields[1]
                component.start = int( fields[2] )
                component.size = int( fields[3] )
                component.strand = fields[4]
                component.src_size = int( fields[5] )
                component.text = None
                synteny = fields[6].strip()
                assert len( synteny ) == 1, \
                    "Synteny status in 'e' rows should be denoted with a single character code"
                component.synteny_empty = synteny
                alignment.add_component( component )
                last_component = component
        elif fields[0] == 'i':
            # An 'i' row, indicates left and right synteny status for the 
            # previous component, we hope ;)
            assert fields[1] == last_component.src, "'i' row does not follow matching 's' row"
            last_component.synteny_left = ( fields[2], int( fields[3] ) )
            last_component.synteny_right = ( fields[4], int( fields[5] ) )
        elif fields[0] == 'q':
            assert fields[1] == last_component.src, "'q' row does not follow matching 's' row"
            # TODO: Should convert this to an integer array?
            last_component.quality = fields[2]
            
    return alignment

def readline( file, skip_blank=False ):
    """Read a line from provided file, skipping any blank or comment lines"""
    while 1:
        line = file.readline()
        #print "every line: %r" % line
        if not line: return None 
        if line[0] != '#' and not ( skip_blank and line.isspace() ):
            return line

def parse_attributes( fields ):
    """Parse list of key=value strings into a dict"""
    attributes = {}
    for field in fields:
        pair = field.split( '=' )
        attributes[ pair[0] ] = pair[1]
    return attributes

def format_tabular( rows, align=None ):
    if len( rows ) == 0: return ""
    lengths = [ len( col ) for col in rows[ 0 ] ]
    for row in rows[1:]:
        for i in range( 0, len( row ) ):
            lengths[ i ] = max( lengths[ i ], len( row[ i ] ) )
    rval = ""
    for row in rows:
        for i in range( 0, len( row ) ):
            if align and align[ i ] == "l":
                rval += row[ i ].ljust( lengths[ i ] )
            else:
                rval += row[ i ].rjust( lengths[ i ] )
            rval += " "
        rval += "\n"
    return rval