1. binet
  2. mana-core-athenamp

Commits

wl...@4525493e-7705-40b1-a816-d608a930855b  committed ac59783

better error handling; enabled MpProcessing

  • Participants
  • Parent commits 9863666
  • Branches default

Comments (0)

Files changed (8)

File ChangeLog

View file
+2012-01-19  Wim Lavrijsen <WLavrijsen@lbl.gov>
+        * More improved error propagation for corner cases. Now enabled MpProcessing.
+
 2012-01-18  Wim Lavrijsen <WLavrijsen@lbl.gov>
         * More exit code handling: different paths through PyComps give completely different
           results. (Note that the current multiprocessing based code hangs, crashes, or does

File python/PyComps.py

View file
 
 #-----Python imports---#
 import os, sys, time
-import multiprocessing as mp
-#import AthenaMP.MpProcessing as mp
+#import multiprocessing as mp
+import AthenaMP.MpProcessing as mp
 
 #-----Athena imports---#
 import AthenaCommon.SystemOfUnits as Units

File src/_athenamp/AthenaMP/Process.h

View file
 
    bool connectIn( const SharedQueue& queue );
    bool connectOut( const IdentifiedSharedQueue& queue );
-   int schedule(
+   bool schedule(
       const std::string& func, const std::string& args = "" );
 
    int mainloop();

File src/_athenamp/Process.cxx

View file
    return false;
 }
 
-int Process::schedule( const std::string& func, const std::string& args )
+bool Process::schedule( const std::string& func, const std::string& args )
 {
-   int result = 0;
-   try {
-   // Added to the inbox, typically from the mother; taken out of the inbox in
-   // the mainloop in the worker.
-      m_inbox.try_send( func );
-      m_inbox.try_send( args );
-   } catch ( interprocess_exception& e ) {
-      result = -1;
-   }
+// Added to the inbox, typically from the mother; taken out of the inbox in
+// the mainloop in the worker.
+    bool send_ok = m_inbox.try_send( func );
+    if ( send_ok ) send_ok = m_inbox.try_send( args );
 
-   return result;
+   return send_ok;
 }
 
 int Process::mainloop() {

File src/_athenamp/ProcessGroup.cxx

View file
    for ( std::vector< Process >::iterator iproc = m_processes.begin();
          iproc != m_processes.end(); ++iproc ) {
 
-      if ( iproc->schedule( func, args ) != 0 ) {
+      if ( ! iproc->schedule( func, args ) ) {
       // stopping the scheduling on all other processes is debatable ...
          return -1;
       }
          result = errno;
          break;
       }
-      ProcessResult p = { child, WEXITSTATUS( child_status ), { 0, 0 } };
+      child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
+      ProcessResult p = { child, child_status, { 0, 0 } };
       status.push_back( p );
    }
 

File src/_athenamp/PySharedQueue.cxx

View file
 
 
 //- module classes -----------------------------------------------------------
-AthenaMP::SharedQueueProxy* AthenaMP::SharedQueueProxy_New( const std::string& name )
+AthenaMP::SharedQueueProxy* AthenaMP::SharedQueueProxy_New(
+      const std::string& name, int max_msg, bool do_unlink )
 {
    SharedQueueProxy* pysq = (SharedQueueProxy*)SharedQueueProxy_Type.tp_new( &SharedQueueProxy_Type, 0, 0 );
-   new( (void*)&pysq->m_sharedqueue ) SharedQueue( name );
+   new( (void*)&pysq->m_sharedqueue ) SharedQueue( name, max_msg, do_unlink );
    return pysq;
 }
 

File src/_athenamp/PySharedQueue.h

View file
 }
 
 //- creation -----------------------------------------------------------------
-SharedQueueProxy* SharedQueueProxy_New( const std::string& name );
+SharedQueueProxy* SharedQueueProxy_New(
+   const std::string& name, int max_msg = SHAREDQUEUE_MAX_MSG, bool do_unlink = true );
 SharedQueueProxy* SharedQueueProxy_New( const SharedQueue& );
 
 } // namespace AthenaMP

File src/_athenamp/SharedQueue.cxx

View file
 using namespace boost::interprocess;
 
 
-const std::size_t MAX_MSG_SIZE = 4096;
+const std::size_t MAX_MSG_SIZE = 256;  // may be too big, but is on mmapped memory
 
 namespace AthenaMP {   
 
 
       if ( result ) {
          if ( recvd_size > MAX_MSG_SIZE )
-            recvd_size = MAX_MSG_SIZE;
+            recvd_size = MAX_MSG_SIZE;      // this is debatable, but send
+                                            // should have failed already, so
+                                            // "can not happen" applies
          return std::string( buf, recvd_size );
       }
    } catch ( interprocess_exception& e ) {