chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
synbio / SynBio / Cloning / TypeIIsSplitters.py
r2:775d9e9a5c4d 430 loc 19.0 KB embed / history / annotate / raw /
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""Flavors of TypeIIS-based splitters.

These splitters break segments into pieces which can be assembled via typeIIs
digestion and ligation.
"""
import logging

from SynBio.Cloning.TypeIIs import (TypeIISDesignError, TypeIISDesigner,
        TypeIISVectorFinder)
from SynBio.OligoSequences.Assembly import SplitTandemInverted

class MaxTypeIISBasedSplitter:
    """Provide splitting trying to maximize the use of typeIIs sites.
    """
    def __init__(self, check_size, min_size, max_size, diff_finder,
            cloning_group, vectors, pre_builder, post_builder,
            assembly_level, max_parts = 4, lower_limits = None,
            hints = None):
        self._min_size = min_size
        self._max_size = max_size
        self._check_size = check_size
        self._diff_finder = diff_finder
        self._base_vectors = vectors
        self._assembly_level = assembly_level
        self._max_parts = max_parts
        self._pre_builder = pre_builder
        self._post_builder = post_builder
        self._cloning_group = cloning_group
        self._hint_finder = AdvancedTypeIISHintFinder(min_size, max_size,
                cloning_group, max_parts)
        self._vector_finder = TypeIISVectorFinder(diff_finder, assembly_level,
                lower_limits)
        self._end_typeIIS = True

        self._hints = hints
    
    def set_melt_calc(self, melt_calc):
        pass
    
    def get_smaller_splitter(self):
        raise TypeIISDesignError('No smaller splitters available')
    
    def get_min_max(self, convervative = False):
        return self._min_size, self._max_size
    
    def get_builders(self):
        pre = self._pre_builder.copy()
        post = self._post_builder.copy()
        pre.use_pam_backup(False)
        post.use_pam_backup(False)
        return pre, post
    
    def split(self, name, sequence):
        designer = TypeIISDesigner(self._base_vectors, self._vector_finder,
                                   self._cloning_group, self._end_typeIIS,
                                   is_psa = False)
        final_hints = self._hint_finder.find_split_hints(sequence)
        parts = designer.design(name, sequence, final_hints, False)
        min_size, max_size = self.get_min_max()
        for part in parts:
            if ((part.bad_size(min_size, max_size) != 0) and
                    (len(sequence) >= min_size)):
                raise TypeIISDesignError("Not in size range: %s" % part)
        if self._max_parts is not None:
            if len(parts) > self._max_parts:
                raise TypeIISDesignError("Too many parts: %s" % len(parts))
        return parts

class AdvancedTypeIISHintFinder:
    """Find hints for splitting based only on typeIIs sites.

    This is meant for a high level breakdown where the most important thing
    is to utilize typeIIs enzymes efficiently.
    """
    def __init__(self, min_size, max_size, cloning_group, hint_limit):
        self._min_size = min_size
        self._max_size = max_size
        self._cloning_group = cloning_group
        self._hint_limit = hint_limit
        
        self._enzyme_buffer = 20

    def find_split_hints(self, seq):
        """Find a set of hints for breaking based on typeIIs distribution.
        """
        enzymes = self._cloning_group.get_end_enzymes()
        enzyme_blocks = {}
        for enzyme in enzymes:
            blocks = enzyme.get_okay_blocks(seq, self._enzyme_buffer)
            enzyme_blocks[enzyme.get_name()] = blocks
        hints = []
        import pprint
        pprint.pprint(enzyme_blocks)
        cur_pos = 0
        while 1:
            print 'current', cur_pos
            pos_breaks = []
            num_beyond = 0
            for enzyme, blocks in enzyme_blocks.iteritems():
                for bstart, bend in blocks:
                    if bstart <= cur_pos:
                        if ((bend > cur_pos + self._min_size) and
                                (bend < cur_pos + self._max_size)):
                            print enzyme, bstart, bend
                            pos_breaks.append(bend)
                        elif (bend >= cur_pos + self._max_size):
                            num_beyond += 1
            # if we have any that stretch beyond the end, pick an
            # appropriate start point for these based on the next region
            next_break_info = []
            for enzyme, blocks in enzyme_blocks.iteritems():
                for bstart, bend in blocks:
                    if ((bstart > cur_pos + self._min_size) and
                            (bstart < cur_pos + self._max_size)):
                        if (bend - self._min_size) > bstart:
                            next_break_info.append((bend - bstart, bstart))
            # sort by size, we want longest first
            next_break_info.sort()
            next_break_info.reverse()
            # pull these breaks for each of the extending enzymes
            all_next_breaks = [y for x, y in next_break_info]
            next_breaks = all_next_breaks[:num_beyond]
            print next_break_info
            print cur_pos, pos_breaks, next_breaks
            all_breaks = pos_breaks + next_breaks
            to_break = self._pick_break_choice(cur_pos, all_breaks, hints)
            # deal with any end problems
            if ((len(seq) - to_break) < self._min_size):
                min_choice = min(all_breaks)
                if len(seq) in all_breaks:
                    to_break = len(seq)
                elif (((len(seq) - min_choice) >= self._min_size) and
                        len(all_next_breaks) > 0):
                    to_break = all_next_breaks[0]
                elif ((len(seq) - cur_pos) >= self._min_size * 2):
                    to_break = len(seq) - self._min_size - self._enzyme_buffer
                else:
                    raise TypeIISDesignError('Problem at end of sequence')
            cur_pos = to_break
            if cur_pos == len(seq):
                break
            else:
                hints.append(cur_pos)
        print 'Smart typeIIs hints', hints
        return hints

    def _pick_break_choice(self, cur_pos, choices, cur_hints):
        """Pick a choice of a break from several choices.
        """
        # take care of obvious ones
        if len(choices) == 0:
            raise TypeIISDesignError('No breaks from position %s' % cur_pos)
        # a choice poor region, keep it small
        # or if we are running out of hints
        if len(choices) <= 2 and (len(cur_hints) <= self._hint_limit / 2):
            return min(choices)
        # many choices -- we can go bigger now
        else:
            return max(choices)

class DifficultyBasedSplitter:
    """Split a region into segments based on difficulty.

    Difficult regions are assembled by a different method than non-difficult
    regions, and thus have different sizes. This walks across a sequence from 5'
    to 3' and splits it based on difficulty -- difficult regions have their
    (smaller) size, and non-difficult regions having their (larger) size.
    """
    def __init__(self, check_size, min_maxes, diff_finder, cloning_group,
            check_cloning_group, vectors,
                 pre_builder, post_builder, assembly_level = 'sub01-pairwise',
                 max_parts = None, lower_limits = None,
                 last_chance = None):
        """Intialize with splitter details.

        min_maxes is a 2D list of min and max values for different complexities
        and their sizes. Currently there are only two complexities
        (corresponding to AMA and PAM), so exactly two sizes are expected.
        These should be ordered by difficulty, hardest sizes first.
        """
        self._min_maxes = min_maxes[0]
        assert len(self._min_maxes) == 2, \
           'Can only handle two options for difficulty sizes'
        self._backup_min_maxes = min_maxes[1:]
        self._diff_finder = diff_finder
        self._cloning_group = cloning_group
        self._check_cloning_group = check_cloning_group
        self._overhang = cloning_group.max_overhang()
        self._base_vectors = vectors
        self._assembly_level = assembly_level
        self._end_typeIIS = True
        self._check_size = check_size
        self._pre_builder = pre_builder
        self._post_builder = post_builder
        self._buffer = 15
        self._max_parts = max_parts
        self._last_chance = last_chance
        self._lower_limits = lower_limits
        self._vector_finder = TypeIISVectorFinder(diff_finder,
                assembly_level, lower_limits)
        if assembly_level in ['sub01', 'sub02']:
            self._is_psa = False
        elif assembly_level in ['sub01-pairwise']:
            self._is_psa = True
        else:
            raise ValueError('Cannot classify level: %s' % assembly_level)
        self._logger = logging.getLogger('summary.log')

    def set_melt_calc(self, melt_calc):
        pass

    def get_smaller_splitter(self):
        if len(self._backup_min_maxes) > 0:
            return DifficultyBasedSplitter(self._check_size,
                    self._backup_min_maxes, self._diff_finder,
                    self._cloning_group, self._check_cloning_group,
                    self._base_vectors, self._pre_builder, self._post_builder,
                    self._assembly_level, self._max_parts, self._lower_limits,
                    self._last_chance)
        elif self._last_chance is not None:
            return self._last_chance
        else:
            raise TypeIISDesignError('No smaller splitters available')

    def get_min_max(self, conservative = False):
        min_parts = [x[0] for x in self._min_maxes]
        max_parts = [x[-1] for x in self._min_maxes]
        if conservative:
            return min(min_parts), min(max_parts)
        else:
            return min(min_parts), max(max_parts)
    
    def get_builders(self):
        pre = self._pre_builder.copy()
        post = self._post_builder.copy()
        pre.use_pam_backup(False)
        post.use_pam_backup(False)
        return pre, post
    
    def split(self, name, sequence):
        """Split a sequence into chunks of low and high complexity.
        """
        self._logger.info('^^> Difficulty based split: %s' %
                self._min_maxes)
        designer = TypeIISDesigner(self._base_vectors, self._vector_finder,
                                   self._cloning_group, self._end_typeIIS,
                                   is_psa = self._is_psa)
        final_hints = self._get_split_hints(sequence)
        parts = designer.design(name, sequence, final_hints, False)
        min_size, max_size = self.get_min_max()
        for part in parts:
            if ((part.bad_size(min_size, max_size) != 0) and
                    (len(sequence) >= min_size)):
                raise TypeIISDesignError("Not in size range: %s" % part)
        if self._max_parts is not None and len(parts) > self._max_parts:
            raise TypeIISDesignError("Too many parts: %s" % len(parts))
        return parts

    def _get_split_hints(self, sequence):
        """Retrieve the regions to split the sequence at, based on complexity.
        """
        easy_min, easy_direct, easy_max = self._min_maxes[-1]
        easy_min += self._buffer
        easy_max -= self._buffer
        easy_hint_finder = SplitTandemInverted(self._check_size, 0,
                                               easy_max - easy_min, 6)
        hard_min, hard_direct, hard_max = self._min_maxes[0]
        hard_min += self._buffer
        hard_max -= self._buffer
        hard_hint_finder = SplitTandemInverted(self._check_size, 0,
                                               hard_max - hard_min, 6)
        cur_pos = 0
        hints = []
        while cur_pos < len(sequence):
            self._logger.info('***Splitting at position: %s' % cur_pos)
            easy_final_check_seq = self._find_easy_stretch(sequence, cur_pos,
                                    easy_min, easy_max, easy_hint_finder)
            complexity = self._diff_finder.summary_difficulty('',
                                                        easy_final_check_seq)
            easy_check_pos = cur_pos + len(easy_final_check_seq)
            # easy case -- we've found a breakpoint and a low complexity region
            # so we can add this region to be split and keep moving
            if complexity in ['low'] and self._end_okay(cur_pos, easy_check_pos,
                    sequence, easy_direct):
                final_pos = cur_pos + len(easy_final_check_seq)
            # hard case -- get a segment for assembly via the complex AMA
            # method
            else:
                final_pos = self._get_easy_breakpoint(sequence, cur_pos,
                                                      hard_max, easy_min)
                if final_pos and not self._end_okay(cur_pos, final_pos, sequence,
                        easy_direct):
                    final_pos = None
                # finally -- give up and get a segment for assembly via the
                # complex AMA method
                if final_pos is None:
                    final_pos = self._get_hard_breakpoint(sequence, cur_pos,
                                     hard_min, hard_max, hard_direct,
                                     hard_hint_finder)
            assert final_pos is not None
            hints.append(final_pos)
            cur_pos = final_pos
        # chop off the last hint, which is just the end of the sequence
        assert hints[-1] == len(sequence)
        hints = hints[:-1]
        self._logger.info('Final hints: %s' % hints)
        return hints

    def _end_okay(self, start_pos, cur_pos, sequence, direct_check):
        """Check if the current position is okay with the sequence end.

        Also check if the end has any issues with the cloning group used
        downstream, which would mean it needs to go direct to full length.
        """
        # check if we are okay with the end
        total_size = len(sequence)
        our_min, our_max = self.get_min_max()
        distance = total_size - cur_pos
        is_okay = ((distance >= our_min * 2) or 
                (distance >= our_min and distance <= our_max)
                or distance == 0)
        # print distance, our_min, our_max, is_okay
        # check if the ends are okay with downstream restriction choices
        if is_okay:
            # if we are so small we don't need downstream enzymes we are okay
            check_min, check_max = self.get_min_max(True)
            if (cur_pos - start_pos) < check_max:
                res_okay = True
            # otherwise make sure we have something to work with
            else:
                res_okay = False
            # print check_min, check_max, cur_pos, start_pos, res_okay
            end_enzymes = self._cloning_group.get_end_enzymes()
            while len(end_enzymes) > 0 and not res_okay:
                end_enzyme = end_enzymes.pop(0)
                region = sequence[start_pos:cur_pos]
                if len(region) >= direct_check:
                    end_check = int(round(direct_check / 1.5))
                    res_okay = self._check_IIs_ends(region, end_enzyme,
                            end_check, self._check_cloning_group)
                else:
                    res_okay = True
            is_okay = res_okay
        #self._logger.info('Checking: %s %s %s' % (cur_pos, total_size, is_okay))
        return is_okay

    def _check_IIs_ends(self, region, end_enzyme, end_check, check_group):
        """Check whether we are okay with IIs ends at the end of this sequence.
        """
        full_region = end_enzyme.get_five_end('') + region + \
                end_enzyme.get_three_end('')
        five_end = full_region[:end_check]
        three_end = full_region[-end_check:]
        # see if we can find an enzyme for the five end
        five_okay = False
        for check_enzyme in check_group.get_end_enzymes():
            if check_enzyme.check_seq(five_end):
                five_okay = True
        # find an enzyme for the three end
        if five_okay:
            for check_enzyme in check_group.get_end_enzymes():
                if check_enzyme.check_seq(three_end):
                    return True
        # if we got here couldn't find an okay enzyme
        return False

    def _get_easy_breakpoint(self, sequence, cur_pos, min_size, max_size):
        """Find a breakpoint that creates an easy to assemble segment.

        This searches for a region that was not our ideal PAM splitting size,
        but that could be still dealt with via PAM. It helps prevent creeping
        along good sequence to find a tough stretch further along.
        """
        move_size = 150
        cur_size = max_size
        while cur_size > min_size + move_size:
            end = min(cur_pos + cur_size + self._overhang, len(sequence))
            test_seq = sequence[cur_pos:end]
            complexity = self._diff_finder.summary_difficulty('', test_seq)
            if complexity in ['low']:
                return cur_pos + cur_size
            cur_size -= move_size
        # if we got here, we couldn't find a breakpoint
        return None

    def _get_hard_breakpoint(self, sequence, cur_pos, hard_min, hard_max,
                             hard_direct, hard_hint_finder):
        """Retrieve a breakpoint on a region designed for difficult assembly.
        """
        # base case -- we are at the end of the sequence
        if (cur_pos + hard_max >= len(sequence) or
            cur_pos + hard_min >= len(sequence)):
            return len(sequence)
        hard_check_seq = sequence[cur_pos + hard_min:cur_pos + hard_max]
        hard_hints = hard_hint_finder.find_split_hints(hard_check_seq,
                                                       len(hard_check_seq))
        if len(hard_hints) == 1:
            hard_size = hard_min + hard_hints[0].breakpoint
        elif len(hard_hints) == 0:
            hard_size = hard_max
        else:
            raise ValueError('Not expecting multiple hints')
        hard_break = cur_pos + hard_size
        if not self._end_okay(cur_pos, hard_break, sequence, hard_direct):
            hard_break = cur_pos + hard_min
        return hard_break
           
    def _find_easy_stretch(self, sequence, cur_pos, easy_min, easy_max,
                           easy_hint_finder):
        """Retrieve a length of sequence designed for easy assembly.

        This checks for repeats and designs a breakpoint in a region meant to be
        amenable to assembly via standard PAM methods.
        """
        # base case -- we are at the end of the sequence
        if (cur_pos + easy_min >= len(sequence) or
            cur_pos + easy_max >= len(sequence)):
            return sequence[cur_pos:]
        easy_check_seq = sequence[cur_pos + easy_min:cur_pos + easy_max]
        easy_hints = easy_hint_finder.find_split_hints(easy_check_seq,
                                                       len(easy_check_seq))
        if len(easy_hints) == 1:
            easy_size = easy_min + easy_hints[0].breakpoint
        elif len(easy_hints) == 0:
            easy_size = easy_max
        else:
            raise ValueError('Not expecting multiple hints')
        end = cur_pos + easy_size + self._overhang
        easy_final_check_seq = sequence[cur_pos:end]
        return easy_final_check_seq