chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
synbio / SynBio / Codons / ProblemRemap.py
r2:775d9e9a5c4d 614 loc 23.7 KB embed / history / annotate / raw /
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
"""Remap a problematic repeat region to eliminate cross-hybridization issues.

This code deals with problematic amino acid repeats like the following:

    KGEDD KGEDD KGEDN KGEDN

It deals with exact repeat regions, so this repeat would be described as KGED*
with the star representing any potential amino acid at that point. More
complicated possibilities like specifying the pattern with regular
expression-like syntax are not (yet) implemented.
"""
from Bio.Seq import Seq
from Bio.Alphabet.IUPAC import unambiguous_dna
from Bio import SeqUtils
from CodingRegion import FullSequence

class WordDistance:
    """Perform Levenshtein calculation of the distance between two words.

    In this case we are using this as a rapid calculation of the difference
    between two nucleotide sequences we are trying to remap.

    The implementation of the Levenshtein algorithm is by Magnus Lie Hetland:

    http://hetland.org/python/
    """
    def __init__(self):
        pass
    
    def distance(self, a, b):
        """Calculates the Levenshtein distance between a and b.
        """
        n, m = len(a), len(b)
        if n > m:
            # Make sure n <= m, to use O(min(n,m)) space
            a,b = b,a
            n,m = m,n
            
        current = range(n+1)
        for i in range(1,m+1):
            previous, current = current, [i]+[0]*n
            for j in range(1,n+1):
                add, delete = previous[j]+1, current[j-1]+1
                change = previous[j-1]
                if a[j-1] != b[i-1]:
                    change = change + 1
                current[j] = min(add, delete, change)
                
        return current[n]

    def all_orientation_distance(self, a, b):
        """Calculate the minimum distance with all orientations considered.
        """
        a_rc = Seq(a, unambiguous_dna).reverse_complement().data
        b_rc = Seq(b, unambiguous_dna).reverse_complement().data
        ds = []
        for a_t in [a, a_rc]:
            for b_t in [b, b_rc]:
                ds.append(self.distance(a_t, b_t))
        return min(ds)

class CodonChoiceException(Exception):
    pass

class AminoAcidRepeatDesign:
    """Design a region with amino acid repeats to limit cross hybridization.

    This uses an approach of setting a threshold that defines a region as
    being problematic, and then finding a set of codon mappings for repeat
    regions which are below this threshold. So the basic algorithm is at
    follows:

    1. Split sequence region into aa_repeats.
    2. For each repeat, generate all possible DNA sequences which can generate
    that given protein sequence, making use of codon degeneracy.
    3. For each repeat, choose a DNA sequence choice that minimizes cross
    reactivity across the entire region.

    Part 3 is where all the tricky details are in the implementation. This
    approach uses a threshold, defined as a minimum cross-reactivity to avoid,
    and then the number of allowable violations of that in all comparisons.
    This is the quickest to implement and test out, but we'd be much better
    with something fancier which searches for best matches. That's in the
    future for the moment.
    """
    def __init__(self, aa_repeat, codon_table, codon_usage,
            gc_min, gc_max, max_single_nts, min_distance = 4,
            no_use_thresh = 0.2, count_compensate = False, logger = None):
        self._aa_repeat = aa_repeat
        self._codon_table = codon_table
        self._codon_usage = codon_usage
        self._no_use_thresh = no_use_thresh
        self._count_compensate = count_compensate
        self._distance_calc = WordDistance()
        self._min_distance = min_distance

        self._gc_min = gc_min
        self._gc_max = gc_max
        self._max_single_nts = max_single_nts
        self._logger = logger

    def design_regions(self, dna_regions):
        """Design several regions to minimize cross reactivity.

        Each region is expected to contain our defined repeat.
        """
        all_repeats = []
        all_coding_regions = []
        full_seqs = []
        for region in dna_regions:
            full_seq, coding_region, new_repeat = \
                    self._get_repeat_region(region, self._aa_repeat)
            all_repeats.append(new_repeat)
            all_coding_regions.append(coding_region)
            full_seqs.append(full_seq)
        
        choice_list = []
        for repeat in all_repeats:
            choice_list.append(repeat.all_dna_seqs())
        
        final_codons = []
        final_choices = self._pick_different_choices(choice_list)

        # if we've got nothing to work with, return the original regions
        if len(final_choices) == 0:
            return dna_regions
        # otherwise return replacement regions
        else:
            assert len(final_choices) == len(all_repeats)
            for index, final_choice in enumerate(final_choices):
                repeat = all_repeats[index]
                repeat.update_with_choice(final_choice)
                all_coding_regions[index].update_codons(repeat.get_codons())
            replacement_seqs = []
            for index, full_seq in enumerate(full_seqs):
                full_seq.add_modified_cds(all_coding_regions[index])
                final_seq = full_seq.get_sequence()
                replacement_seqs.append(final_seq)
            return replacement_seqs

    def _get_repeat_region(self, dna_region, aa_repeat):
        """Retrieve a _CodonRepeat region corresponding to a given AA repeat.
        """
        self._logger.info(dna_region)
        full_seq = FullSequence(dna_region, [[(0, len(dna_region))]],
                self._codon_table, self._codon_usage)
        non_cds = full_seq.retrieve_non_cds()
        assert len(non_cds) == 0
        cds_regions = full_seq.check_out_cds_iterator()
        coding_region = cds_regions.next()
        try:
            cds_regions.next()
            raise ValueError('Expect only one')
        except StopIteration:
            pass
        initial_protein = coding_region.protein_seq()
        self._logger.info(coding_region.protein_seq())
        codons = coding_region.get_codons()
        assert len(codons) == len(aa_repeat)
        new_repeat = _CodonRepeat(aa_repeat, codons,
                self._no_use_thresh,
                self._count_compensate, self._gc_min, self._gc_max,
                self._max_single_nts)
        return full_seq, coding_region, new_repeat

    def design_region(self, dna_region):
        """Design a specific DNA region to remove any issues.
        """
        self._logger.info(dna_region)
        full_seq = FullSequence(dna_region, [(0, len(dna_region))],
                self._codon_table, self._codon_usage)
        non_cds = full_seq.retrieve_non_cds()
        assert len(non_cds) == 0
        cds_regions = full_seq.check_out_cds_iterator()
        assert len(cds_regions) == 1
        coding_region = cds_regions.next()
        initial_protein = coding_region.protein_seq()
        self._logger.info(coding_region.protein_seq())
        new_region = self._design_with_repeat(coding_region, self._aa_repeat)
        self._logger.info(new_region.get_sequence())
        self._logger.info(new_region.protein_seq())
        assert new_region.protein_seq() == initial_protein

        full_seq.add_modified_cds(new_region)
        final_seq = full_seq.get_sequence()
        assert len(final_seq) == len(dna_region)
        return final_seq

    def _design_with_repeat(self, coding_region, aa_repeat):
        """Design a coding region with repeat problems
        """
        codons = coding_region.get_codons()
        assert len(codons) % len(aa_repeat) == 0

        all_repeats = []
        num_repeats = len(codons) / len(aa_repeat)
        for start_index in range(num_repeats):
            repeat_codons = codons[start_index * len(aa_repeat):
                                   (start_index + 1) * len(aa_repeat)]
            all_repeats.append(_CodonRepeat(aa_repeat, repeat_codons,
                self._no_use_thresh,
                self._count_compensate, self._gc_min, self._gc_max,
                self._max_single_nts))

        choice_list = []
        for repeat in all_repeats:
            choice_list.append(repeat.all_dna_seqs())
        
        final_codons = []
        final_choices = self._pick_different_choices(choice_list)
        assert len(final_choices) == len(all_repeats)
        for index, final_choice in enumerate(final_choices):
            repeat = all_repeats[index]
            repeat.update_with_choice(final_choice)
            final_codons.extend(repeat.get_codons())

        coding_region.update_codons(final_codons)
        return coding_region
    
    def _max_variable_cmp(self, one, two):
        """Sort choices based on the amount of variability in a choice.

        This is designed to get the most variable regions first in a list of
        choices. Variable regions are defined by the total number of different
        codons present in a sequence, and the counts of each.
        """
        assert len(one) == len(two)
        one_var = self._calc_variability(one)
        two_var = self._calc_variability(two)
        return cmp(-one_var, -two_var)

    def _calc_variability(self, sequence):
        """Perform the actual calculation of variability.
        """
        assert len(sequence) % 3 == 0
        score = 0
        codons = []
        last_codon = None
        for index in range(len(sequence) / 3):
            codon = sequence[index * 3:(index + 1) * 3]
            if codon != last_codon:
                score += 0.5
            if codon not in codons:
                score += 0.5
                codons.append(codon)
            last_codon = codon
        return score

    def _pick_different_choices(self, choice_list):
        """Find a choice from each repeat region which lessens cross-reactivity
        """
        assert len(choice_list) > 0
        all_choices = set(choice_list[0])
        for other_choices in choice_list[1:]:
            all_choices = all_choices & set(other_choices)
        all_choices = list(all_choices)
        all_choices.sort(self._max_variable_cmp)
        self._logger.info("Initial choices: %s" % len(all_choices))
        # base case, no choices
        if len(all_choices) == 1:
            cur_distance = 0
        else:
            cur_distance = 11

        num_needed = len(choice_list)
        final_choices = []
        # move down distances until we've got enough different choices
        # to fill up our regions
        while len(final_choices) < num_needed and cur_distance > 1:
            cur_distance -= 1
            final_choices = self._get_choices(all_choices[:], cur_distance,
                    num_needed)

        if cur_distance >= self._min_distance:
            # get the final set of choices we are going to use, and place one
            # in each of the regions we want to have differences in
            self._logger.info("Using distance %s; %s choices for %s" % (
                cur_distance, len(final_choices), len(choice_list)))
            self._logger.info(str(final_choices))
            return_choices = final_choices
            while len(return_choices) < len(choice_list):
                return_choices.extend(final_choices)
            return return_choices[:len(choice_list)]
        else:
            return []

    def _get_choices(self, all_choices, distance_cutoff, num_needed):
        """Retrieve a list of choices which are different at a given cutoff.

        This pulls out a set of choices at least as large as num_needed that
        all differ by the given cutoff.
        """
        final_choices = []
        while (len(final_choices) < num_needed) and (len(all_choices) > 0):
            choice = all_choices.pop(0)
            is_good = True
            for two_choice in final_choices:
                distance = self._distance_calc.distance(choice, two_choice)
                if distance < distance_cutoff:
                    is_good = False
                    break
            if is_good:
                final_choices.append(choice)

        return final_choices

class _CodonRepeat:
    """Represent a repeat region of codons.
    """
    def __init__(self, aa_repeat, codons, no_use_thresh,
            count_compensate, gc_min, gc_max, max_single_nts,
            codon_indexes = None):
        self._verify_codons(aa_repeat, codons)

        self._codons = codons
        self._codon_indexes = codon_indexes
        self._no_use_thresh = no_use_thresh
        self._count_compensate = count_compensate

        self._gc_min = gc_min
        self._gc_max = gc_max
        self._max_single_nts = max_single_nts

    def _verify_codons(self, aa_repeat, codons):
        """Do a sanity check to be sure that the supplied codons match an AA.
        """
        all_aas = []
        for codon in codons:
            all_aas.append(codon.get_aa())
        for index, aa in enumerate(all_aas):
            if aa_repeat[index] != "*":
                assert aa_repeat[index] == aa, (aa_repeat[index], aa)

    def update_with_choice(self, change_seq):
        """Update the codons in the repeat with the newly supplied sequence.
        """
        assert len(change_seq) % 3 == 0
        assert len(change_seq) / 3 == len(self._codons)
        for index in range(len(change_seq) / 3):
            new_codon = change_seq[index * 3:(index + 1) * 3]
            self._codons[index].change_codon(new_codon)

    def get_codons(self):
        return self._codons

    def get_codons_with_indexes(self):
        assert self._codon_indexes is not None
        assert len(self._codons) == len(self._codon_indexes)
        final = []
        for i in range(len(self._codons)):
            final.append((self._codon_indexes[i], self._codons[i]))
        return final

    def all_dna_seqs(self):
        """Return a list of all possible DNA sequences for the repeat.
        """
        cur_parts = [""]
        for codon in self._codons:
            new_parts = []
            choices = codon.all_choices(self._no_use_thresh,
                    self._count_compensate)
            for new_choice in choices:
                for built_part in cur_parts:
                    new_parts.append(built_part + new_choice)
            cur_parts = new_parts
       
        possible_seqs = list(set(cur_parts))
        final_seqs = self._filter_seqs(possible_seqs)
        # do some sanity checking that all created sequences are correct
        for final_part in final_seqs:
            assert len(final_part) == len(self._codons) * 3
            for index, codon in enumerate(self._codons):
                valid_choices = codon.all_choices(self._no_use_thresh,
                        self._count_compensate)
                seq_part = final_part[index * 3:(index + 1) * 3]
                assert seq_part in valid_choices

        return final_seqs

    def _filter_seqs(self, input_seqs):
        """Filter the given input sequences to be sure they are sane and useful.
        """
        final_seqs = []
        for test_seq in input_seqs:
            test_gc = SeqUtils.GC(test_seq)
            if test_gc >= self._gc_min and test_gc <= self._gc_max:
                single_rpt_size = self._single_repeat(test_seq)
                if single_rpt_size < self._max_single_nts:
                    final_seqs.append(test_seq)
        # if len(final_seqs) == 0:
        #     raise ValueError("Filtered out all possible sequences")
        return final_seqs
    
    def _single_repeat(self, seq):
        """Look for long repeats of a single nucleotide which mess up synthesis.
        """
        counts = []
        cur_base = seq[0]
        cur_count = 1
        for base in seq[1:]:
            if base == cur_base:
                cur_count += 1
            else:
                counts.append(cur_count)
                cur_count = 1
                cur_base = base
        counts.append(cur_count)
        return max(counts)

class ChangeCodons:
    """Modify codons in a region to be different than the base sequence.
    """
    def __init__(self, no_use_thresh, count_compensate):
        self._no_use_thresh = no_use_thresh
        self._count_compensate = count_compensate
        self._distance_finder = WordDistance()

    def modify_codons(self, codons):
        """Modify the given set of codons to have new choices if possible.
        """
        final_codons = []
        for index, codon in codons:
            choices = codon.all_choices(self._no_use_thresh,
                    self._count_compensate)
            if codon.cur_codon in choices:
                choices.remove(codon.cur_codon)
            if len(choices) > 0:
                codon.cur_codon = self._pick_different_choice(codon.cur_codon,
                        choices)
            final_codons.append((index, codon))
        return final_codons

    def _pick_different_choice(self, initial_choice, choices):
        """Return the best usage choice at the largest distance from the initial 
        """
        distance_dict = {}
        for tchoice in choices:
            distance = self._distance_finder.distance(initial_choice, tchoice)
            try:
                distance_dict[distance].append(tchoice)
            except KeyError:
                distance_dict[distance] = [tchoice]
        final_dis = max(distance_dict.keys())
        return distance_dict[final_dis][0]

class RemapProblemGC:
    """Remap problematic regions of GC in a coding sequence.
    """
    def __init__(self, gc_min, gc_max, no_use_thresh, count_compensate):
        """Search for and normalize GC to within given thresholds.
        """
        self._gc_min = gc_min
        self._gc_max = gc_max
        self._no_use_thresh = no_use_thresh
        self._count_compensate = count_compensate

    def remap_gc(self, cds_region, aa_window):
        """Remap GC content in the given coding sequence region.
        """
        protein_seq = cds_region.protein_seq()
        new_codons = cds_region.get_codons()
        codon_count = len(new_codons)

        half_window = aa_window / 2 # int division
        cur_pos = 0
        while cur_pos < codon_count:
            end = min(cur_pos + aa_window, codon_count)
            cur_codons = new_codons[cur_pos:end]
            if len(cur_codons) > half_window:
                updated_codons = self._normalize_gc(cur_codons)
                new_codons = (new_codons[:cur_pos] + updated_codons +
                        new_codons[end:])
            cur_pos += half_window
            
        cds_region.update_codons(new_codons)
        new_protein_seq = cds_region.protein_seq()
        assert new_protein_seq == protein_seq

        return cds_region

    def remap_ends(self, cds_region, end_aa_window, gc_difference):
        """Remap GC at the ends of the given coding sequence.
        """
        protein_seq = cds_region.protein_seq()
        new_codons = cds_region.get_codons()

        five_codons = new_codons[:end_aa_window]
        three_codons = new_codons[len(new_codons) - end_aa_window:]
        assert len(five_codons) == len(three_codons)
        # first get both end GCs within our min/max range
        five_codons = self._normalize_gc(five_codons)
        three_codons = self._normalize_gc(three_codons)

        adjust_five = True
        num_tries = 0
        while 1:
            five_gc = self._codons_gc(five_codons)
            three_gc = self._codons_gc(three_codons)

            cur_gc_diff = five_gc - three_gc
            if (abs(cur_gc_diff) <= gc_difference):
                break
            # the five end has a higher GC
            if cur_gc_diff > 0:
                if adjust_five:
                    five_codons = self._decrease_region_gc(five_codons,
                            three_gc + gc_difference)
                else:
                    three_codons = self._increase_region_gc(three_codons,
                            five_gc - gc_difference, True)
            # the three end has a higher GC
            else:
                if adjust_five:
                    five_codons = self._increase_region_gc(five_codons,
                            three_gc - gc_difference)
                else:
                    three_codons = self._decrease_region_gc(three_codons,
                            five_gc + gc_difference, True)
            # switch back and forth between adjusting the 5' and 3' codons
            adjust_five = not adjust_five
            num_tries += 1
            # we did the best we could here -- not all codons can be adjusted
            if num_tries > 6:
                break

        print "Finished", five_gc, three_gc
        #assert five_gc >= self._gc_min and five_gc <= self._gc_max, (five_gc)
        #assert three_gc >= self._gc_min and three_gc <= self._gc_max, (three_gc)

        final_codons = five_codons + new_codons[end_aa_window:len(new_codons) -
                end_aa_window] + three_codons

        cds_region.update_codons(final_codons)
        new_protein_seq = cds_region.protein_seq()
        assert new_protein_seq == protein_seq

        return cds_region

    def _normalize_gc(self, cur_codons):
        """Normalize the GC over a given set of codons
        """
        full_seq_gc = self._codons_gc(cur_codons)
        if (full_seq_gc < self._gc_min):
            return self._increase_region_gc(cur_codons, self._gc_min)
        if (full_seq_gc > self._gc_max):
            return self._decrease_region_gc(cur_codons, self._gc_max)
        else:
            return cur_codons

    def _codons_gc(self, cur_codons):
        """Return the GC of a list of codons.
        """
        all_seqs = []
        for codon in cur_codons:
            all_seqs.append(codon.get_sequence())
        full_seq = "".join(all_seqs)
        return SeqUtils.GC(full_seq)

    def _increase_region_gc(self, cur_codons, gc_min, reverse = False):
        """Increase the GC content of the given region of codons.

        We can walk either left to right or right to left fixing the CDS issues.
        """
        if reverse:
            codon_index = len(cur_codons) - 1
        else:
            codon_index = 0

        cur_gc = self._codons_gc(cur_codons)
        # work through each codon and try and replace with a good codon
        # that increases GC
        while (cur_gc < gc_min) and ((reverse and codon_index >= 0) or
                (not reverse and codon_index < len(cur_codons))):
            work_codon = cur_codons[codon_index]
            high_gc_choices = work_codon.increase_gc_choices(self._no_use_thresh,
                    self._count_compensate)
            # if we have some choices for higher GC -- take the best and use it
            if len(high_gc_choices) > 0:
                cur_codons[codon_index].change_codon(high_gc_choices[0])
            cur_gc = self._codons_gc(cur_codons)
            if reverse:
                codon_index -= 1
            else:
                codon_index += 1
        final_gc = self._codons_gc(cur_codons)

        return cur_codons

    def _decrease_region_gc(self, cur_codons, gc_max, reverse = False):
        """Decrease the GC content of the given region of codons.
        """
        if reverse:
            codon_index = len(cur_codons) - 1
        else:
            codon_index = 0
        cur_gc = self._codons_gc(cur_codons)
        # work through each codon and try and replace with a good codon
        # that decreases GC
        while (cur_gc > gc_max) and ((reverse and codon_index >= 0) or
                (not reverse and codon_index < len(cur_codons))):
            work_codon = cur_codons[codon_index]
            low_gc_choices = work_codon.decrease_gc_choices(self._no_use_thresh,
                    self._count_compensate)
            # if we have some choices for low GC -- take the best and use it
            if len(low_gc_choices) > 0:
                cur_codons[codon_index].change_codon(low_gc_choices[0])
            cur_gc = self._codons_gc(cur_codons)
            if reverse:
                codon_index -= 1
            else:
                codon_index += 1
        final_gc = self._codons_gc(cur_codons)

        return cur_codons