Source

vinstall / vinstall / core / command.py

Full commit
#-* encoding: utf8 -*-


"""Command Pattern"""


from threading import Thread, Lock
from Queue import Queue
from log import get_logger
import unittest


LOG = get_logger(__name__)


class Command(object):

    def __init__(self, func, args=None, description=None):
        """Store the callable and its arguments.

        Example:
            >>> inspect_command_class = Command(dir, Command)
            >>> inspect_command_class()
            ['__init__', '__call__', ...]

        """
        assert callable(func), "%r is not callable" % func
        self.func = func
        self.args = args or tuple()
        self.description = description or ""

    def __call__(self):
        """Call the command.

        """
        return apply(self.func, self.args)

    def __repr__(self):
        return "%s(<%s %s>)" % (self.__class__.__name__, self.func.__name__,
                repr(self.args))


class CommandExecutor(Thread):
    """Execute commands in another thread.
    Call hooks before and after executing commands, and before and after
    processing the queue

    """
    def __init__(self, delegate=None):
        "Initialize"
        super(CommandExecutor, self).__init__()
        self.executor = Executor()
        self.executor.delegate = self
        self.size = 0
        self.delegate = delegate
        self._lock = Lock()
        self.setDaemon(True)

    def started(self):
        "Execution started"
        if self.delegate is not None:
            with self._lock:
                self.delegate.on_execution_started(self)

    def ended(self):
        "Execution ended"
        if self.delegate is not None:
            with self._lock:
                self.delegate.on_execution_ended(self)

    def command_execution_started(self, command):
        "Command started hook"
        if self.delegate is not None:
            with self._lock:
                self.delegate.on_command_start(command)

    def command_execution_ended(self, command):
        "Command ended hook"
        if self.delegate is not None:
            with self._lock:
                self.delegate.on_command_end(command, self.size,
                        self.executor.items_done)

    def add_command(self, func, args=None, description=None):
        """Create a Command instance and add it to the stack.

        """
        command = Command(func, args, description)
        f = self.executor.submit(command)
        return f

    def execute_all(self, commands):
        self.size = len(commands)
        for command in commands:
            self.add_command(*command)

    def run(self):
        self.started()
        self.executor.start()
        self.executor.join()
        self.ended()


class Executor(Thread):

    def __init__(self, q=None):
        super(Executor, self).__init__()
        if q is None:
            q = Queue()
        self.q = q
        self.items_done = 0
        self.delegate = None
        self.setDaemon(True)

    def submit(self, item):
        self.q.put(item)

    def run(self):
        while not self.q.empty():
            item = self.q.get()
            if self.delegate is not None:
                self.delegate.command_execution_started(item)
            item()
            if self.delegate is not None:
                self.delegate.command_execution_ended(item)
            self.items_done += 1


class CommandTestCase(unittest.TestCase):

    def testCommand(self):

        command = Command(lambda x, y: x + y, (1, 2))
        self.assertEquals(command(), 3)


class CommandExecutorTestCase(unittest.TestCase):

    def testAddCommand(self):
        """Create a Command instance and add it to the stack.

        """
        queue = Queue()
        ce = CommandExecutor()
        func = lambda x: x + 1
        ce.add_command(func, (1,))
        ce.start()
        ce.join()

    def testDelegate(self):

        whitness = []

        class Delegate:
            def on_execution_started(self, ce):
                whitness.append(1)

            def on_execution_ended(self, ce):
                whitness.append(2)

            def on_command_start(self, command):
                whitness.append(3)

            def on_command_end(self, command, total, done):
                whitness.append(4)

        ce = CommandExecutor()
        ce.delegate = Delegate()
        commands = [ (lambda x: x+1, (1,), dict()) for _ in xrange(2) ]
        ce.execute_all(commands)
        ce.start()
        ce.join()
        self.assertEquals(sorted(whitness), [1, 2, 3, 3, 4, 4])


if __name__ == "__main__":
    unittest.main()