bubble-economy / dispatchers_basic.py

import multiprocessing
import time
from path import path
import settings
from warehouse import Box, digest
import warnings

def safe_call(fn, box, *args, **kwargs):
    try:
         return fn(box=box, *args, **kwargs)
    except Exception, e:
         #commute stats errors into warnings, because weird errors raised at run time poison concurrency
         warnings.warn(str(e))
         return box

class InProcessDispatcher(object):
    """We give dispatchers responsiblity for the warehouse results are kept in
    and also the actual process to run.
    
    The parent implementation does not actually multiprocess. Rather, we do
    calculations in *this* process, in *this* thread, and have proper step
    debugging and exception handling work OK."""
    
    def __init__(self,
            warehouse,
            task,
            identifier="",
            *args, **kwargs):
        self.identifier = identifier
        self.warehouse = warehouse
        self.task = task
        self.outbox = {} #dict mapping arg hash to result box
        self.inbox = {} #dict mapping arg hash to result box
        #This would be a logical place to resurrect dispatchers from previous
        #experiment runs in order to resume state.
        
    def batch_dispatch(self, param_iterable):
        """
        The main method. iterate over this to get a lot of stuff done at once.
        """
        for params in param_iterable:
            self._maybe_sow(**params)
        self._tend()
        for box in self._reap_all():
            yield box
    
    def _sow(self, box, **params):
        """Queue, or process, one job, providing a Box to stash results in."""
        box.store()
        box.clear_cache()
        self.task(box=box, **params)
        self.outbox[box._basename] = box
        
    def _maybe_sow(self, box=None, **params):
        """Queue, or process, one job, if it has not been done already"""
        #set the force_update flag to indicate that we should push ahead even
        # if the box already has content
        force_update = True
        if box is None:
            box = self.warehouse.Box(**params)
            force_update = False
        
        #Now, if this has successfully completed before, we 
        #are done. ("Slack memoization")
        if force_update or len(box)==0:
            #No data. queue for processing
            self._sow(box=box, **params)
        else:
            #skip straight to the chase
            self.outbox[box._basename] = box
        return box._basename
    
    def _tend(self):
        """do whatever massaging is necessary to get the queued jobs into the
        outbox"""
        pass
        
    def _reap_all(self):
        """get all results back"""
        for k in self.outbox.keys():
            box = self.outbox.pop(k)
            box.store()
            yield box
    
    def dispatch(self, more_params, **params):
        """helper to to one-off dispatches."""
        params.update(more_params)
        return self.batch_dispatch([params])


class LocalDispatcher(InProcessDispatcher):
    """Native local dispatch with a global process pool.
    
    Easier to debug than the picloud one, as it has better docs and fewer
    inscrutable external config options intervening in its behaviour"""
    def __init__(self, processes=2, *args, **kwargs):
        self.in_process = {} #dict mapping arg hash to jid
        self.pool = multiprocessing.Pool(processes=processes)
        super(LocalDispatcher, self).__init__(*args, **kwargs)
        self.outqueue = multiprocessing.JoinableQueue()
            
    def _sow(self, box, **params):
        """Queue, or process, one job, providing a Box to stash results
        in."""
        self.outbox[box._basename] = self.pool.apply_async(
          safe_call,
          args=[self.task, box],
          kwds=params)
            
    def _reap_all(self):
        """get all results back. This is awful, coz of serialization woes
        getting in the way of the usual machinery. """
        print "kv", self.outbox
        while self.outbox:
            ready = []
            finished = []
            for k, v in self.outbox.iteritems():
                #why do i end up with both Boxes and AsyncResults in the queue? Handle both here in any case.
                if hasattr(v, 'ready') and v.ready():
                    ready.append(k)
                if isinstance(v, Box):
                    finished.append(k)
            for k in ready:
                yield self.outbox.pop(k).get(0).store()
            for k in finished:
                yield self.outbox.pop(k).store()
            time.sleep(10)

          
class PbsDispatcher(InProcessDispatcher):
    """PBS cli dispatch, with all single-proc and concurrency handled by the server
    """
    def __init__(self, *args, **qsub_kwargs):
        import pbs
        qsub_args = dict(getattr(settings, 'BASE_QSUB_ARGS', {}))
        qsub_args.update(qsub_kwargs)
        self.qsub = pbs.Qsub(**qsub_kwargs)
        self.in_process = {} #dict mapping arg hash to jid
        super(PbsDispatcher, self).__init__(*args, **qsub_kwargs)
        
    def _sow(self, box, **params):
        """Queue, or process, one job, providing a Box to stash results
        in."""
        import pickle
        
        settings.TEMP_ARGS_PATH.makedirs()
        file_prefix = settings.TEMP_ARGS_PATH.abspath()/box._basename
        task_file = file_prefix + '.task.pickle'
        box_file = file_prefix + '.box.pickle'
        kwargs_file = file_prefix + '.kwargs.pickle'
        pickle.dump(self.task, open(task_file, 'wb'))
        pickle.dump(box, open(box_file, 'wb'))
        pickle.dump(params, open(kwargs_file, 'wb'))
        
        full_script_command = [str(path('./main.py').realpath().abspath())]
        full_script_command.append(str(task_file))
        full_script_command.append(str(box_file))
        full_script_command.append(str(kwargs_file))
        
        self.outbox[box._basename] = self.qsub(full_script_command)
        
    def _reap_all(self):
        """get all results back. This is not really implemented, coz i can;t
        be bothered writing a real API to parse the OpenPBS CLI scripts'
        output robustly"""
        print "kv", self.outbox
        # while self.outbox:
        #     ready = []
        #     finished = []
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.