Commits

Lynn Rees  committed 31843d8

- initial for thingpipe

  • Participants
  • Parent commits 381af4a
  • Branches next

Comments (0)

Files changed (69)

 include README.rst
 include packages
 include requirements.txt
-recursive-include thingq *.py
+recursive-include thingpipe *.py
-*thingq* chains iterators together using a double-headed input and output queue,
+*thingpipe* chains iterators together using a double-headed input and output queue,
 that can be shifted or synced as needed.
 
-*thingq* exposes some useful features buried in the standard library and 
+*thingpipe* exposes some useful features buried in the standard library and 
 includes some Python 2 <-> 3 compatibility aids.
 
-Mirrored on https://github.com/kwarterthieves/thingq/
+Mirrored on https://github.com/kwarterthieves/thingpipe/

File docs/conf.py

 # -*- coding: utf-8 -*-
 #
-# thingq documentation build configuration file, created by
+# thingpipe documentation build configuration file, created by
 # sphinx-quickstart on Mon Mar 19 15:41:40 2012.
 #
 # This file is execfile()d with the current directory set to its containing dir.
 master_doc = 'index'
 
 # General information about the project.
-project = u'thingq'
+project = u'thingpipe'
 copyright = u'2012, L. C. Rees'
 
 # The version info for the project you're documenting, acts as replacement for
 #html_file_suffix = None
 
 # Output file base name for HTML help builder.
-htmlhelp_basename = 'thingqdoc'
+htmlhelp_basename = 'thingpipedoc'
 
 
 # -- Options for LaTeX output --------------------------------------------------
 # Grouping the document tree into LaTeX files. List of tuples
 # (source start file, target name, title, author, documentclass [howto/manual]).
 latex_documents = [
-  ('index', 'thingq.tex', u'thingq Documentation',
+  ('index', 'thingpipe.tex', u'thingpipe Documentation',
    u'L. C. Rees', 'manual'),
 ]
 
 # One entry per manual page. List of tuples
 # (source start file, name, description, authors, manual section).
 man_pages = [
-    ('index', 'thingq', u'thingq Documentation',
+    ('index', 'thingpipe', u'thingpipe Documentation',
      [u'L. C. Rees'], 1)
 ]
 
 # (source start file, target name, title, author,
 #  dir menu entry, description, category)
 texinfo_documents = [
-  ('index', 'thingq', u'thingq Documentation',
-   u'L. C. Rees', 'thingq', 'One line description of project.',
+  ('index', 'thingpipe', u'thingpipe Documentation',
+   u'L. C. Rees', 'thingpipe', 'One line description of project.',
    'Miscellaneous'),
 ]
 

File docs/index.rst

-.. thingq documentation master file, created by
+.. thingpipe documentation master file, created by
    sphinx-quickstart on Mon Mar 19 15:41:40 2012.
    You can adapt this file completely to your liking, but it should at least
    contain the root `toctree` directive.
 
-Welcome to thingq's documentation!
+Welcome to thingpipe's documentation!
 ================================
 
 Contents:
-'''thingq fabfile'''
+'''thingpipe fabfile'''
 
 from fabric.api import prompt, local, settings, env
 
 
 
 def tox():
-    '''test thingq'''
+    '''test thingpipe'''
     local('tox')
 
 
 def tox_recreate():
-    '''recreate thingq test env'''
+    '''recreate thingpipe test env'''
     prompt(
         'Enter testenv: [py26, py27, py32]',
         'testenv',
 
 
 def release():
-    '''release thingq'''
+    '''release thingpipe'''
     local('hg update pu')
     local('hg update next')
     local('hg merge pu; hg ci -m automerge')
     prompt('Enter tag: ', 'tag')
     with settings(warn_only=True):
         local('hg tag "%(tag)s"' % env)
-        local('hg push ssh://hg@bitbucket.org/lcrees/thingq')
-        local('hg push git+ssh://git@github.com:kwarterthieves/thingq.git')
+        local('hg push ssh://hg@bitbucket.org/lcrees/thingpipe')
+        local('hg push git+ssh://git@github.com:kwarterthieves/thingpipe.git')
     local('./setup.py register sdist --format=bztar,gztar,zip upload')
     local('rm -rf dist')
 
 
 def release_next():
-    '''release thingq from next branch'''
+    '''release thingpipe from next branch'''
     local('hg update maint')
     local('hg merge default; hg ci -m automerge')
     local('hg update default')
     prompt('Enter tag: ', 'tag')
     with settings(warn_only=True):
         local('hg tag "%(tag)s"' % env)
-        local('hg push ssh://hg@bitbucket.org/lcrees/thingq')
-        local('hg push git+ssh://git@github.com:kwarterthieves/thingq.git')
+        local('hg push ssh://hg@bitbucket.org/lcrees/thingpipe')
+        local('hg push git+ssh://git@github.com:kwarterthieves/thingpipe.git')
     local('./setup.py register sdist --format=bztar,gztar,zip upload')
     local('rm -rf dist')

File packages

-thingq
-thingq.active
-thingq.lazy
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
-'''setup for thingq'''
+'''setup for thingpipe'''
 
 from os import getcwd
 from os.path import join
 ).readlines())
 
 setup(
-    name='thingq',
+    name='thingpipe',
     version='0.5.0',
-    description='iterator chaining, underscored by a two-headed queue',
+    description='Things go in. Things happen. Things go out. Vaguely inspired'
+        ' by underscore.js',
     long_description=open(join(getcwd(), 'README.rst'), 'r').read(),
-    keywords='queue generator utility iterator functional programming',
+    keywords='pipe flow ETL iterator functional fluent chaining',
     license='BSD',
     author='L. C. Rees',
     author_email='lcrees@gmail.com',
-    url='https://bitbucket.org/lcrees/thingq',
-    packages=[
-        l.strip() for l in open(join(getcwd(), 'packages'), 'r').readlines()
-    ],
-    test_suite='thingq.tests',
+    url='https://bitbucket.org/lcrees/thingpipe',
+    packages=['thingpipe'],
+    test_suite='thingpipe.tests',
     zip_safe=False,
     install_requires=install_requires,
     classifiers=[
-        'Development Status :: 3 - Alpha',
+        'Development Status :: 4 - Beta',
         'Intended Audience :: Developers',
         'License :: OSI Approved :: BSD License',
         'Natural Language :: English',

File thingpipe/__init__.py

+# -*- coding: utf-8 -*-
+'''Things go in. Things happen. Things go out.'''
+
+from thingpipe.compat import port
+from thingpipe.lazy import lazypipe
+from thingpipe.active import activepipe
+
+thingpipe = activepipe
+
+__all__ = ('thingpipe', 'activepipe', 'lazypipe' 'port')
+__version__ = (0, 5, 0)

File thingpipe/active.py

+# -*- coding: utf-8 -*-
+'''actively evaluated pipelines'''
+
+from copy import copy
+from itertools import repeat
+from collections import deque
+from contextlib import contextmanager
+
+from stuf.utils import clsname
+
+from thingpipe.transform import StringMixin
+from thingpipe.mapping import RepeatMixin, MapMixin
+from thingpipe.order import RandomMixin, OrderMixin
+from thingpipe.base import SLOTS, PipeMixin, ResultsMixin
+from thingpipe.reducing import MathMixin, TruthMixin, ReduceMixin
+from thingpipe.filtering import FilterMixin, ExtractMixin, SetMixin, SliceMixin
+
+
+class ActiveMixin(PipeMixin):
+
+    '''base active pipeline'''
+
+    def __init__(self, *things, **kw):
+        try:
+            incoming = deque(things[0]) if len(things) == 1 else deque(things)
+        except TypeError:
+            incoming = deque()
+            incoming.extend(things)
+        super(ActiveMixin, self).__init__(incoming, deque())
+        # set iterator
+        self._iterator = self._iterexcept
+        # work things
+        self._work = deque()
+        # utility things
+        self._util = deque()
+
+    ###########################################################################
+    ## mode things ############################################################
+    ###########################################################################
+
+    def ro(self):
+        '''switch to read-only mode'''
+        with self.ctx3(outq=self._UTILVAR, savepoint=False):
+            self._xtend(self._iterable)
+        with self.ctx1(hard=True, workq=self._UTILVAR, savepoint=False):
+            self.current_mode = self._RO
+            return self
+
+    ###########################################################################
+    ## context things #######################################################
+    ###########################################################################
+
+    @contextmanager
+    def ctx2(self, **kw):
+        '''swap for two-armed context'''
+        self.swap(
+            outq=kw.get(self._OUTCFG, self._INVAR), context=self.ctx2(), **kw
+        )
+        getr_ = lambda x: getattr(self, x)
+        outq = getr_(self._OUTQ)
+        utilq = getr_(self._UTILQ)
+        workq = getr_(self._WORKQ)
+        # clear all work things
+        workq.clear()
+        # extend work things with outgoing things
+        workq.extend(outq)
+        # swap iterator
+        self._iterator = self._breakcount
+        yield
+        # clear outgoing things if so configured
+        if self._clearout:
+            outq.clear()
+        # extend outgoing things with utility things
+        outq.extend(utilq)
+        # clear utility things
+        utilq.clear()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def ctx3(self, **kw):
+        '''swap for three-armed context'''
+        self.swap(
+            utilq=kw.get(self._WORKCFG, self._WORKVAR), context=self.ctx3, **kw
+        )
+        getr_ = lambda x: getattr(self, x)
+        outq = getr_(self._OUTQ)
+        utilq = getr_(self._UTILQ)
+        workq = getr_(self._WORKQ)
+        # clear work things
+        workq.clear()
+        # extend work things with incoming things
+        workq.extend(getr_(self._INQ))
+        # swap iterators
+        self._iterator = self._breakcount
+        yield
+        # clear outgoing things if so configured
+        if self._clearout:
+            outq.clear()
+        # extend outgoing things with utility things
+        outq.extend(utilq)
+        # clear utility things
+        utilq.clear()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def ctx4(self, **kw):
+        '''swap for four-armed context'''
+        self.swap(context=self.ctx4, **kw)
+        getr_ = lambda x: getattr(self, x)
+        outq = getr_(self._OUTQ)
+        utilq = getr_(self._UTILQ)
+        workq = getr_(self._WORKQ)
+        # clear work things
+        workq.clear()
+        # extend work things with incoming things
+        workq.extend(getr_(self._INQ))
+        # swap iterators
+        self._iterator = self._iterexcept
+        yield
+        # clear outgoing things if so configured
+        if self._clearout:
+            outq.clear()
+        # extend outgoing things with utility things
+        outq.extend(utilq)
+        # clear utility things
+        utilq.clear()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def autoctx(self, **kw):
+        '''swap for auto-synchronizing context'''
+        self.swap(context=self.autoctx, **kw)
+        getr_ = lambda x: getattr(self, x)
+        inq, workq = getr_(self._INQ), getr_(self._WORKQ)
+        utilq,  outq = getr_(self._UTILQ), getr_(self._OUTQ)
+        # clear work things
+        workq.clear()
+        # extend work things with incoming things
+        workq.extend(inq)
+        # swap iterators
+        self._iterator = self._iterexcept
+        yield
+        # clear outgoing things if so configured
+        if self._clearout:
+            outq.clear()
+        outq.extend(utilq)
+        # clear incoming things
+        inq.clear()
+        inq.extend(utilq)
+        # clear utility things
+        utilq.clear()
+        # return to global context
+        self.reswap()
+
+    ###########################################################################
+    ## savepoint for things ###################################################
+    ###########################################################################
+
+    def _savepoint(self):
+        '''take savepoint of incoming things'''
+        self._savepoints.append(copy(getattr(self, self._INQ)))
+        return self
+
+    ###########################################################################
+    ## iterate things #########################################################
+    ###########################################################################
+
+    @property
+    def _iterable(self):
+        '''iterable'''
+        return self._iterator(self._WORKQ)
+
+    def _breakcount(self, attr='_UTILQ'):
+        '''
+        breakcount iterator
+
+        @param attr: things to iterate over
+        '''
+        dq = getattr(self, attr)
+        length, call = len(dq), dq.popleft
+        for i in repeat(None, length):  # @UnusedVariable
+            yield call()
+
+    def _iterexcept(self, attr='_UTILQ'):
+        '''
+        call a function repeatedly until an exception is raised
+
+        Converts a call-until-exception interface to an iterator interface.
+        Like `iter(call, sentinel)` but uses an exception instead of a sentinel
+        to end the loop.
+
+        Raymond Hettinger, Python Cookbook recipe # 577155
+        '''
+        call = getattr(self, attr).popleft
+        try:
+            while 1:
+                yield call()
+        except IndexError:
+            pass
+
+    ###########################################################################
+    ## extend things ##########################################################
+    ###########################################################################
+
+    def _xtend(self, things):
+        '''extend utility things with `things` wrapped'''
+        getattr(self, self._UTILQ).extend(things)
+        return self
+
+    def _xtendleft(self, things):
+        '''extend left side of utility things with `things`'''
+        getattr(self, self._UTILQ).extendleft(things)
+        return self
+
+    def _iter(self, things):
+        '''extend work things with `things` wrapped in iterator'''
+        getattr(self, self._UTILQ).extend(iter(things))
+        return self
+
+    ###########################################################################
+    ## append things ##########################################################
+    ###########################################################################
+
+    def _append(self, things):
+        '''append `things` to utility things'''
+        getattr(self, self._UTILQ).append(things)
+        return self
+
+    def _appendleft(self, things):
+        '''append `things` to left side of utility things'''
+        getattr(self, self._UTILQ).appendleft(things)
+        return self
+
+    ###########################################################################
+    ## know things ############################################################
+    ###########################################################################
+
+    def __repr__(self):
+        getr_, list_ = lambda x: getattr(self, x), list
+        return self._repr(
+            self.__module__,
+            clsname(self),
+            self.current_mode.upper(),
+            self._INQ,
+            list_(getr_(self._INQ)),
+            self._WORKQ,
+            list_(getr_(self._WORKQ)),
+            self._UTILQ,
+            list_(getr_(self._UTILQ)),
+            self._OUTQ,
+            list_(getr_(self._OUTQ)),
+            id(self),
+        )
+
+    def __len__(self):
+        '''number of incoming things'''
+        return len(self.incoming)
+    
+    count = __len__
+
+    def countout(self):
+        '''number of outgoing things'''
+        return len(self.outgoing)
+
+    ###########################################################################
+    ## clear things ###########################################################
+    ###########################################################################
+
+    def _clearu(self):
+        '''clear utility things'''
+        self._util.clear()
+        return self
+
+    def _clearw(self):
+        '''clear work things'''
+        self._work.clear()
+        return self
+
+    def clearin(self):
+        '''clear incoming things'''
+        self.incoming.clear()
+        return self
+
+    def clearout(self):
+        '''clear outgoing things'''
+        self.outgoing.clear()
+        return self
+
+
+class EndMixin(ResultsMixin):
+
+    '''result things mixin'''
+    
+    def __iter__(self):
+        '''yield outgoing things, clearing outgoing things as it iterates'''
+        return self._iterexcept(self._OUTQ)
+    
+    results = __iter__
+
+    def end(self):
+        '''return outgoing things then clear wrap everything'''
+        # return to default context
+        self.unswap()
+        wrap, outgoing = self._wrapper, self.outgoing
+        wrap = self.outgoing.pop() if len(outgoing) == 1 else wrap(outgoing)
+        # clear every last thing
+        self.clear()._clearsp()
+        return wrap
+
+    def snapshot(self):
+        '''snapshot of current outgoing things'''
+        wrap = copy(self.outgoing)
+        return wrap.pop() if len(wrap) == 1 else self._wrapper(wrap)
+
+    def out(self):
+        '''return outgoing things and clear outgoing things'''
+        # return to default context
+        self.unswap()
+        wrap, outgoing = self._wrapper, self.outgoing
+        wrap = outgoing.pop() if len(outgoing) == 1 else wrap(outgoing)
+        # clear outgoing things
+        self.clearout()
+        return wrap
+
+
+class ResultMixin(ActiveMixin, EndMixin):
+
+    '''pipeline with with results extractor mixin'''
+
+
+class activepipe(
+    ResultMixin, FilterMixin, MapMixin, ReduceMixin, OrderMixin,
+    ExtractMixin, SetMixin, SliceMixin, TruthMixin, MathMixin, RepeatMixin,
+    RandomMixin, StringMixin,
+):
+
+    '''active pipeline'''
+
+    __slots__ = SLOTS
+
+
+class collectpipe(ResultMixin, ExtractMixin):
+
+    '''collecting pipeline'''
+
+    __slots__ = SLOTS
+
+
+class setpipe(ResultMixin, SetMixin):
+
+    '''set pipeline'''
+
+    __slots__ = SLOTS
+
+
+class slicepipe(ResultMixin, SliceMixin):
+
+    '''slice pipeline'''
+
+    __slots__ = SLOTS
+
+
+class filterpipe(ResultMixin, FilterMixin, ExtractMixin, SetMixin, SliceMixin):
+
+    '''filter pipeline'''
+
+    __slots__ = SLOTS
+
+
+class repeatpipe(ResultMixin, RepeatMixin):
+
+    '''repeat pipeline'''
+
+    __slots__ = SLOTS
+
+
+class mappipe(ResultMixin, RepeatMixin, MapMixin):
+
+    '''mapping pipeline'''
+
+    __slots__ = SLOTS
+
+
+class randompipe(ResultMixin, RandomMixin):
+
+    '''randomizing pipeline'''
+
+    __slots__ = SLOTS
+
+
+class sortpipe(ResultMixin, OrderMixin, RandomMixin):
+
+    '''ordering pipeline'''
+
+    __slots__ = SLOTS
+
+
+class mathpipe(ResultMixin, MathMixin):
+
+    '''auto-balancing math pipeline'''
+
+    __slots__ = SLOTS
+
+
+class truthpipe(ResultMixin, TruthMixin):
+
+    '''auto-balancing truth pipeline'''
+
+    __slots__ = SLOTS
+
+
+class reducepipe(ResultMixin, MathMixin, TruthMixin, ReduceMixin):
+
+    '''auto-balancing reduce pipeline'''
+
+    __slots__ = SLOTS
+
+
+class stringpipe(ResultMixin, StringMixin):
+
+    '''auto-balancing string transformation pipeline'''
+
+    __slots__ = SLOTS
+
+
+class transformpipe(ResultMixin, StringMixin):
+
+    '''auto-balancing transformation pipeline'''
+
+    __slots__ = SLOTS

File thingpipe/base.py

+# -*- coding: utf-8 -*-
+'''base mixins'''
+
+from threading import local
+from operator import truth
+from collections import deque
+from contextlib import contextmanager
+
+from stuf.utils import OrderedDict
+from stuf.core import stuf, frozenstuf, orderedstuf
+
+SLOTS = [
+    '_work', 'outgoing', '_util', 'incoming', '_call', '_alt', '_wrapper',
+    '_args', '_kw', '_clearout', '_context', '_CONFIG', '_INQ', '_WORKQ',
+    '_UTILQ', '_OUTQ', '_iterator', 'current_mode', '_savepoints', '_start',
+    '_true'
+]
+
+
+class PipeMixin(local):
+
+    '''pipeline mixin'''
+
+    def __init__(self, incoming, outgoing, **kw):
+        '''
+        init
+
+        @param incoming: incoming things
+        @param outgoing: outgoing things
+        '''
+        super(PipeMixin, self).__init__()
+        # set if manually balanced or automatically balanced pipeline
+        manual = kw.pop('manual', False)
+        self._DEFAULT_CONTEXT = 'ctx4' if manual else 'autoctx'
+        # incoming things
+        self.incoming = incoming
+        # outgoing things
+        self.outgoing = outgoing
+        # preferred mode
+        self.current_mode = self._RW
+        #######################################################################
+        ## context defaults ###################################################
+        #######################################################################
+        # preferred context
+        self._context = getattr(self, self._DEFAULT_CONTEXT)
+        # default context settings
+        self._CONFIG = {}
+        # 1. default incoming things
+        self._INQ = self._INVAR
+        # 2. default work things
+        self._WORKQ = self._WORKVAR
+        # 3. default utility things
+        self._UTILQ = self._UTILVAR
+        # 4. default outgoing things
+        self._OUTQ = self._OUTVAR
+        # clear outgoing things before extending/appending to them?
+        self._clearout = True
+        #######################################################################
+        ## savepoint defaults #################################################
+        #######################################################################
+        # number of savepoints to retain (default: 5)
+        maxlen = kw.pop('savepoints', 5)
+        # create stack for savepoint things
+        self._savepoints = deque(maxlen=maxlen) if maxlen is not None else None
+        # savepoint of original incoming things
+        if self._savepoints is not None:
+            self._original()
+        #######################################################################
+        ## callable defaults ##################################################
+        #######################################################################
+        # current callable (default: `None`)
+        self._call = None
+        # current alternate callable (default: `None`)
+        self._alt = None
+        # iterable export wrapper (default: `list`)
+        self._wrapper = list
+        # postition arguments (default: `tuple`)
+        self._args = ()
+        # keyword arguments (default: `dict`)
+        self._kw = {}
+
+    ###########################################################################
+    ## mode things ############################################################
+    ###########################################################################
+
+    # read/write mode
+    _RW = 'read/write'
+    # read-only mode
+    _RO = 'read-only'
+
+    def rw(self):
+        '''switch to read/write mode'''
+        self.current_mode = self._RW
+        # clear utility queue
+        return self._clearu().unswap()
+
+    ###########################################################################
+    ## context things #EE######################################################
+    ###########################################################################
+
+    # 1. incoming things
+    _INCFG = 'inq'
+    _INVAR = 'incoming'
+    # 2. utility things
+    _UTILCFG = 'utilq'
+    _UTILVAR = '_util'
+    # 3. work things
+    _WORKCFG = 'workq'
+    _WORKVAR = '_work'
+    # 4. outgoing things
+    _OUTCFG = 'outq'
+    _OUTVAR = 'outgoing'
+
+    def swap(self, **kw):
+        '''swap context'''
+        # savepoint
+        savepoint = kw.pop('savepoint', True)
+        if savepoint:
+            self._savepoint()
+        # keep context-specific settings between context swaps
+        self._CONFIG = kw if kw.get('hard', False) else {}
+        # set context
+        self._context = kw.get('context', getattr(self, self._DEFAULT_CONTEXT))
+        # clear outgoing things before extending them?
+        self._clearout = kw.get('clearout', True)
+        # 1. incoming things
+        self._INQ = kw.get(self._INCFG, self._INVAR)
+        # 2. work things
+        self._WORKQ = kw.get(self._WORKCFG, self._WORKVAR)
+        # 3. utility things
+        self._UTILQ = kw.get(self._UTILCFG, self._UTILVAR)
+        # 4. outgoing things
+        self._OUTQ = kw.get(self._OUTCFG, self._OUTVAR)
+        return self
+
+    def reswap(self):
+        '''swap for current default context'''
+        return self.swap(savepoint=False, **self._CONFIG)
+
+    def unswap(self):
+        '''swap for preferred context'''
+        return self.swap(savepoint=False)
+
+    @contextmanager
+    def ctx1(self, **kw):
+        '''swap for one-armed context'''
+        q = kw.pop(self._WORKCFG, self._INVAR)
+        self.swap(workq=q, utilq=q, context=self.ctx1, **kw)
+        yield
+        self.reswap()
+
+    ###########################################################################
+    ## savepoint for things ##################################################
+    ###########################################################################
+
+    def _original(self):
+        '''preserve original incoming things'''
+        self._savepoint()
+        # preserve from savepoint stack
+        self._start = self._savepoints.pop()
+        return self
+
+    def undo(self, index=0, everything=False):
+        '''
+        revert to previous savepoint
+
+        @param index: index of savepoint (default: 0)
+        @param everything: undo everything and return things to original state
+        '''
+        if everything:
+            # clear everything plus savepoints
+            self.clear()._clearsp()
+            # restore original incoming things
+            self.incoming = self._start
+            # take new snapshot for incoming things
+            self._original()
+            return self
+        self.clear()
+        # by default use most recent savepoint
+        if not index:
+            self.incoming = self._savepoints.pop()
+        # if specified, use savepoint at specific index
+        else:
+            self.incoming = deque(reversed(self._savepoints))[index]
+        return self._savepoint()
+
+    ###########################################################################
+    ## rotate things ##########################################################
+    ###########################################################################
+
+    def reup(self):
+        '''put incoming things in incoming things as one incoming thing'''
+        with self.ctx2(savepoint=False):
+            return self._append(list(self._iterable))
+
+    def sync(self):
+        '''shift outgoing things to incoming things'''
+        with self.autoctx(inq=self._OUTVAR, outq=self._INVAR, savepoint=False):
+            return self._xtend(self._iterable)
+
+    def syncout(self):
+        '''shift incoming things to outgoing things'''
+        with self.autoctx(savepoint=False):
+            return self._xtend(self._iterable)
+
+    ###########################################################################
+    ## extend incoming things #################################################
+    ###########################################################################
+
+    def extend(self, things):
+        '''
+        extend after incoming things
+
+        @param thing: some things
+        '''
+        with self.ctx1():
+            return self._xtend(things)
+
+    def extendleft(self, things):
+        '''
+        extend before incoming things
+
+        @param thing: some things
+        '''
+        with self.ctx1():
+            return self._xtendleft(things)
+
+    def extendout(self, things):
+        '''
+        extend right side of outgoing things
+
+        @param thing: some things
+        '''
+        with self.ctx1(workq=self._OUTVAR):
+            return self._xtend(things)
+
+    ###########################################################################
+    ## append incoming things #################################################
+    ###########################################################################
+
+    def append(self, thing):
+        '''
+        append after incoming things
+
+        @param thing: some thing
+        '''
+        with self.ctx1():
+            return self._append(thing)
+
+    def prepend(self, thing):
+        '''
+        append before incoming things
+
+        @param thing: some thing
+        '''
+        with self.ctx1():
+            return self._appendleft(thing)
+
+    ###########################################################################
+    ## call things ############################################################
+    ###########################################################################
+
+    @property
+    def _truth(self):
+        return self._call if self._call is not None else truth
+
+    @property
+    def _identity(self):
+        return self._call if self._call is not None else lambda x: x
+
+    def args(self, *args, **kw):
+        '''set arguments for current or alternative callable'''
+        # set position arguments
+        self._args = args
+        # set keyword arguemnts
+        self._kw = kw
+        return self
+
+    def tap(self, call, alt=None, factory=False):
+        '''
+        set current callable
+
+        @param call: a callable
+        @param alt: an alternative callable (default: None)
+        @param factor: call is a factory? (default: False)
+        '''
+        # reset postition arguments
+        self._args = ()
+        # reset keyword arguments
+        self._kw = {}
+        # set factory for building current callable
+        if factory:
+            def factory(*args, **kw):
+                return call(*args, **kw)
+            self._call = factory
+        else:
+            # set current callable
+            self._call = call
+        # set alternative callable
+        self._alt = alt
+        return self
+
+    def untap(self):
+        '''clear current callable'''
+        # reset postition arguments
+        self._args = ()
+        # reset keyword arguments
+        self._kw = {}
+        # reset current callable (default is identity)
+        self._call = None
+        # reset alternative callable
+        self._alt = None
+        return self
+
+    ###########################################################################
+    ## know things ############################################################
+    ###########################################################################
+
+    @staticmethod
+    def _repr(*args):
+        '''queue representation'''
+        return (
+            '<{0}.{1}<<{2}>>([IN: {3}({4}) => WORK: {5}({6}) => UTIL: {7}({8})'
+            ' => OUT: {9}: ({10})]) at {11}>'
+        ).format(*args)
+
+    @property
+    def balanced(self):
+        '''queues are balanced?'''
+        return self.countout() == self.__len__()
+
+    ###########################################################################
+    ## clear things ###########################################################
+    ###########################################################################
+
+    def _clearsp(self):
+        '''clear out savepoints'''
+        self._savepoints.clear()
+        return self
+
+    def clear(self):
+        '''clear out everything'''
+        return self.untap().unwrap().clearout().clearin()._clearw()._clearu()
+
+
+class ResultsMixin(local):
+
+    '''result of things mixin'''
+
+    ###########################################################################
+    ## export outgoing things #################################################
+    ###########################################################################
+
+    def peek(self):
+        '''results from read-only context'''
+        self.ro()
+        out = self._wrapper(self._util)
+        results = out[0] if len(out) == 1 else out
+        self.rw()
+        return results
+
+    ###########################################################################
+    ## wrap outgoing things ###################################################
+    ###########################################################################
+
+    def wrap(self, wrapper):
+        '''
+        wrapper for outgoing things
+
+        @param wrapper: an iterator class
+        '''
+        self._wrapper = wrapper
+        return self
+
+    def tupleout(self):
+        '''set wrapper to `tuple`'''
+        return self.wrap(tuple)
+
+    def setout(self):
+        '''set wrapper to `set`'''
+        return self.wrap(set)
+
+    def dequeout(self):
+        '''set wrapper to `deque`'''
+        return self.wrap(deque)
+
+    def dictout(self):
+        '''set wrapper to `dict`'''
+        return self.wrap(dict)
+
+    def fsetout(self):
+        '''set wrapper to `frozenset`'''
+        return self.wrap(frozenset)
+
+    def fstufout(self):
+        '''set wrapper to `frozenstuf`'''
+        return self.wrap(frozenstuf)
+
+    def odictout(self):
+        '''set wrapper to `OrderedDict`'''
+        return self.wrap(OrderedDict)
+
+    def ostufout(self):
+        '''set wrapper to `orderedstuf`'''
+        return self.wrap(orderedstuf)
+
+    def stufout(self):
+        '''set wrapper to `stuf`'''
+        return self.wrap(stuf)
+
+    def listout(self):
+        '''clear current wrapper'''
+        return self.wrap(list)
+
+    unwrap = listout

File thingpipe/compat.py

+# -*- coding: utf-8 -*-
+'''thingpipe support'''
+
+from itertools import chain
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest  # @UnusedImport
+
+from stuf import six
+# pylint: disable-msg=f0401,w0611
+from stuf.six.moves import (
+    map, filterfalse, filter, zip, zip_longest, xrange)  # @UnresolvedImport @UnusedImport @IgnorePep8
+# pylint: enable-msg=f0401
+
+items = six.items
+ichain = chain.from_iterable
+range = xrange
+imap = map
+ifilter = filter
+
+
+class port(object):
+
+    '''python 2/3 helper'''
+
+    # is python 3?
+    PY3 = six.PY3
+    # types
+    BINARY = six.binaries
+    CLASS = six.classes
+    INTEGER = six.integers
+    MAXSIZE = six.MAXSIZE
+    STRING = six.strings
+    UNICODE = six.texts
+    # classes
+    BytesIO = six.BytesIO
+    StringIO = six.StringIO
+    # character data
+    b = staticmethod(six.b)
+    int2byte = staticmethod(six.int2byte)
+    u = staticmethod(six.u)
+    # dictionary
+    items = staticmethod(six.items)
+    keys = staticmethod(six.keys)
+    values = staticmethod(six.values)
+    # iterables
+    iterator = staticmethod(six.advance_iterator)
+    # classes
+    metaclass = staticmethod(six.with_metaclass)
+    # methods
+    code = staticmethod(six.function_code)
+    defaults = staticmethod(six.function_defaults)
+    method_function = staticmethod(six.method_function)
+    method_self = staticmethod(six.method_self)
+    unbound = staticmethod(six.get_unbound_function)
+    # exception
+    reraise = staticmethod(six.reraise)
+
+    @classmethod
+    def isbinary(cls, value):
+        '''is binary?'''
+        return isinstance(value, cls.BINARY)
+
+    @classmethod
+    def isclass(cls, value):
+        '''is class?'''
+        return isinstance(value, cls.CLASS)
+
+    @classmethod
+    def iscall(cls, value):
+        '''is callable?'''
+        return six.callable(value)
+
+    @classmethod
+    def isgtemax(cls, value):
+        '''greater than max size?'''
+        return value > cls.MAXSIZE
+
+    @classmethod
+    def isinteger(cls, value):
+        '''is integer?'''
+        return isinstance(value, cls.INTEGER)
+
+    @classmethod
+    def isltemax(cls, value):
+        '''less than max size?'''
+        return value < cls.MAXSIZE
+
+    @classmethod
+    def isstring(cls, value):
+        '''is string'''
+        return isinstance(value, cls.STRING)
+
+    @classmethod
+    def isunicode(cls, value):
+        '''is text?'''
+        return isinstance(value, cls.UNICODE)
+
+    @staticmethod
+    def printf(*args, **kw):
+        '''print output'''
+        return six.printf(*args, **kw)
+
+
+isbinary = port.isbinary
+isstring = port.isstring
+isunicode = port.isunicode
+texts = six.texts
+
+
+def tounicode(thing, encoding='utf-8', errors='strict'):
+    return (
+        thing.decode(encoding, errors) if isbinary(thing) else
+        texts(texts(thing).encode(encoding, errors), encoding, errors)
+    )
+
+
+def tobytes(thing, encoding='utf-8', errors='strict'):
+    return (
+        texts(thing).encode(encoding, errors) if not isbinary(thing) else thing
+    )
+
+
+import sys
+if not sys.version_info[0] == 2 and sys.version_info[1] < 7:
+    from collections import Counter  # @UnresolvedImport
+else:
+    import heapq
+    from operator import itemgetter
+
+    class Counter(dict):
+
+        '''dict subclass for counting hashable items'''
+
+        def __init__(self, iterable=None, **kw):
+            '''
+            If given, count elements from an input iterable. Or, initialize
+            count from another mapping of elements to their counts.
+            '''
+            super(Counter, self).__init__()
+            self.update(iterable, **kw)
+
+        def most_common(self, n=None):
+            '''
+            list the n most common elements and their counts from the most
+            common to the least
+
+            If n is None, then list all element counts.
+            '''
+            # Emulate Bag.sortedByCount from Smalltalk
+            if n is None:
+                return sorted(items(self), key=itemgetter(1), reverse=True)
+            return heapq.nlargest(n, self.iteritems(), key=itemgetter(1))
+
+        # Override dict methods where necessary
+
+        def update(self, iterable=None, **kw):
+            '''like dict.update() but add counts instead of replacing them'''
+            if iterable is not None:
+                self_get = self.get
+                for elem in iterable:
+                    self[elem] = self_get(elem, 0) + 1

File thingpipe/filtering.py

+# -*- coding: utf-8 -*-
+'''thingpipe filtering mixins'''
+
+import re
+from inspect import getmro
+from threading import local
+from functools import reduce
+from collections import deque
+from itertools import tee, islice
+from operator import attrgetter, itemgetter, truth
+
+from thingpipe.compat import ifilter, ichain, imap, filterfalse
+
+
+class ExtractMixin(local):
+
+    '''collecting mixin'''
+
+    def attributes(self, *names):
+        '''extract object attributes from incoming things by their `*names`'''
+        def pick(names, iterable):
+            attrfind = attrgetter(*names)
+            for thing in iterable:
+                try:
+                    yield attrfind(thing)
+                except AttributeError:
+                    pass
+        with self._context():
+            return self._xtend(pick(names, self._iterable))
+
+    def items(self, *keys):
+        '''extract object items from incoming things by item `*keys`'''
+        def pluck(keys, iterable, _itemgetter=itemgetter):
+            itemfind = _itemgetter(*keys)
+            IndexErr_, KeyErr_, TypeErr_ = IndexError, KeyError, TypeError
+            for thing in iterable:
+                try:
+                    yield itemfind(thing)
+                except (IndexErr_, KeyErr_, TypeErr_):
+                    pass
+        with self._context():
+            return self._xtend(pluck(keys, self._iterable))
+
+    def members(self):
+        '''extract object members from incoming things'''
+        call_, alt_, wrap_ = self._call, self._alt, self._wrapper
+        def members(truth, iterable): #@IgnorePep8
+            f, s, t, i = truth, alt_, wrap_, iterable
+            d, w, g, e = dir, extract, getattr, AttributeError
+            test = lambda x: x.startswith('__') or x.startswith('mro')
+            for k in filterfalse(test, d(i)):
+                try:
+                    v = g(i, k)
+                except e:
+                    pass
+                else:
+                    yield k, t(w(f, v)) if s(v) else k, v
+        def extract(truth, iterable, ifilter_=ifilter, members_=members):
+            for member in ifilter_(truth, members_(truth, iterable)):
+                yield member
+        with self._context():
+            return self._xtend(ichain(imap(
+                lambda x: extract(call_, x), self._iterable,
+            )))
+
+    def mro(self):
+        '''extract ancestors of things by method resolution order'''
+        with self._context():
+            return self._xtend(ichain(getmro(i) for i in self._iterable))
+
+    def extract(self, pattern, flags=0, *things):
+        '''
+        extract patterns from incoming strings
+        
+        @param pattern: search pattern 
+        '''
+        search = re.compile(pattern, flags).search
+        def find(x):
+            results = search(x)
+            if not results:
+                return (), {}
+            # extract any named results
+            named = results.groupdict()
+            # extract any positional arguments
+            positions = tuple(i for i in results.groups() if i not in named)
+            return positions, named
+        with self._context():
+            return self._xtend(ifilter(
+                lambda x, y: truth(x and y), imap(find, self._iterable),
+            ))
+
+
+class FilterMixin(local):
+
+    '''filter mixin'''
+
+    def filter(self, pattern=None, flags=0, *things):
+        '''
+        incoming things for which current callable returns `True`
+        
+        @param pattern: search pattern expression (default: None)
+        '''
+        if pattern is not None:
+            call = re.compile(pattern, flags).search
+        elif things:
+            call = lambda y: y in things
+        else:
+            call = self._call if self._call is not None else truth
+        with self._context():
+            return self._xtend(ifilter(call, self._iterable))
+
+    def find(self):
+        '''first incoming thing for which current callable returns `True`'''
+        with self._context():
+            return self._append(
+                next(ifilter(self._call, self._iterable))
+            )
+        
+    def partition(self):
+        '''
+        split incoming things into `True` and `False` things based on results
+        of call
+        '''
+        list_, call_ = list, self._call
+        with self._context():
+            falsy, truey = tee(self._iterable)
+            return self._xtend(iter([
+                list_(filterfalse(call_, falsy)), list_(ifilter(call_, truey)),
+            ]))
+
+    def replace(self, pattern, new, count=0, flags=0):
+        '''
+        replace incoming strings matching pattern with replacement string
+        
+        @param pattern: search pattern 
+        @param new: replacement string
+        '''
+        sub = re.compile(pattern, flags).sub
+        with self._context():
+            return self._xtend(imap(
+                lambda x: sub(new, x, count), self._iterable,
+            ))
+
+    def filterfalse(self, pattern=None, flags=0, *things):
+        '''strip things from incoming things'''
+        if pattern is not None:
+            call = re.compile(pattern, flags).search
+        elif things:
+            call = lambda y: y in things
+        else:
+            call = self._call if self._call is not None else truth
+        with self._context():
+            return self._xtend(filterfalse(call, self._iterable))
+
+
+class SetMixin(local):
+
+    '''set and uniqueness mixin'''
+
+    def difference(self, symmetric=False):
+        '''
+        difference between incoming things
+        
+        @param symmetric: use symmetric difference
+        '''
+        with self._context():
+            test = (
+                lambda x, y: set(x).difference(y) if symmetric else
+                lambda x, y: set(x).symmetric_difference(y)
+            )
+            return self._xtend(reduce(test, self._iterable))
+
+    def disjointed(self):
+        '''disjoint between incoming things'''
+        with self._context():
+            return self._append(reduce(
+                lambda x, y: set(x).isdisjoint(y), self._iterable,
+            ))
+
+    def intersection(self):
+        '''intersection between incoming things'''
+        with self._context():
+            return self._xtend(reduce(
+                lambda x, y: set(x).intersection(y), self._iterable,
+            ))
+
+    def subset(self):
+        '''incoming things that are subsets of incoming things'''
+        with self._context():
+            return self._append(reduce(
+                lambda x, y: set(x).issubset(y), self._iterable,
+            ))
+
+    def superset(self):
+        '''incoming things that are supersets of incoming things'''
+        with self._context():
+            return self._append(reduce(
+                lambda x, y: set(x).issubset(y), self._iterable
+            ))
+
+    def union(self):
+        '''union between incoming things'''
+        with self._context():
+            return self._xtend(
+                reduce(lambda x, y: set(x).union(y), self._iterable)
+            )
+
+    def unique(self):
+        '''
+        list unique incoming things, preserving order and remember all incoming
+        things ever seen
+        '''
+        def unique(iterable, key=None):
+            seen = set()
+            seen_add_, key_ = seen.add, key
+            for element in iterable:
+                k = key_(element)
+                if k not in seen:
+                    seen_add_(k)
+                    yield element
+        with self._context():
+            return self._iter(unique(self._iterable, self._call))
+
+
+class SliceMixin(local):
+
+    '''slicing mixin'''
+
+    def first(self, n=0):
+        '''
+        first `n` things of incoming things or just the first thing
+
+        @param n: number of things (default: 0)
+        '''
+        with self._context():
+            if n:
+                return self._xtend(islice(self._iterable, n))
+            return self._append(next(self._iterable))
+
+    def last(self, n=0):
+        '''
+        last `n` things of incoming things or just the last thing
+
+        @param n: number of things (default: 0)
+        '''
+        with self._context():
+            i1, i2 = tee(self._iterable)
+            if n:
+                return self._xtend(islice(i1, len(list(i2)) - n, None))
+            return self._append(deque(i1, maxlen=1).pop())
+
+    def nth(self, n, default=None):
+        '''
+        `nth` incoming thing in incoming things or default thing
+
+        @param n: number of things
+        @param default: default thing (default: None)
+        '''
+        with self._context():
+            return self._append(
+                next(islice(self._iterable, n, None), default)
+            )
+
+    def head(self):
+        '''all incoming things except the last thing'''
+        with self._context():
+            i1, i2 = tee(self._iterable)
+            return self._xtend(islice(i1, len(list(i2)) - 1))
+
+    def rest(self):
+        '''all incoming things except the first thing'''
+        with self._context():
+            return self._xtend(islice(self._iterable, 1, None))

File thingpipe/lazy.py

+# -*- coding: utf-8 -*-
+'''lazily evaluated pipelines'''
+
+from itertools import tee, chain
+from contextlib import contextmanager
+
+from stuf.utils import clsname
+
+from thingpipe.transform import StringMixin
+from thingpipe.mapping import RepeatMixin, MapMixin
+from thingpipe.order import RandomMixin, OrderMixin
+from thingpipe.base import SLOTS, ResultsMixin, PipeMixin
+from thingpipe.reducing import MathMixin, TruthMixin, ReduceMixin
+from thingpipe.filtering import FilterMixin, ExtractMixin, SetMixin, SliceMixin
+
+
+class LazyMixin(PipeMixin):
+
+    '''base lazy pipeline'''
+
+    def __init__(self, *things, **kw):
+        incoming = iter([things[0]]) if len(things) == 1 else iter(things)
+        super(LazyMixin, self).__init__(incoming, iter([]))
+        # work things
+        self._work = iter([])
+        # utility things
+        self._util = iter([])
+
+    def __repr__(self):
+        list_, tee_ = list, tee
+        setr_ = lambda x, y: setattr(self, x, y)
+        getr_ = lambda x: getattr(self, x)
+        in1, in2 = tee_(getr_(self._INQ))
+        setr_(self._INQ, in1)
+        out1, out2 = tee_(getr_(self._OUTQ))
+        setr_(self._OUTQ, out1)
+        work1, work2 = tee_(getr_(self._WORKQ))
+        setr_(self._WORKQ, work1)
+        util1, util2 = tee_(getr_(self._UTILQ))
+        setr_(self._UTILQ, util1)
+        return self._repr(
+            self.__module__,
+            clsname(self),
+            self.current_mode.upper(),
+            self._INQ,
+            list_(in2),
+            self._WORKQ,
+            list_(work2),
+            self._UTILQ,
+            list_(util2),
+            self._OUTQ,
+            list_(out2),
+            id(self),
+        )
+
+    ###########################################################################
+    ## mode things ############################################################
+    ###########################################################################
+
+    def ro(self):
+        '''switch to read-only mode'''
+        with self.ctx3(outq=self._UTILVAR, savepoint=False):
+            self._xreplace(self._iterable)
+        with self.ctx1(hard=True, workq=self._UTILVAR, savepoint=False):
+            self.current_mode = self._RO
+            return self
+
+    ###########################################################################
+    ## context things #########################################################
+    ###########################################################################
+
+    @contextmanager
+    def ctx2(self, **kw):
+        '''swap for two-armed context'''
+        self.swap(
+            context=self.ctx2, outq=kw.get(self._OUTCFG, self._INVAR), **kw
+        )._clearworking()
+        setr_ = lambda x, y: setattr(self, x, y)
+        getr_ = lambda x: getattr(self, x)
+        OUTQ = self._OUTQ
+        # extend work things with outgoing things
+        work, wrap = tee(getr_(OUTQ))
+        setr_(self._WORKQ, work)
+        setr_(OUTQ, wrap)
+        yield
+        # extend outgoing things with utility things
+        util = getr_(self._UTILQ)
+        setr_(
+            self._OUTQ,
+            util if self._clearout else chain(util, getr_(self._OUTQ)),
+        )
+        self._clearworking()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def ctx3(self, **kw):
+        '''swap for three-armed context'''
+        self.swap(
+            utilq=kw.get(self._WORKCFG, self._WORKVAR), context=self.ctx3, **kw
+        )._clearworking()
+        setr_ = lambda x, y: setattr(self, x, y)
+        getr_ = lambda x: getattr(self, x)
+        INQ = self._INQ
+        # extend work things with incoming things
+        work, inq = tee(getr_(INQ))
+        setr_(self._WORKQ, work)
+        setr_(INQ, inq)
+        yield
+        # extend outgoing things with utility things
+        util = getr_(self._UTILQ)
+        setr_(
+            self._OUTQ,
+            util if self._clearout else chain(util, getr_(self._OUTQ)),
+        )
+        self._clearworking()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def ctx4(self, **kw):
+        '''swap for four-armed context'''
+        self.swap(context=self.ctx4, **kw)._clearworking()
+        setr_ = lambda x, y: setattr(self, x, y)
+        getr_ = lambda x: getattr(self, x)
+        INQ = self._INQ
+        # extend work things with incoming things
+        work, inq = tee(getr_(INQ))
+        setr_(self._WORKQ, work)
+        setr_(INQ, inq)
+        yield
+        # extend outgoing things with utility things
+        util = getr_(self._UTILQ)
+        setr_(
+            self._OUTQ,
+            util if self._clearout else chain(util, getr_(self._OUTQ)),
+        )
+        self._clearworking()
+        # return to global context
+        self.reswap()
+
+    @contextmanager
+    def autoctx(self, **kw):
+        '''swap for auto-synchronizing context'''
+        self.swap(context=self.autoctx, **kw)._clearworking()
+        setr_ = lambda x, y: setattr(self, x, y)
+        getr_ = lambda x: getattr(self, x)
+        INQ = self._INQ
+        # extend work things with incoming things
+        work, inq = tee(getr_(INQ))
+        setr_(self._WORKQ, work)
+        setr_(INQ, inq)
+        yield
+        # extend incoming things and outgoing things with utility things
+        inq, wrap = tee(getr_(self._UTILQ))
+        setr_(
+            self._OUTQ,
+            wrap if self._clearout else chain(wrap, getr_(self._OUTQ)),
+        )
+        setr_(INQ, inq)
+        self._clearworking()
+        # return to global context
+        self.reswap()
+
+    ###########################################################################
+    ## savepoint for things ###################################################
+    ###########################################################################
+
+    def _savepoint(self):
+        '''make savepoint of incoming things'''
+        savepoint, self.incoming = tee(getattr(self, self._INQ))
+        self._savepoints.append(savepoint)
+        return self
+
+    ###########################################################################
+    ## iterate things #########################################################
+    ###########################################################################
+
+    @property
+    def _iterable(self):
+        '''iterable'''
+        return getattr(self, self._WORKQ)
+
+    ###########################################################################
+    ## extend things ##########################################################
+    ###########################################################################
+
+    def _xtend(self, thing):
+        '''build chain'''
+        UTILQ = self._UTILQ
+        setattr(self, UTILQ, chain(thing, getattr(self, UTILQ)))
+        return self
+
+    __buildchain = _xtend
+
+    def _xtendleft(self, things):
+        '''extend left side of work things with `things`'''
+        return self.__buildchain(reversed(things))
+
+    def _xreplace(self, thing):
+        '''build chain'''
+        setattr(self, self._UTILQ, thing)
+        return self
+
+    def _iter(self, things):
+        '''extend work things with `things` wrapped in iterator'''
+        return self.__buildchain(iter(things))
+
+    ###########################################################################
+    ## append things ##########################################################
+    ###########################################################################
+
+    def _append(self, things):
+        '''append `things` to work things'''
+        UTILQ = self._UTILQ
+        setattr(self, UTILQ, chain(getattr(self, UTILQ), iter([things])))
+        return self
+
+    def _appendleft(self, things):
+        '''append `things` to left side of work things'''
+        return self.__buildchain(iter([things]))
+
+    ###########################################################################
+    ## know things ############################################################
+    ###########################################################################
+
+    def __len__(self):
+        '''number of incoming things'''
+        self.incoming, incoming = tee(self.incoming)
+        return len(list(incoming))
+    
+    count = __len__
+
+    def countout(self):
+        '''number of outgoing things'''
+        self.outgoing, outgoing = tee(self.outgoing)
+        return len(list(outgoing))
+
+    ###########################################################################
+    ## clear things ###########################################################
+    ###########################################################################
+
+    def _clearworking(self):
+        '''clear work things and utility things'''
+        iter_ = iter
+        setr_ = lambda x, y: setattr(self, x, y)
+        delr_ = lambda x: delattr(self, x)
+        WORKQ, UTILQ = self._WORKQ, self._UTILQ
+        # clear work things
+        delr_(WORKQ)
+        setr_(WORKQ, iter_([]))
+        # clear utility things
+        delr_(UTILQ)
+        setr_(UTILQ, iter_([]))
+        return self
+
+    def _clearu(self):
+        '''clear utility things'''
+        UTILQ = self._UTILQ
+        delattr(self, UTILQ)
+        setattr(self, UTILQ, iter([]))
+        return self
+
+    def _clearw(self):
+        '''clear work things'''
+        WORKQ = self._WORKQ
+        delattr(self, WORKQ)
+        setattr(self, WORKQ, iter([]))
+        return self
+
+    def clearin(self):
+        '''clear incoming things'''
+        INQ = self._INQ
+        delattr(self, INQ)
+        setattr(self, INQ, iter([]))
+        return self
+
+    def clearout(self):
+        '''clear outgoing things'''
+        OUTQ = self._OUTQ
+        delattr(self, OUTQ)
+        setattr(self, OUTQ, iter([]))
+        return self
+
+
+class EndMixin(ResultsMixin):
+
+    '''result things mixin'''
+    
+    def __iter__(self):
+        '''yield outgoing things, clearing outgoing things as it iterates'''
+        return getattr(self, self._OUTQ)
+    
+    results = __iter__
+
+    def end(self):
+        '''return outgoing things then clear out everything'''
+        # swap for default context
+        self.unswap()
+        out, tell = tee(self.outgoing)
+        wrap = self._wrapper
+        wrap = next(out) if len(wrap(tell)) == 1 else wrap(out)
+        # clear every last thing
+        self.clear()
+        return wrap
+
+    def snapshot(self):
+        '''snapshot of outgoing things'''
+        out, tell, self.outgoing = tee(getattr(self, self._OUTQ), 3)
+        wrap = self._wrapper
+        return out.pop() if len(wrap(tell)) == 1 else wrap(out)
+
+    def out(self):
+        '''return outgoing things and clear outgoing things'''
+        # swap for default context
+        self.unswap()
+        out, tell = tee(self.outgoing)
+        wrap = self._wrapper
+        wrap = next(out) if len(wrap(tell)) == 1 else wrap(out)
+        # clear outgoing things
+        self.clearout()
+        return wrap
+
+
+class ResultMixin(EndMixin, LazyMixin):
+
+    '''pipeline with results extraction mixin'''
+
+
+class lazypipe(
+    ResultMixin, FilterMixin, MapMixin, ReduceMixin, OrderMixin,
+    ExtractMixin, SetMixin, SliceMixin, TruthMixin, MathMixin, RepeatMixin,
+    RandomMixin, StringMixin,
+):
+
+    '''pipeline with results'''
+
+    __slots__ = SLOTS
+
+
+class collectpipe(ResultMixin, ExtractMixin):
+
+    '''collecting pipeline'''
+
+    __slots__ = SLOTS
+
+
+class setpipe(ResultMixin, SetMixin):
+
+    '''set pipeline'''
+
+    __slots__ = SLOTS
+
+
+class slicepipe(ResultMixin, SliceMixin):
+
+    '''slice pipeline'''
+
+    __slots__ = SLOTS
+
+
+class filterpipe(ResultMixin, FilterMixin, ExtractMixin, SetMixin, SliceMixin):
+
+    '''filter pipeline'''
+
+    __slots__ = SLOTS
+
+
+class repeatpipe(ResultMixin, RepeatMixin):
+
+    '''repeat pipeline'''
+
+    __slots__ = SLOTS
+
+
+class mappipe(ResultMixin, RepeatMixin, MapMixin):
+
+    '''map pipeline'''
+
+    __slots__ = SLOTS
+
+
+class randompipe(ResultMixin, RandomMixin):
+
+    '''random pipeline'''
+
+    __slots__ = SLOTS
+
+
+class sortpipe(ResultMixin, OrderMixin, RandomMixin):
+
+    '''order pipeline'''
+
+    __slots__ = SLOTS
+
+
+class mathpipe(ResultMixin, MathMixin):
+
+    '''math pipeline'''
+
+    __slots__ = SLOTS
+
+
+class truthpipe(ResultMixin, TruthMixin):
+
+    '''truth pipeline'''
+
+    __slots__ = SLOTS
+
+
+class reducepipe(ResultMixin, MathMixin, TruthMixin, ReduceMixin):
+
+    '''reduce pipeline'''
+
+    __slots__ = SLOTS
+
+
+class stringpipe(ResultMixin, StringMixin):
+
+    '''string transformation pipeline'''
+
+    __slots__ = SLOTS
+
+
+class transformpipe(ResultMixin, StringMixin):
+
+    '''transformation pipeline'''

File thingpipe/mapping.py

+# -*- coding: utf-8 -*-
+'''thingpipe mapping mixins'''
+
+from time import sleep
+from copy import deepcopy
+from threading import local
+from operator import methodcaller
+from itertools import starmap, repeat, product, combinations, permutations
+
+from stuf.six import keys, values, items
+from thingpipe.compat import imap, ichain, xrange
+
+
+class RepeatMixin(local):
+
+    '''repetition mixin'''
+
+    def combinations(self, n):
+        '''
+        repeat every combination for `n` of incoming things
+
+        @param n: number of repetitions
+        '''
+        with self._context():
+            return self._xtend(combinations(self._iterable, n))
+
+    def copy(self):
+        '''copy each incoming thing'''
+        with self._context():
+            return self._xtend(imap(deepcopy, self._iterable))
+
+    def product(self, n=1):
+        '''
+        nested for each loops repeated `n` times
+
+        @param n: number of repetitions (default: 1)
+        '''
+        with self._context():
+            return self._xtend(product(*self._iterable, repeat=n))
+
+    def permutations(self, n):
+        '''
+        repeat every permutation for every `n` of incoming things
+
+        @param n: length of thing to permutate
+        '''
+        with self._context():
+            return self._xtend(permutations(self._iterable, n))
+
+    def range(self, start, stop=0, step=1):
+        '''
+        put sequence of numbers in incoming things
+
+        @param start: number to start with
+        @param stop: number to stop with (default: 0)
+        @param step: number of steps to advance per iteration (default: 1)
+        '''
+        with self._context():
+            return self._xtend(
+                xrange(start, stop, step) if stop else xrange(start)
+            )
+
+    def repeat(self, n):
+        '''
+        repeat incoming things `n` times
+
+        @param n: number of times to repeat
+        '''
+        with self._context():
+            return self._xtend(repeat(tuple(self._iterable), n))
+
+    def times(self, n=None):
+        '''
+        repeat call with incoming things `n` times
+
+        @param n: repeat call n times on incoming things (default: None)
+        '''
+        with self._context():
+            if n is None:
+                return self._xtend(starmap(
+                    self._call, repeat(list(self._iterable)),
+                ))
+            return self._xtend(starmap(
+                self._call, repeat(list(self._iterable), n),
+            ))
+
+
+class MapMixin(local):
+
+    '''mapping mixin'''
+
+    def each(self, wait=0):
+        '''
+        invoke call with passed arguments, keywords in incoming things
+
+        @param wait: time in seconds (default: 0)
+        '''
+        call = self._call
+        if wait:
+            def delay_each(x, y, wait=0, caller=None):
+                sleep(wait)
+                return caller(*x, **y)
+            de = delay_each
+            call_ = lambda x, y: de(x, y, wait, call)
+        else:
+
+            call_ = lambda x, y: call(*x, **y)
+        with self._context():
+            return self._xtend(starmap(call_, self._iterable))
+
+    def invoke(self, name, wait=0):
+        '''
+        invoke method `name` on each incoming thing with passed arguments,
+        keywords but return incoming thing instead if method returns `None`
+
+        @param name: name of method
+        @param wait: time in seconds (default: 0)
+        '''
+        caller = methodcaller(name, *self._args, **self._kw)
+        def invoke(thing):
+            results = caller(thing)
+            return thing if results is None else results
+        if wait:
+            def invoke(x, wait=0):
+                sleep(wait)
+                results = caller(x)
+                return x if results is None else results
+        with self._context():
+            return self._xtend(imap(invoke, self._iterable))
+
+    def items(self):
+        '''invoke call on each mapping to get key, value pairs'''
+        with self._context():
+            return self._xtend(starmap(
+                self._call, ichain(imap(items, self._iterable))
+            ))
+
+    def keys(self):
+        '''invoke call on each mapping to get keys'''
+        with self._context():
+            return self._xtend(starmap(
+                self._call, ichain(imap(keys, self._iterable))
+            ))
+
+    def map(self, wait=0):
+        '''
+        invoke call on each incoming thing
+        
+        @param wait: time in seconds (default: 0)
+        '''