Commits

Anonymous committed 92ff05c

reworked python function processing

  • Participants
  • Parent commits 1c87279

Comments (0)

Files changed (12)

+2012-01-13  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Further expansion of adiabatic changes (still disabled). Reworked python
+          function processing and argument passing. Started worker -> mother communication.
+
 2012-01-11  Vakho Tsulaia  <tsulaia@mail.cern.ch>
 
 	* Merger for DRAW files
 # not having a set of linkopts for the module only
 #macro_append _athenamp_use_linkopts " -lrt"
 macro_append AthenaMP_linkopts " -lrt"
+macro_append AthenaMP_cppflags " -fno-strict-aliasing"
 
 end_private
 

python/MpProcessing.py

         return result
 
 class Pool( object ):
+    packaged_count = 0
+
     def __init__( self, processes = None, initializer = None, initargs = (),
                   maxtasksperchild = None ):
  
         if not callable( initializer ):
             raise TypeError( 'initializer must be a callable' )
 
-      # TODO: remove this workaround
-        def packaged_call( initializer = initializer, initargs = initargs ):
+      # this workaround is needed b/c initargs can (and do) contain an amp.SharedQueue,
+      # which can not be marshalled, but COW will deal with it properly by binding it
+      # into this local 'packaged_initializer'
+        def packaged_initializer( initializer = initializer, initargs = initargs ):
             return initializer( *initargs )
+
+        self.packaged_count += 1
+        self._initializer = '_amp_pool_%s_%d' % (initializer.__name__,self.packaged_count)
+
         import __main__
-        setattr( __main__, '__pool_%s' % initializer.__name__, packaged_call )
-
-        self._initializer = initializer
-        self._initargs = initargs
+        setattr( __main__, self._initializer, packaged_initializer )
 
         global _current_group
         if _current_group:
         _current_group = self._group
 
     def map_async( self, func, iterable, chunksize=1 ):
-     # TODO: hand over iterable
-        self._group.map_async( 'py:__pool_%s' % self._initializer.__name__ )
-        self._group.map_async( 'py:' + func.__name__ )
+     # NOTE: initializer results are ignored (same as in multiprocessing)
+        self._group.map_async( self._initializer )
+
+     # TODO: hand over iterable properly (it just so happens that in AthenaMP, it is
+     # a repeated list of MaxEvent, for use of reading from the queue)
+        self._group.map_async( '%s.%s' % (func.__module__,func.__name__), iterable[0] )
         return MapResults( self._group )
 
     def close( self ):
-        pass  # seems unnecessary, perhaps to exit all members of the group?
+        self._group.map_async( 'exit' )
 
     def join( self ):
         pass  # alternative functionality for now

share/tests/AMP_basictests.py

       __main__.myfunc = myfunc
 
       group = _athenamp.ProcessGroup( 4 )
-      group.map_async( "py:myfunc" )
+      group.map_async( "myfunc" )
 
       status = group.wait()
       self.assertEqual( [ x[1] for x in status ], 4*[0,] )

src/_athenamp/AthenaMP/Process.h

 
 namespace AthenaMP {
 
+typedef void (*scheduled_func)( void* databuf, int size );
+
 class Process {
 public:
    static Process launch();
 
    pid_t getProcessID() const;
 
-   bool connect( const SharedQueue& queue );
-   int schedule( const std::string& func );
+   bool connectIn( const SharedQueue& queue );
+   bool connectOut( const SharedQueue& queue );
+   int schedule(
+      const std::string& func, const std::string& args = "" );
 
    int mainloop();
 
 private:
-   SharedQueue m_queue;
+   SharedQueue m_inbox;
+   SharedQueue m_outbox;
    pid_t m_pid;
 };
 

src/_athenamp/AthenaMP/ProcessGroup.h

 #define ATHENAMP_PROCESSGROUP_H
 
 #include "AthenaMP/Process.h"
+#include "AthenaMP/SharedQueue.h"
 #include <vector>
 
 
 
    pid_t getGroupID() const;
 
-   int map_async( const std::string& func );
+   int map_async( const std::string& func, const std::string& args = "" );
    int wait( std::vector< ProcessResult >& status, int options = 0 );
 
 // the following should be temporary (is needed for the python
 
 private:
    std::vector< Process > m_processes;
+   SharedQueue m_inbox;
    int m_nprocs;
    pid_t m_pgid;
 };

src/_athenamp/AthenaMP/SharedQueue.h

       return m_queue;
    }
 
+   operator bool() const {
+      return (bool)m_queue;
+   }
+
 private:
    void copy( const SharedQueue& other );
    void destroy();

src/_athenamp/Process.cxx

-#include "PyAthenaMP.h"      // TODO: this is for compat and needs resolving
-
 #include "AthenaMP/Process.h"
 
 #include <boost/interprocess/ipc/message_queue.hpp>
 
+#include <sys/prctl.h>
+#include <dlfcn.h>
 #include <signal.h>
-#include <sys/prctl.h>
 
 #include <iostream>
 
 
 
 //- construction/destruction -------------------------------------------------
-Process::Process( pid_t pid ) : m_queue(), m_pid( pid )
+Process::Process( pid_t pid ) : m_inbox(), m_outbox(), m_pid( pid )
 {
 
 }
 
-Process::Process( const Process& other ) : m_queue( other.m_queue), m_pid( other.m_pid )
+Process::Process( const Process& other ) :
+      m_inbox( other.m_inbox ), m_outbox( other.m_outbox ), m_pid( other.m_pid )
 {
 
 }
 Process& Process::operator=( const Process& other )
 {
    if ( this != &other ) {
-      m_queue = other.m_queue;
+      m_inbox = other.m_inbox;
+      m_outbox = other.m_outbox;
       m_pid = other.m_pid;
    }
    return *this;
    return m_pid;
 }
 
-bool Process::connect( const SharedQueue& queue )
+bool Process::connectIn( const SharedQueue& queue )
 {
    if ( ! queue.name().empty() ) {
-      m_queue = queue;
-   /* TODO: check that the queue is valid and can receive message */
+      m_inbox = queue;
+   /* TODO: check that the queue is valid and can receive messages */
       return true;
    }
 
    return false;
 }
 
-int Process::schedule( const std::string& func )
+bool Process::connectOut( const SharedQueue& queue )
+{
+   if ( ! queue.name().empty() ) {
+      m_outbox = queue;
+   /* TODO: check that the queue is valid and can send messages */
+      return true;
+   }
+
+   return false;
+}
+
+int Process::schedule( const std::string& func, const std::string& args )
 {
    int result = 0;
    try {
-      m_queue->try_send( func.data(), func.length(), 1 );
+   // Added to the inbox, typically from the mother; taken out of the inbox in
+   // the mainloop in the worker.
+      m_inbox.try_send( func );
+      m_inbox.try_send( args );
    } catch ( interprocess_exception& e ) {
-      result = 1;
+      result = -1;
    }
 
    return result;
       return 1;
 
    while ( true ) {
-      std::string func = m_queue.receive();
-      if ( 4 <= func.length() && func.substr( 0, 3 ) == "py:" ) {
-      // TODO: factor this out of existence
-         PyObject* mod = 0;
-         std::string::size_type pos = func.rfind( '.' );
-         if ( pos == std::string::npos ) {
-             mod = PyImport_AddModule( "__main__" );
-             Py_XINCREF( mod );
-             pos = 3;
-         } else {
-             mod = PyImport_ImportModuleNoBlock( func.substr(3, pos).c_str() );
-         }
-
-         if ( ! mod ) {
-            PyErr_Print();
-            continue;
-         }
-
-         PyObject* pyfunc = PyObject_GetAttrString( mod, func.substr(pos, std::string::npos).c_str() );
-         if ( ! pyfunc ) {
-            PyErr_Print();
-            continue;
-         }
-
-         if ( ! PyCallable_Check( pyfunc ) ) {
-            PyErr_Format( PyExc_TypeError,
-               "received non-callable %s", func.substr(pos, std::string::npos).c_str() );
-            PyErr_Print();
-            continue;
-         }
-
-         PyObject* args = PyTuple_New( 0 );
-         PyObject* pyresult = PyObject_Call( pyfunc, args, NULL );
-         Py_DECREF( args );
-
-         if ( ! pyresult )
-            PyErr_Print();
-
-         Py_DECREF( pyfunc );
-         Py_DECREF( mod );
-
-      } else {
-      // use dlsym to locate C function
-      }
+      std::string func = m_inbox.receive();
+      std::string args = m_inbox.receive();
 
       if ( func == "exit" )
          break;         // drop out of mainloop, will result in exit of process
+      
+      scheduled_func sf;
+      *(void**)(&sf) = dlsym( RTLD_DEFAULT, func.c_str() );
+      if ( ! sf ) {
+      // TODO: msgsvc this
+         std::cerr << "(_athenamp) ignoring function: " << func << std::endl;
+         continue;
+      }
+
+      sf( (void*)args.c_str(), args.size() );
+   // TODO: handle and post result
    }
 
    return 0;

src/_athenamp/ProcessGroup.cxx

 
 #include <boost/interprocess/ipc/message_queue.hpp>
 
-#include <sstream>
+#include <sys/wait.h>
+#include <errno.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <sys/wait.h>
+
+#include <sstream>
 
 using namespace boost::interprocess;
 
 
 //- helper -------------------------------------------------------------------
-static inline AthenaMP::SharedQueue create_queue( int count )
+static inline AthenaMP::SharedQueue create_queue( const std::string& owner, int count )
 {
    std::ostringstream s;
-   s << "athenamp_child_" << getpid() << "_" << count << std::ends;
+   s << "athenamp_" << owner << '_' << getpid() << '_' << count << std::ends;
    AthenaMP::SharedQueue queue = AthenaMP::SharedQueue( s.str() );
    return queue;
 }
    if ( m_nprocs <= 0 )
       return false;
 
+// a single queue for posting back onto the mother
+   if ( ! m_inbox )
+      m_inbox = create_queue( "mother", 0 );
+
 // create process group leader
-   SharedQueue queue = create_queue( 0 );
+   SharedQueue queue = create_queue( "child", 0 );
    Process leader = Process::launch();
-   leader.connect( queue );
+   leader.connectIn( queue );
+   leader.connectOut( m_inbox );
    pid_t lpid = leader.getProcessID();
    if ( lpid == 0 ) {
       int status = leader.mainloop();
 
 // create rest of the group
    for ( int i = 1; i < m_nprocs; ++i ) {
-      SharedQueue queue = create_queue( i );
+      SharedQueue queue = create_queue( "child", i );
       Process p = Process::launch();
-      p.connect( queue );
+      p.connectIn( queue );
+      p.connectOut( m_inbox );
       if ( p.getProcessID() == 0 ) {
          int status = p.mainloop();
          exit( status );
    return m_pgid;
 }
 
-int ProcessGroup::map_async( const std::string& func ) {
-// Map the named (C++) function 'func' onto all current child processes. Does
+int ProcessGroup::map_async( const std::string& func, const std::string& args ) {
+// Map the named (C) function 'func' onto all current child processes. Does
 // not wait for the results, but will return success only if the writing to
-// the child queues succeeds.
+// all the child queues succeeds.
 
    if ( m_processes.empty() ) {
       if ( ! create() )
-         return 1;
+         return -1;
    }
 
-   int result = 0;
    for ( std::vector< Process >::iterator iproc = m_processes.begin();
          iproc != m_processes.end(); ++iproc ) {
-      result += iproc->schedule( func );
+
+      if ( iproc->schedule( func, args ) != 0 ) {
+      // stopping the scheduling on all other processes is debatable ...
+         return -1;
+      }
    }
 
-   return result;
+   return 0;
 }
 
 int ProcessGroup::wait( std::vector< ProcessResult >& status, int options )
    if ( m_processes.empty() )
       return 0;
 
-// Schedule an exit it we are to wait forever
+// Schedule an exit if we are to wait forever
    if ( ! options & WNOHANG )
       map_async( "exit" );
 
       int child_status = 0;
       pid_t child = waitpid( -m_pgid, &child_status, options );
       if ( child < 0 ) {
-         result = (int)child;
+         result = errno;
          break;
       }
       ProcessResult p = { child, WEXITSTATUS( child_status ) };
       status.push_back( p );
    }
 
+// Processes are dead now
+   m_processes.clear();
+
    return result;
 }
 

src/_athenamp/PyProcessGroup.cxx

 #include "PyProcessGroup.h"
 #include "PyProcess.h"
 
+#include <string.h>
+
 #include <new>
 
+#include <marshal.h>
+
 
 //- module classes -----------------------------------------------------------
 AthenaMP::ProcessGroupProxy* AthenaMP::ProcessGroupProxy_New( int nprocs )
 
 
 //__________________
-static PyObject* pg_map_async( ProcessGroupProxy* self, PyObject* args, PyObject* kwds )
+void AthenaMP_MappedPyFunction( void* databuf, int size ) {
+
+   PyObject* pydata = PyMarshal_ReadObjectFromString( (char*)databuf, size );
+   if ( !pydata || !PyTuple_Check( pydata ) ) {
+      PyErr_Print();
+      return;
+   }
+
+// one string entry in pydata is guaranteed by pg_map_async
+   std::string func = PyString_AsString( PyTuple_GET_ITEM( pydata, 0 ) );
+
+   PyObject* pymod = 0;
+   std::string::size_type pos = func.rfind( '.' );
+   if ( pos == std::string::npos ) {
+      pymod = PyImport_AddModule( "__main__" );
+      Py_XINCREF( pymod );
+      pos = 0;
+   } else {
+      pymod = PyImport_ImportModuleNoBlock( func.substr(0, pos).c_str() );
+      pos = pos + 1;
+   }
+
+   if ( !pymod ) {
+      PyErr_Print();
+      Py_DECREF( pydata );
+      return;
+   }
+
+   PyObject* pyfunc = PyObject_GetAttrString( pymod, func.substr(pos, std::string::npos).c_str() );
+   if ( !pyfunc ) {
+      PyErr_Print();
+      Py_DECREF( pymod );
+      Py_DECREF( pydata );
+      return;
+   }
+
+   if ( !PyCallable_Check( pyfunc ) ) {
+      PyErr_Format( PyExc_TypeError,
+         "received non-callable %s", func.substr(pos, std::string::npos).c_str() );
+      PyErr_Print();
+      Py_DECREF( pyfunc );
+      Py_DECREF( pymod );
+      Py_DECREF( pydata );
+      return;
+   }
+
+   PyObject* pyargs = PyTuple_GetSlice( pydata, 1, PyTuple_GET_SIZE( pydata ) );
+   PyObject* pyresult = PyObject_Call( pyfunc, pyargs, NULL );
+   Py_DECREF( pyargs );
+
+   if ( !pyresult ) {
+      PyErr_Print();
+   } else {
+   // TODO: return python result
+      Py_DECREF( pyresult );
+   }
+
+   Py_DECREF( pyfunc );
+   Py_DECREF( pymod );
+   Py_DECREF( pydata );
+}
+
+static PyObject* pg_map_async( ProcessGroupProxy* self, PyObject* args )
 {
-   static const char* kwlist[] = { "func", NULL };
-   char* func;
-   if ( !PyArg_ParseTupleAndKeywords( args, kwds, (char*)"s:func", (char**)kwlist, &func ) )
+   int status = -1;
+
+   if ( !args || !PyTuple_Size( args ) || !PyString_Check( PyTuple_GET_ITEM( args, 0 ) ) ) {
+      PyErr_SetString( PyExc_TypeError, "map_async expects at least one (string) argument" );
       return NULL;
+   }
 
-   int status = self->m_group.map_async( func );
+   std::string func = PyString_AsString( PyTuple_GET_ITEM( args, 0 ) );
+
+   if ( strcmp( PyString_AS_STRING( PyTuple_GET_ITEM( args, 0 ) ), "exit" ) == 0 )
+      status = self->m_group.map_async( "exit" );
+   else {
+      PyObject* pybuf = PyMarshal_WriteObjectToString( args, Py_MARSHAL_VERSION );
+
+      if ( ! pybuf )
+         return NULL;
+
+      status = self->m_group.map_async( "AthenaMP_MappedPyFunction",
+         std::string( PyString_AS_STRING( pybuf ), PyString_GET_SIZE( pybuf ) ) );
+   }
 
    return PyInt_FromLong( status );
 }
 
 //____________________________________________________________________________
 static PyMethodDef pg_methods[] = {
-   { (char*)"map_async", (PyCFunction)pg_map_async, METH_VARARGS | METH_KEYWORDS,
+   { (char*)"map_async", (PyCFunction)pg_map_async, METH_VARARGS,
         (char*)"wait for all processes in group" },
    { (char*)"wait", (PyCFunction)pg_wait, METH_VARARGS | METH_KEYWORDS,
         (char*)"wait for all processes in group" },

src/_athenamp/PyProcessGroup.h

 
 #include "AthenaMP/ProcessGroup.h"
 
+#include <string>
+
+
+extern "C" {   // only to prevent mangling, for dlsym lookup
+   void AthenaMP_MappedPyFunction( void* databuf, int size );
+}
 
 namespace AthenaMP {
 

src/_athenamp/SharedQueue.cxx

 
 }
 
-   SharedQueue::SharedQueue( const std::string& name, int max_msg, bool do_unlink ) :
+SharedQueue::SharedQueue( const std::string& name, int max_msg, bool do_unlink ) :
       m_queue( 0 ), m_name( 0 ), m_count( 0 )
 {
    m_queue = new message_queue( open_or_create, name.c_str(), max_msg, MAX_MSG_SIZE );
       else
          result = mq->try_receive( buf, MAX_MSG_SIZE, recvd_size, priority );
 
-      if ( result && recvd_size ) {
+      if ( result ) {
          if ( recvd_size > MAX_MSG_SIZE )
             recvd_size = MAX_MSG_SIZE;
          return std::string( buf, recvd_size );