Commits

Dmitry Vakhrushev committed 78345dc Draft

Added support for named forked pipelines

  • Participants
  • Parent commits 2e8e42b

Comments (0)

Files changed (3)

File copipes/__init__.py

         self.pipe.append(null)
 
     @contextmanager
-    def fork(self, worker, pipes_count):
+    def fork(self, worker, *pipes):
         """
         Connect to pipeline forked coroutine.
 
             collect.params([2, 0, 3, 1, 4, 2])
 
         """
-        pipes = tuple(pipeline() for i in range(pipes_count))
+        if isinstance(pipes[0], int):
+            pipe_count = pipes[0]
+            pipe_names = None
+        else:
+            pipe_count = len(pipes)
+            pipe_names = pipes
+        pipes = tuple(pipeline() for i in range(pipe_count))
         yield pipes
-        self.connect(_fork(worker, *pipes))
+        if pipe_names:
+            self.connect(_fork(worker, **dict(zip(pipe_names, pipes))))
+        else:
+            self.connect(_fork(worker, *pipes))
 
     def feed(self, source):
         """
 
     """
 
-    def __init__(self, worker, *pipes):
+    def __init__(self, worker, *pipes, **named_pipes):
         self.worker = worker
         self.pipes = pipes
+        self.named_pipes = named_pipes
 
     def __call__(self, next):
         """ Returns initialized forked pipeline """
         pipes = (pipe(next) for pipe in self.pipes)
-        return self.worker(*pipes)
+        named_pipes = dict((name, pipe(next)) for name, pipe
+                                              in self.named_pipes.items())
+        return self.worker(*pipes, **named_pipes)
 
     def __repr__(self):
         result = [repr(self.worker) + ':']
         for pipe in self.pipes:
             result.append('    -->')
             result.extend(' ' * 8 + wr for wr in repr(pipe).split(linesep))
+        for name, pipe in self.named_pipes.items():
+            result.append('    {0} -->'.format(name))
+            result.extend(' ' * 8 + wr for wr in repr(pipe).split(linesep))
         return linesep.join(result)

File copipes/example.py

 
 
 @coroutine
-def split(selector, *channels):
+def split(selector, **channels):
     while True:
         record = yield
         channel = selector(record)
         filter.params(lambda r: r.level != 'DEBUG'),
     )
     with p.fork(broadcast, 2) as (modules, errors):
-        module_names = ('first', 'second', 'third')
-        selector = split.params(lambda r: module_names.index(r.module))
-        with modules.fork(selector, 3) as (first, second, third):
+        module_names = ()
+        with modules.fork(split.params(lambda r: r.module),
+                         'first', 'second', 'third') as (first, second, third):
             first.connect(save.params(first_log))
             second.connect(save.params(second_log))
             third.connect(save.params(third_log))

File copipes/test.py

     tools.eq_(odds, [11, 13])
 
 
+def forked_named_pipeline_test():
+    evens = []
+    odds = []
+    p = pipeline()
+    with p.fork(split, 'odd', 'even') as (odd, even):
+        even.connect(
+            multiply.params(10),
+            collect.params(evens)
+        )
+        odd.connect(
+            add.params(10),
+            collect.params(odds)
+        )
+    p.feed([1, 2, 3, 4])
+    tools.eq_(evens, [20, 40])
+    tools.eq_(odds, [11, 13])
+
+
 def plugged_pipeline_test():
     result = []
     null = []