Commits

Anonymous committed a8afeda

further (still disabled) adiabatic changes

Comments (0)

Files changed (8)

-2011-12-22  Wim Lavijsen <WLavrijsen@lbl.gov>
+2012-01-10  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Expansion of adiabatic changes (still disabled) with initializers and
+          better result returns
+
+2011-12-22  Wim Lavrijsen <WLavrijsen@lbl.gov>
         * First codes for an attempt to adiabatically remove the unmaintainable
 	  python/C++ hybrid prototype code in order to be able to run AthenaMP
 	  in production. For now, this new code is disabled in use (modify

python/MpProcessing.py

     return list()
 
 
+##### class Queue
+Queue = amp.SharedQueue
+
+
 ##### class Pool
-class dummy_results( object ):
+class MapResults( object ):
+    def __init__( self, group ):
+        self._group = group
+
     def get( self, *args, **kw ):
-        return [[(12656, 10, 0, 'OK'), (12656, -1, 0, 'OK')], [(12657, 10, 0, 'OK'), (12657, -1, 0, 'OK')]]
+        status = self._group.wait()
+     # TODO: figure out the full needed info
+        result = []
+        for cresult in status:
+            result.append( [(cresult[0], 10, cresult[1], 'OK'), (cresult[0], -1, cresult[1], 'OK')] )
+        return result
 
 class Pool( object ):
     def __init__( self, processes = None, initializer = None, initargs = (),
                   maxtasksperchild = None ):
-        # TODO: schedule initializer
+ 
+        if not callable( initializer ):
+            raise TypeError( 'initializer must be a callable' )
+
+      # TODO: remove this workaround
+        def packaged_call( initializer = initializer, initargs = initargs ):
+            return initializer( *initargs )
+        import __main__
+        setattr( __main__, '__pool_%s' % initializer.__name__, packaged_call )
+
+        self._initializer = initializer
+        self._initargs = initargs
 
         global _current_group
         if _current_group:
         self._group = amp.ProcessGroup( processes )
         _current_group = self._group
 
-    def map_async( self, func, iterable, chunksize=1):
-        self._group.map_async( func.__name__ )
-        return dummy_results()
+    def map_async( self, func, iterable, chunksize=1 ):
+     # TODO: hand over iterable
+        self._group.map_async( 'py:__pool_%s' % self._initializer.__name__ )
+        self._group.map_async( 'py:' + func.__name__ )
+        return MapResults( self._group )
 
     def close( self ):
         pass  # seems unnecessary, perhaps to exit all members of the group?

share/tests/AMP_basictests.py

       group = _athenamp.ProcessGroup( 4 )
       group.map_async( "exit" )
       self.assertEqual( len(group._children()), 4 )   # now instantiated
-      self.assertEqual( group.wait(), 4*(0,) )
+
+      status = group.wait()
+      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
 
    def test03PythonTaskMapAsync( self ):
       """Test running a python task via map_async on a worker group"""
 
       group = _athenamp.ProcessGroup( 4 )
       group.map_async( "py:myfunc" )
-      self.assertEqual( group.wait(), 4*(0,) )
+
+      status = group.wait()
+      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
 
 
 ### basic group usage test cases =============================================

src/_athenamp/AthenaMP/ProcessGroup.h

 
 namespace AthenaMP {
 
+struct ProcessResult {
+   pid_t pid;
+   int   exitcode;
+};
+
 class ProcessGroup {
 public:
    explicit ProcessGroup( int nprocs = -1 );
    pid_t getGroupID() const;
 
    int map_async( const std::string& func );
-   int wait( std::vector< int >& status, int options = 0 );
+   int wait( std::vector< ProcessResult >& status, int options = 0 );
 
 // the following should be temporary (is needed for the python
 // interface, but it is better broken up into the actual needs)

src/_athenamp/Process.cxx

 #include <signal.h>
 #include <sys/prctl.h>
 
+#include <iostream>
+
 using namespace boost::interprocess;
 
 
 
 Process Process::launch()
 {
+   std::cout.flush();
+   std::cerr.flush();
+
    pid_t pid = fork();
    if ( pid == 0 )
       prctl( PR_SET_PDEATHSIG, SIGHUP );          // Linux only
          }
 
          PyObject* args = PyTuple_New( 0 );
-         PyObject_Call( pyfunc, args, NULL );
+         PyObject* pyresult = PyObject_Call( pyfunc, args, NULL );
          Py_DECREF( args );
 
+         if ( ! pyresult )
+            PyErr_Print();
+
          Py_DECREF( pyfunc );
          Py_DECREF( mod );
 

src/_athenamp/ProcessGroup.cxx

    return result;
 }
 
-int ProcessGroup::wait( std::vector< int >& status, int options )
+int ProcessGroup::wait( std::vector< ProcessResult >& status, int options )
 {
 // Wait for all child processes and return their status codes in 'status'.
    if ( ! status.empty() )
          result = (int)child;
          break;
       }
-      status.push_back( WEXITSTATUS( child_status ) );
+      ProcessResult p = { child, WEXITSTATUS( child_status ) };
+      status.push_back( p );
    }
 
    return result;

src/_athenamp/PyProcessGroup.cxx

    if ( !PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:wait", (char**)kwlist, &options ) )
       return NULL;
 
-   std::vector< int > status;
+   std::vector< ProcessResult > status;
    int result = self->m_group.wait( status, options );
    if ( result == 0 ) {
       PyObject* pystatus = PyTuple_New( status.size() );
-      for ( int i = 0; i < (int)status.size(); ++i )
-         PyTuple_SET_ITEM( pystatus, i, PyInt_FromLong( status[ i ] ) );
+      for ( int i = 0; i < (int)status.size(); ++i ) {
+         PyObject* itm = PyTuple_New( 2 );
+         PyTuple_SET_ITEM( itm, 0, PyInt_FromLong( (long)status[ i ].pid ) );
+         PyTuple_SET_ITEM( itm, 1, PyInt_FromLong( status[ i ].exitcode ) );
+         PyTuple_SET_ITEM( pystatus, i, itm );
+      }
       return pystatus;
    }
 

src/_athenamp/PySharedQueue.cxx

 }
 
 //__________________
+static PyObject* sq_get( SharedQueueProxy* pysq )
+{
+   const std::string& buf = pysq->m_sharedqueue.receive();
+   PyObject* pyobj = PyMarshal_ReadObjectFromString( (char*)buf.c_str(), buf.size() );
+
+   return pyobj;
+}
+
+//__________________
 static PyObject* sq_put_nowait( SharedQueueProxy* pysq, PyObject* pyobj )
 {
    PyObject* pybuf = PyMarshal_WriteObjectToString( pyobj, Py_MARSHAL_VERSION );
         (char*)"non-blocking put on queue" },
    { (char*)"get_nowait", (PyCFunction)sq_get_nowait, METH_NOARGS,
         (char*)"non-blocking get on queue" },
+   { (char*)"get",        (PyCFunction)sq_get,        METH_NOARGS,
+        (char*)"blocking get on queue" },
    { (char*)NULL, NULL, 0, NULL }
 };