1. Moises Henriquez
  2. vinstall

Source

vinstall / vinstall / core / command.py

#-* encoding: utf8 -*-


"""Command Pattern"""


from observer import Observable
from threading import Thread, RLock
from Queue import Queue
import unittest
import time
from log import get_logger


LOG = get_logger(__name__)


class Command(object):

    def __init__(self, func, args=None, kwargs=None, description=None):
        """Store the callable and its arguments.

        Example:
            >>> inspect_command_class = Command(dir, Command)
            >>> inspect_command_class()
            ['__init__', '__call__', ...]

        """
        assert callable(func), "%r is not callable" % func
        self.func = func
        self.args = args or tuple()
        self.kwargs = kwargs or {}
        self.description = description or ""

    def __call__(self):
        """Call the command.

        """
        return apply(self.func, self.args, self.kwargs)

    def __repr__(self):
        return "%s(<%s %s %s>)" % (self.__class__.__name__, self.func.__name__,
                repr(self.args), repr(self.kwargs))


class ProcessingFacade(object):

    def __init__(self, input_queue, observer=None):
        """Facade initialization:

            - Create an instance of a BatchProcessor, which is responsible for
              command execution and notifying suscribers.

        """
        self.input_queue = input_queue
        self.observer = observer
        self.processing = BatchProcessor(input_queue)

    def add_command(self, func, args=None, kwargs=None, description=None):
        """Create a Command instance and add it to the stack.

        """
        command = Command(func, args, kwargs, desc)
        self.input_queue.put(command)

    def execute_all(self):
        """Execute all the commands and notify subscribers.

        """
        if self.observer:
            self.processing.connect("process-item-start",
                    self.observer.on_item_start)
            self.processing.connect("process-item-end",
                    self.observer.on_item_end)
            self.processing.connect("job-start",
                    self.observer.on_job_start)
            self.processing.connect("job-end",
                    self.observer.on_job_end)
        self.processing.start()
        #self.input_queue.join()


class BatchProcessor(Observable, Thread):
    """Process items in a thread and notify observers.
    Subscribers can connect to these signals:

        - "job-start": The processing of the input queue started
        - "job-end": All the items in the queue have been processed
        - "item-process-start": Emited before a single item is processed
        - "item-process-end": Emited after a single item processing finished

    """

    signals = ("process-item-start", "process-item-end", "job-start", "job-end")

    def __init__(self, input_queue, output_queue=None):
        """Attach queues to the object.

        """
        super(BatchProcessor, self).__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.total_items = None
        self.daemon = True

    def process_item(self, item):
        """Process a single item.

        """
        LOG.debug("Processing item %s" % item)
        self.emit("process-item-start", item)
        result = item()
        if self.output_queue:
            self.output_queue.put(result)
        self.emit("process-item-end", (item, result, self.items_processed(),
                                       self.total_items))

    def items_processed(self):
        """The aproximated size of the already finished items

        """
        return self.total_items - self.input_queue.qsize()

    def run(self):
        """Overrides Thread.run

        """
        self.total_items = self.input_queue.qsize()
        LOG.debug("Start processing %s items" % self.total_items)
        self.emit("job-start", self.input_queue)
        while True:
            item = self.input_queue.get()
            self.process_item(item)
            self.input_queue.task_done()
            if self.input_queue.empty() and not self.input_queue.unfinished_tasks:
                self.emit("job-end", self.input_queue)


class CommandTestCase(unittest.TestCase):

    def testCommand(self):

        command = Command(lambda x, y: x + y, (1, 2))
        self.assertEquals(command(), 3)


class ProcessingFacadeTestCase(unittest.TestCase):

    def testAddCommand(self):
        """Create a Command instance and add it to the stack.

        """
        queue = Queue()
        f = ProcessingFacade(queue)
        func = lambda x: x + 1
        f.add_command(func, (1,))

    def testObserver(self):

        whitness = []

        class ProcessingObserver(object):
            def on_job_start(bp, args):
                whitness.append(1)

            def on_job_end(bp, args):
                whitness.append(2)

            def on_item_start(item, args):
                whitness.append(3)

            def on_item_end(item, args):
                whitness.append(4)

        queue = Queue()
        facade = ProcessingFacade(queue)
        facade.observer = ProcessingObserver()

        for i in range(2):
            queue.put(Command(lambda x: x+1, (i,)))

        facade.execute_all()
        queue.join()
        time.sleep(1)
        self.assertEquals(sorted(whitness), [1, 2, 3, 3, 4, 4])


    def testExecuteAll(self):

        results = []

        def f1(x): results.append(x)
        def f2(x): results.append(x**2)

        queue = Queue()
        facade = ProcessingFacade(queue)
        facade.add_command(f1, (2,))
        facade.add_command(f2, (2,))
        facade.execute_all()
        time.sleep(1)
        self.assertEquals(results, [2, 4])


class BatchProcessorTestCase(unittest.TestCase):

    def testRun(self):

        whitness = []

        def on_job_start(bp):
            whitness.append(1)

        def on_job_stop(bp):
            whitness.append(2)

        def on_item_start(item):
            whitness.append(3)

        def on_item_stop(item):
            whitness.append(4)

        queue = Queue()
        bp = BatchProcessor(queue)

        bp.connect("job-start", on_job_start)
        bp.connect("job-end", on_job_stop)
        bp.connect("process-item-start", on_item_start)
        bp.connect("process-item-end", on_item_stop)


        for i in range(2):
            queue.put(Command(lambda x: x+1, (i,)))

        bp.start()
        queue.join()

        time.sleep(1)
        self.assertEquals(sorted(whitness), [1, 2, 3, 3, 4, 4])


if __name__ == "__main__":
    unittest.main()