"""Flavors of TypeIIS-based splitters.
These splitters break segments into pieces which can be assembled via typeIIs
digestion and ligation.
"""
import logging
from SynBio.Cloning.TypeIIs import (TypeIISDesignError, TypeIISDesigner,
TypeIISVectorFinder)
from SynBio.OligoSequences.Assembly import SplitTandemInverted
class MaxTypeIISBasedSplitter:
"""Provide splitting trying to maximize the use of typeIIs sites.
"""
def __init__(self, check_size, min_size, max_size, diff_finder,
cloning_group, vectors, pre_builder, post_builder,
assembly_level, max_parts = 4, lower_limits = None,
hints = None):
self._min_size = min_size
self._max_size = max_size
self._check_size = check_size
self._diff_finder = diff_finder
self._base_vectors = vectors
self._assembly_level = assembly_level
self._max_parts = max_parts
self._pre_builder = pre_builder
self._post_builder = post_builder
self._cloning_group = cloning_group
self._hint_finder = AdvancedTypeIISHintFinder(min_size, max_size,
cloning_group, max_parts)
self._vector_finder = TypeIISVectorFinder(diff_finder, assembly_level,
lower_limits)
self._end_typeIIS = True
self._hints = hints
def set_melt_calc(self, melt_calc):
pass
def get_smaller_splitter(self):
raise TypeIISDesignError('No smaller splitters available')
def get_min_max(self, convervative = False):
return self._min_size, self._max_size
def get_builders(self):
pre = self._pre_builder.copy()
post = self._post_builder.copy()
pre.use_pam_backup(False)
post.use_pam_backup(False)
return pre, post
def split(self, name, sequence):
designer = TypeIISDesigner(self._base_vectors, self._vector_finder,
self._cloning_group, self._end_typeIIS,
is_psa = False)
final_hints = self._hint_finder.find_split_hints(sequence)
parts = designer.design(name, sequence, final_hints, False)
min_size, max_size = self.get_min_max()
for part in parts:
if ((part.bad_size(min_size, max_size) != 0) and
(len(sequence) >= min_size)):
raise TypeIISDesignError("Not in size range: %s" % part)
if self._max_parts is not None:
if len(parts) > self._max_parts:
raise TypeIISDesignError("Too many parts: %s" % len(parts))
return parts
class AdvancedTypeIISHintFinder:
"""Find hints for splitting based only on typeIIs sites.
This is meant for a high level breakdown where the most important thing
is to utilize typeIIs enzymes efficiently.
"""
def __init__(self, min_size, max_size, cloning_group, hint_limit):
self._min_size = min_size
self._max_size = max_size
self._cloning_group = cloning_group
self._hint_limit = hint_limit
self._enzyme_buffer = 20
def find_split_hints(self, seq):
"""Find a set of hints for breaking based on typeIIs distribution.
"""
enzymes = self._cloning_group.get_end_enzymes()
enzyme_blocks = {}
for enzyme in enzymes:
blocks = enzyme.get_okay_blocks(seq, self._enzyme_buffer)
enzyme_blocks[enzyme.get_name()] = blocks
hints = []
import pprint
pprint.pprint(enzyme_blocks)
cur_pos = 0
while 1:
print 'current', cur_pos
pos_breaks = []
num_beyond = 0
for enzyme, blocks in enzyme_blocks.iteritems():
for bstart, bend in blocks:
if bstart <= cur_pos:
if ((bend > cur_pos + self._min_size) and
(bend < cur_pos + self._max_size)):
print enzyme, bstart, bend
pos_breaks.append(bend)
elif (bend >= cur_pos + self._max_size):
num_beyond += 1
# if we have any that stretch beyond the end, pick an
# appropriate start point for these based on the next region
next_break_info = []
for enzyme, blocks in enzyme_blocks.iteritems():
for bstart, bend in blocks:
if ((bstart > cur_pos + self._min_size) and
(bstart < cur_pos + self._max_size)):
if (bend - self._min_size) > bstart:
next_break_info.append((bend - bstart, bstart))
# sort by size, we want longest first
next_break_info.sort()
next_break_info.reverse()
# pull these breaks for each of the extending enzymes
all_next_breaks = [y for x, y in next_break_info]
next_breaks = all_next_breaks[:num_beyond]
print next_break_info
print cur_pos, pos_breaks, next_breaks
all_breaks = pos_breaks + next_breaks
to_break = self._pick_break_choice(cur_pos, all_breaks, hints)
# deal with any end problems
if ((len(seq) - to_break) < self._min_size):
min_choice = min(all_breaks)
if len(seq) in all_breaks:
to_break = len(seq)
elif (((len(seq) - min_choice) >= self._min_size) and
len(all_next_breaks) > 0):
to_break = all_next_breaks[0]
elif ((len(seq) - cur_pos) >= self._min_size * 2):
to_break = len(seq) - self._min_size - self._enzyme_buffer
else:
raise TypeIISDesignError('Problem at end of sequence')
cur_pos = to_break
if cur_pos == len(seq):
break
else:
hints.append(cur_pos)
print 'Smart typeIIs hints', hints
return hints
def _pick_break_choice(self, cur_pos, choices, cur_hints):
"""Pick a choice of a break from several choices.
"""
# take care of obvious ones
if len(choices) == 0:
raise TypeIISDesignError('No breaks from position %s' % cur_pos)
# a choice poor region, keep it small
# or if we are running out of hints
if len(choices) <= 2 and (len(cur_hints) <= self._hint_limit / 2):
return min(choices)
# many choices -- we can go bigger now
else:
return max(choices)
class DifficultyBasedSplitter:
"""Split a region into segments based on difficulty.
Difficult regions are assembled by a different method than non-difficult
regions, and thus have different sizes. This walks across a sequence from 5'
to 3' and splits it based on difficulty -- difficult regions have their
(smaller) size, and non-difficult regions having their (larger) size.
"""
def __init__(self, check_size, min_maxes, diff_finder, cloning_group,
check_cloning_group, vectors,
pre_builder, post_builder, assembly_level = 'sub01-pairwise',
max_parts = None, lower_limits = None,
last_chance = None):
"""Intialize with splitter details.
min_maxes is a 2D list of min and max values for different complexities
and their sizes. Currently there are only two complexities
(corresponding to AMA and PAM), so exactly two sizes are expected.
These should be ordered by difficulty, hardest sizes first.
"""
self._min_maxes = min_maxes[0]
assert len(self._min_maxes) == 2, \
'Can only handle two options for difficulty sizes'
self._backup_min_maxes = min_maxes[1:]
self._diff_finder = diff_finder
self._cloning_group = cloning_group
self._check_cloning_group = check_cloning_group
self._overhang = cloning_group.max_overhang()
self._base_vectors = vectors
self._assembly_level = assembly_level
self._end_typeIIS = True
self._check_size = check_size
self._pre_builder = pre_builder
self._post_builder = post_builder
self._buffer = 15
self._max_parts = max_parts
self._last_chance = last_chance
self._lower_limits = lower_limits
self._vector_finder = TypeIISVectorFinder(diff_finder,
assembly_level, lower_limits)
if assembly_level in ['sub01', 'sub02']:
self._is_psa = False
elif assembly_level in ['sub01-pairwise']:
self._is_psa = True
else:
raise ValueError('Cannot classify level: %s' % assembly_level)
self._logger = logging.getLogger('summary.log')
def set_melt_calc(self, melt_calc):
pass
def get_smaller_splitter(self):
if len(self._backup_min_maxes) > 0:
return DifficultyBasedSplitter(self._check_size,
self._backup_min_maxes, self._diff_finder,
self._cloning_group, self._check_cloning_group,
self._base_vectors, self._pre_builder, self._post_builder,
self._assembly_level, self._max_parts, self._lower_limits,
self._last_chance)
elif self._last_chance is not None:
return self._last_chance
else:
raise TypeIISDesignError('No smaller splitters available')
def get_min_max(self, conservative = False):
min_parts = [x[0] for x in self._min_maxes]
max_parts = [x[-1] for x in self._min_maxes]
if conservative:
return min(min_parts), min(max_parts)
else:
return min(min_parts), max(max_parts)
def get_builders(self):
pre = self._pre_builder.copy()
post = self._post_builder.copy()
pre.use_pam_backup(False)
post.use_pam_backup(False)
return pre, post
def split(self, name, sequence):
"""Split a sequence into chunks of low and high complexity.
"""
self._logger.info('^^> Difficulty based split: %s' %
self._min_maxes)
designer = TypeIISDesigner(self._base_vectors, self._vector_finder,
self._cloning_group, self._end_typeIIS,
is_psa = self._is_psa)
final_hints = self._get_split_hints(sequence)
parts = designer.design(name, sequence, final_hints, False)
min_size, max_size = self.get_min_max()
for part in parts:
if ((part.bad_size(min_size, max_size) != 0) and
(len(sequence) >= min_size)):
raise TypeIISDesignError("Not in size range: %s" % part)
if self._max_parts is not None and len(parts) > self._max_parts:
raise TypeIISDesignError("Too many parts: %s" % len(parts))
return parts
def _get_split_hints(self, sequence):
"""Retrieve the regions to split the sequence at, based on complexity.
"""
easy_min, easy_direct, easy_max = self._min_maxes[-1]
easy_min += self._buffer
easy_max -= self._buffer
easy_hint_finder = SplitTandemInverted(self._check_size, 0,
easy_max - easy_min, 6)
hard_min, hard_direct, hard_max = self._min_maxes[0]
hard_min += self._buffer
hard_max -= self._buffer
hard_hint_finder = SplitTandemInverted(self._check_size, 0,
hard_max - hard_min, 6)
cur_pos = 0
hints = []
while cur_pos < len(sequence):
self._logger.info('***Splitting at position: %s' % cur_pos)
easy_final_check_seq = self._find_easy_stretch(sequence, cur_pos,
easy_min, easy_max, easy_hint_finder)
complexity = self._diff_finder.summary_difficulty('',
easy_final_check_seq)
easy_check_pos = cur_pos + len(easy_final_check_seq)
# easy case -- we've found a breakpoint and a low complexity region
# so we can add this region to be split and keep moving
if complexity in ['low'] and self._end_okay(cur_pos, easy_check_pos,
sequence, easy_direct):
final_pos = cur_pos + len(easy_final_check_seq)
# hard case -- get a segment for assembly via the complex AMA
# method
else:
final_pos = self._get_easy_breakpoint(sequence, cur_pos,
hard_max, easy_min)
if final_pos and not self._end_okay(cur_pos, final_pos, sequence,
easy_direct):
final_pos = None
# finally -- give up and get a segment for assembly via the
# complex AMA method
if final_pos is None:
final_pos = self._get_hard_breakpoint(sequence, cur_pos,
hard_min, hard_max, hard_direct,
hard_hint_finder)
assert final_pos is not None
hints.append(final_pos)
cur_pos = final_pos
# chop off the last hint, which is just the end of the sequence
assert hints[-1] == len(sequence)
hints = hints[:-1]
self._logger.info('Final hints: %s' % hints)
return hints
def _end_okay(self, start_pos, cur_pos, sequence, direct_check):
"""Check if the current position is okay with the sequence end.
Also check if the end has any issues with the cloning group used
downstream, which would mean it needs to go direct to full length.
"""
# check if we are okay with the end
total_size = len(sequence)
our_min, our_max = self.get_min_max()
distance = total_size - cur_pos
is_okay = ((distance >= our_min * 2) or
(distance >= our_min and distance <= our_max)
or distance == 0)
# print distance, our_min, our_max, is_okay
# check if the ends are okay with downstream restriction choices
if is_okay:
# if we are so small we don't need downstream enzymes we are okay
check_min, check_max = self.get_min_max(True)
if (cur_pos - start_pos) < check_max:
res_okay = True
# otherwise make sure we have something to work with
else:
res_okay = False
# print check_min, check_max, cur_pos, start_pos, res_okay
end_enzymes = self._cloning_group.get_end_enzymes()
while len(end_enzymes) > 0 and not res_okay:
end_enzyme = end_enzymes.pop(0)
region = sequence[start_pos:cur_pos]
if len(region) >= direct_check:
end_check = int(round(direct_check / 1.5))
res_okay = self._check_IIs_ends(region, end_enzyme,
end_check, self._check_cloning_group)
else:
res_okay = True
is_okay = res_okay
#self._logger.info('Checking: %s %s %s' % (cur_pos, total_size, is_okay))
return is_okay
def _check_IIs_ends(self, region, end_enzyme, end_check, check_group):
"""Check whether we are okay with IIs ends at the end of this sequence.
"""
full_region = end_enzyme.get_five_end('') + region + \
end_enzyme.get_three_end('')
five_end = full_region[:end_check]
three_end = full_region[-end_check:]
# see if we can find an enzyme for the five end
five_okay = False
for check_enzyme in check_group.get_end_enzymes():
if check_enzyme.check_seq(five_end):
five_okay = True
# find an enzyme for the three end
if five_okay:
for check_enzyme in check_group.get_end_enzymes():
if check_enzyme.check_seq(three_end):
return True
# if we got here couldn't find an okay enzyme
return False
def _get_easy_breakpoint(self, sequence, cur_pos, min_size, max_size):
"""Find a breakpoint that creates an easy to assemble segment.
This searches for a region that was not our ideal PAM splitting size,
but that could be still dealt with via PAM. It helps prevent creeping
along good sequence to find a tough stretch further along.
"""
move_size = 150
cur_size = max_size
while cur_size > min_size + move_size:
end = min(cur_pos + cur_size + self._overhang, len(sequence))
test_seq = sequence[cur_pos:end]
complexity = self._diff_finder.summary_difficulty('', test_seq)
if complexity in ['low']:
return cur_pos + cur_size
cur_size -= move_size
# if we got here, we couldn't find a breakpoint
return None
def _get_hard_breakpoint(self, sequence, cur_pos, hard_min, hard_max,
hard_direct, hard_hint_finder):
"""Retrieve a breakpoint on a region designed for difficult assembly.
"""
# base case -- we are at the end of the sequence
if (cur_pos + hard_max >= len(sequence) or
cur_pos + hard_min >= len(sequence)):
return len(sequence)
hard_check_seq = sequence[cur_pos + hard_min:cur_pos + hard_max]
hard_hints = hard_hint_finder.find_split_hints(hard_check_seq,
len(hard_check_seq))
if len(hard_hints) == 1:
hard_size = hard_min + hard_hints[0].breakpoint
elif len(hard_hints) == 0:
hard_size = hard_max
else:
raise ValueError('Not expecting multiple hints')
hard_break = cur_pos + hard_size
if not self._end_okay(cur_pos, hard_break, sequence, hard_direct):
hard_break = cur_pos + hard_min
return hard_break
def _find_easy_stretch(self, sequence, cur_pos, easy_min, easy_max,
easy_hint_finder):
"""Retrieve a length of sequence designed for easy assembly.
This checks for repeats and designs a breakpoint in a region meant to be
amenable to assembly via standard PAM methods.
"""
# base case -- we are at the end of the sequence
if (cur_pos + easy_min >= len(sequence) or
cur_pos + easy_max >= len(sequence)):
return sequence[cur_pos:]
easy_check_seq = sequence[cur_pos + easy_min:cur_pos + easy_max]
easy_hints = easy_hint_finder.find_split_hints(easy_check_seq,
len(easy_check_seq))
if len(easy_hints) == 1:
easy_size = easy_min + easy_hints[0].breakpoint
elif len(easy_hints) == 0:
easy_size = easy_max
else:
raise ValueError('Not expecting multiple hints')
end = cur_pos + easy_size + self._overhang
easy_final_check_seq = sequence[cur_pos:end]
return easy_final_check_seq