Commits

Boris FELD committed 5d67d00

Rename Recipe to Scheduler and udpate others files.

  • Participants
  • Parent commits 497af56
  • Branches feature/FixTerminology

Comments (0)

Files changed (5)

File pytasks/Recipe.py

-from .Store import Store
-from .Status import (NormalStatus, SkipStatus, DependencySkipStatus,
-                     FailStatus, TooMuchItemStatus, StdCatch)
-from .Exceptions import (DependenceNameAlreadyDeclared, ArgumentException,
-                         CircularReference, DependenceMissing, SkipException)
-from .Item import Item
-
-class Recipe(object):
-    def __init__(self):
-        self._tasks = {}
-        self._status = {}
-        self._executed = set()
-        self._finished = set()
-        self._output_tags = {}
-        self._store = None
-        self._filters = {}
-
-    def store(self, task):
-        if not hasattr(task, 'name') or not hasattr(task, 'dependencies'):
-            raise ArgumentException
-        if task.name in self._tasks:
-            raise DependenceNameAlreadyDeclared
-        self._tasks[task.name] = task
-
-    def add_tag(self, task, *tags):
-        self._output_tags.setdefault(task.name, [])
-        self._output_tags[task.name].extend(tags)
-
-    def filter(self, task, input_name, tag):
-        self._filters.setdefault(task.name, {})
-        self._filters[task.name].setdefault(input_name, set())
-        self._filters[task.name][input_name].add(tag)
-
-    def execute(self, base_inputs = None):
-
-        self._executed = set()
-        self._finished = set()
-
-        self._store = Store()
-
-        if base_inputs is not None:
-            for input_type, inputs in base_inputs.iteritems():
-                for input_value in inputs:
-                    if isinstance(input_value, Item):
-                        self._store.store(input_type, input_value)
-                    else:
-                        self._store.store(input_type, Item(input_value))
-
-        task = self._choose_task()
-
-        while(task != None):
-            self._execute_task(task)
-            task = self._choose_task()
-
-        self._mark_remaining_tasks_skipped()
-
-    def _choose_task(self):
-        for task in (task for task_name, task in self._tasks.items() if not
-                     task_name in self._executed):
-
-
-            if len(task.dependencies) == 0:
-                return task
-
-            task_filters = self._filters.get(task.name, {})
-
-            valid = True
-
-            for dependence_name, dependence_type in task.dependencies.iteritems():
-
-                dependence_filters = task_filters.get(dependence_name, frozenset())
-
-                values = self._store.number_of_values(dependence_type,
-                                                          dependence_filters)
-
-                if values > 1:
-                    valid = False
-                    self._executed.add(task.name)
-                    self._status[task.name] = TooMuchItemStatus(dependence_name)
-                    break
-
-                if values != 1:
-                    valid = False
-                    break
-
-            if valid:
-                return task
-
-    def _execute_task(self, task):
-        inputs = {}
-        returns = {}
-
-        task_filters = self._filters.get(task.name, {})
-
-        for dependencie_name, dependencie_type in task.dependencies.iteritems():
-            dependencie_filters = task_filters.get(dependencie_name,
-                                                   frozenset())
-
-
-            values = [value for value in self._store.get(dependencie_type)
-                      if value.has_tags(dependencie_filters)]
-
-            value, = values
-            inputs[dependencie_name] = value.get_value()
-
-        try:
-            std_catcher = StdCatch()
-            std_catcher.setup()
-            task(**inputs)
-            returns = task.get_outputs()
-
-        except SkipException as exception:
-            self._status[task.name] = SkipStatus(exception.skip_message,
-                                                std_catcher)
-        except Exception as exception:
-            self._status[task.name] = FailStatus(exception, std_catcher)
-        else:
-            self._status[task.name] = NormalStatus(std_catcher)
-            self._finished.add(task.name)
-            std_catcher.uninstall()
-
-            if returns != None:
-                for output_type, outputs in returns.iteritems():
-                    for output in outputs:
-
-                        output_object = Item(output)
-
-                        #Add tags
-                        for tag in self._output_tags.get(task.name, []):
-                            output_object.add_tag(tag)
-
-                        self._store.store(output_type, output_object)
-
-        finally:
-            self._executed.add(task.name)
-            std_catcher.uninstall()
-
-    def _mark_remaining_tasks_skipped(self):
-        for task_name in self._tasks.iterkeys():
-            if not task_name in self._executed:
-                self._status[task_name] = DependencySkipStatus()
-
-    def get_status(self):
-        return self._status
-
-    def get_tasks(self):
-        return self._tasks
-
-    def get_messenger(self):
-        return self._store

File pytasks/Scheduler.py

+from .Store import Store
+from .Status import (NormalStatus, SkipStatus, DependencySkipStatus,
+                     FailStatus, TooMuchItemStatus, StdCatch)
+from .Exceptions import (DependenceNameAlreadyDeclared, ArgumentException,
+                         CircularReference, DependenceMissing, SkipException)
+from .Item import Item
+
+class Scheduler(object):
+    def __init__(self):
+        self._tasks = {}
+        self._status = {}
+        self._executed = set()
+        self._finished = set()
+        self._output_tags = {}
+        self._store = None
+        self._filters = {}
+
+    def store(self, task):
+        if not hasattr(task, 'name') or not hasattr(task, 'dependencies'):
+            raise ArgumentException
+        if task.name in self._tasks:
+            raise DependenceNameAlreadyDeclared
+        self._tasks[task.name] = task
+
+    def add_tag(self, task, *tags):
+        self._output_tags.setdefault(task.name, [])
+        self._output_tags[task.name].extend(tags)
+
+    def filter(self, task, input_name, tag):
+        self._filters.setdefault(task.name, {})
+        self._filters[task.name].setdefault(input_name, set())
+        self._filters[task.name][input_name].add(tag)
+
+    def execute(self, base_inputs = None):
+
+        self._executed = set()
+        self._finished = set()
+
+        self._store = Store()
+
+        if base_inputs is not None:
+            for input_type, inputs in base_inputs.iteritems():
+                for input_value in inputs:
+                    if isinstance(input_value, Item):
+                        self._store.store(input_type, input_value)
+                    else:
+                        self._store.store(input_type, Item(input_value))
+
+        task = self._choose_task()
+
+        while(task != None):
+            self._execute_task(task)
+            task = self._choose_task()
+
+        self._mark_remaining_tasks_skipped()
+
+    def _choose_task(self):
+        for task in (task for task_name, task in self._tasks.items() if not
+                     task_name in self._executed):
+
+
+            if len(task.dependencies) == 0:
+                return task
+
+            task_filters = self._filters.get(task.name, {})
+
+            valid = True
+
+            for dependence_name, dependence_type in task.dependencies.iteritems():
+
+                dependence_filters = task_filters.get(dependence_name, frozenset())
+
+                values = self._store.number_of_values(dependence_type,
+                                                          dependence_filters)
+
+                if values > 1:
+                    valid = False
+                    self._executed.add(task.name)
+                    self._status[task.name] = TooMuchItemStatus(dependence_name)
+                    break
+
+                if values != 1:
+                    valid = False
+                    break
+
+            if valid:
+                return task
+
+    def _execute_task(self, task):
+        inputs = {}
+        returns = {}
+
+        task_filters = self._filters.get(task.name, {})
+
+        for dependencie_name, dependencie_type in task.dependencies.iteritems():
+            dependencie_filters = task_filters.get(dependencie_name,
+                                                   frozenset())
+
+
+            values = [value for value in self._store.get(dependencie_type)
+                      if value.has_tags(dependencie_filters)]
+
+            value, = values
+            inputs[dependencie_name] = value.get_value()
+
+        try:
+            std_catcher = StdCatch()
+            std_catcher.setup()
+            task(**inputs)
+            returns = task.get_outputs()
+
+        except SkipException as exception:
+            self._status[task.name] = SkipStatus(exception.skip_message,
+                                                std_catcher)
+        except Exception as exception:
+            self._status[task.name] = FailStatus(exception, std_catcher)
+        else:
+            self._status[task.name] = NormalStatus(std_catcher)
+            self._finished.add(task.name)
+            std_catcher.uninstall()
+
+            if returns != None:
+                for output_type, outputs in returns.iteritems():
+                    for output in outputs:
+
+                        output_object = Item(output)
+
+                        #Add tags
+                        for tag in self._output_tags.get(task.name, []):
+                            output_object.add_tag(tag)
+
+                        self._store.store(output_type, output_object)
+
+        finally:
+            self._executed.add(task.name)
+            std_catcher.uninstall()
+
+    def _mark_remaining_tasks_skipped(self):
+        for task_name in self._tasks.iterkeys():
+            if not task_name in self._executed:
+                self._status[task_name] = DependencySkipStatus()
+
+    def get_status(self):
+        return self._status
+
+    def get_tasks(self):
+        return self._tasks
+
+    def get_messenger(self):
+        return self._store

File pytasks/__init__.py

 from .Store import Store
-from .Recipe import Recipe
+from .Scheduler import Scheduler
 from .Status import *
 from .Task import Task
 
-__all__ = [Store, Recipe]
+__all__ = [Store, Scheduler]

File tests/TestRecipe.py

-from __future__ import print_function
-
-import os.path
-import sys
-sys.path.insert(0, os.path.realpath('../'))
-
-import unittest
-from pytasks import Recipe
-from pytasks import Task
-from pytasks.Exceptions import (ArgumentException,
-                                DependenceNameAlreadyDeclared, SkipException)
-from pytasks.Status import (NormalStatus, SkipStatus, DependencySkipStatus,
-                            FailStatus, TooMuchItemStatus)
-from pytasks.Item import Item
-from pytasks.Store import Store
-
-class RecipeStoreTestCase(unittest.TestCase):
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testSimpleStorage(self):
-        task_name = 'name'
-        mock = Task(task_name)
-        self.recipe.store(mock)
-        self.assertEqual({task_name: mock}, self.recipe.get_tasks())
-
-    def testSameNameStorage(self):
-        task_name = 'name'
-        t1 = Task(task_name)
-        t2 = Task(task_name)
-        self.recipe.store(t1)
-        self.assertRaises(DependenceNameAlreadyDeclared,
-                          lambda : self.recipe.store(t2))
-        self.assertEqual({task_name: t1}, self.recipe.get_tasks())
-
-
-    def testFailNoCompatibleObject(self):
-        self.assertRaises(ArgumentException, lambda:
-                          self.recipe.store('string'))
-
-class RecipeExecutionTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testSimpleExecution(self):
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-
-        task = SimpleTask("sample_task")
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertTrue(task.executed)
-
-    def testSimpleCommunication(self):
-
-        class SimpleSender(Task):
-            def __init__(self, name, message, dependencies = None):
-                super(SimpleSender, self).__init__(name, dependencies)
-                self.message = message
-
-            def __call__(self):
-                self.output("message", self.message)
-
-        class SimpleReceiver(Task):
-            def initialize(self):
-                self.message = ''
-
-            def __call__(self, message):
-                self.message = message
-
-        message = 'testMessage'
-        sender = SimpleSender("simple_sender", message)
-        receiver = SimpleReceiver("simple_receiver", {"message" : "message"})
-
-        self.recipe.store(sender)
-        self.recipe.store(receiver)
-        self.recipe.execute()
-
-        self.assertEqual(receiver.message, message)
-
-
-    def testSimpleStatus(self):
-
-        task_name = "simple"
-        message = "sample"
-
-        class SimpleTask(Task):
-            def __call__(self):
-                print(message)
-
-        task = SimpleTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : NormalStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stdout, message + "\n")
-
-    def testFailStatus(self):
-
-        task_name = "fail"
-        message = "fail"
-        exception = Exception("exception message")
-
-        class FailTask(Task):
-            def __call__(self):
-                print(message, file = sys.stderr)
-                raise exception
-
-        task = FailTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : FailStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stderr, message + "\n")
-        self.assertEquals(status.exception, exception)
-
-    def testSkipStatus(self):
-
-        task_name = "skip"
-        message = "skip"
-        skip_message = "skipMessage"
-
-        class SkipTask(Task):
-            def __call__(self):
-                print("skip")
-                raise SkipException(skip_message)
-
-        task = SkipTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : SkipStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stdout, message + "\n")
-        self.assertEquals(status.skip_message, skip_message)
-
-    def testSkipDependFail(self):
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-                self.output(self.name, "value")
-
-        class FailTask(Task):
-            def __call__(self, fail):
-                self.executed = True
-                raise Exception
-
-        task1 = SimpleTask("task1")
-        task2 = SimpleTask("task2", {"fail" : "fail"})
-        fail_task = FailTask("fail_task", {"fail" : "task1"})
-
-        self.recipe.store(task1)
-        self.recipe.store(task2)
-        self.recipe.store(fail_task)
-        self.recipe.execute()
-
-        status = self.recipe.get_status()
-
-        self.assertEquals(status, {"task1" : NormalStatus(),
-                                   "task2" : DependencySkipStatus(),
-                                   "fail_task" : FailStatus()})
-
-        self.assertTrue(task1.executed)
-        self.assertTrue(fail_task.executed)
-        self.assertFalse(task2.executed)
-
-    def testSkipDependSkip(self):
-        '''
-        task1 -> fail -> task2
-        '''
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-                self.output(self.name, "value")
-
-        class SkipTask(Task):
-            def __call__(self, skip):
-                self.executed = True
-                raise SkipException
-
-        task1 = SimpleTask("task1")
-        task2 = SimpleTask("task2", {"skip" : "skip"})
-        skip_task = SkipTask("skip_task", {"skip" : "task1"})
-
-        self.recipe.store(task1)
-        self.recipe.store(task2)
-        self.recipe.store(skip_task)
-        self.recipe.execute()
-
-        status = self.recipe.get_status()
-
-        self.assertEquals(status, {"task1" : NormalStatus(),
-                                   "task2" : DependencySkipStatus(),
-                                   "skip_task" : SkipStatus()})
-
-        self.assertTrue(task1.executed)
-        self.assertTrue(skip_task.executed)
-        self.assertFalse(task2.executed)
-
-    def testBaseInput(self):
-
-        class SumTask(Task):
-
-            def __init__(self):
-                super(SumTask, self).__init__("sum",
-                                              {"n1" : "number.operande1",
-                                               "n2" : "number.operande2"})
-
-            def __call__(self, n1, n2):
-                n3 = int(n1) + int(n2)
-                self.output("number.sum", n3)
-
-        sum = SumTask()
-
-        self.recipe.store(sum)
-        self.recipe.execute({"number.operande1" : [1], "number.operande2" : [2]})
-
-        self.assertEqual(self.recipe.get_status()['sum'], NormalStatus())
-        output = self.recipe.get_messenger().get("number.sum", True)
-        self.assertEqual(output.get_value(), 3)
-
-    def testMultipleItemsBaseInput(self):
-
-        class SimpleTask(Task):
-
-            def __init__(self):
-                super(SimpleTask, self).__init__("simple",
-                                              {"input" : "item"})
-                self.input = None
-
-            def __call__(self, input):
-                self.input = input
-
-        simple = SimpleTask()
-
-        self.recipe.store(simple)
-        self.recipe.execute({"item" : ["value1", "value2"]})
-        self.assertEquals(simple.input, None)
-        status = self.recipe.get_status()
-        self.assertEquals(status["simple"], TooMuchItemStatus())
-
-class RecipeTagTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testOneItemOneTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output", "value")
-
-        simple = SimpleTask("simple")
-
-        tag = "tag"
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, tag)
-
-        self.recipe.execute()
-        output = self.recipe.get_messenger().get("output", True)
-        self.assertTrue(output.has_tag(tag))
-
-    def testOneItemMultipleTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output", "value")
-
-        simple = SimpleTask("simple")
-
-        tags = ["tag1", "tag2"]
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, *tags)
-
-        self.recipe.execute()
-        output = self.recipe.get_messenger().get("output", True)
-        for tag in tags:
-            self.assertTrue(output.has_tag(tag))
-
-    def testMultipleItemTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output1", "value")
-                self.output("output2", "value")
-
-        simple = SimpleTask("simple")
-
-        tag = "tag"
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, tag)
-
-        self.recipe.execute()
-
-        for output in ["output1", "output2"]:
-            output_object = self.recipe.get_messenger().get(output, True)
-            self.assertTrue(output_object.has_tag(tag))
-
-    def testSimpleFilterTag(self):
-
-        class SimpleFiltering(Task):
-
-            def __init__(self, type):
-                super(SimpleFiltering, self).__init__("filter",
-                                                       {"input" : type,
-                                                        "prod1" : "prod1",
-                                                        "prod2" : "prod2"})
-                self.input = None
-
-            def __call__(self, input, prod1, prod2):
-                self.input = input
-
-        class SimpleProducer(Task):
-
-            def __init__(self, name, type, value):
-                super(SimpleProducer, self).__init__(name)
-                self.name = name
-                self.type = type
-                self.value = value
-
-            def __call__(self):
-                self.output(self.type, self.value)
-                self.output(self.name, self.name)
-
-        type = "output"
-
-        value1 = "value1"
-        value2 = "value2"
-
-        tag1 = "tag1"
-        tag2 = "tag2"
-
-        fil = SimpleFiltering(type)
-
-        prod1 = SimpleProducer("prod1", type, value1)
-        prod2 = SimpleProducer("prod2", type, value2)
-
-        self.recipe.store(prod1)
-        self.recipe.add_tag(prod1, tag1)
-        self.recipe.store(prod2)
-        self.recipe.add_tag(prod2, tag2)
-
-        self.recipe.store(fil)
-        self.recipe.filter(fil, "input", tag1)
-
-        self.recipe.execute()
-
-        self.assertEquals(fil.input, value1)
-        self.assertEquals(self.recipe.get_status()["filter"], NormalStatus())
-
-    def testBaseInputsTag(self):
-
-        class SimpleFiltering(Task):
-
-            def __init__(self, type):
-                super(SimpleFiltering, self).__init__("filter",
-                                                      {"input" : type})
-                self.input = None
-
-            def __call__(self, input):
-                self.input = input
-
-        type = "item"
-
-        value1 = "value1"
-        value2 = "value2"
-
-        tag1 = "tag1"
-        tag2 = "tag2"
-
-        fil = SimpleFiltering(type)
-
-        self.recipe.store(fil)
-        self.recipe.filter(fil, "input", tag1)
-
-        base_input1 = Item(value1)
-        base_input1.add_tag(tag1)
-
-        base_input2 = Item(value2)
-        base_input2.add_tag(tag2)
-
-        self.recipe.execute({type : [base_input1, base_input2]})
-
-        self.assertEquals(fil.input, value1)
-        self.assertEquals(self.recipe.get_status()["filter"], NormalStatus())
-
-if __name__ == '__main__':
-    unittest.main()

File tests/TestScheduler.py

+from __future__ import print_function
+
+import os.path
+import sys
+sys.path.insert(0, os.path.realpath('../'))
+
+import unittest
+from pytasks import Scheduler
+from pytasks import Task
+from pytasks.Exceptions import (ArgumentException,
+                                DependenceNameAlreadyDeclared, SkipException)
+from pytasks.Status import (NormalStatus, SkipStatus, DependencySkipStatus,
+                            FailStatus, TooMuchItemStatus)
+from pytasks.Item import Item
+from pytasks.Store import Store
+
+class SchedulerStoreTestCase(unittest.TestCase):
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testSimpleStorage(self):
+        task_name = 'name'
+        mock = Task(task_name)
+        self.scheduler.store(mock)
+        self.assertEqual({task_name: mock}, self.scheduler.get_tasks())
+
+    def testSameNameStorage(self):
+        task_name = 'name'
+        t1 = Task(task_name)
+        t2 = Task(task_name)
+        self.scheduler.store(t1)
+        self.assertRaises(DependenceNameAlreadyDeclared,
+                          lambda : self.scheduler.store(t2))
+        self.assertEqual({task_name: t1}, self.scheduler.get_tasks())
+
+
+    def testFailNoCompatibleObject(self):
+        self.assertRaises(ArgumentException, lambda:
+                          self.scheduler.store('string'))
+
+class SchedulerExecutionTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testSimpleExecution(self):
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+
+        task = SimpleTask("sample_task")
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertTrue(task.executed)
+
+    def testSimpleCommunication(self):
+
+        class SimpleSender(Task):
+            def __init__(self, name, message, dependencies = None):
+                super(SimpleSender, self).__init__(name, dependencies)
+                self.message = message
+
+            def __call__(self):
+                self.output("message", self.message)
+
+        class SimpleReceiver(Task):
+            def initialize(self):
+                self.message = ''
+
+            def __call__(self, message):
+                self.message = message
+
+        message = 'testMessage'
+        sender = SimpleSender("simple_sender", message)
+        receiver = SimpleReceiver("simple_receiver", {"message" : "message"})
+
+        self.scheduler.store(sender)
+        self.scheduler.store(receiver)
+        self.scheduler.execute()
+
+        self.assertEqual(receiver.message, message)
+
+
+    def testSimpleStatus(self):
+
+        task_name = "simple"
+        message = "sample"
+
+        class SimpleTask(Task):
+            def __call__(self):
+                print(message)
+
+        task = SimpleTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : NormalStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stdout, message + "\n")
+
+    def testFailStatus(self):
+
+        task_name = "fail"
+        message = "fail"
+        exception = Exception("exception message")
+
+        class FailTask(Task):
+            def __call__(self):
+                print(message, file = sys.stderr)
+                raise exception
+
+        task = FailTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : FailStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stderr, message + "\n")
+        self.assertEquals(status.exception, exception)
+
+    def testSkipStatus(self):
+
+        task_name = "skip"
+        message = "skip"
+        skip_message = "skipMessage"
+
+        class SkipTask(Task):
+            def __call__(self):
+                print("skip")
+                raise SkipException(skip_message)
+
+        task = SkipTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : SkipStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stdout, message + "\n")
+        self.assertEquals(status.skip_message, skip_message)
+
+    def testSkipDependFail(self):
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+                self.output(self.name, "value")
+
+        class FailTask(Task):
+            def __call__(self, fail):
+                self.executed = True
+                raise Exception
+
+        task1 = SimpleTask("task1")
+        task2 = SimpleTask("task2", {"fail" : "fail"})
+        fail_task = FailTask("fail_task", {"fail" : "task1"})
+
+        self.scheduler.store(task1)
+        self.scheduler.store(task2)
+        self.scheduler.store(fail_task)
+        self.scheduler.execute()
+
+        status = self.scheduler.get_status()
+
+        self.assertEquals(status, {"task1" : NormalStatus(),
+                                   "task2" : DependencySkipStatus(),
+                                   "fail_task" : FailStatus()})
+
+        self.assertTrue(task1.executed)
+        self.assertTrue(fail_task.executed)
+        self.assertFalse(task2.executed)
+
+    def testSkipDependSkip(self):
+        '''
+        task1 -> fail -> task2
+        '''
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+                self.output(self.name, "value")
+
+        class SkipTask(Task):
+            def __call__(self, skip):
+                self.executed = True
+                raise SkipException
+
+        task1 = SimpleTask("task1")
+        task2 = SimpleTask("task2", {"skip" : "skip"})
+        skip_task = SkipTask("skip_task", {"skip" : "task1"})
+
+        self.scheduler.store(task1)
+        self.scheduler.store(task2)
+        self.scheduler.store(skip_task)
+        self.scheduler.execute()
+
+        status = self.scheduler.get_status()
+
+        self.assertEquals(status, {"task1" : NormalStatus(),
+                                   "task2" : DependencySkipStatus(),
+                                   "skip_task" : SkipStatus()})
+
+        self.assertTrue(task1.executed)
+        self.assertTrue(skip_task.executed)
+        self.assertFalse(task2.executed)
+
+    def testBaseInput(self):
+
+        class SumTask(Task):
+
+            def __init__(self):
+                super(SumTask, self).__init__("sum",
+                                              {"n1" : "number.operande1",
+                                               "n2" : "number.operande2"})
+
+            def __call__(self, n1, n2):
+                n3 = int(n1) + int(n2)
+                self.output("number.sum", n3)
+
+        sum = SumTask()
+
+        self.scheduler.store(sum)
+        self.scheduler.execute({"number.operande1" : [1], "number.operande2" : [2]})
+
+        self.assertEqual(self.scheduler.get_status()['sum'], NormalStatus())
+        output = self.scheduler.get_messenger().get("number.sum", True)
+        self.assertEqual(output.get_value(), 3)
+
+    def testMultipleItemsBaseInput(self):
+
+        class SimpleTask(Task):
+
+            def __init__(self):
+                super(SimpleTask, self).__init__("simple",
+                                              {"input" : "item"})
+                self.input = None
+
+            def __call__(self, input):
+                self.input = input
+
+        simple = SimpleTask()
+
+        self.scheduler.store(simple)
+        self.scheduler.execute({"item" : ["value1", "value2"]})
+        self.assertEquals(simple.input, None)
+        status = self.scheduler.get_status()
+        self.assertEquals(status["simple"], TooMuchItemStatus())
+
+class SchedulerTagTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testOneItemOneTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output", "value")
+
+        simple = SimpleTask("simple")
+
+        tag = "tag"
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, tag)
+
+        self.scheduler.execute()
+        output = self.scheduler.get_messenger().get("output", True)
+        self.assertTrue(output.has_tag(tag))
+
+    def testOneItemMultipleTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output", "value")
+
+        simple = SimpleTask("simple")
+
+        tags = ["tag1", "tag2"]
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, *tags)
+
+        self.scheduler.execute()
+        output = self.scheduler.get_messenger().get("output", True)
+        for tag in tags:
+            self.assertTrue(output.has_tag(tag))
+
+    def testMultipleItemTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output1", "value")
+                self.output("output2", "value")
+
+        simple = SimpleTask("simple")
+
+        tag = "tag"
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, tag)
+
+        self.scheduler.execute()
+
+        for output in ["output1", "output2"]:
+            output_object = self.scheduler.get_messenger().get(output, True)
+            self.assertTrue(output_object.has_tag(tag))
+
+    def testSimpleFilterTag(self):
+
+        class SimpleFiltering(Task):
+
+            def __init__(self, type):
+                super(SimpleFiltering, self).__init__("filter",
+                                                       {"input" : type,
+                                                        "prod1" : "prod1",
+                                                        "prod2" : "prod2"})
+                self.input = None
+
+            def __call__(self, input, prod1, prod2):
+                self.input = input
+
+        class SimpleProducer(Task):
+
+            def __init__(self, name, type, value):
+                super(SimpleProducer, self).__init__(name)
+                self.name = name
+                self.type = type
+                self.value = value
+
+            def __call__(self):
+                self.output(self.type, self.value)
+                self.output(self.name, self.name)
+
+        type = "output"
+
+        value1 = "value1"
+        value2 = "value2"
+
+        tag1 = "tag1"
+        tag2 = "tag2"
+
+        fil = SimpleFiltering(type)
+
+        prod1 = SimpleProducer("prod1", type, value1)
+        prod2 = SimpleProducer("prod2", type, value2)
+
+        self.scheduler.store(prod1)
+        self.scheduler.add_tag(prod1, tag1)
+        self.scheduler.store(prod2)
+        self.scheduler.add_tag(prod2, tag2)
+
+        self.scheduler.store(fil)
+        self.scheduler.filter(fil, "input", tag1)
+
+        self.scheduler.execute()
+
+        self.assertEquals(fil.input, value1)
+        self.assertEquals(self.scheduler.get_status()["filter"], NormalStatus())
+
+    def testBaseInputsTag(self):
+
+        class SimpleFiltering(Task):
+
+            def __init__(self, type):
+                super(SimpleFiltering, self).__init__("filter",
+                                                      {"input" : type})
+                self.input = None
+
+            def __call__(self, input):
+                self.input = input
+
+        type = "item"
+
+        value1 = "value1"
+        value2 = "value2"
+
+        tag1 = "tag1"
+        tag2 = "tag2"
+
+        fil = SimpleFiltering(type)
+
+        self.scheduler.store(fil)
+        self.scheduler.filter(fil, "input", tag1)
+
+        base_input1 = Item(value1)
+        base_input1.add_tag(tag1)
+
+        base_input2 = Item(value2)
+        base_input2.add_tag(tag2)
+
+        self.scheduler.execute({type : [base_input1, base_input2]})
+
+        self.assertEquals(fil.input, value1)
+        self.assertEquals(self.scheduler.get_status()["filter"], NormalStatus())
+
+if __name__ == '__main__':
+    unittest.main()