Commits

Jesse Noller committed 1d923eb

issue5738: The distribution example was confusing, and out of date. It's too large to include inline in the docs as well. It belongs in an addons module outside the stdlib. Removing.

  • Participants
  • Parent commits 2e3ec3b
  • Branches legacy-trunk

Comments (0)

Files changed (3)

File Doc/includes/mp_distributing.py

-#
-# Module to allow spawning of processes on foreign host
-#
-# Depends on `multiprocessing` package -- tested with `processing-0.60`
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
-
-#
-# Imports
-#
-
-import sys
-import os
-import tarfile
-import shutil
-import subprocess
-import logging
-import itertools
-import Queue
-
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
-
-from multiprocessing import Process, current_process, cpu_count
-from multiprocessing import util, managers, connection, forking, pool
-
-#
-# Logging
-#
-
-def get_logger():
-    return _logger
-
-_logger = logging.getLogger('distributing')
-_logger.propagate = 0
-
-_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
-_handler = logging.StreamHandler()
-_handler.setFormatter(_formatter)
-_logger.addHandler(_handler)
-
-info = _logger.info
-debug = _logger.debug
-
-#
-# Get number of cpus
-#
-
-try:
-    slot_count = cpu_count()
-except NotImplemented:
-    slot_count = 1
-
-#
-# Manager type which spawns subprocesses
-#
-
-class HostManager(managers.SyncManager):
-    '''
-    Manager type used for spawning processes on a (presumably) foreign host
-    '''
-    def __init__(self, address, authkey):
-        managers.SyncManager.__init__(self, address, authkey)
-        self._name = 'Host-unknown'
-
-    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
-        if hasattr(sys.modules['__main__'], '__file__'):
-            main_path = os.path.basename(sys.modules['__main__'].__file__)
-        else:
-            main_path = None
-        data = pickle.dumps((target, args, kwargs))
-        p = self._RemoteProcess(data, main_path)
-        if name is None:
-            temp = self._name.split('Host-')[-1] + '/Process-%s'
-            name = temp % ':'.join(map(str, p.get_identity()))
-        p.set_name(name)
-        return p
-
-    @classmethod
-    def from_address(cls, address, authkey):
-        manager = cls(address, authkey)
-        managers.transact(address, authkey, 'dummy')
-        manager._state.value = managers.State.STARTED
-        manager._name = 'Host-%s:%s' % manager.address
-        manager.shutdown = util.Finalize(
-            manager, HostManager._finalize_host,
-            args=(manager._address, manager._authkey, manager._name),
-            exitpriority=-10
-            )
-        return manager
-
-    @staticmethod
-    def _finalize_host(address, authkey, name):
-        managers.transact(address, authkey, 'shutdown')
-
-    def __repr__(self):
-        return '<Host(%s)>' % self._name
-
-#
-# Process subclass representing a process on (possibly) a remote machine
-#
-
-class RemoteProcess(Process):
-    '''
-    Represents a process started on a remote host
-    '''
-    def __init__(self, data, main_path):
-        assert not main_path or os.path.basename(main_path) == main_path
-        Process.__init__(self)
-        self._data = data
-        self._main_path = main_path
-
-    def _bootstrap(self):
-        forking.prepare({'main_path': self._main_path})
-        self._target, self._args, self._kwargs = pickle.loads(self._data)
-        return Process._bootstrap(self)
-
-    def get_identity(self):
-        return self._identity
-
-HostManager.register('_RemoteProcess', RemoteProcess)
-
-#
-# A Pool class that uses a cluster
-#
-
-class DistributedPool(pool.Pool):
-
-    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
-        self._cluster = cluster
-        self.Process = cluster.Process
-        pool.Pool.__init__(self, processes or len(cluster),
-                           initializer, initargs)
-
-    def _setup_queues(self):
-        self._inqueue = self._cluster._SettableQueue()
-        self._outqueue = self._cluster._SettableQueue()
-        self._quick_put = self._inqueue.put
-        self._quick_get = self._outqueue.get
-
-    @staticmethod
-    def _help_stuff_finish(inqueue, task_handler, size):
-        inqueue.set_contents([None] * size)
-
-#
-# Manager type which starts host managers on other machines
-#
-
-def LocalProcess(**kwds):
-    p = Process(**kwds)
-    p.set_name('localhost/' + p.name)
-    return p
-
-class Cluster(managers.SyncManager):
-    '''
-    Represents collection of slots running on various hosts.
-
-    `Cluster` is a subclass of `SyncManager` so it allows creation of
-    various types of shared objects.
-    '''
-    def __init__(self, hostlist, modules):
-        managers.SyncManager.__init__(self, address=('localhost', 0))
-        self._hostlist = hostlist
-        self._modules = modules
-        if __name__ not in modules:
-            modules.append(__name__)
-        files = [sys.modules[name].__file__ for name in modules]
-        for i, file in enumerate(files):
-            if file.endswith('.pyc') or file.endswith('.pyo'):
-                files[i] = file[:-4] + '.py'
-        self._files = [os.path.abspath(file) for file in files]
-
-    def start(self):
-        managers.SyncManager.start(self)
-
-        l = connection.Listener(family='AF_INET', authkey=self._authkey)
-
-        for i, host in enumerate(self._hostlist):
-            host._start_manager(i, self._authkey, l.address, self._files)
-
-        for host in self._hostlist:
-            if host.hostname != 'localhost':
-                conn = l.accept()
-                i, address, cpus = conn.recv()
-                conn.close()
-                other_host = self._hostlist[i]
-                other_host.manager = HostManager.from_address(address,
-                                                              self._authkey)
-                other_host.slots = other_host.slots or cpus
-                other_host.Process = other_host.manager.Process
-            else:
-                host.slots = host.slots or slot_count
-                host.Process = LocalProcess
-
-        self._slotlist = [
-            Slot(host) for host in self._hostlist for i in range(host.slots)
-            ]
-        self._slot_iterator = itertools.cycle(self._slotlist)
-        self._base_shutdown = self.shutdown
-        del self.shutdown
-
-    def shutdown(self):
-        for host in self._hostlist:
-            if host.hostname != 'localhost':
-                host.manager.shutdown()
-        self._base_shutdown()
-
-    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
-        slot = self._slot_iterator.next()
-        return slot.Process(
-            group=group, target=target, name=name, args=args, kwargs=kwargs
-            )
-
-    def Pool(self, processes=None, initializer=None, initargs=()):
-        return DistributedPool(self, processes, initializer, initargs)
-
-    def __getitem__(self, i):
-        return self._slotlist[i]
-
-    def __len__(self):
-        return len(self._slotlist)
-
-    def __iter__(self):
-        return iter(self._slotlist)
-
-#
-# Queue subclass used by distributed pool
-#
-
-class SettableQueue(Queue.Queue):
-    def empty(self):
-        return not self.queue
-    def full(self):
-        return self.maxsize > 0 and len(self.queue) == self.maxsize
-    def set_contents(self, contents):
-        # length of contents must be at least as large as the number of
-        # threads which have potentially called get()
-        self.not_empty.acquire()
-        try:
-            self.queue.clear()
-            self.queue.extend(contents)
-            self.not_empty.notifyAll()
-        finally:
-            self.not_empty.release()
-
-Cluster.register('_SettableQueue', SettableQueue)
-
-#
-# Class representing a notional cpu in the cluster
-#
-
-class Slot(object):
-    def __init__(self, host):
-        self.host = host
-        self.Process = host.Process
-
-#
-# Host
-#
-
-class Host(object):
-    '''
-    Represents a host to use as a node in a cluster.
-
-    `hostname` gives the name of the host.  If hostname is not
-    "localhost" then ssh is used to log in to the host.  To log in as
-    a different user use a host name of the form
-    "username@somewhere.org"
-
-    `slots` is used to specify the number of slots for processes on
-    the host.  This affects how often processes will be allocated to
-    this host.  Normally this should be equal to the number of cpus on
-    that host.
-    '''
-    def __init__(self, hostname, slots=None):
-        self.hostname = hostname
-        self.slots = slots
-
-    def _start_manager(self, index, authkey, address, files):
-        if self.hostname != 'localhost':
-            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
-            debug('startup files copied to %s:%s', self.hostname, tempdir)
-            p = subprocess.Popen(
-                ['ssh', self.hostname, 'python', '-c',
-                 '"import os; os.chdir(%r); '
-                 'from distributing import main; main()"' % tempdir],
-                stdin=subprocess.PIPE
-                )
-            data = dict(
-                name='BoostrappingHost', index=index,
-                dist_log_level=_logger.getEffectiveLevel(),
-                dir=tempdir, authkey=str(authkey), parent_address=address
-                )
-            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
-            p.stdin.close()
-
-#
-# Copy files to remote directory, returning name of directory
-#
-
-unzip_code = '''"
-import tempfile, os, sys, tarfile
-tempdir = tempfile.mkdtemp(prefix='distrib-')
-os.chdir(tempdir)
-tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
-for ti in tf:
-    tf.extract(ti)
-print tempdir
-"'''
-
-def copy_to_remote_temporary_directory(host, files):
-    p = subprocess.Popen(
-        ['ssh', host, 'python', '-c', unzip_code],
-        stdout=subprocess.PIPE, stdin=subprocess.PIPE
-        )
-    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
-    for name in files:
-        tf.add(name, os.path.basename(name))
-    tf.close()
-    p.stdin.close()
-    return p.stdout.read().rstrip()
-
-#
-# Code which runs a host manager
-#
-
-def main():
-    # get data from parent over stdin
-    data = pickle.load(sys.stdin)
-    sys.stdin.close()
-
-    # set some stuff
-    _logger.setLevel(data['dist_log_level'])
-    forking.prepare(data)
-
-    # create server for a `HostManager` object
-    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
-    current_process()._server = server
-
-    # report server address and number of cpus back to parent
-    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
-    conn.send((data['index'], server.address, slot_count))
-    conn.close()
-
-    # set name etc
-    current_process().set_name('Host-%s:%s' % server.address)
-    util._run_after_forkers()
-
-    # register a cleanup function
-    def cleanup(directory):
-        debug('removing directory %s', directory)
-        shutil.rmtree(directory)
-        debug('shutting down host manager')
-    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
-
-    # start host manager
-    debug('remote host manager starting in %s', data['dir'])
-    server.serve_forever()

File Doc/library/multiprocessing.rst

 
 .. literalinclude:: ../includes/mp_benchmarks.py
 
-An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
-and others to build a system which can distribute processes and work via a
-distributed queue to a "cluster" of machines on a network, accessible via SSH.
-You will need to have private key authentication for all hosts configured for
-this to work.
-
-.. literalinclude:: ../includes/mp_distributing.py

File Lib/multiprocessing/queues.py

         if sys.platform != 'win32':
             register_after_fork(self, Queue._after_fork)
 
+        self.getv = 0
+
     def __getstate__(self):
         assert_spawning(self)
         return (self._maxsize, self._reader, self._writer,
         self._poll = self._reader.poll
 
     def put(self, obj, block=True, timeout=None):
+        if not isinstance(obj, list):
+            debug('put: %s', obj)
         assert not self._closed
         if not self._sem.acquire(block, timeout):
             raise Full
             self._notempty.release()
 
     def get(self, block=True, timeout=None):
+        self.getv += 1
+        debug('self.getv: %s', self.getv)
         if block and timeout is None:
             self._rlock.acquire()
             try:
                 res = self._recv()
                 self._sem.release()
+                if not isinstance(res, list):
+                    debug('get: %s', res)
                 return res
             finally:
                 self._rlock.release()
                     raise Empty
                 res = self._recv()
                 self._sem.release()
+                if not isinstance(res, list):
+                    debug('get: %s', res)
                 return res
             finally:
                 self._rlock.release()
                 try:
                     while 1:
                         obj = bpopleft()
+                        if not isinstance(obj, list):
+                            debug('feeder thread got: %s', obj)
                         if obj is sentinel:
                             debug('feeder thread got sentinel -- exiting')
                             close()
                             return
-
                         if wacquire is None:
+                            if not isinstance(obj, list):
+                                debug('sending to pipe: %s', obj)
                             send(obj)
                         else:
-                            wacquire()
+                            debug('waiting on wacquire')
+                            wacquire(timeout=30)
                             try:
+                                if not isinstance(obj, list):
+                                    debug('sending to pipe: %s', obj)
                                 send(obj)
                             finally:
                                 wrelease()