Anonymous avatar Anonymous committed ab066b0

create the Job class

Comments (0)

Files changed (4)

+# This script makes it possible to run a test without building first
+export PYTHONPATH=`pwd`/src
+python $1
 scons/Builder.py
 scons/Defaults.py
 scons/Environment.py
+scons/Job.py
 scons/Node/__init__.py
 scons/Node/FS.py
 scons/Sig/__init__.py
+"""scons.Job
+
+This module defines the Serial and Parallel classes that execute tasks to
+complete a build.
+
+"""
+
+__revision__ = "Job.py __REVISION__ __DATE__ __DEVELOPER__"
+
+class Serial:
+    """This class is used to execute tasks in series, and is more efficient
+    than Parallel, but is only appropriate for non-parallel builds. Only
+    one instance of this class should be in existence at a time.
+
+    This class is not thread safe.
+    """
+
+    def __init__(self, taskmaster):
+        """Create a new serial job given a taskmaster. 
+
+        The taskmaster's next_task() method should return the next task that
+        needs to be executed, or None if there are no more tasks. The
+        taskmaster's executed() method will be called for each task when it is
+        finished being executed. The taskmaster's is_blocked() method will not
+        be called.
+        """
+        
+        self.taskmaster = taskmaster
+
+    def start(self):
+        
+        """Start the job. This will begin pulling tasks from the taskmaster
+        and executing them, and return when there are no more tasks.  """
+        
+        while 1:
+            task = self.taskmaster.next_task()
+
+            if task is None:
+                break
+ 
+            task.execute()
+            self.taskmaster.executed(task)
+
+    def stop(self):
+        """Serial jobs are always finished when start() returns, so there
+        is nothing to do here"""
+        
+        pass
+
+    def wait(self):
+        """Serial jobs are always finished when start() returns, so there
+        is nothing to do here"""
+        pass
+
+
+# The will hold a condition variable once the first parallel task
+# is created.
+cv = None
+
+class Parallel:
+    """This class is used to execute tasks in parallel, and is less
+    efficient than Serial, but is appropriate for parallel builds. Create
+    an instance of this class for each job or thread you want.
+
+    This class is thread safe.
+    """
+
+
+    def __init__(self, taskmaster):
+
+        """Create a new parallel job given a taskmaster. Multiple jobs will
+        be using the taskmaster in parallel, but all method calls to taskmaster
+        methods are serialized by the jobs themselves.
+
+        The taskmaster's next_task() method should return the next task
+        that needs to be executed, or None if there are no more tasks. The
+        taskmaster's executed() method will be called for each task when it
+        is finished being executed. The taskmaster's is_blocked() method
+        should return true iff there are more tasks, but they can't be
+        executed until one or more other tasks have been
+        executed. next_task() will be called iff is_blocked() returned
+        false.
+
+        Note: calls to taskmaster are serialized, but calls to execute() on
+        distinct tasks are not serialized, because that is the whole point
+        of parallel jobs: they can execute multiple tasks
+        simultaneously. """
+
+        global cv
+        
+        # import threading here so that everything in the Job module
+        # but the Parallel class will work if the interpreter doesn't
+        # support threads
+        import threading
+        
+        self.taskmaster = taskmaster
+        self.thread = threading.Thread(None, self.__run)
+        self.stop_running = 0
+
+        if cv is None:
+            cv = threading.Condition()
+
+    def start(self):
+        """Start the job. This will spawn a thread that will begin pulling
+        tasks from the task master and executing them. This method returns
+        immediately and doesn't wait for the jobs to be executed.
+
+        To stop the job, call stop().
+        To wait for the job to finish, call wait().
+        """
+        self.thread.start()
+
+    def stop(self):
+        """Stop the job. This will cause the job to finish after the
+        currently executing task is done. A job that has been stopped can
+        not be restarted.
+
+        To wait for the job to finish, call wait().
+        """
+        self.stop_running = 1
+
+    def wait(self):
+        """Wait for the job to finish. A job is finished when either there
+        are no more tasks or the job has been stopped and it is no longer
+        executing a task.
+
+        This method should only be called after start() has been called.
+
+        To stop the job, call stop().
+        """
+        self.thread.join()
+
+    def __run(self):
+        """private method that actually executes the tasks"""
+
+        cv.acquire()
+
+        try:
+
+            while 1:
+                while self.taskmaster.is_blocked():
+                    cv.wait(None)
+
+                task = self.taskmaster.next_task()
+
+                if task == None or self.stop_running:
+                    break
+
+                cv.release()
+                task.execute()
+                cv.acquire()
+
+                self.taskmaster.executed(task)
+
+                if not self.taskmaster.is_blocked():
+                    cv.notifyAll()
+
+        finally:
+            cv.release()
+
+
+
+

src/scons/JobTests.py

+__revision__ = "JobTests.py __REVISION__ __DATE__ __DEVELOPER__"
+
+import unittest
+import random
+import math
+import scons.Job
+import sys
+
+# a large number
+num_sines = 10000
+
+# how many parallel jobs to perform for the test
+num_jobs = 11
+
+# how many tasks to perform for the test
+num_tasks = num_jobs*5
+
+class DummyLock:
+    "fake lock class to use if threads are not supported"
+    def acquire(self):
+        pass
+
+    def release(self):
+        pass
+
+class NoThreadsException:
+    "raised by the ParallelTestCase if threads are not supported"
+
+    def __str__(self):
+        return "the interpreter doesn't support threads"
+
+class Task:
+    """A dummy task class for testing purposes."""
+
+    def __init__(self, i, taskmaster):
+        self.i = i
+        self.taskmaster = taskmaster
+        self.was_executed = 0
+        
+    def execute(self):
+        self.taskmaster.guard.acquire()
+        self.taskmaster.begin_list.append(self.i)
+        self.taskmaster.guard.release()
+
+        # do something that will take some random amount of time:
+        for i in range(random.randrange(0, num_sines, 1)):
+            x = math.sin(i)
+
+        self.was_executed = 1
+
+        self.taskmaster.guard.acquire()
+        self.taskmaster.end_list.append(self.i)
+        self.taskmaster.guard.release()
+
+class Taskmaster:
+    """A dummy taskmaster class for testing the job classes."""
+
+    def __init__(self, n, test_case):
+        """n is the number of dummy tasks to perform."""
+
+        self.test_case = test_case
+        self.num_tasks = n
+        self.num_iterated = 0
+        self.num_executed = 0
+        # 'guard' guards 'task_begin_list' and 'task_end_list'
+        try:
+            import threading
+            self.guard = threading.Lock()
+        except:
+            self.guard = DummyLock()
+
+        # keep track of the order tasks are begun in
+        self.begin_list = []
+
+        # keep track of the order tasks are completed in
+        self.end_list = []
+
+
+    def next_task(self):
+        if self.all_tasks_are_iterated():
+            return None
+        else:
+            self.num_iterated = self.num_iterated + 1
+            return Task(self.num_iterated, self)
+
+    def all_tasks_are_executed(self):
+        return self.num_executed == self.num_tasks
+
+    def all_tasks_are_iterated(self):
+        return self.num_iterated == self.num_tasks
+
+    def executed(self, task):
+        self.num_executed = self.num_executed + 1
+
+        self.test_case.failUnless(task.was_executed,
+                                  "the task wasn't really executed")
+        self.test_case.failUnless(task.__class__ is Task,
+                                  "the task wasn't really a Task instance")
+                                
+        
+    def is_blocked(self):
+        # simulate blocking tasks
+        return self.num_iterated - self.num_executed >= max(num_jobs/2, 2)
+
+    def tasks_where_serial(self):
+        "analyze the task order to see if they were serial"
+        serial = 1 # assume the tasks where serial
+        for i in range(num_tasks):
+            serial = serial and (self.begin_list[i]
+                                 == self.end_list[i]
+                                 == (i + 1))
+        return serial
+
+class ParallelTestCase(unittest.TestCase):
+    def runTest(self):
+        "test parallel jobs"
+        
+        try:
+            import threading
+        except:
+            raise NoThreadsException()
+
+        taskmaster = Taskmaster(num_tasks, self)
+        jobs = []
+        for i in range(num_jobs):
+            jobs.append(scons.Job.Parallel(taskmaster))
+
+        for job in jobs:
+            job.start()
+
+        for job in jobs:
+            job.wait()
+
+        self.failUnless(not taskmaster.tasks_where_serial(),
+                        "the tasks where not executed in parallel")
+        self.failUnless(taskmaster.all_tasks_are_executed(),
+                        "all the tests where not executed")
+        self.failUnless(taskmaster.all_tasks_are_iterated(),
+                        "all the tests where not iterated over")
+
+class SerialTestCase(unittest.TestCase):
+    def runTest(self):
+        "test a serial job"
+
+        taskmaster = Taskmaster(num_tasks, self)
+        job = scons.Job.Serial(taskmaster)
+        job.start()
+        self.failUnless(taskmaster.tasks_where_serial(),
+                        "the tasks where not executed in series")
+        self.failUnless(taskmaster.all_tasks_are_executed(),
+                        "all the tests where not executed")
+        self.failUnless(taskmaster.all_tasks_are_iterated(),
+                        "all the tests where not iterated over")
+
+def suite():
+    suite = unittest.TestSuite()
+    suite.addTest(ParallelTestCase())
+    suite.addTest(SerialTestCase())
+    return suite
+
+if __name__ == "__main__":
+    runner = unittest.TextTestRunner()
+    result = runner.run(suite())
+    if (len(result.failures) == 0
+        and len(result.errors) == 1
+        and type(result.errors[0][0]) == SerialTestCase
+        and type(result.errors[0][1][0]) == NoThreadsException):
+        sys.exit(2)
+    elif not result.wasSuccessful():
+        sys.exit(1)
+
+            
+
+        
+    
+    
+    
+    
+
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.