Commits

Boris FELD committed 9b16e0f Merge

Mark Feature FixTerminology as finished.

Comments (0)

Files changed (11)

+class Item(object):
+
+    def __init__(self, value):
+        self._value = value
+        self._tags = set()
+
+    def add_tag(self, tag):
+        self._tags.add(tag)
+
+    def has_tag(self, tag):
+        return tag in self._tags
+
+    def has_tags(self, tags):
+        return self._tags.issuperset(tags)
+
+    def get_tags(self):
+        return self._tags
+
+    def get_value(self):
+        return self._value
+
+    def __str__(self):
+        return "Output(value = %s, tags = %s)" % (self._value, self._tags)
+
+    def __repr__(self):
+        return self.__str__()

pytasks/Messenger.py

-class Messenger(object):
-
-    def __init__(self):
-        self.__values = {}
-
-    def store(self, key, value):
-        for k in _split_sub_type(key):
-            self.__values.setdefault(k, set())
-            self.__values[k].add(value)
-
-    def get(self, key, only_one = False):
-        if only_one == False:
-            return self.__values[key]
-        else:
-            return list(self.__values[key])[0]
-
-    def __contains__(self, key):
-        return key in self.__values.keys()
-
-    def number_of_values(self, key, tags = None):
-
-        if key in self.__values.keys():
-            if tags == None:
-                return len(self.__values[key])
-
-            else:
-                matched = 0
-                for value in self.__values[key]:
-                    if value.has_tags(tags):
-                        matched += 1
-                return matched
-
-        else:
-            return 0
-
-    def get_keys(self):
-        return self.__values.keys()
-
-def _split_sub_type(sub_type):
-    splitted = str(sub_type).split('.')
-    for i in xrange(len(splitted)):
-        yield '.'.join(splitted[0:i + 1])

pytasks/Output.py

-class Output(object):
-
-    def __init__(self, value):
-        self._value = value
-        self._tags = set()
-
-    def add_tag(self, tag):
-        self._tags.add(tag)
-
-    def has_tag(self, tag):
-        return tag in self._tags
-
-    def has_tags(self, tags):
-        return self._tags.issuperset(tags)
-
-    def get_tags(self):
-        return self._tags
-
-    def get_value(self):
-        return self._value
-
-    def __str__(self):
-        return "Output(value = %s, tags = %s)" % (self._value, self._tags)
-
-    def __repr__(self):
-        return self.__str__()

pytasks/Recipe.py

-from .Messenger import Messenger
-from .Status import (NormalStatus, SkipStatus, DependencySkipStatus,
-                     FailStatus, TooMuchItemStatus, StdCatch)
-from .Exceptions import (DependenceNameAlreadyDeclared, ArgumentException,
-                         CircularReference, DependenceMissing, SkipException)
-from .Output import Output
-
-class Recipe(object):
-    def __init__(self):
-        self._tasks = {}
-        self._status = {}
-        self._executed = set()
-        self._finished = set()
-        self._output_tags = {}
-        self._messenger = None
-        self._filters = {}
-
-    def store(self, task):
-        if not hasattr(task, 'name') or not hasattr(task, 'dependencies'):
-            raise ArgumentException
-        if task.name in self._tasks:
-            raise DependenceNameAlreadyDeclared
-        self._tasks[task.name] = task
-
-    def add_tag(self, task, *tags):
-        self._output_tags.setdefault(task.name, [])
-        self._output_tags[task.name].extend(tags)
-
-    def filter(self, task, input_name, tag):
-        self._filters.setdefault(task.name, {})
-        self._filters[task.name].setdefault(input_name, set())
-        self._filters[task.name][input_name].add(tag)
-
-    def execute(self, base_inputs = None):
-
-        self._executed = set()
-        self._finished = set()
-
-        self._messenger = Messenger()
-
-        if base_inputs is not None:
-            for input_type, inputs in base_inputs.iteritems():
-                for input_value in inputs:
-                    if isinstance(input_value, Output):
-                        self._messenger.store(input_type, input_value)
-                    else:
-                        self._messenger.store(input_type, Output(input_value))
-
-        task = self._choose_task()
-
-        while(task != None):
-            self._execute_task(task)
-            task = self._choose_task()
-
-        self._mark_remaining_tasks_skipped()
-
-    def _choose_task(self):
-        for task in (task for task_name, task in self._tasks.items() if not
-                     task_name in self._executed):
-
-
-            if len(task.dependencies) == 0:
-                return task
-
-            task_filters = self._filters.get(task.name, {})
-
-            valid = True
-
-            for dependence_name, dependence_type in task.dependencies.iteritems():
-
-                dependence_filters = task_filters.get(dependence_name, frozenset())
-
-                values = self._messenger.number_of_values(dependence_type,
-                                                          dependence_filters)
-
-                if values > 1:
-                    valid = False
-                    self._executed.add(task.name)
-                    self._status[task.name] = TooMuchItemStatus(dependence_name)
-                    break
-
-                if values != 1:
-                    valid = False
-                    break
-
-            if valid:
-                return task
-
-    def _execute_task(self, task):
-        inputs = {}
-        returns = {}
-
-        task_filters = self._filters.get(task.name, {})
-
-        for dependencie_name, dependencie_type in task.dependencies.iteritems():
-            dependencie_filters = task_filters.get(dependencie_name,
-                                                   frozenset())
-
-
-            values = [value for value in self._messenger.get(dependencie_type)
-                      if value.has_tags(dependencie_filters)]
-
-            value, = values
-            inputs[dependencie_name] = value.get_value()
-
-        try:
-            std_catcher = StdCatch()
-            std_catcher.setup()
-            task(**inputs)
-            returns = task.get_outputs()
-
-        except SkipException as exception:
-            self._status[task.name] = SkipStatus(exception.skip_message,
-                                                std_catcher)
-        except Exception as exception:
-            self._status[task.name] = FailStatus(exception, std_catcher)
-        else:
-            self._status[task.name] = NormalStatus(std_catcher)
-            self._finished.add(task.name)
-            std_catcher.uninstall()
-
-            if returns != None:
-                for output_type, outputs in returns.iteritems():
-                    for output in outputs:
-
-                        output_object = Output(output)
-
-                        #Add tags
-                        for tag in self._output_tags.get(task.name, []):
-                            output_object.add_tag(tag)
-
-                        self._messenger.store(output_type, output_object)
-
-        finally:
-            self._executed.add(task.name)
-            std_catcher.uninstall()
-
-    def _mark_remaining_tasks_skipped(self):
-        for task_name in self._tasks.iterkeys():
-            if not task_name in self._executed:
-                self._status[task_name] = DependencySkipStatus()
-
-    def get_status(self):
-        return self._status
-
-    def get_tasks(self):
-        return self._tasks
-
-    def get_messenger(self):
-        return self._messenger

pytasks/Scheduler.py

+from .Store import Store
+from .Status import (NormalStatus, SkipStatus, DependencySkipStatus,
+                     FailStatus, TooMuchItemStatus, StdCatch)
+from .Exceptions import (DependenceNameAlreadyDeclared, ArgumentException,
+                         CircularReference, DependenceMissing, SkipException)
+from .Item import Item
+
+class Scheduler(object):
+    def __init__(self):
+        self._steps = {}
+        self._status = {}
+        self._executed = set()
+        self._finished = set()
+        self._output_tags = {}
+        self._store = None
+        self._filters = {}
+
+    def store(self, task):
+        if not hasattr(task, 'name') or not hasattr(task, 'dependencies'):
+            raise ArgumentException
+        if task.name in self._steps:
+            raise DependenceNameAlreadyDeclared
+        self._steps[task.name] = task
+
+    def add_tag(self, task, *tags):
+        self._output_tags.setdefault(task.name, [])
+        self._output_tags[task.name].extend(tags)
+
+    def filter(self, task, input_name, tag):
+        self._filters.setdefault(task.name, {})
+        self._filters[task.name].setdefault(input_name, set())
+        self._filters[task.name][input_name].add(tag)
+
+    def execute(self, base_inputs = None):
+
+        self._executed = set()
+        self._finished = set()
+
+        self._store = Store()
+
+        if base_inputs is not None:
+            for input_type, inputs in base_inputs.iteritems():
+                for input_value in inputs:
+                    if isinstance(input_value, Item):
+                        self._store.store(input_type, input_value)
+                    else:
+                        self._store.store(input_type, Item(input_value))
+
+        task = self._choose_task()
+
+        while(task != None):
+            self._execute_task(task)
+            task = self._choose_task()
+
+        self._mark_remaining_tasks_skipped()
+
+    def _choose_task(self):
+        for task in (task for task_name, task in self._steps.items() if not
+                     task_name in self._executed):
+
+
+            if len(task.dependencies) == 0:
+                return task
+
+            task_filters = self._filters.get(task.name, {})
+
+            valid = True
+
+            for dependence_name, dependence_type in task.dependencies.iteritems():
+
+                dependence_filters = task_filters.get(dependence_name, frozenset())
+
+                values = self._store.number_of_values(dependence_type,
+                                                          dependence_filters)
+
+                if values > 1:
+                    valid = False
+                    self._executed.add(task.name)
+                    self._status[task.name] = TooMuchItemStatus(dependence_name)
+                    break
+
+                if values != 1:
+                    valid = False
+                    break
+
+            if valid:
+                return task
+
+    def _execute_task(self, task):
+        inputs = {}
+        returns = {}
+
+        task_filters = self._filters.get(task.name, {})
+
+        for dependencie_name, dependencie_type in task.dependencies.iteritems():
+            dependencie_filters = task_filters.get(dependencie_name,
+                                                   frozenset())
+
+
+            values = [value for value in self._store.get(dependencie_type)
+                      if value.has_tags(dependencie_filters)]
+
+            value, = values
+            inputs[dependencie_name] = value.get_value()
+
+        try:
+            std_catcher = StdCatch()
+            std_catcher.setup()
+            task(**inputs)
+            returns = task.get_outputs()
+
+        except SkipException as exception:
+            self._status[task.name] = SkipStatus(exception.skip_message,
+                                                std_catcher)
+        except Exception as exception:
+            self._status[task.name] = FailStatus(exception, std_catcher)
+        else:
+            self._status[task.name] = NormalStatus(std_catcher)
+            self._finished.add(task.name)
+            std_catcher.uninstall()
+
+            if returns != None:
+                for output_type, outputs in returns.iteritems():
+                    for output in outputs:
+
+                        output_object = Item(output)
+
+                        #Add tags
+                        for tag in self._output_tags.get(task.name, []):
+                            output_object.add_tag(tag)
+
+                        self._store.store(output_type, output_object)
+
+        finally:
+            self._executed.add(task.name)
+            std_catcher.uninstall()
+
+    def _mark_remaining_tasks_skipped(self):
+        for task_name in self._steps.iterkeys():
+            if not task_name in self._executed:
+                self._status[task_name] = DependencySkipStatus()
+
+    def get_status(self):
+        return self._status
+
+    def get_tasks(self):
+        return self._steps
+
+    def get_messenger(self):
+        return self._store
+class Store(object):
+
+    def __init__(self):
+        self.__values = {}
+
+    def store(self, key, value):
+        for k in _split_sub_type(key):
+            self.__values.setdefault(k, set())
+            self.__values[k].add(value)
+
+    def get(self, key, only_one = False):
+        if only_one == False:
+            return self.__values[key]
+        else:
+            return list(self.__values[key])[0]
+
+    def __contains__(self, key):
+        return key in self.__values.keys()
+
+    def number_of_values(self, key, tags = None):
+
+        if key in self.__values.keys():
+            if tags == None:
+                return len(self.__values[key])
+
+            else:
+                matched = 0
+                for value in self.__values[key]:
+                    if value.has_tags(tags):
+                        matched += 1
+                return matched
+
+        else:
+            return 0
+
+    def get_keys(self):
+        return self.__values.keys()
+
+def _split_sub_type(sub_type):
+    splitted = str(sub_type).split('.')
+    for i in xrange(len(splitted)):
+        yield '.'.join(splitted[0:i + 1])

pytasks/__init__.py

-from .Messenger import Messenger
-from .Recipe import Recipe
+from .Store import Store
+from .Scheduler import Scheduler
 from .Status import *
 from .Task import Task
 
-__all__ = [Messenger, Recipe]
+__all__ = [Store, Scheduler]

tests/TestMessenger.py

-import sys
-import os.path
-sys.path.insert(0, os.path.realpath('../'))
-
-from pytasks.Messenger import (Messenger, _split_sub_type)
-from pytasks.Output import Output
-import unittest
-
-class MessengerStoreTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.messenger = Messenger()
-
-    def tearDown(self):
-        self.messenger = None
-
-    def testStore(self):
-        value = 'value'
-        key = 'simple'
-        self.messenger.store(key, value)
-        self.assertEquals(self.messenger.get(key), set([value]))
-
-    def testNoModification(self):
-        value = 'value'
-        value2 = 'value2'
-        key = 'nomodification'
-
-        self.messenger.store(key, value)
-        self.messenger.store(key, value2)
-        self.assertEqual(self.messenger.get(key), set([value, value2]))
-
-    def testStoreSubType(self):
-        value = 'subtype_value'
-        key = 'type.subtype.subsubtype'
-
-        key1 = 'type'
-        key2 = 'type.subtype'
-
-        self.messenger.store(key, value)
-
-        self.assertEqual(self.messenger.get(key1), set([value]))
-        self.assertEqual(self.messenger.get(key2), set([value]))
-        self.assertEqual(self.messenger.get(key), set([value]))
-
-    def testSplitSubType(self):
-        subtype = 'file.archive'
-        subtype2 = 'file.archive.sdist.python'
-
-        self.assertEquals(list(_split_sub_type(subtype)),
-                          ['file', 'file.archive'])
-        self.assertEquals(list(_split_sub_type(subtype2)),
-                          ['file', 'file.archive', 'file.archive.sdist',
-                           'file.archive.sdist.python'])
-
-    def testNumberOfValues(self):
-
-        value = "value"
-
-        not_a_key = "non"
-        one_value = "one"
-        mutliple_value = "multiple"
-
-        tag1 = "tag1"
-        tag2 = "tag2"
-
-        output_tag1 = Output(value)
-        output_tag1.add_tag(tag1)
-
-        output_tags = Output(value)
-        output_tags.add_tag(tag1)
-        output_tags.add_tag(tag2)
-
-        #Add some sample values
-        self.messenger.store(one_value, Output(value))
-        self.messenger.store(mutliple_value, Output(value))
-        self.messenger.store(mutliple_value, output_tag1)
-        self.messenger.store(mutliple_value, output_tags)
-
-        self.assertEquals(self.messenger.number_of_values(not_a_key), 0)
-        self.assertEquals(self.messenger.number_of_values(one_value), 1)
-        self.assertEquals(self.messenger.number_of_values(mutliple_value), 3)
-        self.assertEquals(self.messenger.number_of_values(mutliple_value, [tag1]), 2)
-        self.assertEquals(self.messenger.number_of_values(mutliple_value, [tag1, tag2]), 1)
-
-if __name__ == '__main':
-    unittest.main()

tests/TestRecipe.py

-from __future__ import print_function
-
-import os.path
-import sys
-sys.path.insert(0, os.path.realpath('../'))
-
-import unittest
-from pytasks import Recipe
-from pytasks import Task
-from pytasks.Exceptions import (ArgumentException,
-                                DependenceNameAlreadyDeclared, SkipException)
-from pytasks.Status import (NormalStatus, SkipStatus, DependencySkipStatus,
-                            FailStatus, TooMuchItemStatus)
-from pytasks.Output import Output
-from pytasks.Messenger import Messenger
-
-class RecipeStoreTestCase(unittest.TestCase):
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testSimpleStorage(self):
-        task_name = 'name'
-        mock = Task(task_name)
-        self.recipe.store(mock)
-        self.assertEqual({task_name: mock}, self.recipe.get_tasks())
-
-    def testSameNameStorage(self):
-        task_name = 'name'
-        t1 = Task(task_name)
-        t2 = Task(task_name)
-        self.recipe.store(t1)
-        self.assertRaises(DependenceNameAlreadyDeclared,
-                          lambda : self.recipe.store(t2))
-        self.assertEqual({task_name: t1}, self.recipe.get_tasks())
-
-
-    def testFailNoCompatibleObject(self):
-        self.assertRaises(ArgumentException, lambda:
-                          self.recipe.store('string'))
-
-class RecipeExecutionTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testSimpleExecution(self):
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-
-        task = SimpleTask("sample_task")
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertTrue(task.executed)
-
-    def testSimpleCommunication(self):
-
-        class SimpleSender(Task):
-            def __init__(self, name, message, dependencies = None):
-                super(SimpleSender, self).__init__(name, dependencies)
-                self.message = message
-
-            def __call__(self):
-                self.output("message", self.message)
-
-        class SimpleReceiver(Task):
-            def initialize(self):
-                self.message = ''
-
-            def __call__(self, message):
-                self.message = message
-
-        message = 'testMessage'
-        sender = SimpleSender("simple_sender", message)
-        receiver = SimpleReceiver("simple_receiver", {"message" : "message"})
-
-        self.recipe.store(sender)
-        self.recipe.store(receiver)
-        self.recipe.execute()
-
-        self.assertEqual(receiver.message, message)
-
-
-    def testSimpleStatus(self):
-
-        task_name = "simple"
-        message = "sample"
-
-        class SimpleTask(Task):
-            def __call__(self):
-                print(message)
-
-        task = SimpleTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : NormalStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stdout, message + "\n")
-
-    def testFailStatus(self):
-
-        task_name = "fail"
-        message = "fail"
-        exception = Exception("exception message")
-
-        class FailTask(Task):
-            def __call__(self):
-                print(message, file = sys.stderr)
-                raise exception
-
-        task = FailTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : FailStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stderr, message + "\n")
-        self.assertEquals(status.exception, exception)
-
-    def testSkipStatus(self):
-
-        task_name = "skip"
-        message = "skip"
-        skip_message = "skipMessage"
-
-        class SkipTask(Task):
-            def __call__(self):
-                print("skip")
-                raise SkipException(skip_message)
-
-        task = SkipTask(task_name)
-
-        self.recipe.store(task)
-        self.recipe.execute()
-
-        self.assertEquals(self.recipe.get_status(), {task_name : SkipStatus()})
-
-        status = self.recipe.get_status()[task_name]
-        self.assertEquals(status.stdout, message + "\n")
-        self.assertEquals(status.skip_message, skip_message)
-
-    def testSkipDependFail(self):
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-                self.output(self.name, "value")
-
-        class FailTask(Task):
-            def __call__(self, fail):
-                self.executed = True
-                raise Exception
-
-        task1 = SimpleTask("task1")
-        task2 = SimpleTask("task2", {"fail" : "fail"})
-        fail_task = FailTask("fail_task", {"fail" : "task1"})
-
-        self.recipe.store(task1)
-        self.recipe.store(task2)
-        self.recipe.store(fail_task)
-        self.recipe.execute()
-
-        status = self.recipe.get_status()
-
-        self.assertEquals(status, {"task1" : NormalStatus(),
-                                   "task2" : DependencySkipStatus(),
-                                   "fail_task" : FailStatus()})
-
-        self.assertTrue(task1.executed)
-        self.assertTrue(fail_task.executed)
-        self.assertFalse(task2.executed)
-
-    def testSkipDependSkip(self):
-        '''
-        task1 -> fail -> task2
-        '''
-
-        class SimpleTask(Task):
-            def initialize(self):
-                self.executed = False
-
-            def __call__(self):
-                self.executed = True
-                self.output(self.name, "value")
-
-        class SkipTask(Task):
-            def __call__(self, skip):
-                self.executed = True
-                raise SkipException
-
-        task1 = SimpleTask("task1")
-        task2 = SimpleTask("task2", {"skip" : "skip"})
-        skip_task = SkipTask("skip_task", {"skip" : "task1"})
-
-        self.recipe.store(task1)
-        self.recipe.store(task2)
-        self.recipe.store(skip_task)
-        self.recipe.execute()
-
-        status = self.recipe.get_status()
-
-        self.assertEquals(status, {"task1" : NormalStatus(),
-                                   "task2" : DependencySkipStatus(),
-                                   "skip_task" : SkipStatus()})
-
-        self.assertTrue(task1.executed)
-        self.assertTrue(skip_task.executed)
-        self.assertFalse(task2.executed)
-
-    def testBaseInput(self):
-
-        class SumTask(Task):
-
-            def __init__(self):
-                super(SumTask, self).__init__("sum",
-                                              {"n1" : "number.operande1",
-                                               "n2" : "number.operande2"})
-
-            def __call__(self, n1, n2):
-                n3 = int(n1) + int(n2)
-                self.output("number.sum", n3)
-
-        sum = SumTask()
-
-        self.recipe.store(sum)
-        self.recipe.execute({"number.operande1" : [1], "number.operande2" : [2]})
-
-        self.assertEqual(self.recipe.get_status()['sum'], NormalStatus())
-        output = self.recipe.get_messenger().get("number.sum", True)
-        self.assertEqual(output.get_value(), 3)
-
-    def testMultipleItemsBaseInput(self):
-
-        class SimpleTask(Task):
-
-            def __init__(self):
-                super(SimpleTask, self).__init__("simple",
-                                              {"input" : "item"})
-                self.input = None
-
-            def __call__(self, input):
-                self.input = input
-
-        simple = SimpleTask()
-
-        self.recipe.store(simple)
-        self.recipe.execute({"item" : ["value1", "value2"]})
-        self.assertEquals(simple.input, None)
-        status = self.recipe.get_status()
-        self.assertEquals(status["simple"], TooMuchItemStatus())
-
-class RecipeTagTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.recipe = Recipe()
-
-    def tearDown(self):
-        self.recipe = None
-
-    def testOneOutputOneTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output", "value")
-
-        simple = SimpleTask("simple")
-
-        tag = "tag"
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, tag)
-
-        self.recipe.execute()
-        output = self.recipe.get_messenger().get("output", True)
-        self.assertTrue(output.has_tag(tag))
-
-    def testOneOutputMultipleTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output", "value")
-
-        simple = SimpleTask("simple")
-
-        tags = ["tag1", "tag2"]
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, *tags)
-
-        self.recipe.execute()
-        output = self.recipe.get_messenger().get("output", True)
-        for tag in tags:
-            self.assertTrue(output.has_tag(tag))
-
-    def testMultipleOutputTag(self):
-
-        class SimpleTask(Task):
-
-            def __call__(self):
-                self.output("output1", "value")
-                self.output("output2", "value")
-
-        simple = SimpleTask("simple")
-
-        tag = "tag"
-
-        self.recipe.store(simple)
-        self.recipe.add_tag(simple, tag)
-
-        self.recipe.execute()
-
-        for output in ["output1", "output2"]:
-            output_object = self.recipe.get_messenger().get(output, True)
-            self.assertTrue(output_object.has_tag(tag))
-
-    def testSimpleFilterTag(self):
-
-        class SimpleFiltering(Task):
-
-            def __init__(self, type):
-                super(SimpleFiltering, self).__init__("filter",
-                                                       {"input" : type,
-                                                        "prod1" : "prod1",
-                                                        "prod2" : "prod2"})
-                self.input = None
-
-            def __call__(self, input, prod1, prod2):
-                self.input = input
-
-        class SimpleProducer(Task):
-
-            def __init__(self, name, type, value):
-                super(SimpleProducer, self).__init__(name)
-                self.name = name
-                self.type = type
-                self.value = value
-
-            def __call__(self):
-                self.output(self.type, self.value)
-                self.output(self.name, self.name)
-
-        type = "output"
-
-        value1 = "value1"
-        value2 = "value2"
-
-        tag1 = "tag1"
-        tag2 = "tag2"
-
-        fil = SimpleFiltering(type)
-
-        prod1 = SimpleProducer("prod1", type, value1)
-        prod2 = SimpleProducer("prod2", type, value2)
-
-        self.recipe.store(prod1)
-        self.recipe.add_tag(prod1, tag1)
-        self.recipe.store(prod2)
-        self.recipe.add_tag(prod2, tag2)
-
-        self.recipe.store(fil)
-        self.recipe.filter(fil, "input", tag1)
-
-        self.recipe.execute()
-
-        self.assertEquals(fil.input, value1)
-        self.assertEquals(self.recipe.get_status()["filter"], NormalStatus())
-
-    def testBaseInputsTag(self):
-
-        class SimpleFiltering(Task):
-
-            def __init__(self, type):
-                super(SimpleFiltering, self).__init__("filter",
-                                                      {"input" : type})
-                self.input = None
-
-            def __call__(self, input):
-                self.input = input
-
-        type = "item"
-
-        value1 = "value1"
-        value2 = "value2"
-
-        tag1 = "tag1"
-        tag2 = "tag2"
-
-        fil = SimpleFiltering(type)
-
-        self.recipe.store(fil)
-        self.recipe.filter(fil, "input", tag1)
-
-        base_input1 = Output(value1)
-        base_input1.add_tag(tag1)
-
-        base_input2 = Output(value2)
-        base_input2.add_tag(tag2)
-
-        self.recipe.execute({type : [base_input1, base_input2]})
-
-        self.assertEquals(fil.input, value1)
-        self.assertEquals(self.recipe.get_status()["filter"], NormalStatus())
-
-if __name__ == '__main__':
-    unittest.main()

tests/TestScheduler.py

+from __future__ import print_function
+
+import os.path
+import sys
+sys.path.insert(0, os.path.realpath('../'))
+
+import unittest
+from pytasks import Scheduler
+from pytasks import Task
+from pytasks.Exceptions import (ArgumentException,
+                                DependenceNameAlreadyDeclared, SkipException)
+from pytasks.Status import (NormalStatus, SkipStatus, DependencySkipStatus,
+                            FailStatus, TooMuchItemStatus)
+from pytasks.Item import Item
+from pytasks.Store import Store
+
+class SchedulerStoreTestCase(unittest.TestCase):
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testSimpleStorage(self):
+        task_name = 'name'
+        mock = Task(task_name)
+        self.scheduler.store(mock)
+        self.assertEqual({task_name: mock}, self.scheduler.get_tasks())
+
+    def testSameNameStorage(self):
+        task_name = 'name'
+        t1 = Task(task_name)
+        t2 = Task(task_name)
+        self.scheduler.store(t1)
+        self.assertRaises(DependenceNameAlreadyDeclared,
+                          lambda : self.scheduler.store(t2))
+        self.assertEqual({task_name: t1}, self.scheduler.get_tasks())
+
+
+    def testFailNoCompatibleObject(self):
+        self.assertRaises(ArgumentException, lambda:
+                          self.scheduler.store('string'))
+
+class SchedulerExecutionTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testSimpleExecution(self):
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+
+        task = SimpleTask("sample_task")
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertTrue(task.executed)
+
+    def testSimpleCommunication(self):
+
+        class SimpleSender(Task):
+            def __init__(self, name, message, dependencies = None):
+                super(SimpleSender, self).__init__(name, dependencies)
+                self.message = message
+
+            def __call__(self):
+                self.output("message", self.message)
+
+        class SimpleReceiver(Task):
+            def initialize(self):
+                self.message = ''
+
+            def __call__(self, message):
+                self.message = message
+
+        message = 'testMessage'
+        sender = SimpleSender("simple_sender", message)
+        receiver = SimpleReceiver("simple_receiver", {"message" : "message"})
+
+        self.scheduler.store(sender)
+        self.scheduler.store(receiver)
+        self.scheduler.execute()
+
+        self.assertEqual(receiver.message, message)
+
+
+    def testSimpleStatus(self):
+
+        task_name = "simple"
+        message = "sample"
+
+        class SimpleTask(Task):
+            def __call__(self):
+                print(message)
+
+        task = SimpleTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : NormalStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stdout, message + "\n")
+
+    def testFailStatus(self):
+
+        task_name = "fail"
+        message = "fail"
+        exception = Exception("exception message")
+
+        class FailTask(Task):
+            def __call__(self):
+                print(message, file = sys.stderr)
+                raise exception
+
+        task = FailTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : FailStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stderr, message + "\n")
+        self.assertEquals(status.exception, exception)
+
+    def testSkipStatus(self):
+
+        task_name = "skip"
+        message = "skip"
+        skip_message = "skipMessage"
+
+        class SkipTask(Task):
+            def __call__(self):
+                print("skip")
+                raise SkipException(skip_message)
+
+        task = SkipTask(task_name)
+
+        self.scheduler.store(task)
+        self.scheduler.execute()
+
+        self.assertEquals(self.scheduler.get_status(), {task_name : SkipStatus()})
+
+        status = self.scheduler.get_status()[task_name]
+        self.assertEquals(status.stdout, message + "\n")
+        self.assertEquals(status.skip_message, skip_message)
+
+    def testSkipDependFail(self):
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+                self.output(self.name, "value")
+
+        class FailTask(Task):
+            def __call__(self, fail):
+                self.executed = True
+                raise Exception
+
+        task1 = SimpleTask("task1")
+        task2 = SimpleTask("task2", {"fail" : "fail"})
+        fail_task = FailTask("fail_task", {"fail" : "task1"})
+
+        self.scheduler.store(task1)
+        self.scheduler.store(task2)
+        self.scheduler.store(fail_task)
+        self.scheduler.execute()
+
+        status = self.scheduler.get_status()
+
+        self.assertEquals(status, {"task1" : NormalStatus(),
+                                   "task2" : DependencySkipStatus(),
+                                   "fail_task" : FailStatus()})
+
+        self.assertTrue(task1.executed)
+        self.assertTrue(fail_task.executed)
+        self.assertFalse(task2.executed)
+
+    def testSkipDependSkip(self):
+        '''
+        task1 -> fail -> task2
+        '''
+
+        class SimpleTask(Task):
+            def initialize(self):
+                self.executed = False
+
+            def __call__(self):
+                self.executed = True
+                self.output(self.name, "value")
+
+        class SkipTask(Task):
+            def __call__(self, skip):
+                self.executed = True
+                raise SkipException
+
+        task1 = SimpleTask("task1")
+        task2 = SimpleTask("task2", {"skip" : "skip"})
+        skip_task = SkipTask("skip_task", {"skip" : "task1"})
+
+        self.scheduler.store(task1)
+        self.scheduler.store(task2)
+        self.scheduler.store(skip_task)
+        self.scheduler.execute()
+
+        status = self.scheduler.get_status()
+
+        self.assertEquals(status, {"task1" : NormalStatus(),
+                                   "task2" : DependencySkipStatus(),
+                                   "skip_task" : SkipStatus()})
+
+        self.assertTrue(task1.executed)
+        self.assertTrue(skip_task.executed)
+        self.assertFalse(task2.executed)
+
+    def testBaseInput(self):
+
+        class SumTask(Task):
+
+            def __init__(self):
+                super(SumTask, self).__init__("sum",
+                                              {"n1" : "number.operande1",
+                                               "n2" : "number.operande2"})
+
+            def __call__(self, n1, n2):
+                n3 = int(n1) + int(n2)
+                self.output("number.sum", n3)
+
+        sum = SumTask()
+
+        self.scheduler.store(sum)
+        self.scheduler.execute({"number.operande1" : [1], "number.operande2" : [2]})
+
+        self.assertEqual(self.scheduler.get_status()['sum'], NormalStatus())
+        output = self.scheduler.get_messenger().get("number.sum", True)
+        self.assertEqual(output.get_value(), 3)
+
+    def testMultipleItemsBaseInput(self):
+
+        class SimpleTask(Task):
+
+            def __init__(self):
+                super(SimpleTask, self).__init__("simple",
+                                              {"input" : "item"})
+                self.input = None
+
+            def __call__(self, input):
+                self.input = input
+
+        simple = SimpleTask()
+
+        self.scheduler.store(simple)
+        self.scheduler.execute({"item" : ["value1", "value2"]})
+        self.assertEquals(simple.input, None)
+        status = self.scheduler.get_status()
+        self.assertEquals(status["simple"], TooMuchItemStatus())
+
+class SchedulerTagTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.scheduler = Scheduler()
+
+    def tearDown(self):
+        self.scheduler = None
+
+    def testOneItemOneTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output", "value")
+
+        simple = SimpleTask("simple")
+
+        tag = "tag"
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, tag)
+
+        self.scheduler.execute()
+        output = self.scheduler.get_messenger().get("output", True)
+        self.assertTrue(output.has_tag(tag))
+
+    def testOneItemMultipleTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output", "value")
+
+        simple = SimpleTask("simple")
+
+        tags = ["tag1", "tag2"]
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, *tags)
+
+        self.scheduler.execute()
+        output = self.scheduler.get_messenger().get("output", True)
+        for tag in tags:
+            self.assertTrue(output.has_tag(tag))
+
+    def testMultipleItemTag(self):
+
+        class SimpleTask(Task):
+
+            def __call__(self):
+                self.output("output1", "value")
+                self.output("output2", "value")
+
+        simple = SimpleTask("simple")
+
+        tag = "tag"
+
+        self.scheduler.store(simple)
+        self.scheduler.add_tag(simple, tag)
+
+        self.scheduler.execute()
+
+        for output in ["output1", "output2"]:
+            output_object = self.scheduler.get_messenger().get(output, True)
+            self.assertTrue(output_object.has_tag(tag))
+
+    def testSimpleFilterTag(self):
+
+        class SimpleFiltering(Task):
+
+            def __init__(self, type):
+                super(SimpleFiltering, self).__init__("filter",
+                                                       {"input" : type,
+                                                        "prod1" : "prod1",
+                                                        "prod2" : "prod2"})
+                self.input = None
+
+            def __call__(self, input, prod1, prod2):
+                self.input = input
+
+        class SimpleProducer(Task):
+
+            def __init__(self, name, type, value):
+                super(SimpleProducer, self).__init__(name)
+                self.name = name
+                self.type = type
+                self.value = value
+
+            def __call__(self):
+                self.output(self.type, self.value)
+                self.output(self.name, self.name)
+
+        type = "output"
+
+        value1 = "value1"
+        value2 = "value2"
+
+        tag1 = "tag1"
+        tag2 = "tag2"
+
+        fil = SimpleFiltering(type)
+
+        prod1 = SimpleProducer("prod1", type, value1)
+        prod2 = SimpleProducer("prod2", type, value2)
+
+        self.scheduler.store(prod1)
+        self.scheduler.add_tag(prod1, tag1)
+        self.scheduler.store(prod2)
+        self.scheduler.add_tag(prod2, tag2)
+
+        self.scheduler.store(fil)
+        self.scheduler.filter(fil, "input", tag1)
+
+        self.scheduler.execute()
+
+        self.assertEquals(fil.input, value1)
+        self.assertEquals(self.scheduler.get_status()["filter"], NormalStatus())
+
+    def testBaseInputsTag(self):
+
+        class SimpleFiltering(Task):
+
+            def __init__(self, type):
+                super(SimpleFiltering, self).__init__("filter",
+                                                      {"input" : type})
+                self.input = None
+
+            def __call__(self, input):
+                self.input = input
+
+        type = "item"
+
+        value1 = "value1"
+        value2 = "value2"
+
+        tag1 = "tag1"
+        tag2 = "tag2"
+
+        fil = SimpleFiltering(type)
+
+        self.scheduler.store(fil)
+        self.scheduler.filter(fil, "input", tag1)
+
+        base_input1 = Item(value1)
+        base_input1.add_tag(tag1)
+
+        base_input2 = Item(value2)
+        base_input2.add_tag(tag2)
+
+        self.scheduler.execute({type : [base_input1, base_input2]})
+
+        self.assertEquals(fil.input, value1)
+        self.assertEquals(self.scheduler.get_status()["filter"], NormalStatus())
+
+if __name__ == '__main__':
+    unittest.main()

tests/TestStore.py

+import sys
+import os.path
+sys.path.insert(0, os.path.realpath('../'))
+
+from pytasks.Store import (Store, _split_sub_type)
+from pytasks.Item import Item
+import unittest
+
+class StoreTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.store = Store()
+
+    def tearDown(self):
+        self.store = None
+
+    def testStore(self):
+        value = 'value'
+        key = 'simple'
+        self.store.store(key, value)
+        self.assertEquals(self.store.get(key), set([value]))
+
+    def testNoModification(self):
+        value = 'value'
+        value2 = 'value2'
+        key = 'nomodification'
+
+        self.store.store(key, value)
+        self.store.store(key, value2)
+        self.assertEqual(self.store.get(key), set([value, value2]))
+
+    def testStoreSubType(self):
+        value = 'subtype_value'
+        key = 'type.subtype.subsubtype'
+
+        key1 = 'type'
+        key2 = 'type.subtype'
+
+        self.store.store(key, value)
+
+        self.assertEqual(self.store.get(key1), set([value]))
+        self.assertEqual(self.store.get(key2), set([value]))
+        self.assertEqual(self.store.get(key), set([value]))
+
+    def testSplitSubType(self):
+        subtype = 'file.archive'
+        subtype2 = 'file.archive.sdist.python'
+
+        self.assertEquals(list(_split_sub_type(subtype)),
+                          ['file', 'file.archive'])
+        self.assertEquals(list(_split_sub_type(subtype2)),
+                          ['file', 'file.archive', 'file.archive.sdist',
+                           'file.archive.sdist.python'])
+
+    def testNumberOfValues(self):
+
+        value = "value"
+
+        not_a_key = "non"
+        one_value = "one"
+        mutliple_value = "multiple"
+
+        tag1 = "tag1"
+        tag2 = "tag2"
+
+        output_tag1 = Item(value)
+        output_tag1.add_tag(tag1)
+
+        output_tags = Item(value)
+        output_tags.add_tag(tag1)
+        output_tags.add_tag(tag2)
+
+        #Add some sample values
+        self.store.store(one_value, Item(value))
+        self.store.store(mutliple_value, Item(value))
+        self.store.store(mutliple_value, output_tag1)
+        self.store.store(mutliple_value, output_tags)
+
+        self.assertEquals(self.store.number_of_values(not_a_key), 0)
+        self.assertEquals(self.store.number_of_values(one_value), 1)
+        self.assertEquals(self.store.number_of_values(mutliple_value), 3)
+        self.assertEquals(self.store.number_of_values(mutliple_value, [tag1]), 2)
+        self.assertEquals(self.store.number_of_values(mutliple_value, [tag1, tag2]), 1)
+
+if __name__ == '__main':
+    unittest.main()