Commits

wl...@4525493e-7705-40b1-a816-d608a930855b  committed ccccd9f

improved return code handling

  • Participants
  • Parent commits 92ff05c

Comments (0)

Files changed (13)

+2012-01-17  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * Finalized some of the worker return handling (still following the old reporting
+          style, even as it is no longer necessary). Adiabatic expansion still disabled.
+
 2012-01-13  Wim Lavrijsen <WLavrijsen@lbl.gov>
         * Further expansion of adiabatic changes (still disabled). Reworked python
           function processing and argument passing. Started worker -> mother communication.

File python/MpProcessing.py

 
     def get( self, *args, **kw ):
         status = self._group.wait()
-     # TODO: figure out the full needed info
+
+     # there are two parts to the exit code: the reported result from the worker
+     # function and the process' exit code
+
+     # TODO: handle process' exit code in a cleaner way (fix in PyComps?), as this
+     # obviously does not work in general ...
+
         result = []
-        for cresult in status:
-            result.append( [(cresult[0], 10, cresult[1], 'OK'), (cresult[0], -1, cresult[1], 'OK')] )
+        for proc_result in status:
+            r = proc_result[ 2 ]
+            if ( r[0][2] == 0 ): r[0] = ( r[0][0], r[0][1], proc_result[1], r[0][3] )
+            if ( r[1][2] == 0 ): r[1] = ( r[1][0], r[1][1], proc_result[1], r[1][3] )
+            result.append( proc_result[ 2 ] )
         return result
 
 class Pool( object ):

File share/tests/AMP_basictests.py

 
       def myfunc():
       #  print 'called myfunc'
-         pass
+          return 1
       __main__.myfunc = myfunc
 
+    # existing function with return value
       group = _athenamp.ProcessGroup( 4 )
       group.map_async( "myfunc" )
+      status = group.wait()
 
+      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
+      self.assertEqual( [ x[2] for x in status ], 4*[1,] )
+
+    # non-existing function, leading to failure
+      group = _athenamp.ProcessGroup( 4 )
+      group.map_async( "no_such_func" )
       status = group.wait()
-      self.assertEqual( [ x[1] for x in status ], 4*[0,] )
+
+      self.assertEqual( [ x[1] for x in status ], 4*[0x0B,] )
 
 
 ### basic group usage test cases =============================================

File src/_athenamp/AthenaMP/IdentifiedSharedQueue.h

+#ifndef ATHENAMP_IDENTIFIEDSHAREDQUEUE_H
+#define ATHENAMP_IDENTIFIEDSHAREDQUEUE_H
+
+#include "AthenaMP/SharedQueue.h"
+
+#include <unistd.h>
+
+namespace AthenaMP {
+
+class IdentifiedSharedQueue : public SharedQueue {
+public:
+
+public:
+   IdentifiedSharedQueue();
+   IdentifiedSharedQueue( const std::string& name,
+                int max_msg = SHAREDQUEUE_MAX_MSG, bool do_unlink = true );
+
+public:
+   virtual bool try_send( const std::string& );     // non-blocking
+   virtual bool send( const std::string& );
+
+   virtual std::string try_receive();                // non-blocking
+   virtual std::string try_receive( pid_t& id );     // id.
+   virtual std::string receive();
+   virtual std::string receive( pid_t& id );
+};
+
+} // namespace AthenaMP
+
+#endif // !ATHENAMP_IDENTIFIEDSHAREDQUEUE_H

File src/_athenamp/AthenaMP/Process.h

 #define ATHENAMP_PROCESS_H
 
 #include "AthenaMP/SharedQueue.h"
+#include "AthenaMP/IdentifiedSharedQueue.h"
+
 #include <string>
 #include <unistd.h>
 
 
 namespace AthenaMP {
 
-typedef void (*scheduled_func)( void* databuf, int size );
+struct ScheduledWork {
+   void* data;
+   int size;
+};
+
+typedef ScheduledWork* (*scheduled_func)( ScheduledWork* databuf );
 
 class Process {
 public:
    pid_t getProcessID() const;
 
    bool connectIn( const SharedQueue& queue );
-   bool connectOut( const SharedQueue& queue );
+   bool connectOut( const IdentifiedSharedQueue& queue );
    int schedule(
       const std::string& func, const std::string& args = "" );
 
    int mainloop();
 
 private:
-   SharedQueue m_inbox;
-   SharedQueue m_outbox;
+   SharedQueue           m_inbox;
+   IdentifiedSharedQueue m_outbox;
    pid_t m_pid;
 };
 

File src/_athenamp/AthenaMP/ProcessGroup.h

 #define ATHENAMP_PROCESSGROUP_H
 
 #include "AthenaMP/Process.h"
-#include "AthenaMP/SharedQueue.h"
+#include "AthenaMP/IdentifiedSharedQueue.h"
 #include <vector>
 
 
 struct ProcessResult {
    pid_t pid;
    int   exitcode;
+   ScheduledWork output;
 };
 
 class ProcessGroup {
 
 private:
    std::vector< Process > m_processes;
-   SharedQueue m_inbox;
+   IdentifiedSharedQueue m_inbox;
    int m_nprocs;
    pid_t m_pgid;
 };

File src/_athenamp/AthenaMP/SharedQueue.h

 public:
    std::string name() const;
 
-   bool try_send( const std::string& );     // non-blocking
-   bool send( const std::string& );
+   virtual bool try_send( const std::string& );     // non-blocking
+   virtual bool send( const std::string& );
 
-   std::string try_receive();               // non-blocking
-   std::string receive();
-
-   boost::interprocess::message_queue* operator->() {
-      return m_queue;
-   }
+   virtual std::string try_receive();               // non-blocking
+   virtual std::string receive();
 
    operator bool() const {
       return (bool)m_queue;
    }
 
+protected:
+   boost::interprocess::message_queue* operator->() {
+      return m_queue;
+   }
+
 private:
    void copy( const SharedQueue& other );
    void destroy();

File src/_athenamp/IdentifiedSharedQueue.cxx

+#include "AthenaMP/IdentifiedSharedQueue.h"
+
+#include <boost/interprocess/ipc/message_queue.hpp>
+
+#include <sstream>
+
+using namespace boost::interprocess;
+
+
+const std::size_t MAX_MSG_SIZE = 4096;
+
+namespace AthenaMP {   
+
+//- construction/destruction -------------------------------------------------
+IdentifiedSharedQueue::IdentifiedSharedQueue() : SharedQueue()
+{
+/* empty */
+}
+
+IdentifiedSharedQueue::IdentifiedSharedQueue( const std::string& name, int max_msg, bool do_unlink ) :
+      SharedQueue( name, max_msg, do_unlink )
+{
+/* empty */
+}
+
+//- public member functions --------------------------------------------------
+inline std::string add_pid( const std::string& buf )
+{
+   std::ostringstream s;
+   s << (long)getpid() << '\0' << buf << std::ends;
+   return s.str();
+}
+
+bool IdentifiedSharedQueue::try_send( const std::string& buf )
+{
+   return this->SharedQueue::try_send( add_pid( buf ) );
+}
+
+bool IdentifiedSharedQueue::send( const std::string& buf )
+{
+   return this->SharedQueue::send( add_pid( buf ) );
+}
+
+inline std::string get_pid( const std::string& buf, pid_t &pid )
+{
+   std::istringstream s( buf );
+   long id = -1;
+   s >> id;
+   pid = (pid_t)id;
+   std::string::size_type pos = s.tellg();
+   if ( (pos+1) <= buf.size() )
+      return buf.substr( pos+1 );
+   return "";
+}
+  
+
+std::string IdentifiedSharedQueue::try_receive()
+{
+   pid_t pid;
+   return try_receive( pid );
+}
+
+std::string IdentifiedSharedQueue::try_receive( pid_t& id )
+{
+   const std::string& buf = this->SharedQueue::try_receive();
+   return get_pid( buf, id );
+}
+
+std::string IdentifiedSharedQueue::receive()
+{
+   pid_t pid;
+   return receive( pid );
+}
+
+std::string IdentifiedSharedQueue::receive( pid_t& id )
+{
+   const std::string& buf = this->SharedQueue::receive();
+   return get_pid( buf, id );
+}
+
+} // namespace AthenaMP

File src/_athenamp/Process.cxx

    return false;
 }
 
-bool Process::connectOut( const SharedQueue& queue )
+bool Process::connectOut( const IdentifiedSharedQueue& queue )
 {
    if ( ! queue.name().empty() ) {
       m_outbox = queue;
 }
 
 int Process::mainloop() {
+// TODO: the exit codes used here continue where the "miscellaneous" exit codes
+// in AthenaCommon/ExitCodes.py end. There's nothing to enforce that, though,
+// and it's not documented either.
    if ( m_pid != 0 )
       return 1;
 
+   int exit_code = 0; 
+
    while ( true ) {
       std::string func = m_inbox.receive();
       std::string args = m_inbox.receive();
       if ( ! sf ) {
       // TODO: msgsvc this
          std::cerr << "(_athenamp) ignoring function: " << func << std::endl;
+         exit_code = 0x09;
          continue;
       }
 
-      sf( (void*)args.c_str(), args.size() );
-   // TODO: handle and post result
+      ScheduledWork inwork = { (void*)args.c_str(), args.size() };
+      ScheduledWork* outwork = sf( &inwork );
+
+      if ( outwork ) {
+         bool posted = m_outbox.try_send( std::string( (char*)outwork->data, outwork->size ) );
+         free( outwork->data );
+         free( outwork );
+
+         if ( ! posted ) {
+            exit_code = 0x0A;
+            break;
+         }
+      } else {
+      // assume reportable as an error
+         exit_code = 0x0B;
+      }
    }
 
-   return 0;
+   return exit_code;     // TODO: standardize/document exit code
 }
 
 } // namespace AthenaMP

File src/_athenamp/ProcessGroup.cxx

 #include "AthenaMP/ProcessGroup.h"
+#include "AthenaMP/SharedQueue.h"
 
 #include <boost/interprocess/ipc/message_queue.hpp>
 
       return false;
 
 // a single queue for posting back onto the mother
-   if ( ! m_inbox )
-      m_inbox = create_queue( "mother", 0 );
+   if ( ! m_inbox ) {
+      std::ostringstream s;
+      s << "athenamp_mother_" << getpid() << std::ends;
+      m_inbox = AthenaMP::IdentifiedSharedQueue( s.str() );
+   }
 
 // create process group leader
    SharedQueue queue = create_queue( "child", 0 );
          result = errno;
          break;
       }
-      ProcessResult p = { child, WEXITSTATUS( child_status ) };
+      ProcessResult p = { child, WEXITSTATUS( child_status ), { 0, 0 } };
       status.push_back( p );
    }
 
+// retrieve the last posted returns from our inbox
+   while ( true ) {
+      pid_t pid = -1;
+      std::string buf = m_inbox.try_receive( pid );
+      if ( pid < 0 )
+         break;
+
+   // this is silly, but nprocs is presumably a rather small number
+      for ( std::vector< ProcessResult >::iterator
+               istat = status.begin(); istat < status.end(); ++istat ) {
+         if ( istat->pid == pid ) {
+            free( istat->output.data );
+            istat->output.data = malloc( buf.size() );
+            memcpy( istat->output.data, buf.data(), buf.size() );
+            istat->output.size = buf.size();
+            break;
+         }
+      }
+
+   }
+
 // Processes are dead now
    m_processes.clear();
 

File src/_athenamp/PyProcessGroup.cxx

 {
    int nprocs = -1;
    static const char* kwlist[] = { "nprocs", NULL };
-   if ( !PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:Process", (char**)kwlist, &nprocs ) ) {
+   if ( ! PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:Process", (char**)kwlist, &nprocs ) ) {
       new( (void*)&self->m_group ) ProcessGroup( 0 );
       return -1;
    }
 
 
 //__________________
-void AthenaMP_MappedPyFunction( void* databuf, int size ) {
+ScheduledWork* AthenaMP_MappedPyFunction( ScheduledWork* databuf ) {
 
-   PyObject* pydata = PyMarshal_ReadObjectFromString( (char*)databuf, size );
-   if ( !pydata || !PyTuple_Check( pydata ) ) {
+   PyObject* pydata = PyMarshal_ReadObjectFromString( (char*)databuf->data, databuf->size );
+   if ( ! pydata || ! PyTuple_Check( pydata ) ) {
       PyErr_Print();
-      return;
+      return 0;
    }
 
 // one string entry in pydata is guaranteed by pg_map_async
       pos = pos + 1;
    }
 
-   if ( !pymod ) {
+   if ( ! pymod ) {
       PyErr_Print();
       Py_DECREF( pydata );
-      return;
+      return 0;
    }
 
    PyObject* pyfunc = PyObject_GetAttrString( pymod, func.substr(pos, std::string::npos).c_str() );
-   if ( !pyfunc ) {
+   if ( ! pyfunc ) {
       PyErr_Print();
       Py_DECREF( pymod );
       Py_DECREF( pydata );
-      return;
+      return 0;
    }
 
-   if ( !PyCallable_Check( pyfunc ) ) {
+   if ( ! PyCallable_Check( pyfunc ) ) {
       PyErr_Format( PyExc_TypeError,
          "received non-callable %s", func.substr(pos, std::string::npos).c_str() );
       PyErr_Print();
       Py_DECREF( pyfunc );
       Py_DECREF( pymod );
       Py_DECREF( pydata );
-      return;
+      return 0;
    }
 
    PyObject* pyargs = PyTuple_GetSlice( pydata, 1, PyTuple_GET_SIZE( pydata ) );
    PyObject* pyresult = PyObject_Call( pyfunc, pyargs, NULL );
    Py_DECREF( pyargs );
 
-   if ( !pyresult ) {
+   ScheduledWork* result = 0;
+   if ( pyresult ) {
+      PyObject* pybuf = PyMarshal_WriteObjectToString( pyresult, Py_MARSHAL_VERSION );
+      if ( pybuf ) {
+         result = (ScheduledWork*)malloc( sizeof( ScheduledWork ) );
+         result->data = malloc( PyString_GET_SIZE( pybuf ) );
+         memcpy( result->data, PyString_AS_STRING( pybuf ), PyString_GET_SIZE( pybuf ) );
+         result->size = PyString_GET_SIZE( pybuf );
+         Py_DECREF( pybuf );
+      } else {
+         PyErr_Print();
+      }
+
+      Py_DECREF( pyresult );
+
+   } else {
       PyErr_Print();
-   } else {
-   // TODO: return python result
-      Py_DECREF( pyresult );
    }
 
    Py_DECREF( pyfunc );
    Py_DECREF( pymod );
    Py_DECREF( pydata );
+
+   return result;
 }
 
 static PyObject* pg_map_async( ProcessGroupProxy* self, PyObject* args )
 {
    int status = -1;
 
-   if ( !args || !PyTuple_Size( args ) || !PyString_Check( PyTuple_GET_ITEM( args, 0 ) ) ) {
+   if ( ! (args && PyTuple_Size( args ) && PyString_Check( PyTuple_GET_ITEM( args, 0 ) )) ) {
       PyErr_SetString( PyExc_TypeError, "map_async expects at least one (string) argument" );
       return NULL;
    }
 {
    int options = 0;
    static const char* kwlist[] = { "options", NULL };
-   if ( !PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:wait", (char**)kwlist, &options ) )
+   if ( ! PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:wait", (char**)kwlist, &options ) )
       return NULL;
 
    std::vector< ProcessResult > status;
    if ( result == 0 ) {
       PyObject* pystatus = PyTuple_New( status.size() );
       for ( int i = 0; i < (int)status.size(); ++i ) {
-         PyObject* itm = PyTuple_New( 2 );
-         PyTuple_SET_ITEM( itm, 0, PyInt_FromLong( (long)status[ i ].pid ) );
-         PyTuple_SET_ITEM( itm, 1, PyInt_FromLong( status[ i ].exitcode ) );
+         ProcessResult& ipr = status[ i ];
+
+         PyObject* itm = PyTuple_New( 3 );
+         PyTuple_SET_ITEM( itm, 0, PyInt_FromLong( (long)ipr.pid ) );
+         PyTuple_SET_ITEM( itm, 1, PyInt_FromLong( ipr.exitcode ) );
+         if ( ipr.output.data )
+            PyTuple_SET_ITEM( itm, 2,
+                PyMarshal_ReadObjectFromString( (char*)ipr.output.data, ipr.output.size ) );
+         else {
+            Py_INCREF( Py_None );
+            PyTuple_SET_ITEM( itm, 2, Py_None );
+         }
+
          PyTuple_SET_ITEM( pystatus, i, itm );
       }
       return pystatus;

File src/_athenamp/PyProcessGroup.h

 #define ATHENAMP_PY_PROCESSGROUP_H
 
 #include "AthenaMP/ProcessGroup.h"
+#include "AthenaMP/Process.h"
 
 #include <string>
 
 
 extern "C" {   // only to prevent mangling, for dlsym lookup
-   void AthenaMP_MappedPyFunction( void* databuf, int size );
+   AthenaMP::ScheduledWork* AthenaMP_MappedPyFunction( AthenaMP::ScheduledWork* databuf );
 }
 
 namespace AthenaMP {

File src/_athenamp/SharedQueue.cxx

 
 #include <boost/interprocess/ipc/message_queue.hpp>
 
-#include <sstream>
-
 using namespace boost::interprocess;
 
 
-const std::size_t MAX_MSG_SIZE = 4096;
+const std::size_t MAX_MSG_SIZE = 40960;
 
 namespace AthenaMP {   
 
 //- construction/destruction -------------------------------------------------
 SharedQueue::SharedQueue() : m_queue( 0 ), m_name( 0 ), m_count( 0 )
 {
-
+/* empty */
 }
 
 SharedQueue::SharedQueue( const std::string& name, int max_msg, bool do_unlink ) :