Commits

Dmitry Vakhrushev committed c5216ce Draft

Intial code base

Comments (0)

Files changed (8)

+syntax: glob
+
+build
+dist
+*.egg-info
+*.pyc
+*.sublime-*
+Copyright (c) 2012, Dmitry Vakhrushev <self@kr41.net>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+1.  Redistributions of source code must retain the above copyright notice, this
+    list of conditions and the following disclaimer.
+2.  Redistributions in binary form must reproduce the above copyright notice,
+    this list of conditions and the following disclaimer in the documentation
+    and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+include *.txt *.rst *.cfg
+A library provides utilities for creating data-processing pipelines based
+on coroutines and generators.  An idea is described in David Beazley's
+presentations:
+
+*   `Generator Tricks for Systems Programmers
+    <http://www.dabeaz.com/generators/>`_
+*   `A Curious Course on Coroutines and Concurrency
+    <http://www.dabeaz.com/coroutines/>`_
+
+For example, you need to process log file::
+
+    >>> from io import StringIO
+    >>> source = StringIO('''
+    ...     01-01-2000 12:00 [INFO] Info message 1
+    ...     01-01-2000 12:10 [WARN] Warning message 1
+    ...     01-01-2000 12:20 [WARN] Warning message 2
+    ...     01-01-2000 12:30 [INFO] Info message 2
+    ...     01-01-2000 12:40 [ERROR] Error message 1
+    ...     01-01-2000 12:50 [INFO] Info message 3
+    ...     01-01-2000 13:00 [ERROR] Error message 2
+    ... ''')
+
+Create some workers::
+
+    >>> from copipes import coroutine
+    >>> @coroutine
+    ... def parse(next):
+    ...     while True:
+    ...         item = yield
+    ...         date, time, type, msg = item.strip().split(' ', 3)
+    ...         next.send({
+    ...             'date': date,
+    ...             'time': time,
+    ...             'type': type,
+    ...             'msg': msg,
+    ...         })
+    ...
+    >>> @coroutine
+    ... def filter(cond, next):
+    ...     while True:
+    ...         item = yield
+    ...         if cond(item):
+    ...             next.send(item)
+    ...
+    >>> @coroutine
+    ... def split(info, warn, error):
+    ...     map = {
+    ...         '[INFO]': info,
+    ...         '[WARN]': warn,
+    ...         '[ERROR]': error,
+    ...     }
+    ...     while True:
+    ...         item = yield
+    ...         map[item['type']].send(item)
+    ...
+    >>> @coroutine
+    ... def echo(next, prefix=''):
+    ...     while True:
+    ...         item = yield
+    ...         print(prefix + item['msg'])
+    ...         next.send(item)
+    ...
+    >>> @coroutine
+    ... def write(f, next):
+    ...     while True:
+    ...         item = yield
+    ...         f.write(' '.join((item['date'], item['time'],
+    ...                           item['type'], item['msg'])))
+    ...         f.write('\\n')
+    ...         next.send(item)
+    ...
+
+Connect workers to pipeline::
+
+    >>> from copipes import pipeline
+    >>> output = StringIO()
+    >>> p = pipeline(
+    ...     filter.params(lambda line: line.strip()),
+    ...     parse,
+    ... )
+    ...
+    >>> with p.fork(split, 3) as (info, warn, error):
+    ...     info.plug()
+    ...     error.connect(
+    ...         filter.params
+    ...         echo.params(prefix='! ')
+    ...     )
+    ...
+    >>> p.connect(write.params(output))
+    >>> print(repr(p))                                   # doctest: +ELLIPSIS
+    filter.params(<function <lambda> at ...>)
+    parse
+    split:
+        -->
+            null
+        -->
+            <empty pipeline>
+        -->
+            echo.params(prefix='! ')
+    write.params(<...>)
+    >>> p.feed(source)
+    ! Error message 1
+    ! Error message 2
+    >>> print(output.getvalue())
+    01-01-2000 12:10 [WARN] Warning message 1
+    01-01-2000 12:20 [WARN] Warning message 2
+    01-01-2000 12:40 [ERROR] Error message 1
+    01-01-2000 13:00 [ERROR] Error message 2
+    <BLANKLINE>

copipes/__init__.py

+from contextlib import contextmanager
+from functools import update_wrapper
+from os import linesep
+from sys import version_info
+
+
+__all__ = ['coroutine', 'pipeline']
+__version__ = '0.1'
+__author__ = 'Dmitry Vakhrushev <self@kr41.net>'
+__license__ = 'BSD'
+
+
+is2 = version_info[0] == 2
+
+
+class coroutine(object):
+
+    def __init__(self, func):
+        self.func = func
+        self.args = ()
+        self.kw = {}
+        update_wrapper(self, func)
+
+    def __call__(self, *args, **kw):
+        pargs = self.args + args
+        kwargs = self.kw.copy()
+        kwargs.update(kw)
+        c = self.func(*pargs, **kwargs)
+        method = getattr(c, 'next' if is2 else '__next__')
+        method()
+        return c
+
+    def __repr__(self):
+        params = [repr(a) for a in self.args]
+        params.extend('{0!s}={1!r}'.format(k, v) for k, v in self.kw.items())
+        params = ', '.join(params)
+        return '{0}'.format(self.__name__) if not params else \
+               '{0}.params({1})'.format(self.__name__, params)
+
+    def params(self, *args, **kw):
+        """ Returns a parametrized copy of coroutine """
+        p = self.__class__(self.func)
+        p.args = args
+        p.kw = kw
+        return p
+
+
+class pipeline(object):
+
+    def __init__(self, *workers):
+        self.pipe = []
+        self.connect(*workers)
+
+    def __call__(self, next=None):
+        next = next or null()
+        for worker in reversed(self.pipe):
+            next = worker(next)
+        return next
+
+    def __repr__(self):
+        return linesep.join(repr(worker) for worker in self.pipe) or \
+               '<empty pipeline>'
+
+    def connect(self, *workers):
+        self.pipe.extend(workers)
+
+    def plug(self):
+        self.pipe.append(null)
+
+    @contextmanager
+    def fork(self, worker, pipes_count):
+        pipes = tuple(pipeline() for i in range(pipes_count))
+        yield pipes
+        self.connect(fork(worker, *pipes))
+
+    def feed(self, source):
+        p = self()
+        for item in source:
+            p.send(item)
+        p.close()
+
+
+class fork(object):
+
+    def __init__(self, worker, *pipes):
+        self.worker = worker
+        self.pipes = pipes
+
+    def __call__(self, next):
+        pipes = (pipe(next) for pipe in self.pipes)
+        return self.worker(*pipes)
+
+    def __repr__(self):
+        result = [repr(self.worker) + ':']
+        for pipe in self.pipes:
+            result.append('    -->')
+            result.extend(' ' * 8 + wr for wr in repr(pipe).split(linesep))
+        return linesep.join(result)
+
+
+@coroutine
+def null(*args):
+    while True:
+        yield
+from textwrap import dedent
+
+from nose import tools
+
+from copipes import coroutine, pipeline
+
+
+@coroutine
+def collect(to, next):
+    """ Connects pipeline to queue """
+    while True:
+        item = yield
+        to.append(item)
+        next.send(item)
+
+
+@coroutine
+def add(value, next):
+    """ Adds specified ``value`` to each item passed to pipeline """
+    while True:
+        item = yield
+        next.send(item + value)
+
+
+@coroutine
+def multiply(value, next):
+    """ Multiplies specified ``value`` by each item passed to pipeline """
+    while True:
+        item = yield
+        next.send(item * value)
+
+
+@coroutine
+def split(even, odd):
+    """ Splits up items to evens and odd """
+    while True:
+        item = yield
+        next = odd if item % 2 else even
+        next.send(item)
+
+
+def coroutine_preserves_name_and_docstring_test():
+    tools.eq_(add.__name__, 'add')
+    tools.eq_(add.__doc__, ' Adds specified ``value`` to each'
+                           ' item passed to pipeline ')
+    tools.eq_(repr(add), 'add')
+
+
+def parametrized_coroutine_test():
+    add_5 = add.params(5)
+    add_3 = add.params(3)
+
+    # Parametrized provides readable representation
+    tools.eq_(repr(add_5), 'add.params(5)')
+    tools.eq_(repr(add_3), 'add.params(3)')
+
+    result_5 = []
+    result_3 = []
+
+    pipeline(add_5, collect.params(result_5)).feed([1, 2, 3])
+    pipeline(add_3, collect.params(result_3)).feed([1, 2, 3])
+    tools.eq_(result_5, [6, 7, 8])
+    tools.eq_(result_3, [4, 5, 6])
+
+
+def straight_forward_pipeline_test():
+    result = []
+    p = pipeline(
+        multiply.params(10),
+        add.params(5),
+    )
+    p.connect(
+        add.params(1),
+        collect.params(result),
+    )
+    p.feed([1, 2, 3, 4])
+    tools.eq_(result, [16, 26, 36, 46])
+
+
+def forked_pipeline_test():
+    evens = []
+    odds = []
+    p = pipeline()
+    with p.fork(split, 2) as (even, odd):
+        even.connect(
+            multiply.params(10),
+            collect.params(evens)
+        )
+        odd.connect(
+            add.params(10),
+            collect.params(odds)
+        )
+    p.feed([1, 2, 3, 4])
+    tools.eq_(evens, [20, 40])
+    tools.eq_(odds, [11, 13])
+
+
+def plugged_pipeline_test():
+    result = []
+    null = []
+    p = pipeline(
+        multiply.params(10),
+        collect.params(result),
+    )
+    p.plug()
+    p.connect(
+        add.params(1),
+        collect.params(null),
+    )
+    p.feed([1, 2, 3, 4])
+    tools.eq_(result, [10, 20, 30, 40])
+    tools.eq_(null, [])
+
+
+def complex_pipeline_test():
+    odds = []
+    evens = []
+    result = []
+    p = pipeline(
+        add.params(1)
+    )
+    with p.fork(split, 2) as (even, odd):
+        even.connect(
+            collect.params(evens),
+            multiply.params(2),
+            add.params(5),
+        )
+        odd.connect(
+            collect.params(odds),
+            multiply.params(5),
+            add.params(2),
+        )
+    p.connect(
+        collect.params(result)
+    )
+    p.feed([1, 2, 3, 4])
+    tools.eq_(evens, [2, 4])
+    tools.eq_(odds, [3, 5])
+    tools.eq_(result, [9, 17, 13, 27])
+
+
+def pipeline_representation_test():
+    p = pipeline(
+        add.params(1)
+    )
+    with p.fork(split, 2) as (even, odd):
+        even.connect(
+            multiply.params(2),
+            add.params(5),
+        )
+        odd.connect(
+            multiply.params(5),
+            add.params(2),
+        )
+    p.connect(
+        add.params(2)
+    )
+    tools.eq_(repr(p).strip(), dedent("""
+    add.params(1)
+    split:
+        -->
+            multiply.params(2)
+            add.params(5)
+        -->
+            multiply.params(5)
+            add.params(2)
+    add.params(2)
+    """).strip())
+[nosetests]
+verbosity=3
+with-doctest=1
+from os import path
+from setuptools import setup
+
+readme = ''.join(open(path.join(path.dirname(__file__), 'README.rst')))
+
+import copipes
+
+setup(
+    name='CoPipes',
+    version=copipes.__version__,
+    description="A coroutine based utils for processing data via pipelines",
+    long_description=readme,
+    classifiers=[
+        'License :: OSI Approved :: BSD License',
+        'Programming Language :: Python',
+        'Programming Language :: Python :: 2.6',
+        'Programming Language :: Python :: 2.7',
+        'Programming Language :: Python :: 3',
+        'Topic :: Software Development :: Libraries :: Python Modules',
+    ],
+    keywords='',
+    author='Dmitry Vakhrushev',
+    author_email='self@kr41.net',
+    url='https://bitbucket.org/kr41/copipes',
+    download_url='https://bitbucket.org/kr41/copipes/downloads',
+    license='BSD',
+    packages=['copipes'],
+    include_package_data=True,
+    zip_safe=True,
+)