Commits

Ronny Pfannschmidt  committed bc47da1

fix bugs in the task queue dependency tracking

  • Participants
  • Parent commits e43e4f1

Comments (0)

Files changed (3)

File pu/task_queue.py

         self.depends = {}
         self.completed = set()
         self.runnable = set()
+        self.taken = set()
+        self.running = set()
 
     def find_and_add_new_runnable(self):
         found = set()
+
         for k, v in self.depends.items():
+            if k in self.completed:
+                continue
+            if k in self.running:
+                continue
             if hasattr(k, '__iter__'):
-                new_items = list(k)
-                for item in new_items:
-                    self.add(new, parent, l)
+                new_item = next(k, None)
+                if new_item is not None:
+                    self.add(new_item, parent=k)
+                    found.add(new_item)
             if not v-self.completed:
                 self.runnable.add(k)
+        return bool(found)
+
+
 
     def report_failure(self, t):
-        self.completed.add(t)
+        self.completed.add(t) #XXX: evil
+        self.running.remove(t)
 
     def report_sucess(self, t):
         self.completed.add(t)
+        self.running.remove(t)
 
     def next(self):
         if not self.runnable:
-            self.find_and_add_new_runnable() #XXX: expensive
-            if not set(self.depends)-self.completed:
-                raise StopIteration
+            f = self.find_and_add_new_runnable() #XXX: expensive
+            while f and not self.runnable:
+                f = self.find_and_add_new_runnable() #XXX: expensive
+            
+            print 'dep', set(self.depends)
+            print 'don', self.completed
+            print 'run', self.runnable
+        
+        if not self.runnable:
+            raise StopIteration
 
-
-        return self.runnable.pop()
-
+        result = self.runnable.pop()
+        self.running.add(result)
+        return result
 
     def __iter__(self):
         return self
     def __len__(self):
         return len(self.depends) - len(self.completed)
 
-    def add(self, task, requires=None):
+    def add(self, task, parent=None):
         if task not in self.depends:
             self.depends[task] = set()
-        if requires:
-            self.depends[task].add(requires)
+        if parent:
+            self.depends[parent].add(task)
 
 
     def run_all(self):
             try:
                 item()
                 self.report_sucess(item)
-            except:
+                print 'all:', self.depends.keys()
+                print 'completed', self.completed
+            except RuntimeError:
                 #reraise if report_failure tells us to do so
                 if self.report_failure(item):
                     raise

File tests/tasks/test_build.py

     queue.add(copy)
 
     compile = CompileByteCode(build_lib=build_lib)
-    queue.add(compile, requires=copy)
+    queue.add(compile)
+    queue.add(copy, parent=compile)
 
     queue.run_all()
     assert build_lib.join('testpkg/__init__.py').check()

File tests/test_task_queue.py

+import py
 from pu.task_queue import Queue
 from pu.tasks.install import LinkPTH
 
 class SimpleTask(object):
     def __init__(self, number):
         self.number = number
+        self.ndeps = number
+        self.called = False
 
     def __hash__(self):
         return hash(self.number)
 
+    def __repr__(self):
+        return '<ST %d>' % self.number
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self.ndeps:
+            self.ndeps -= 1
+            return SimpleTask(self.ndeps)
+        raise StopIteration
+
     def __eq__(self, other):
         return type(self) == type(other) and self.number == other.number
 
     def __call__(self):
+        assert not self.called
+        self.called = True
         print self.number
 
 
 
 def test_simple():
     queue = Queue()
-    queue.add(SimpleTask(1))
+    queue.add(SimpleTask(1)) # will create SimpleTask(0) as dependency
     first_task = next(queue)
-    assert first_task.number == 1
+    assert first_task.number == 0
+
+    py.test.raises(StopIteration, next, queue)
+
     assert isinstance(first_task, SimpleTask)
 
+    queue.report_sucess(first_task)
+    next_task = next(queue)
+    assert next_task.number == 1
+    queue.report_sucess(next_task)
+    py.test.raises(StopIteration, next, queue)
+
 def test_adding_the_same_twice_has_no_effect():
     queue = Queue()
-    queue.add(SimpleTask(1))
-    queue.add(SimpleTask(1))
+    queue.add(SimpleTask(0))
+    queue.add(SimpleTask(0))
     assert len(queue) == 1
 
 
     assert site.join(task.pth_name).check()
 
 
+def test_dependencies():
+    queue = Queue()
+    queue.add(SimpleTask(3))
+    queue.run_all()
 
+