Commits

Christoph Schindler committed 78e71a5

Initial import of code by Andrew Snare.

Comments (0)

Files changed (2)

+TypoGenetics
+============
+
+Attribution
+-----------
+
+> “Originally proposed in Hofstadter's ‘Gödel, Escher, Bach’, typogenetics
+> is a formal system intended to capture the essence of what was known about
+> biological genetics at the time (1979).
+>
+> The system was the topic for my research project undertaken as part of the
+> honours programme within the School of Computer Science and Software
+> ngineering (CSSE), Monash University, during 1999.” -- Andrew Snare
+
+For further information and the full text of the thesis, please refer to
+[his website][1].
+
+Published curtesy of Andrew Snare.
+
+About
+-----
+
+We (a group of nerds at the local hackspace) are currently reading and
+discussing the book “Gödel, Escher Bach” by Douglas Hofstadter. In Chapter XVI,
+the author proposes a set of rules for a game called “Typogenetics.”
+
+Given the date of publication, the system was probably meant to be played with
+pen and paper. Thirty odd years later, computer hardware is sufficiently
+advanced to tackle this beast on a computer, though. What should I say… we are
+a lazy bunch ;)
+
+Big thanks goes out to Andrew Snare who let us publish this. Revision 1 of this
+repository represents the code as extracted from the PDF of his thesis. In
+addition, helped our understanding of the chapter mentioned above a lot.
+
+Requirements
+------------
+
+This code depends on the PQueue Python module, written by Andrew Snare and
+released under the LGPL license.
+
+Source can be downloaded from [his website][1] as well.
+
+Debian maintains a package for this module under the name python-pqueue.
+
+  [1]: http://www.csse.monash.edu.au/hons/projects/1999/Andrew.Snare/
+
+#!/usr/bin/env python
+""" This module exports routines useful in modelling the Typogenetics
+system, as put forward by Hofstadter in his book "Goedel, Escher,
+Bach".
+
+Classes exported:
+        Strand          -- models a DNA strand
+        StrandFactory   -- enumerates all possible Strands
+        Enzyme          -- models an enzyme
+        Applicator      -- provides contexts for applying enzymes to strands
+        PrintApplicator -- noisy specialization of Applicator
+
+----
+
+Methods exported:
+        translate -- translates a DNA strand into an enzyme
+        mogrify -- applies an enzyme to a strand, returning results
+"""
+
+import array, types
+import sys, string, time
+import bisect
+from pqueue import PQueue
+
+# Constants to represent the available bases
+A = 0 #was ord("A")
+C = 1 #was ord("C")
+G = 2 #was ord("G")
+T = 3 #was ord("T")
+BLANK = 32 #was ord(" ")
+
+
+class Strand:
+    "Concrete strand that models a DNA strand."
+    # Mapping table from real letters to bases
+    basemap = {'A':A, 'C':C, 'G':G, 'T':T, 'a':A, 'c':C, 'g':G, 't':T}
+
+    # Mapping for next base
+    nextbase = [C,G,T,A]
+
+    # Used to translate a strand into a human-readable string
+    transbase = "ACGT"
+    transbase = transbase + '?'*(256-len(transbase))
+
+    def __init__(self,init=""):
+        "Initializes the value of the strand."
+        # We're using an array of bytes to represent a strand
+        #print "type(self):",type(self)
+        #print "type(init):",type(init)
+        if type(init) == type(self):
+            # Copy constructor
+            self.array = array.array('b',init.array.tolist())
+        elif type(init) == types.ListType or \
+             type(init) == types.TupleType or \
+             type(init) == types.SliceType:
+            # Sequence constructor
+            self.array = array.array('b',list(init))
+        elif type(init) == array.ArrayType:
+            # Array constructor
+            self.array = array.array('b',init.tolist())
+        elif type(init) == types.StringType:
+            # String constructor
+            self.array = array.array('b')
+            for char in init:
+                self.array.append(self.basemap[char])
+        else:
+            raise TypeError, \
+                'argument 1: expected array or sequence or string, %s found' %\
+                type(init).__name__
+
+        # This is a cache of the positions of each base in the strand
+        self._basepositions = None
+        return
+
+    def __repr__(self):
+        "String representation of strand. This is as fast as I can make it."
+        return "Strand('%s')" % str(self)
+
+    def __str__(self):
+        "Informal string representation of the strand."
+        return string.translate(self.array.tostring(),self.transbase)
+
+    def __len__(self):
+        "Return the length of our strand, when necessary."
+        return len(self.array)
+
+    def __cmp__(self,other):
+        "Compares the strand to something else."
+        if type(self) == type(other):
+            # Strand comparison
+            return cmp(self.array,other.array)
+        else:
+            # Try to convert to a strand then compare
+            try:
+                other = Strand(other)
+            except TypeError:
+                # Meaningful conversion not possible. Doh.
+                return -1
+            # With the conversion done, recursively compare
+            return cmp(self,other)
+
+        # Should never reach here
+        return
+
+    def __getitem__(self,x):
+        "Return a given unit in the strand."
+        return self.array[x]
+
+    def __setitem__(self,x,value):
+        "Set (and return) a given unit in the strand."
+        self.array[x] = value
+        self._basepositions = None # Throw out the base-position cache
+        return
+
+    def __delitem__(self,x):
+        "Delete a given unit from the strand."
+        del self.array[x]
+        self._basepositions = None # Throw out the base-position cache
+        return
+
+    def __getslice__(self,i,j):
+        "Return a slice of the strand."
+        return Strand(self.array[i:j].tolist())
+
+    def __setslice__(self,i,j,sequence):
+        "Sets a slice of the strand."
+        self.array[i:j] = sequence
+        self._basepositions = None # Throw out the base-position cache
+        return None
+
+    def __delslice__(self,i,j):
+        "Deletes a slice of the strand."
+        del self.array[i:j]
+        self._basepositions = None # Throw out the base-position cache
+        return
+
+    def insert(self,index,base):
+        "Inserts the given base at the given index."
+        self._basepositions = None # Throw out the base-position cache
+        return self.array.insert(index,base)
+
+    def reverse(self):
+        "Reverses the strand."
+        self._basepositions = None # Throw out the base-position cache
+        return self.array.reverse()
+
+    def increment(self):
+        "Increment the strand to the next position."
+        unit = len(self.array)
+        nocarry = 0 # To force initial loop eval
+        while not nocarry and unit: # While there's carry and not at start
+            unit = unit - 1
+            self.array[unit] = nocarry = self.nextbase[self.array[unit]]
+        else:
+            if not nocarry:
+                self.array.insert(0, A)
+        self._basepositions = None # Throw out base-position cache
+        return
+
+    def basepositions(self):
+        "Calculate the positions of each base in the strand."
+        # Generate a list of the positions for each base
+        if not self._basepositions:
+            positions = [[],[],[],[],[]] # List of positions of each base
+            for i in range(len(self)):
+                try:
+                    positions[self[i]].append(i)
+                except IndexError: # We hit a gap, remap to [4]
+                    positions[4].append(i)
+            self._basepositions = positions
+        return self._basepositions
+
+
+class StrandFactory:
+    "Class that returns all strands one-by-one."
+    def __init__(self,start=Strand('A')):
+        "Initialize our enumeration state."
+        self.strand = start
+        self.pqueue = PQueue()
+        return
+
+    def pop(self):
+        "Return a priority/strand tuple."
+        if len(self.strand)*10 < self.pqueue.peek():
+            retval = (len(self.strand), Strand(self.strand))
+            self.strand.increment()
+        else:
+            retval = self.pqueue.pop()
+        return retval
+
+    def __getitem__(self,key):
+        return self.pqueue[key]
+
+    def __setitem__(self,key,value):
+        self.pqueue[key] = value
+
+
+class Enzyme:
+    "Concrete class that models an enzyme."
+    # This is the Typogenetic Code, which maps base-pairs to both
+    # an operation and a 'kink' direction for the secondary structure.
+
+    # Direction constants, for readability only:
+    S=0; L=-1; R=1
+
+    # Enzyme hints, which can be ORed together.
+    NOC = 0x00 # Instruction does not change strand
+    NEW = 0x01 # Instruction will cause a new strand
+    MOD = 0x02 # Instruction will modify strands
+
+    # This is the code itself
+    TypogeneticCode = \
+            [[(None, S,NOC), ('cut', S,NEW), ('nuk', S,MOD|NEW), ('swi', R,NOC)],
+             [('mvr', S,NOC), ('mvl', S,NOC), ('cop', R,NEW   ), ('off', L,NOC)],
+             [('ina', S,MOD), ('inc', R,MOD), ('ing', R,MOD   ), ('int', L,MOD)],
+             [('rpy', R,NOC), ('rpu', L,NOC), ('lpy', L,NOC   ), ('lpu', L,NOC)]]
+
+
+    # Reverse mapping of the typogenetic code, alas sometimes needed.
+    # Generate this dynamically, for maintenance purposes
+    TypogeneticOdec = {}
+
+    for x1 in xrange(len(TypogeneticCode)):
+        for x2 in xrange(len(TypogeneticCode[x1])):
+            TypogeneticOdec[TypogeneticCode[x1][x2][0]] = (x1,x2)
+    # And this maps which direction binds to which base preferentially
+    BindPref = [A,G,T,C]
+
+    def __init__(self,acids=[],quirk=0):
+        """
+        Initialize the enzyme. The quirk flag determines if use the
+        Morris/Varetto convention for determining binding preference
+        instead of the original Hoftstadter method.
+        """
+        # Initialize the data members
+        self.acids=[]
+        self.dirs=[]
+        self.__bindcache = None
+        self.hint=0
+        self.quirk=quirk
+
+        # Add the initializer amino-acids
+        for acid in acids:
+            self.addAminoAcid(self.TypogeneticOdec[acid])
+        return
+
+    def __repr__(self):
+        "Returns a string representation of the enzyme."
+        return "Enzyme(%s)" % repr(self.acids)
+
+    def __str__(self):
+        "Returns a pretty representation of the enzyme."
+        return "%s: %s" % (Strand.transbase[self.binding()],
+                           string.join(self.acids))
+
+    def __cmp__(self,other):
+        "Compares two enzymes for equality."
+        return cmp(self.acids,other.acids)
+
+    def addAminoAcid(self,basepair):
+        "Adds new amino-acid to our enzyme. The basepair is a tuple of bases."
+        # Retrieve the amino-acid and stuff
+        (acid,direction,hint) = \
+            self.TypogeneticCode[basepair[0]][basepair[1]]
+        if not acid:
+            # Return false to indicate punctuation
+            return 0
+
+        # Add the amino-acid, invalidate the binding cache, update the hint
+        self.acids.append(acid)
+        self.dirs.append(direction)
+        self.__bindcache = None
+        self.hint = self.hint | hint
+        return 1
+
+    def binding(self):
+        "Returns the binding of the enzyme."
+
+        if (self.__bindcache is None) or (self.__bindcache[0] != self.quirk):
+            # The Varetto/Morris/quirky method uses the full amino-acid
+            # # list... the real way ignores the first and last amino-acids.
+            if self.quirk:
+                directions = self.dirs
+            else: directions = self.dirs[1:-1]
+            # This just sums the series
+            binding = (reduce(lambda x,y:x+y,directions,0) % 4)
+            self.__bindcache = (self.quirk, self.BindPref[binding])
+
+        return self.__bindcache[1]
+
+def translate(strand,quirk=0):
+    """
+    Translates a strand into a list of enzymes.
+    The optional flag determines whether we produce quirky Enzymes or not.
+    """
+    enzymes = [Enzyme(quirk=quirk)]
+    for x in xrange(0,len(strand)-1,2): # For each base-pair...
+        basepair = (strand[x],strand[x+1])
+        # Add the basepair, and create a new enzyme if it was punctuation
+        if not enzymes[-1].addAminoAcid(basepair):
+            enzymes.append(Enzyme(quirk=quirk))
+    # Remove all empty amino-acids and return the rest
+    return filter(lambda e: e.acids,enzymes)
+
+class Applicator:
+    "Used to model the application of amino-acids to a DNA strand."
+    # Mapping for complementary bases -- probably the wrong place
+    # to have these things, OO-speaking.
+    compbase = [T,G,C,A]
+    purines = [1,0,1,0] # Could use odd() but this is faster
+
+    # Map for showing where we're bound (using the Morris convention)
+    lowerbase = {'A':'a', 'C':'c', 'G':'g', 'T':'t', '?':'.'}
+
+    def __init__(self,strand):
+        """
+        Initializes the application context, with a default starting
+        position of 0.
+        """
+        # The two strands which will be manipulated (a copy of the
+        # strand is made so that we won't modify it)
+        self.strands = [strand[:],Strand([32]*len(strand))]
+        # Reset our state
+        self.reset()
+        return
+
+    def __str__(self):
+        "Returns a convenient displayable form of the applicator state."
+        strands = map(str,self.strands)
+        # Use the Morris convention and make the bound base
+        # lower-case (if possible)
+        try:
+            strands[self.bound] = strands[self.bound][:self.pos] + \
+                    self.lowerbase[strands[self.bound][self.pos]]+ \
+                    strands[self.bound][self.pos+1:]
+        except IndexError: pass
+        # Flip them so that they're displayed in the right order
+        strands.reverse()
+        if self.copying:
+            strands.append('[Copying: On]')
+        else:
+            strands.append('[Copying: Off]')
+
+        return string.join(strands,'\n')
+
+    def reset(self,position=0):
+        "Reset the applicator state, prior to an enzyme being bound."
+        # Applicator state:
+        #   self.done - flag that's set when enzyme should cease
+        #   self.bound - which strand we're bound to
+        #   self.copying - flag for if we're in copy mode
+        #   self.pos - position we're bound to
+        self.done = self.bound = self.copying = 0
+        self.pos = position
+        return
+
+    def extractstrands(self):
+        "Returns a list of strands represented by the current state."
+        strands = []
+        # These ones don't need to be flipped
+        for substrand in string.split(self.strands[0].array.tostring()):
+            # Simply convert to a Strand object, and add to list
+            s = Strand('')
+            s.array.fromstring(substrand)
+            strands.append(s)
+        # These ones do need to be flipped
+        for substrand in string.split(self.strands[1].array.tostring()):
+            s = Strand('')
+            s.array.fromstring(substrand)
+            s.reverse()
+            strands.append(s)
+        return strands
+
+    def apply(self,enzyme,bindpos=-1):
+        "Binds the the enzyme to the first strand and applies each amino-acid."
+        # Save the current strand we're bound to
+        bound,position = self.bound, self.pos
+
+        # Reset the applicator state
+        self.reset()
+
+        # Fix up bindpos for if on the top strand
+        if bound:
+            bindpos = -bindpos - 1
+
+        # Find the locations of all the bases
+        bp = self.strands[bound].basepositions()
+        candidates = bp[enzyme.binding()]
+        # This trickery restricts the bases we can match to the ones
+        # in the strand we were last bound to.
+        hi = bisect.bisect(bp[4],position)
+
+        try:
+            if hi:
+                lo = bp[4][hi-1]
+            else:
+                lo = -1
+            hi = bp[4][hi]
+            candidates = filter(lambda y,x=lo,z=hi:x<y<z, candidates)
+        except IndexError: # Either no gaps, or end gap
+            if hi:
+                lo = bp[4][hi-1]
+                hi = len(self.strands[0])
+                candidates = filter(lambda y,x=lo,z=hi:x<y<z, candidates)
+            # Bind the enzyme to the correct location
+            self.bound = bound
+            try:
+                self.moveto(candidates[bindpos])
+            except IndexError: # Nothing to bind to, exit straight away
+                return
+
+            # Apply each amino-acid in the enzyme
+            for acid in enzyme.acids:
+                self.execute(acid)
+                # Exit loop if we are done prematurely for some reason
+                if self.done:
+                    break
+
+            return
+
+    def moveto(self,position,checkdouble=0):
+        "Moves the current position we'll act on."
+        self.pos = position
+        self.done = not 0 <= position < len(self.strands[0]) or \
+                self.isblank(checkdouble)
+        if self.copying and not self.done:
+            self.compcopyunit()
+        return
+
+    def execute(self,op):
+        "Causes the given operation to be executed if possible."
+        if not self.done and hasattr(self,op):
+            apply(getattr(self,op),())
+        return
+
+    def compcopyunit(self):
+        "Performs a complementary copy of the current unit."
+        # Note: we're assuming bases are always lined up. One of the
+        # two strands can contain a blank (doesn't matter which) but
+        # not both.
+        if self.strands[0][self.pos] == BLANK:
+            self.strands[0][self.pos] = \
+                self.compbase[self.strands[1][self.pos]]
+        else:
+            self.strands[1][self.pos] = \
+                self.compbase[self.strands[0][self.pos]]
+        return
+
+    def insert(self,base):
+        "Inserts a base to the right of the bound unit."
+        # position to insert the character at
+        inspos = self.pos + (not self.bound)
+
+        # Do the insert for the string we're bound to
+        self.strands[self.bound].insert(inspos,base)
+
+        # Update the unit we're bound to, as is appropriate
+        self.moveto(self.pos + self.bound)
+
+        # Work out whether to insert the complementary base or a blank
+        # and do it.
+        if self.copying:
+            self.strands[not self.bound].insert(inspos,self.compbase[base])
+        else:
+            self.strands[not self.bound].insert(inspos,BLANK)
+        return
+
+    def purine(self):
+        "Returns true if we're bound to a puride."
+        try:
+            return self.purines[self.strands[self.bound][self.pos]]
+        except IndexError:
+            return 0
+
+    def pyrimidine(self):
+        "Returns true if we're bound to a pyrimidine."
+        try:
+            return not self.purines[self.strands[self.bound][self.pos]]
+        except IndexError:
+            return 0
+
+    def isblank(self,checkdouble=0):
+        "Returns true if we're bound to a blank space (ie, nothing at all)."
+        # Note: optimized in light of short-circuit analysis for the
+        # default case of checkdouble=0.
+        return (self.strands[self.bound][self.pos] == BLANK) and \
+                ((not checkdouble or \
+                self.strands[not self.bound][self.pos] == BLANK))
+
+    def search(self,mover,predicate):
+        """
+        Searches for a unit of the given type, moving using the supplied
+        function, and terminating when predicate returns true.
+        """
+        # Ugh this is ugly. Basically the problem that
+        # complementary stuff is done when we move, but the
+        # predicate needs to be checked after the move but before
+        # the complementary copy is done. Kludge it by saving the copy
+        # state, turning it off, and doing it manually at the right
+        # time if need be. The only obvious alternate solution would
+        # prevent us from searching across blanks.
+        copysave = self.copying
+        self.copying = 0
+        found = 0
+        while not found:
+            mover()
+
+            if self.done: break
+            found = predicate()
+            if copysave:
+                self.compcopyunit()
+        # Restore the original copying state
+        self.copying = copysave
+        return
+
+    def cut(self):
+        """
+        Cut both strands to the right of the current position, leaving
+        us attached to the left-hand side.
+        """
+        # We can simulate a cut by inserting a double-blank (which we
+        # can't ever move past). This is very similar to the above
+        # insert() method.
+        #
+        # Position to cut the strands at
+        # (Note: This works due to the way slicing boundaries work)
+        pos = self.pos + (not self.bound)
+
+        # Insert the blanks
+        self.strands[0].insert(pos,BLANK)
+        self.strands[1].insert(pos,BLANK)
+
+        # Update the unit we're bound to, as is appropriate
+        self.moveto(self.pos + self.bound)
+
+        return
+
+    # Since del is a reserved word, we use an allowed name here. Further
+    # down below we force "del" into the class manually -- it's a bit
+    # dodgy but it works
+    def nuk(self):
+        "Deletes the unit at the current position."
+        # Proper behaviour of "del" isn't known. It's only meant to
+        # delete the bound base, not the complementary one as
+        # well. It's not clear if the base should be removed (and
+        # everything slid along to fill the gap) or replaced by a
+        # gap. Replacement by a gap keeps complementary bases lined up
+        # always, so we'll do that.
+        self.strands[self.bound][self.pos] = BLANK
+
+        # Then move to the right
+        self.mvr()
+        return
+
+    def swi(self):
+        "Switches the enzyme to be bound to the other strand."
+        self.bound = not self.bound
+        # Swap the mvr and mvl functions
+        (self.mvr,self.mvl) = (self.mvl,self.mvr)
+        # Blanks block a switch
+        if self.isblank():
+            self.done = 1
+        return
+
+    def mvr(self,checkdouble=0):
+        "Moves the enzyme one unit to the right."
+        self.moveto(self.pos+1,checkdouble)
+        return
+
+    def mvl(self,checkdouble=0):
+        "Moves the enzyme one unit to the left."
+        self.moveto(self.pos-1,checkdouble)
+        return
+
+    def cop(self):
+        "Turns on strand-copying mode."
+        self.copying = 1
+        self.compcopyunit()
+        return
+
+    def off(self):
+        "Turns off strand-copying mode."
+        self.copying = 0
+        return
+
+    def ina(self):
+        "Inserts an A into the strand."
+        self.insert(A)
+        return
+
+    def inc(self):
+        "Inserts a C into the strand."
+        self.insert(C)
+        return
+
+    def ing(self):
+        "Inserts a G into the strand."
+        self.insert(G)
+        return
+
+    def int(self):
+        "Inserts a T into the strand."
+        self.insert(T)
+        return
+
+    def rpy(self):
+        "Search for the nearest pyrimidine to the right."
+        self.search(lambda f=self.mvr: f(1),self.pyrimidine)
+        return
+
+    def rpu(self):
+        "Search for the nearest purine to the right."
+        self.search(lambda f=self.mvr: f(1),self.purine)
+        return
+
+    def lpy(self):
+        "Search for the nearest pyrimidine to the left."
+        self.search(lambda f=self.mvl: f(1),self.pyrimidine)
+        return
+
+    def lpu(self):
+        "Search for the nearest purine to the left."
+        self.search(lambda f=self.mvl: f(1),self.purine)
+        return
+
+# We can't do a "def del(self):" since del is a reserved word. This
+# is a kludge to force del into the Applicator class method table.
+# ie, Applicator.del is an alias for Applicator.nuk
+Applicator.__dict__['del'] = Applicator.__dict__['nuk']
+
+
+class PrintApplicator(Applicator):
+    """
+    This is a superclass of the existing Applicator. It overloads some
+    methods to provide for debugging.
+    """
+    def __init__(self,strand,file=sys.stdout):
+        """
+        Initializes the Applicator. Identical to previous constructor,
+        but we also take a file-like object that we print things to.
+        """
+        # Call the inherited constructor
+        Applicator.__init__(self,strand)
+        # Store the file object for later use.
+        self._file = file
+        return
+
+    # Overloaded methods are below. These simply call the inherited
+    # methods, but also report status stuff out to the file.
+
+    def execute(self,op):
+        # Report the operator we're executing, and the state afterwards
+        self._file.write("Executing operation: %s\n" % str(op))
+        Applicator.execute(self,op)
+        self._file.write(str(self)+'\n')
+        return
+
+    def apply(self,enzyme,bindpos=-1):
+        # Report the applicator state before starting the enzyme
+        self._file.write('--SOE--\n')
+        Applicator.apply(self,enzyme,bindpos)
+        self._file.write('--EOE--\n')
+        return
+
+    def moveto(self,position,checkdouble=0):
+        # Report the applicator state after each move
+        Applicator.moveto(self,position,checkdouble)
+        self._file.write(str(self)+'\n')
+        return
+
+def mogrify(strand, enzymes, binding=-1, debug=0):
+    "Applies a series of enzymes to a strand, and returns the results."
+    if debug:
+        a = PrintApplicator(strand)
+    else:
+        a = Applicator(strand)
+    for enzyme in enzymes:
+        a.apply(enzyme,binding)
+
+    # Return the resulting strands
+    return a.extractstrands()
+
+# Self-test code, for when we're executed standalone instead of imported.
+if __name__ == "__main__":
+    for strand in ['TAGATCCAGTCCACATCGA', 'CGGATACTAAACCGA']:
+        print strand, 'translates to:', translate(Strand(strand))
+
+        # Test our strand-space generator
+        strand = Strand()
+        startc = time.clock()
+        startt = time.time()
+        try:
+            for x in xrange(1000):
+                print strand, translate(strand)
+                strand.increment()
+        except KeyboardInterrupt:
+            pass
+
+        print " CPU time elapsed:", time.clock()-startc, "seconds"
+        print "Real time elapsed:", time.time()-startt, "seconds"
+
+# End.
+
+