Source

python-split / split.py

Full commit
"""Functions to split or partition sequences."""

import collections
import platform

__all__ = [ "chop", "partition", "split" ]
__author__ = "Sergey Astanin"
__license__ = "MIT"
__version__ = "0.3"

if platform.python_version_tuple()[0] == "2":
    _range = xrange
else:
    _range = range

class _SubSequencer:
    """
    Lazily process a sequence in single pass and split into many.
    Compute all output sequences even if only one of them is consumed.
    Maintain as many subsequence queues as there are predicate values.
    """
    def __init__(self, predicate, sequence):
        self.predicate = predicate
        self.subseqs = dict()  # mapping of { predicate_value: subseq }
        self.seq = iter(sequence)

    def subqueue(self, predicate_value):
        "Get or create a subsequence queue."
        if not predicate_value in self.subseqs:
            subseq = self.subseqs[predicate_value] = collections.deque([])
        else:
            subseq = self.subseqs[predicate_value]
        return subseq

    def subappend(self, predicate_value, item):
        "Append a processed item to subsequence queue."
        subseq = self.subqueue(predicate_value)
        subseq.append(item)

    def subnext(self, predicate_value):
        "Next value in the subsequence corresponding to predicate_value."
        subseq = self.subqueue(predicate_value)
        if subseq:
            return subseq.popleft()
        else:  # subsequence queue is empty
            while True:  # exit on StopIteration
                n = next(self.seq)
                pv = self.predicate(n)
                if pv == predicate_value:
                    return n
                else:
                    self.subappend(pv, n)

def partition(condition, sequence):
    """
    Split a sequence into two subsequences, in single-pass and preserving order.

    Arguments:

    condition   a function; if condition is None, split true and false items
    sequence    an iterable object

    Return a pair of generators (seq_true, seq_false). The first one
    builds a subsequence for which the condition holds, the second one
    builds a subsequence for which the condition doesn't hold.

    As the function works in single pass, it leads to build-up of both
    subsequences even if only one of them is consumed.

    It is similar to Data.List.partition in Haskell, partition-by in
    Clojure, or running two complementary filters:

       from itertools import ifilter, ifilterfalse
       (ifilter(condition, sequence), ifilterfalse(condition, sequence))

    >>> def odd(x): return x%2 != 0
    >>> odds, evens = partition(odd, range(10))
    >>> next(odds)
    1
    >>> next(odds)
    3
    >>> list(evens)
    [0, 2, 4, 6, 8]
    >>> list(odds)
    [5, 7, 9]

    """
    cond = condition if condition else bool  # eval as bool if condition is None
    ss = _SubSequencer(cond, sequence)
    def condition_holds():
        while True:
            yield ss.subnext(True)
    def condition_doesnt_hold():
        while 1:
            yield ss.subnext(False)
    return condition_holds(), condition_doesnt_hold()

def _take(n, sequence):
    """Take the first n elements of the sequence.
    Return head and tail sequences.

    >>> head, tail = _take(3, range(10))
    >>> list(head), list(tail)
    ([0, 1, 2], [3, 4, 5, 6, 7, 8, 9])

    >>> head, tail = _take(13, range(10))
    >>> list(head), list(tail)
    ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [])
    """
    head = []
    xs = iter(sequence)
    try:
        for i in _range(n):
            x = next(xs)
            head.append(x)
        return head, xs
    except StopIteration:
        return head, ()

def chop(n, sequence, truncate=False):
    """
    Split a sequence into chunks of size n.
    Return an iterator over chunks.

    Arguments:

    n           chunk size
    sequence    an iterable object
    truncate    if True, truncate sequence length to the multiple of n;
                if False, the size of last chunk may be less than n.

    It is similar to partition and partition-all in Clojure.

    >>> list(chop(3, range(10)))
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
    >>> list(chop(3, range(10), truncate=True))
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    >>> list(chop(3, range(6)))
    [[0, 1, 2], [3, 4, 5]]

    This function is lazy and produces new chunks only on demand:

    >>> if platform.python_version_tuple()[0] > '2': xrange=range
    >>> chunks = chop(3, xrange(int(1e9)))
    >>> next(chunks)
    [0, 1, 2]

    """
    assert n > 1, "chunk size is not positive"
    def chopper():
        tail = sequence
        while tail:
            head, tail = _take(n, tail)
            if head and (not truncate or len(head) == n):
                yield head
    return chopper()

def _nextByDelim(delimiter, seq):
    "Next chunk from from the sequence seq, and sequence tail."
    iseq = iter(seq)
    chunk = []
    try:
        while True:
            x = next(iseq)
            if x != delimiter:
                chunk.append(x)
            else:
                break
        return chunk, iseq
    except StopIteration:
        return chunk, ()

def split(delimiter, sequence, maxsplit=None):
    """
    Break a sequence on elements equal to delimiter.
    Return an iterator over chunks (delimiters excluded).

    If maxsplit is given, at most maxsplit splits are done.

    >>> list(split(0, [1,2,3,0,4,5,0,0,6]))
    [[1, 2, 3], [4, 5], [], [6]]

    >>> list(map(list, split(0, [1,2,3,0,4,5,0,0,6], maxsplit=2)))
    [[1, 2, 3], [4, 5], [0, 6]]

    This function is lazy and produces new chunks only on demand:

    >>> if platform.python_version_tuple()[0] > '2': xrange=range
    >>> chunks = split(9, xrange(int(1e9)))
    >>> next(chunks)
    [0, 1, 2, 3, 4, 5, 6, 7, 8]

    """
    def splitter():
        tail = sequence
        splits = 0
        while tail:
            if maxsplit and splits >= maxsplit:
                yield tail
                tail = None
            else:
                chunk, tail = _nextByDelim(delimiter, tail)
                splits += 1
                yield chunk
    return splitter()

if __name__ == "__main__":
    import doctest
    doctest.testmod()