Commits

Dmitry Vakhrushev committed 6f5b5c1 Draft

Updated doc-strings

Comments (0)

Files changed (1)

         ...     while True:
         ...         item = yield
         ...         next.send(item + 1)
+        >>> inc = increment()
+        >>> inc.send(1)         # No exception is raised
 
     Also it's converted to boolean as ``False``:
 
         return self
 
     def __nonzero__(self):
-        """ Python 2.x boolean representation """
+        # Python 2.x boolean representation
         return False
 
     def __bool__(self):
-        """ Python 3.x boolean representation """
+        # Python 3.x boolean representation
         return False
 
     def __repr__(self):
-        """ String representation """
         return 'null'
 
     def send(self, *args, **kw):
         return c
 
     def __repr__(self):
-        """ Returns string representation """
         params = [repr(a) for a in self.args]
         params.extend('{0!s}={1!r}'.format(k, v) for k, v in self.kw.items())
         params = ', '.join(params)
 
 
 class pipeline(object):
-    """ Coroutine pipeline. """
+    """
+    Coroutine pipeline is utility class to connect number of coroutines into
+    single pipeline.  The main goal is readability.  The following examples
+    of code are equal:
+
+    ..  code-block:: pycon
+
+        >>> @coroutine
+        ... def collect(target, next=null):
+        ...     while True:
+        ...         item = yield
+        ...         target.append(item)
+        ...         next.send(item)
+
+        >>> @coroutine
+        ... def increment(next=null):
+        ...     while True:
+        ...         item = yield
+        ...         next.send(item + 1)
+
+        >>> # Without pipelines
+        >>> result = []
+        >>> p = increment(collect(result))
+        >>> for i in [1, 2, 3]:
+        ...     p.send(i)
+        >>> p.close()
+
+        >>> # With pipelines
+        >>> result = []
+        >>> p = pipeline(
+        ...     increment,
+        ...     collect.params(result),
+        ... )
+        >>> p.feed([1, 2, 3])
+
+    Pipeline also provides a readable representation, which is useful in debug:
+
+    ..  code-block:: pycon
+        >>> p
+        increment
+        collect.params([2, 3, 4])
+
+    """
 
     def __init__(self, *workers):
         self.pipe = []
         return next
 
     def __repr__(self):
-        """ Returns string representation """
         return linesep.join(repr(worker) for worker in self.pipe) or \
                '<empty pipeline>'
 
     def connect(self, *workers):
+        """
+        Connect to pipeline passed coroutines.
+
+        Examples:
+
+        ..  code-block:: pycon
+
+            >>> @coroutine
+            ... def collect(target, next=null):
+            ...     while True:
+            ...         item = yield
+            ...         target.append(item)
+            ...         next.send(item)
+
+            >>> @coroutine
+            ... def increment(next=null):
+            ...     while True:
+            ...         item = yield
+            ...         next.send(item + 1)
+
+            >>> result = []
+            >>> p = pipeline()
+            >>> p.connect(increment, collect.params(result))
+            >>> p.feed([1, 2, 3])
+            >>> result
+            [2, 3, 4]
+
+        """
         self.pipe.extend(workers)
 
     def plug(self):
+        """ Plug pipeline, i.e. connect ``null`` to pipeline """
         self.pipe.append(null)
 
     @contextmanager
     def fork(self, worker, pipes_count):
+        """
+        Connect to pipeline forked coroutine.
+
+        Examples:
+
+        ..  code-block:: pycon
+
+            >>> @coroutine
+            ... def collect(target, next=null):
+            ...     while True:
+            ...         item = yield
+            ...         target.append(item)
+            ...         next.send(item)
+
+            >>> @coroutine
+            ... def split(even=null, odd=null):
+            ...     while True:
+            ...         item = yield
+            ...         next = odd if item % 2 else even
+            ...         next.send(item)
+
+            >>> evens = []
+            >>> odds = []
+            >>> p = pipeline()
+            >>> with p.fork(split, 2) as (even, odd):
+            ...     even.connect(collect.params(evens))
+            ...     odd.connect(collect.params(odds))
+
+            >>> p.feed([1, 2, 3, 4])
+            >>> evens
+            [2, 4]
+            >>> odds
+            [1, 3]
+
+        Note, forked pipelines are joined at the end of fork.  This means that
+        if you connect some coroutines after fork, it will be connected to each
+        of forked pipeline.  To prevent this behavior call :meth:`plug` on the
+        forked pipeline.
+
+        ..  code-block:: pycon
+
+            >>> @coroutine
+            ... def broadcast(*next):
+            ...     while True:
+            ...         item = yield
+            ...         for n in next:
+            ...             n.send(item)
+
+            >>> @coroutine
+            ... def increment(next=null):
+            ...     while True:
+            ...         item = yield
+            ...         next.send(item + 1)
+
+            >>> @coroutine
+            ... def decrement(next=null):
+            ...     while True:
+            ...         item = yield
+            ...         next.send(item - 1)
+
+            >>> incremented = []
+            >>> decremented = []
+            >>> original = []
+            >>> result = []
+            >>> p = pipeline()
+            >>> with p.fork(broadcast, 3) as (inc, dec, orig):
+            ...     inc.connect(increment, collect.params(incremented))
+            ...     dec.connect(decrement, collect.params(decremented))
+            ...     orig.connect(collect.params(original))
+            ...     orig.plug()
+            >>> p.connect(collect.params(result))
+
+            >>> p.feed([1, 2, 3])
+            >>> incremented
+            [2, 3, 4]
+            >>> decremented
+            [0, 1, 2]
+            >>> original
+            [1, 2, 3]
+            >>> result
+            [2, 0, 3, 1, 4, 2]
+
+        The representation of forked pipeline looks like:
+
+        ..  code-block:: pycon
+
+            >>> p
+            broadcast:
+                -->
+                    increment
+                    collect.params([2, 3, 4])
+                -->
+                    decrement
+                    collect.params([0, 1, 2])
+                -->
+                    collect.params([1, 2, 3])
+                    null
+            collect.params([2, 0, 3, 1, 4, 2])
+
+        """
         pipes = tuple(pipeline() for i in range(pipes_count))
         yield pipes
         self.connect(_fork(worker, *pipes))
 
     def feed(self, source):
+        """
+        Feed pipeline using items from ``source``.
+
+        The method initializes pipeline, feeds it, then closes it.
+
+        """
         p = self()
         for item in source:
             p.send(item)
         return self.worker(*pipes)
 
     def __repr__(self):
-        """ Returns string representation """
         result = [repr(self.worker) + ':']
         for pipe in self.pipes:
             result.append('    -->')