Source

TypoGenetics / tg.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
#!/usr/bin/env python
""" This module exports routines useful in modelling the Typogenetics
system, as put forward by Hofstadter in his book "Goedel, Escher,
Bach".

Classes exported:
        Strand          -- models a DNA strand
        StrandFactory   -- enumerates all possible Strands
        Enzyme          -- models an enzyme
        Applicator      -- provides contexts for applying enzymes to strands
        PrintApplicator -- noisy specialization of Applicator

----

Methods exported:
        translate -- translates a DNA strand into an enzyme
        mogrify -- applies an enzyme to a strand, returning results
"""

import array, types
import sys, string, time
import bisect
from pqueue import PQueue

# Constants to represent the available bases
A = 0 #was ord("A")
C = 1 #was ord("C")
G = 2 #was ord("G")
T = 3 #was ord("T")
BLANK = 32 #was ord(" ")


class Strand:
    "Concrete strand that models a DNA strand."
    # Mapping table from real letters to bases
    basemap = {'A':A, 'C':C, 'G':G, 'T':T, 'a':A, 'c':C, 'g':G, 't':T}

    # Mapping for next base
    nextbase = [C,G,T,A]

    # Used to translate a strand into a human-readable string
    transbase = "ACGT"
    transbase = transbase + '?'*(256-len(transbase))

    def __init__(self,init=""):
        "Initializes the value of the strand."
        # We're using an array of bytes to represent a strand
        #print "type(self):",type(self)
        #print "type(init):",type(init)
        if type(init) == type(self):
            # Copy constructor
            self.array = array.array('b',init.array.tolist())
        elif type(init) == types.ListType or \
             type(init) == types.TupleType or \
             type(init) == types.SliceType:
            # Sequence constructor
            self.array = array.array('b',list(init))
        elif type(init) == array.ArrayType:
            # Array constructor
            self.array = array.array('b',init.tolist())
        elif type(init) == types.StringType:
            # String constructor
            self.array = array.array('b')
            for char in init:
                self.array.append(self.basemap[char])
        else:
            raise TypeError, \
                'argument 1: expected array or sequence or string, %s found' %\
                type(init).__name__

        # This is a cache of the positions of each base in the strand
        self._basepositions = None
        return

    def __repr__(self):
        "String representation of strand. This is as fast as I can make it."
        return "Strand('%s')" % str(self)

    def __str__(self):
        "Informal string representation of the strand."
        return string.translate(self.array.tostring(),self.transbase)

    def __len__(self):
        "Return the length of our strand, when necessary."
        return len(self.array)

    def __cmp__(self,other):
        "Compares the strand to something else."
        if type(self) == type(other):
            # Strand comparison
            return cmp(self.array,other.array)
        else:
            # Try to convert to a strand then compare
            try:
                other = Strand(other)
            except TypeError:
                # Meaningful conversion not possible. Doh.
                return -1
            # With the conversion done, recursively compare
            return cmp(self,other)

        # Should never reach here
        return

    def __getitem__(self,x):
        "Return a given unit in the strand."
        return self.array[x]

    def __setitem__(self,x,value):
        "Set (and return) a given unit in the strand."
        self.array[x] = value
        self._basepositions = None # Throw out the base-position cache
        return

    def __delitem__(self,x):
        "Delete a given unit from the strand."
        del self.array[x]
        self._basepositions = None # Throw out the base-position cache
        return

    def __getslice__(self,i,j):
        "Return a slice of the strand."
        return Strand(self.array[i:j].tolist())

    def __setslice__(self,i,j,sequence):
        "Sets a slice of the strand."
        self.array[i:j] = sequence
        self._basepositions = None # Throw out the base-position cache
        return None

    def __delslice__(self,i,j):
        "Deletes a slice of the strand."
        del self.array[i:j]
        self._basepositions = None # Throw out the base-position cache
        return

    def insert(self,index,base):
        "Inserts the given base at the given index."
        self._basepositions = None # Throw out the base-position cache
        return self.array.insert(index,base)

    def reverse(self):
        "Reverses the strand."
        self._basepositions = None # Throw out the base-position cache
        return self.array.reverse()

    def increment(self):
        "Increment the strand to the next position."
        unit = len(self.array)
        nocarry = 0 # To force initial loop eval
        while not nocarry and unit: # While there's carry and not at start
            unit = unit - 1
            self.array[unit] = nocarry = self.nextbase[self.array[unit]]
        else:
            if not nocarry:
                self.array.insert(0, A)
        self._basepositions = None # Throw out base-position cache
        return

    def basepositions(self):
        "Calculate the positions of each base in the strand."
        # Generate a list of the positions for each base
        if not self._basepositions:
            positions = [[],[],[],[],[]] # List of positions of each base
            for i in range(len(self)):
                try:
                    positions[self[i]].append(i)
                except IndexError: # We hit a gap, remap to [4]
                    positions[4].append(i)
            self._basepositions = positions
        return self._basepositions


class StrandFactory:
    "Class that returns all strands one-by-one."
    def __init__(self,start=Strand('A')):
        "Initialize our enumeration state."
        self.strand = start
        self.pqueue = PQueue()
        return

    def pop(self):
        "Return a priority/strand tuple."
        if len(self.strand)*10 < self.pqueue.peek():
            retval = (len(self.strand), Strand(self.strand))
            self.strand.increment()
        else:
            retval = self.pqueue.pop()
        return retval

    def __getitem__(self,key):
        return self.pqueue[key]

    def __setitem__(self,key,value):
        self.pqueue[key] = value


class Enzyme:
    "Concrete class that models an enzyme."
    # This is the Typogenetic Code, which maps base-pairs to both
    # an operation and a 'kink' direction for the secondary structure.

    # Direction constants, for readability only:
    S=0; L=-1; R=1

    # Enzyme hints, which can be ORed together.
    NOC = 0x00 # Instruction does not change strand
    NEW = 0x01 # Instruction will cause a new strand
    MOD = 0x02 # Instruction will modify strands

    # This is the code itself
    TypogeneticCode = \
            [[(None, S,NOC), ('cut', S,NEW), ('nuk', S,MOD|NEW), ('swi', R,NOC)],
             [('mvr', S,NOC), ('mvl', S,NOC), ('cop', R,NEW   ), ('off', L,NOC)],
             [('ina', S,MOD), ('inc', R,MOD), ('ing', R,MOD   ), ('int', L,MOD)],
             [('rpy', R,NOC), ('rpu', L,NOC), ('lpy', L,NOC   ), ('lpu', L,NOC)]]


    # Reverse mapping of the typogenetic code, alas sometimes needed.
    # Generate this dynamically, for maintenance purposes
    TypogeneticOdec = {}

    for x1 in xrange(len(TypogeneticCode)):
        for x2 in xrange(len(TypogeneticCode[x1])):
            TypogeneticOdec[TypogeneticCode[x1][x2][0]] = (x1,x2)
    # And this maps which direction binds to which base preferentially
    BindPref = [A,G,T,C]

    def __init__(self,acids=[],quirk=0):
        """
        Initialize the enzyme. The quirk flag determines if use the
        Morris/Varetto convention for determining binding preference
        instead of the original Hoftstadter method.
        """
        # Initialize the data members
        self.acids=[]
        self.dirs=[]
        self.__bindcache = None
        self.hint=0
        self.quirk=quirk

        # Add the initializer amino-acids
        for acid in acids:
            self.addAminoAcid(self.TypogeneticOdec[acid])
        return

    def __repr__(self):
        "Returns a string representation of the enzyme."
        return "Enzyme(%s)" % repr(self.acids)

    def __str__(self):
        "Returns a pretty representation of the enzyme."
        return "%s: %s" % (Strand.transbase[self.binding()],
                           string.join(self.acids))

    def __cmp__(self,other):
        "Compares two enzymes for equality."
        return cmp(self.acids,other.acids)

    def addAminoAcid(self,basepair):
        "Adds new amino-acid to our enzyme. The basepair is a tuple of bases."
        # Retrieve the amino-acid and stuff
        (acid,direction,hint) = \
            self.TypogeneticCode[basepair[0]][basepair[1]]
        if not acid:
            # Return false to indicate punctuation
            return 0

        # Add the amino-acid, invalidate the binding cache, update the hint
        self.acids.append(acid)
        self.dirs.append(direction)
        self.__bindcache = None
        self.hint = self.hint | hint
        return 1

    def binding(self):
        "Returns the binding of the enzyme."

        if (self.__bindcache is None) or (self.__bindcache[0] != self.quirk):
            # The Varetto/Morris/quirky method uses the full amino-acid
            # # list... the real way ignores the first and last amino-acids.
            if self.quirk:
                directions = self.dirs
            else: directions = self.dirs[1:-1]
            # This just sums the series
            binding = (reduce(lambda x,y:x+y,directions,0) % 4)
            self.__bindcache = (self.quirk, self.BindPref[binding])

        return self.__bindcache[1]

def translate(strand,quirk=0):
    """
    Translates a strand into a list of enzymes.
    The optional flag determines whether we produce quirky Enzymes or not.
    """
    enzymes = [Enzyme(quirk=quirk)]
    for x in xrange(0,len(strand)-1,2): # For each base-pair...
        basepair = (strand[x],strand[x+1])
        # Add the basepair, and create a new enzyme if it was punctuation
        if not enzymes[-1].addAminoAcid(basepair):
            enzymes.append(Enzyme(quirk=quirk))
    # Remove all empty amino-acids and return the rest
    return filter(lambda e: e.acids,enzymes)

class Applicator:
    "Used to model the application of amino-acids to a DNA strand."
    # Mapping for complementary bases -- probably the wrong place
    # to have these things, OO-speaking.
    compbase = [T,G,C,A]
    purines = [1,0,1,0] # Could use odd() but this is faster

    # Map for showing where we're bound (using the Morris convention)
    lowerbase = {'A':'a', 'C':'c', 'G':'g', 'T':'t', '?':'.'}

    def __init__(self,strand):
        """
        Initializes the application context, with a default starting
        position of 0.
        """
        # The two strands which will be manipulated (a copy of the
        # strand is made so that we won't modify it)
        self.strands = [strand[:],Strand([32]*len(strand))]
        # Reset our state
        self.reset()
        return

    def __str__(self):
        "Returns a convenient displayable form of the applicator state."
        strands = map(str,self.strands)
        # Use the Morris convention and make the bound base
        # lower-case (if possible)
        try:
            strands[self.bound] = strands[self.bound][:self.pos] + \
                    self.lowerbase[strands[self.bound][self.pos]]+ \
                    strands[self.bound][self.pos+1:]
        except IndexError: pass
        # Flip them so that they're displayed in the right order
        strands.reverse()
        if self.copying:
            strands.append('[Copying: On]')
        else:
            strands.append('[Copying: Off]')

        return string.join(strands,'\n')

    def reset(self,position=0):
        "Reset the applicator state, prior to an enzyme being bound."
        # Applicator state:
        #   self.done - flag that's set when enzyme should cease
        #   self.bound - which strand we're bound to
        #   self.copying - flag for if we're in copy mode
        #   self.pos - position we're bound to
        self.done = self.bound = self.copying = 0
        self.pos = position
        return

    def extractstrands(self):
        "Returns a list of strands represented by the current state."
        strands = []
        # These ones don't need to be flipped
        for substrand in string.split(self.strands[0].array.tostring()):
            # Simply convert to a Strand object, and add to list
            s = Strand('')
            s.array.fromstring(substrand)
            strands.append(s)
        # These ones do need to be flipped
        for substrand in string.split(self.strands[1].array.tostring()):
            s = Strand('')
            s.array.fromstring(substrand)
            s.reverse()
            strands.append(s)
        return strands

    def apply(self,enzyme,bindpos=-1):
        "Binds the the enzyme to the first strand and applies each amino-acid."
        # Save the current strand we're bound to
        bound,position = self.bound, self.pos

        # Reset the applicator state
        self.reset()

        # Fix up bindpos for if on the top strand
        if bound:
            bindpos = -bindpos - 1

        # Find the locations of all the bases
        bp = self.strands[bound].basepositions()
        candidates = bp[enzyme.binding()]
        # This trickery restricts the bases we can match to the ones
        # in the strand we were last bound to.
        hi = bisect.bisect(bp[4],position)

        try:
            if hi:
                lo = bp[4][hi-1]
            else:
                lo = -1
            hi = bp[4][hi]
            candidates = filter(lambda y,x=lo,z=hi:x<y<z, candidates)
        except IndexError: # Either no gaps, or end gap
            if hi:
                lo = bp[4][hi-1]
                hi = len(self.strands[0])
                candidates = filter(lambda y,x=lo,z=hi:x<y<z, candidates)
            # Bind the enzyme to the correct location
            self.bound = bound
            try:
                self.moveto(candidates[bindpos])
            except IndexError: # Nothing to bind to, exit straight away
                return

            # Apply each amino-acid in the enzyme
            for acid in enzyme.acids:
                self.execute(acid)
                # Exit loop if we are done prematurely for some reason
                if self.done:
                    break

            return

    def moveto(self,position,checkdouble=0):
        "Moves the current position we'll act on."
        self.pos = position
        self.done = not 0 <= position < len(self.strands[0]) or \
                self.isblank(checkdouble)
        if self.copying and not self.done:
            self.compcopyunit()
        return

    def execute(self,op):
        "Causes the given operation to be executed if possible."
        if not self.done and hasattr(self,op):
            apply(getattr(self,op),())
        return

    def compcopyunit(self):
        "Performs a complementary copy of the current unit."
        # Note: we're assuming bases are always lined up. One of the
        # two strands can contain a blank (doesn't matter which) but
        # not both.
        if self.strands[0][self.pos] == BLANK:
            self.strands[0][self.pos] = \
                self.compbase[self.strands[1][self.pos]]
        else:
            self.strands[1][self.pos] = \
                self.compbase[self.strands[0][self.pos]]
        return

    def insert(self,base):
        "Inserts a base to the right of the bound unit."
        # position to insert the character at
        inspos = self.pos + (not self.bound)

        # Do the insert for the string we're bound to
        self.strands[self.bound].insert(inspos,base)

        # Update the unit we're bound to, as is appropriate
        self.moveto(self.pos + self.bound)

        # Work out whether to insert the complementary base or a blank
        # and do it.
        if self.copying:
            self.strands[not self.bound].insert(inspos,self.compbase[base])
        else:
            self.strands[not self.bound].insert(inspos,BLANK)
        return

    def purine(self):
        "Returns true if we're bound to a puride."
        try:
            return self.purines[self.strands[self.bound][self.pos]]
        except IndexError:
            return 0

    def pyrimidine(self):
        "Returns true if we're bound to a pyrimidine."
        try:
            return not self.purines[self.strands[self.bound][self.pos]]
        except IndexError:
            return 0

    def isblank(self,checkdouble=0):
        "Returns true if we're bound to a blank space (ie, nothing at all)."
        # Note: optimized in light of short-circuit analysis for the
        # default case of checkdouble=0.
        return (self.strands[self.bound][self.pos] == BLANK) and \
                ((not checkdouble or \
                self.strands[not self.bound][self.pos] == BLANK))

    def search(self,mover,predicate):
        """
        Searches for a unit of the given type, moving using the supplied
        function, and terminating when predicate returns true.
        """
        # Ugh this is ugly. Basically the problem that
        # complementary stuff is done when we move, but the
        # predicate needs to be checked after the move but before
        # the complementary copy is done. Kludge it by saving the copy
        # state, turning it off, and doing it manually at the right
        # time if need be. The only obvious alternate solution would
        # prevent us from searching across blanks.
        copysave = self.copying
        self.copying = 0
        found = 0
        while not found:
            mover()

            if self.done: break
            found = predicate()
            if copysave:
                self.compcopyunit()
        # Restore the original copying state
        self.copying = copysave
        return

    def cut(self):
        """
        Cut both strands to the right of the current position, leaving
        us attached to the left-hand side.
        """
        # We can simulate a cut by inserting a double-blank (which we
        # can't ever move past). This is very similar to the above
        # insert() method.
        #
        # Position to cut the strands at
        # (Note: This works due to the way slicing boundaries work)
        pos = self.pos + (not self.bound)

        # Insert the blanks
        self.strands[0].insert(pos,BLANK)
        self.strands[1].insert(pos,BLANK)

        # Update the unit we're bound to, as is appropriate
        self.moveto(self.pos + self.bound)

        return

    # Since del is a reserved word, we use an allowed name here. Further
    # down below we force "del" into the class manually -- it's a bit
    # dodgy but it works
    def nuk(self):
        "Deletes the unit at the current position."
        # Proper behaviour of "del" isn't known. It's only meant to
        # delete the bound base, not the complementary one as
        # well. It's not clear if the base should be removed (and
        # everything slid along to fill the gap) or replaced by a
        # gap. Replacement by a gap keeps complementary bases lined up
        # always, so we'll do that.
        self.strands[self.bound][self.pos] = BLANK

        # Then move to the right
        self.mvr()
        return

    def swi(self):
        "Switches the enzyme to be bound to the other strand."
        self.bound = not self.bound
        # Swap the mvr and mvl functions
        (self.mvr,self.mvl) = (self.mvl,self.mvr)
        # Blanks block a switch
        if self.isblank():
            self.done = 1
        return

    def mvr(self,checkdouble=0):
        "Moves the enzyme one unit to the right."
        self.moveto(self.pos+1,checkdouble)
        return

    def mvl(self,checkdouble=0):
        "Moves the enzyme one unit to the left."
        self.moveto(self.pos-1,checkdouble)
        return

    def cop(self):
        "Turns on strand-copying mode."
        self.copying = 1
        self.compcopyunit()
        return

    def off(self):
        "Turns off strand-copying mode."
        self.copying = 0
        return

    def ina(self):
        "Inserts an A into the strand."
        self.insert(A)
        return

    def inc(self):
        "Inserts a C into the strand."
        self.insert(C)
        return

    def ing(self):
        "Inserts a G into the strand."
        self.insert(G)
        return

    def int(self):
        "Inserts a T into the strand."
        self.insert(T)
        return

    def rpy(self):
        "Search for the nearest pyrimidine to the right."
        self.search(lambda f=self.mvr: f(1),self.pyrimidine)
        return

    def rpu(self):
        "Search for the nearest purine to the right."
        self.search(lambda f=self.mvr: f(1),self.purine)
        return

    def lpy(self):
        "Search for the nearest pyrimidine to the left."
        self.search(lambda f=self.mvl: f(1),self.pyrimidine)
        return

    def lpu(self):
        "Search for the nearest purine to the left."
        self.search(lambda f=self.mvl: f(1),self.purine)
        return

# We can't do a "def del(self):" since del is a reserved word. This
# is a kludge to force del into the Applicator class method table.
# ie, Applicator.del is an alias for Applicator.nuk
Applicator.__dict__['del'] = Applicator.__dict__['nuk']


class PrintApplicator(Applicator):
    """
    This is a superclass of the existing Applicator. It overloads some
    methods to provide for debugging.
    """
    def __init__(self,strand,file=sys.stdout):
        """
        Initializes the Applicator. Identical to previous constructor,
        but we also take a file-like object that we print things to.
        """
        # Call the inherited constructor
        Applicator.__init__(self,strand)
        # Store the file object for later use.
        self._file = file
        return

    # Overloaded methods are below. These simply call the inherited
    # methods, but also report status stuff out to the file.

    def execute(self,op):
        # Report the operator we're executing, and the state afterwards
        self._file.write("Executing operation: %s\n" % str(op))
        Applicator.execute(self,op)
        self._file.write(str(self)+'\n')
        return

    def apply(self,enzyme,bindpos=-1):
        # Report the applicator state before starting the enzyme
        self._file.write('--SOE--\n')
        Applicator.apply(self,enzyme,bindpos)
        self._file.write('--EOE--\n')
        return

    def moveto(self,position,checkdouble=0):
        # Report the applicator state after each move
        Applicator.moveto(self,position,checkdouble)
        self._file.write(str(self)+'\n')
        return

def mogrify(strand, enzymes, binding=-1, debug=0):
    "Applies a series of enzymes to a strand, and returns the results."
    if debug:
        a = PrintApplicator(strand)
    else:
        a = Applicator(strand)
    for enzyme in enzymes:
        a.apply(enzyme,binding)

    # Return the resulting strands
    return a.extractstrands()

# Self-test code, for when we're executed standalone instead of imported.
if __name__ == "__main__":
    for strand in ['TAGATCCAGTCCACATCGA', 'CGGATACTAAACCGA']:
        print strand, 'translates to:', translate(Strand(strand))

        # Test our strand-space generator
        strand = Strand()
        startc = time.clock()
        startt = time.time()
        try:
            for x in xrange(1000):
                print strand, translate(strand)
                strand.increment()
        except KeyboardInterrupt:
            pass

        print " CPU time elapsed:", time.clock()-startc, "seconds"
        print "Real time elapsed:", time.time()-startt, "seconds"

# End.