Source

mana-core-athenamp / src / _athenamp / Process.cxx

#include "PyAthenaMP.h"      // TODO: this is for compat and needs resolving

#include "AthenaMP/Process.h"

#include <boost/interprocess/ipc/message_queue.hpp>

#include <signal.h>
#include <sys/prctl.h>

#include <iostream>

using namespace boost::interprocess;


namespace AthenaMP {   

Process Process::launch()
{
   std::cout.flush();
   std::cerr.flush();

   pid_t pid = fork();
   if ( pid == 0 )
      prctl( PR_SET_PDEATHSIG, SIGHUP );          // Linux only

   return Process( pid );
}


//- construction/destruction -------------------------------------------------
Process::Process( pid_t pid ) : m_queue(), m_pid( pid )
{

}

Process::Process( const Process& other ) : m_queue( other.m_queue), m_pid( other.m_pid )
{

}

Process& Process::operator=( const Process& other )
{
   if ( this != &other ) {
      m_queue = other.m_queue;
      m_pid = other.m_pid;
   }
   return *this;
}

Process::~Process()
{

}


//- public member functions --------------------------------------------------
pid_t Process::getProcessID() const
{
   return m_pid;
}

bool Process::connect( const SharedQueue& queue )
{
   if ( ! queue.name().empty() ) {
      m_queue = queue;
   /* TODO: check that the queue is valid and can receive message */
      return true;
   }

   return false;
}

int Process::schedule( const std::string& func )
{
   int result = 0;
   try {
      m_queue->try_send( func.data(), func.length(), 1 );
   } catch ( interprocess_exception& e ) {
      result = 1;
   }

   return result;
}

int Process::mainloop() {
   if ( m_pid != 0 )
      return 1;

   while ( true ) {
      std::string func = m_queue.receive();
      if ( 4 <= func.length() && func.substr( 0, 3 ) == "py:" ) {
      // TODO: factor this out of existence
         PyObject* mod = 0;
         std::string::size_type pos = func.rfind( '.' );
         if ( pos == std::string::npos ) {
             mod = PyImport_AddModule( "__main__" );
             Py_XINCREF( mod );
             pos = 3;
         } else {
             mod = PyImport_ImportModuleNoBlock( func.substr(3, pos).c_str() );
         }

         if ( ! mod ) {
            PyErr_Print();
            continue;
         }

         PyObject* pyfunc = PyObject_GetAttrString( mod, func.substr(pos, std::string::npos).c_str() );
         if ( ! pyfunc ) {
            PyErr_Print();
            continue;
         }

         if ( ! PyCallable_Check( pyfunc ) ) {
            PyErr_Format( PyExc_TypeError,
               "received non-callable %s", func.substr(pos, std::string::npos).c_str() );
            PyErr_Print();
            continue;
         }

         PyObject* args = PyTuple_New( 0 );
         PyObject* pyresult = PyObject_Call( pyfunc, args, NULL );
         Py_DECREF( args );

         if ( ! pyresult )
            PyErr_Print();

         Py_DECREF( pyfunc );
         Py_DECREF( mod );

      } else {
      // use dlsym to locate C function
      }

      if ( func == "exit" )
         break;         // drop out of mainloop, will result in exit of process
   }

   return 0;
}

} // namespace AthenaMP