Source

mana-core-athenamp / src / _athenamp / PySharedQueue.cxx

#include "PyAthenaMP.h"
#include "PySharedQueue.h"

#include "marshal.h"

#include <boost/interprocess/ipc/message_queue.hpp>

#include <sstream>
#include <new>

using namespace boost::interprocess;


//- module classes -----------------------------------------------------------
AthenaMP::SharedQueueProxy* AthenaMP::SharedQueueProxy_New( const std::string& name )
{
   SharedQueueProxy* pysq = (SharedQueueProxy*)SharedQueueProxy_Type.tp_new( &SharedQueueProxy_Type, 0, 0 );
   new( (void*)&pysq->m_sharedqueue ) SharedQueue( name );
   return pysq;
}

AthenaMP::SharedQueueProxy* AthenaMP::SharedQueueProxy_New( const AthenaMP::SharedQueue& sq )
{
   SharedQueueProxy* pysq = (SharedQueueProxy*)SharedQueueProxy_Type.tp_new( &SharedQueueProxy_Type, 0, 0 );
   new( (void*)&pysq->m_sharedqueue ) SharedQueue( sq );
   return pysq;
}


using namespace AthenaMP;

//__________________
static SharedQueueProxy* sq_new( PyTypeObject*, PyObject*, PyObject* )
{
   SharedQueueProxy* pysq = PyObject_GC_New( SharedQueueProxy, &SharedQueueProxy_Type );
   PyObject_GC_Track( pysq );
   return pysq;
}

//__________________
static int sq_init( SharedQueueProxy* self, PyObject* args, PyObject* kwds )
{
   static int count = 0;
   int maxsize = SHAREDQUEUE_MAX_MSG;
   static const char* kwlist[] = { "maxsize", NULL };
   if ( ! PyArg_ParseTupleAndKeywords( args, kwds, (char*)"|i:SharedQueue", (char**)kwlist, &maxsize ) ) {
      return -1;
   }

   std::ostringstream s;
   s << "athenamp_queue_" << getpid() << "_" << count << std::ends;
   new( (void*)&self->m_sharedqueue ) SharedQueue( s.str(), maxsize );
   return 0;
}

//__________________
static int sq_traverse( SharedQueueProxy*, visitproc, void* )
{
   return 0;
}

//__________________
static int sq_clear( SharedQueueProxy* )
{
   return 0;
}

//__________________
static void sq_dealloc( SharedQueueProxy* pysq )
{
   PyObject_GC_UnTrack( pysq );

   pysq->m_sharedqueue.~SharedQueue();

   PyObject_GC_Del( pysq );
}

//__________________
static PyObject* sq_get_nowait( SharedQueueProxy* pysq )
{
   const std::string& buf = pysq->m_sharedqueue.try_receive();
   PyObject* pyobj = PyMarshal_ReadObjectFromString( (char*)buf.c_str(), buf.size() );

   return pyobj;
}

//__________________
static PyObject* sq_get( SharedQueueProxy* pysq )
{
   const std::string& buf = pysq->m_sharedqueue.receive();
   PyObject* pyobj = PyMarshal_ReadObjectFromString( (char*)buf.c_str(), buf.size() );

   return pyobj;
}

//__________________
static PyObject* sq_put_nowait( SharedQueueProxy* pysq, PyObject* pyobj )
{
   PyObject* pybuf = PyMarshal_WriteObjectToString( pyobj, Py_MARSHAL_VERSION );
   if ( ! pybuf )
      return NULL;

   bool send_ok = pysq->m_sharedqueue.try_send(
      std::string( PyString_AS_STRING( pybuf ), PyString_GET_SIZE( pybuf ) ) );
   Py_DECREF( pybuf );

   if ( ! send_ok ) {
      PyObject* rep = PyObject_Str( pyobj );
      PyErr_Format( PyExc_OverflowError, "failed to send object %s", PyString_AsString( rep ) );
      Py_DECREF( rep );
      return NULL;
   }

   Py_INCREF( Py_None );
   return Py_None;
}

//____________________________________________________________________________
static PyMethodDef sq_methods[] = {
   { (char*)"put_nowait", (PyCFunction)sq_put_nowait, METH_O,
        (char*)"non-blocking put on queue" },
   { (char*)"get_nowait", (PyCFunction)sq_get_nowait, METH_NOARGS,
        (char*)"non-blocking get on queue" },
   { (char*)"get",        (PyCFunction)sq_get,        METH_NOARGS,
        (char*)"blocking get on queue" },
   { (char*)NULL, NULL, 0, NULL }
};

//__________________
static PyGetSetDef sq_getset[] = {
   { (char*)NULL, NULL, NULL, NULL, NULL }
};

//__________________
PyTypeObject AthenaMP::SharedQueueProxy_Type = {
   PyVarObject_HEAD_INIT( &PyType_Type, 0 )
   (char*)"_athenamp.SharedQueueProxy", // tp_name
   sizeof(SharedQueueProxy),  // tp_basicsize
   0,                         // tp_itemsize
   (destructor)sq_dealloc,    // tp_dealloc
   0,                         // tp_print
   0,                         // tp_getattr
   0,                         // tp_setattr
   0,                         // tp_compare
   0,                         // tp_repr
   0,                         // tp_as_number
   0,                         // tp_as_sequence
   0,                         // tp_as_mapping
   0,                         // tp_hash
   0,                         // tp_call
   0,                         // tp_str
   0,                         // tp_getattro
   0,                         // tp_setattro
   0,                         // tp_as_buffer
   Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,      // tp_flags
   (char*)"AthenaMP shared queue proxy",         // tp_doc
   (traverseproc)sq_traverse, // tp_traverse
   (inquiry)sq_clear,         // tp_clear
   0,                         // tp_richcompare
   0,                         // tp_weaklistoffset
   0,                         // tp_iter
   0,                         // tp_iternext
   sq_methods,                // tp_methods
   0,                         // tp_members
   sq_getset,                 // tp_getset
   0,                         // tp_base
   0,                         // tp_dict
   0,                         // tp_descr_get
   0,                         // tp_descr_set
   0,                         // tp_dictoffset
   (initproc)sq_init,         // tp_init
   0,                         // tp_alloc
   (newfunc)sq_new,           // tp_new
   0,                         // tp_free
   0,                         // tp_is_gc
   0,                         // tp_bases
   0,                         // tp_mro
   0,                         // tp_cache
   0,                         // tp_subclasses
   0                          // tp_weaklist
#if PY_VERSION_HEX >= 0x02030000
   , 0                        // tp_del
#endif
#if PY_VERSION_HEX >= 0x02060000
   , 0                        // tp_version_tag
#endif
};